Preprocess data
Split and preprocess your dataset
If you want to see the code for this chapter of the guide, head over to the GitHub.

Split, analyze and preprocess your dataset

Now before writing any trainers, we can actually split our dataset into train, test and validation splits and analyze the train split and preprocess our splits accordingly to make sure we get better results. To do this let's add three more steps and make the pipeline a bit more complex.

Splitter

First, we can inherit from the BaseSplitStep which takes a single data artifact as input and splits it into three different output data artifacts for each split based on a given configuration:
1
from typing import Dict
2
3
import pandas as pd
4
from sklearn.model_selection import train_test_split
5
6
from zenml.steps.step_interfaces.base_split_step import (
7
BaseSplitStep,
8
BaseSplitStepConfig,
9
)
10
from zenml.steps import Output
11
12
13
class SklearnSplitterConfig(BaseSplitStepConfig):
14
ratios: Dict[str, float]
15
16
class SklearnSplitter(BaseSplitStep):
17
def entrypoint(self,
18
dataset: pd.DataFrame,
19
config: SklearnSplitterConfig
20
) -> Output(train=pd.DataFrame,
21
test=pd.DataFrame,
22
validation=pd.DataFrame):
23
24
train_dataset, test_dataset = train_test_split(dataset,
25
test_size=config.ratios["test"])
26
27
test_size = config.ratios["validation"] / (config.ratios["validation"] + config.ratios["train"])
28
train_dataset, val_dataset = train_test_split(train_dataset,
29
test_size=test_size)
30
31
return train_dataset, test_dataset, val_dataset
Copied!

Analyzer

Next, we can think of a way to analyze the dataset to generate statistics and schema. For this purpose, we will inherit from the BaseAnalyzerStep and overwrite the entrypoint method:
1
from typing import Any, List, Optional, Type, Union
2
3
import pandas as pd
4
5
from zenml.artifacts import SchemaArtifact, StatisticsArtifact
6
from zenml.steps import Output
7
from zenml.steps.step_interfaces.base_analyzer_step import (
8
BaseAnalyzerConfig,
9
BaseAnalyzerStep,
10
)
11
12
13
class PandasAnalyzerConfig(BaseAnalyzerConfig):
14
percentiles: List[float] = [0.25, 0.5, 0.75]
15
include: Optional[Union[str, List[Type[Any]]]] = None
16
exclude: Optional[Union[str, List[Type[Any]]]] = None
17
18
19
class PandasAnalyzer(BaseAnalyzerStep):
20
OUTPUT_SPEC = {"statistics": StatisticsArtifact, "schema": SchemaArtifact}
21
22
def entrypoint(self,
23
dataset: pd.DataFrame,
24
config: PandasAnalyzerConfig,
25
) -> Output(statistics=pd.DataFrame,
26
schema=pd.DataFrame):
27
28
statistics = dataset.describe(
29
percentiles=config.percentiles,
30
include=config.include,
31
exclude=config.exclude,
32
).T
33
schema = dataset.dtypes.to_frame().T.astype(str)
34
return statistics, schema
Copied!

Preprocessor

Finally, we can write a step which would preprocess all the splits based on the statistics extracted from the train split. Let's use the BasePreprocessorStep to achieve that:
1
from typing import List
2
3
import pandas as pd
4
from sklearn.preprocessing import StandardScaler
5
6
from zenml.steps import Output
7
from zenml.steps.step_interfaces.base_preprocessor_step import (
8
BasePreprocessorConfig,
9
BasePreprocessorStep,
10
)
11
12
class SklearnStandardScalerConfig(BasePreprocessorConfig):
13
ignore_columns: List[str] = []
14
exclude_columns: List[str] = []
15
16
17
class SklearnStandardScaler(BasePreprocessorStep):
18
def entrypoint(self,
19
train_dataset: pd.DataFrame,
20
test_dataset: pd.DataFrame,
21
validation_dataset: pd.DataFrame,
22
statistics: pd.DataFrame,
23
schema: pd.DataFrame,
24
config: SklearnStandardScalerConfig,
25
) -> Output(train_transformed=pd.DataFrame,
26
test_transformed=pd.DataFrame,
27
validation_transformed=pd.DataFrame):
28
29
schema_dict = {k: v[0] for k, v in schema.to_dict().items()}
30
31
feature_set = set(train_dataset.columns) - set(config.exclude_columns)
32
for feature, feature_type in schema_dict.items():
33
if feature_type != "int64" and feature_type != "float64":
34
feature_set.remove(feature)
35
36
transform_feature_set = feature_set - set(config.ignore_columns)
37
38
scaler = StandardScaler()
39
scaler.mean_ = statistics["mean"][transform_feature_set]
40
scaler.scale_ = statistics["std"][transform_feature_set]
41
42
train_dataset[transform_feature_set] = scaler.transform(train_dataset[transform_feature_set])
43
test_dataset[transform_feature_set] = scaler.transform(test_dataset[transform_feature_set])
44
validation_dataset[transform_feature_set] = scaler.transform(validation_dataset[transform_feature_set])
45
46
return train_dataset, test_dataset, validation_dataset
Copied!
There are some important things to note:
  • As we are dealing with three splits, all of these steps have multiple outputs. That is why we need to use the zenml.steps.Output class to indicate the names of each output. If there was only one, we would not need to do this.
  • In the PandasAnalyzer, the outputs are annotated as pd.DataFrames and on default, pd.DataFrames are interpreted as DataArtifacts. As this is not the case within this step, we can overwrite the OUTPUT_SPEC of the class to point our step to the correct artifact types, namely the StatisticsArtifact and the SchemaArtifact.

Pipeline

And that's it. Now, we can make these steps a part of our pipeline which would then look like this:
1
from zenml.pipelines import BasePipeline
2
from zenml.steps import step_interfaces
3
4
5
class Chapter2Pipeline(BasePipeline):
6
"""Class for Chapter 2 of the class-based API"""
7
8
def connect(self,
9
datasource: step_interfaces.BaseDatasourceStep,
10
splitter: step_interfaces.BaseSplitStep,
11
analyzer: step_interfaces.BaseAnalyzerStep,
12
preprocessor: step_interfaces.BasePreprocessorStep
13
) -> None:
14
15
# Ingesting the datasource
16
dataset = datasource()
17
18
# Splitting the data
19
train, test, validation = splitter(dataset=dataset)
20
21
# Analyzing the train dataset
22
statistics, schema = analyzer(dataset=train)
23
24
# Preprocessing the splits
25
train_t, test_t, validation_t = preprocessor(
26
train_dataset=train,
27
test_dataset=test,
28
validation_dataset=validation,
29
statistics=statistics,
30
schema=schema,
31
)
32
33
34
# Create the pipeline and run it
35
import os
36
37
pipeline_instance = Chapter2Pipeline(
38
datasource=PandasDatasource(PandasDatasourceConfig(path=os.getenv("data"))),
39
splitter=SklearnSplitter(SklearnSplitterConfig(ratios={"train": 0.7, "test": 0.15, "validation": 0.15})),
40
analyzer=PandasAnalyzer(PandasAnalyzerConfig(percentiles=[0.2, 0.4, 0.6, 0.8, 1.0])),
41
preprocessor=SklearnStandardScaler(SklearnStandardScalerConfig(ignore_columns=["has_diabetes"]))
42
)
43
44
pipeline_instance.run()
Copied!

Run

You can run this as follows:
1
python chapter_2.py
Copied!
The output will look as follows (note: this is filtered to highlight the most important logs)
1
Creating pipeline: Chapter2Pipeline
2
Cache enabled for pipeline `Chapter2Pipeline`
3
Using orchestrator `local_orchestrator` for pipeline `Chapter2Pipeline`. Running pipeline..
4
Step `PandasDatasource` has started.
5
Step `PandasDatasource` has finished in 0.045s.
6
Step `SklearnSplitter` has started.
7
Step `SklearnSplitter` has finished in 0.432s.
8
Step `PandasAnalyzer` has started.
9
Step `PandasAnalyzer` has finished in 0.092s.
10
Step `SklearnStandardScaler` has started.
11
Step `SklearnStandardScaler` has finished in 0.151s.
Copied!

Inspect

You can add the following code to fetch the pipeline:
1
from zenml.repository import Repository
2
3
repo = Repository()
4
p = repo.get_pipeline(pipeline_name="Chapter2Pipeline")
5
runs = p.runs
6
print(f"Pipeline `Chapter2Pipeline` has {len(runs)} run(s)")
7
run = runs[-1]
8
print(f"The run you just made has {len(run.steps)} step(s).")
Copied!
You will get the following output:
1
Pipeline `Chapter2Pipeline` has 1 run(s)
2
The run you just made has 4 step(s).
Copied!
Last modified 21h ago
Export as PDF
Copy link
Edit on GitHub