Train & evaluate
Train some models.
If you want to see the code for this chapter of the guide, head over to the GitHub.
Finally, we can train and evaluate our model. For this we decide to utilize the two base classes, the BaseTrainerStep and the BaseEvaluator step.

Trainer

We can now start thinking about training our model. For this, we will be using the BaseTrainerStep to design a really simple step for training a parameterized fully connected network using Tensorflow (Keras).
1
from typing import List, Tuple
2
3
import pandas as pd
4
import tensorflow as tf
5
6
from zenml.steps.step_interfaces.base_trainer_step import (
7
BaseTrainerConfig,
8
BaseTrainerStep,
9
)
10
class TensorflowBinaryClassifierConfig(BaseTrainerConfig):
11
target_column: str
12
layers: List[int] = [256, 64, 1]
13
input_shape: Tuple[int] = (8,)
14
learning_rate: float = 0.001
15
metrics: List[str] = ["accuracy"]
16
epochs: int = 50
17
batch_size: int = 8
18
19
20
class TensorflowBinaryClassifier(BaseTrainerStep):
21
def entrypoint(
22
self,
23
train_dataset: pd.DataFrame,
24
validation_dataset: pd.DataFrame,
25
config: TensorflowBinaryClassifierConfig,
26
) -> tf.keras.Model:
27
28
model = tf.keras.Sequential()
29
model.add(tf.keras.layers.InputLayer(input_shape=config.input_shape))
30
model.add(tf.keras.layers.Flatten())
31
32
last_layer = config.layers.pop()
33
for layer in config.layers:
34
model.add(tf.keras.layers.Dense(layer, activation="relu"))
35
model.add(tf.keras.layers.Dense(last_layer, activation="sigmoid"))
36
37
model.compile(
38
optimizer=tf.keras.optimizers.Adam(config.learning_rate),
39
loss=tf.keras.losses.BinaryCrossentropy(),
40
metrics=config.metrics,
41
)
42
43
train_target = train_dataset.pop(config.target_column)
44
validation_target = validation_dataset.pop(config.target_column)
45
model.fit(
46
x=train_dataset,
47
y=train_target,
48
validation_data=(validation_dataset, validation_target),
49
batch_size=config.batch_size,
50
epochs=config.epochs,
51
)
52
model.summary()
53
54
return model
Copied!

Evaluator

We can also add a simple evaluator using the BaseEvaluatorStep:
1
from typing import Any, Dict, cast
2
3
import pandas as pd
4
import tensorflow as tf
5
from sklearn.metrics import classification_report
6
7
from zenml.steps.step_interfaces.base_evaluator_step import (
8
BaseEvaluatorConfig,
9
BaseEvaluatorStep,
10
)
11
12
13
class SklearnEvaluatorConfig(BaseEvaluatorConfig):
14
label_class_column: str
15
16
17
class SklearnEvaluator(BaseEvaluatorStep):
18
def entrypoint(
19
self,
20
dataset: pd.DataFrame,
21
model: tf.keras.Model,
22
config: SklearnEvaluatorConfig,
23
) -> Dict[str, Any]:
24
25
labels = dataset.pop(config.label_class_column)
26
27
predictions = model.predict(dataset)
28
predicted_classes = [1 if v > 0.5 else 0 for v in predictions]
29
30
report = classification_report(labels, predicted_classes, output_dict=True)
31
32
return report
Copied!
Important things to note:
  • the trainer returns a tf.keras.Model, which ZenML takes care of storing in the artifact store. We will talk about how to 'take over' this storing via Materializers in a later chapter.

Pipeline

The final pipeline called TrainingPipeline is actually a built-in pipeline in ZenML, and it looks like this:
1
class TrainingPipeline(BasePipeline):
2
""" The built-in ZenML training pipeline"""
3
4
def connect(
5
self,
6
datasource: BaseDatasourceStep,
7
splitter: BaseSplitStep,
8
analyzer: BaseAnalyzerStep,
9
preprocessor: BasePreprocessorStep,
10
trainer: BaseTrainerStep,
11
evaluator: BaseEvaluatorStep,
12
) -> None:
13
14
# Ingesting the datasource
15
dataset = datasource()
16
17
# Splitting the data
18
train, test, validation = splitter(dataset=dataset)
19
20
# Analyzing the train dataset
21
statistics, schema = analyzer(dataset=train)
22
23
# Preprocessing the splits
24
train_t, test_t, validation_t = preprocessor(
25
train_dataset=train,
26
test_dataset=test,
27
validation_dataset=validation,
28
statistics=statistics,
29
schema=schema,
30
)
31
32
# Training the model
33
model = trainer(train_dataset=train_t, validation_dataset=validation_t)
34
35
# Evaluating the trained model
36
evaluator(model=model, dataset=test_t)
Copied!
You can add your steps to it and run your pipeline as follows:
1
# Create the pipeline and run it
2
import os
3
4
pipeline_instance = TrainingPipeline(
5
datasource=PandasDatasource(PandasDatasourceConfig(path=os.getenv("data"))),
6
splitter=SklearnSplitter(SklearnSplitterConfig(ratios={"train": 0.7, "test": 0.15, "validation": 0.15})),
7
analyzer=PandasAnalyzer(PandasAnalyzerConfig(percentiles=[0.2, 0.4, 0.6, 0.8, 1.0])),
8
preprocessor=SklearnStandardScaler(SklearnStandardScalerConfig(ignore_columns=["has_diabetes"])),
9
trainer=TensorflowBinaryClassifier(TensorflowBinaryClassifierConfig(target_column="has_diabetes")),
10
evaluator=SklearnEvaluator(SklearnEvaluatorConfig(label_class_column="has_diabetes"))
11
)
12
13
pipeline_instance.run()
Copied!
Beautiful, now the pipeline is truly doing something. Let's run it!

Run

You can run this as follows:
1
python chapter_3.py
Copied!
The output will look as follows (note: this is filtered to highlight the most important logs)
1
Creating pipeline: TrainingPipeline
2
Cache enabled for pipeline `TrainingPipeline`
3
Using orchestrator `local_orchestrator` for pipeline `TrainingPipeline`. Running pipeline..
4
Step `PandasDatasource` has started.
5
Step `PandasDatasource` has finished in 0.017s.
6
Step `SklearnSplitter` has started.
7
Step `SklearnSplitter` has finished in 0.013s.
8
Step `PandasAnalyzer` has started.
9
Step `PandasAnalyzer` has finished in 0.013s.
10
Step `SklearnStandardScaler` has started.
11
Step `SklearnStandardScaler` has finished in 0.021s.
12
Step `TensorflowBinaryClassifier` has started.
13
67/67 [==============================] - 0s 2ms/step - loss: 0.5448 - accuracy: 0.7444 - val_loss: 0.4539 - val_accuracy: 0.7500
14
Model: "sequential"
15
_________________________________________________________________
16
Layer (type) Output Shape Param #
17
=================================================================
18
flatten (Flatten) (None, 8) 0
19
_________________________________________________________________
20
dense (Dense) (None, 256) 2304
21
_________________________________________________________________
22
dense_1 (Dense) (None, 64) 16448
23
_________________________________________________________________
24
dense_2 (Dense) (None, 1) 65
25
=================================================================
26
Total params: 18,817
27
Trainable params: 18,817
28
Non-trainable params: 0
29
_________________________________________________________________
30
Step `TensorflowBinaryClassifier` has finished in 1.232s.
31
Step `SklearnEvaluator` has started.
32
Step `SklearnEvaluator` has finished in 0.289s.
Copied!

Inspect

If you add the following code to fetch the pipeline:
1
from zenml.repository import Repository
2
3
repo = Repository()
4
p = repo.get_pipeline(pipeline_name="mnist_pipeline")
5
runs = p.runs
6
print(f"Pipeline `mnist_pipeline` has {len(runs)} run(s)")
7
run = runs[-1]
8
print(f"The run you just made has {len(run.steps)} steps.")
9
step = run.get_step('evaluator')
10
print(
11
f"The `tf_evaluator step` returned an accuracy: {step.output.read()}"
12
)
Copied!
You get the following output:
1
Pipeline `TrainingPipeline` has 1 run(s)
2
The run you just made has 6 step(s).
Copied!
Last modified 23h ago
Export as PDF
Copy link
Edit on GitHub
Contents