Create a step
Create your first step.
If you want to see the code for this chapter of the guide, head over to the GitHub.

Create an importer step to load data

The first thing to do is to load our data. We create a step that can load data from an external source (in this case a csv file representing the Pima Indians Diabetes Database. This can be done by inheriting from the BaseDatasourceStep definition and overwriting the entrypoint function.

Datasource

1
from typing import List, Optional, Union
2
3
import pandas as pd
4
5
from zenml.steps.step_interfaces.base_datasource_step import (
6
BaseDatasourceConfig,
7
BaseDatasourceStep,
8
)
9
10
class PandasDatasourceConfig(BaseDatasourceConfig):
11
path: str
12
sep: str = ","
13
header: Union[int, List[int], str] = "infer"
14
names: Optional[List[str]] = None
15
index_col: Optional[Union[int, str, List[Union[int, str]], bool]] = None
16
17
18
class PandasDatasource(BaseDatasourceStep):
19
def entrypoint(
20
self,
21
config: PandasDatasourceConfig,
22
) -> pd.DataFrame:
23
return pd.read_csv(
24
filepath_or_buffer=config.path,
25
sep=config.sep,
26
header=config.header,
27
names=config.names,
28
index_col=config.index_col,
29
)
Copied!
Importing: things to note:
  • The annotations in the signature of the entrypoint method of the BaseDatasourceStep is being overwritten with the actual data types. This is a necessary step for the step to work.
  • The step is using a specific config object designed with this step in mind.

Pipeline

Now we can go ahead and create a pipeline by inheriting from the BasePipeline and add a single step to ingest from our datasource:
1
import os
2
3
from zenml.pipelines import BasePipeline
4
from zenml.steps import step_interfaces
5
6
7
class Chapter1Pipeline(BasePipeline):
8
"""Class for Chapter 1 of the class-based API"""
9
10
def connect(self, datasource: step_interfaces.BaseDatasourceStep,) -> None:
11
datasource()
12
13
pipeline_instance = Chapter1Pipeline(
14
datasource=PandasDatasource(PandasDatasourceConfig(path=os.getenv("data")))
15
)
16
17
pipeline_instance.run()
Copied!

Run

You can run this as follows:
1
python chapter_1.py
Copied!
The output will look as follows (note: this is filtered to highlight the most important logs)
1
Creating pipeline: Chapter1Pipeline
2
Cache enabled for pipeline `Chapter1Pipeline`
3
Using orchestrator `local_orchestrator` for pipeline `Chapter1Pipeline`. Running pipeline..
4
Step `PandasDatasource` has started.
5
Step `PandasDatasource` has finished in 0.016s.
Copied!

Inspect

You can add the following code to fetch the pipeline:
1
from zenml.repository import Repository
2
3
repo = Repository()
4
p = repo.get_pipeline(pipeline_name="Chapter1Pipeline")
5
runs = p.runs
6
print(f"Pipeline `Chapter1Pipeline` has {len(runs)} run(s)")
7
run = runs[-1]
8
print(f"The run you just made has {len(run.steps)} step(s).")
9
step = run.get_step("datasource")
10
print(f"That step has {len(step.outputs)} output artifacts.")
Copied!
You will get the following output:
1
Pipeline `Chapter1Pipeline` has 3 run(s)
2
The run you just made has 1 step(s).
3
That step has 1 output artifacts.
Copied!
Last modified 23h ago
Export as PDF
Copy link
Edit on GitHub