In this case, ZenML has an integration with sklearn so you can use the ZenML CLI to install the right version directly.
Steps with multiple outputs
Sometimes a step will have multiple outputs. To define such a step, use a Tuple type annotation. Additionally, you can use the Annotated annotation to assign custom output names. Here we load an open-source dataset and split it into a train and a test dataset.
import logging@stepdeftraining_data_loader() -> Tuple[ Annotated[pd.DataFrame,"X_train"], Annotated[pd.DataFrame,"X_test"], Annotated[pd.Series,"y_train"], Annotated[pd.Series,"y_test"],]:"""Load the iris dataset as a tuple of Pandas DataFrame / Series.""" logging.info("Loading iris...") iris =load_iris(as_frame=True) logging.info("Splitting train and test...") X_train, X_test, y_train, y_test =train_test_split( iris.data, iris.target, test_size=0.2, shuffle=True, random_state=42 )return X_train, X_test, y_train, y_test
ZenML records the root python logging handler's output into the artifact store as a side-effect of running a step. Therefore, when writing steps, use the logging module to record logs, to ensure that these logs then show up in the ZenML dashboard.
Parametrizing a step
Here we are creating a training step for a support vector machine classifier with sklearn. As we might want to adjust the hyperparameter gamma later on, we define it as an input value to the step as well.
If you want to run the step function outside the context of a ZenML pipeline, all you need to do is call the step function outside of a ZenML pipeline. For example:
svc_trainer(X_train=..., y_train=...)
Next, we will combine our two steps into a pipeline and run it. As you can see, the parameter gamma is configurable as a pipeline input.
Best Practice: Always nest the actual execution of the pipeline inside an if __name__ == "__main__" condition. This ensures that loading the pipeline from elsewhere does not also run it.
if__name__=="__main__":first_pipeline()
Running python run.py should look somewhat like this in the terminal:
This name is automatically generated based on the current date and time. To change the name for a run, pass run_name as a parameter to the with_options() method:
Pipeline run names must be unique, so if you plan to run your pipelines multiple times or run them on a schedule, make sure to either compute the run name dynamically or include one of the following placeholders that ZenML will replace:
{{date}} will resolve to the current date, e.g. 2023_02_19
{{time}} will resolve to the current time, e.g. 11_07_09_326492
The following example shows caching in action with the code example from the previous section.
Code Example of this Section
import pandas as pdfrom sklearn.datasets import load_irisfrom sklearn.model_selection import train_test_splitfrom sklearn.base import ClassifierMixinfrom sklearn.svm import SVCfrom zenml import pipeline, step@stepdeftraining_data_loader() -> Tuple[ Annotated[pd.DataFrame,"X_train"], Annotated[pd.DataFrame,"X_test"], Annotated[pd.Series,"y_train"], Annotated[pd.Series,"y_test"],]:"""Load the iris dataset as tuple of Pandas DataFrame / Series.""" iris =load_iris(as_frame=True) X_train, X_test, y_train, y_test =train_test_split( iris.data, iris.target, test_size=0.2, shuffle=True, random_state=42 )return X_train, X_test, y_train, y_test@step(enable_cache=False)defsvc_trainer(X_train: pd.DataFrame,y_train: pd.Series,gamma:float=0.001,) -> Tuple[ Annotated[ClassifierMixin,"trained_model"], Annotated[float,"training_acc"],]:"""Train a sklearn SVC classifier and log to MLflow.""" model =SVC(gamma=gamma) model.fit(X_train.to_numpy(), y_train.to_numpy()) train_acc = model.score(X_train.to_numpy(), y_train.to_numpy())print(f"Train accuracy: {train_acc}")return model, train_acc@pipelinedeffirst_pipeline(gamma:float=0.002): X_train, X_test, y_train, y_test =training_data_loader()svc_trainer(gamma=gamma, X_train=X_train, y_train=y_train)if__name__=="__main__":first_pipeline()# Step one will use cache, step two will rerun due to caching# being disabled on the @step decorator. Even if caching was# enabled though, ZenML would detect a different value for the# `gamma` input of the second step and disable cachingfirst_pipeline(gamma=0.0001)