Chapter 6
Reading from a continuously changing datasource
If you want to see the code for this chapter of the guide, head over to the GitHub.

Import data from a dynamic data source

Until now, we've been reading from a static data importer step because we are at the experimentation phase of the ML workflow. Now as we head towards production, we want to switch over to a non-static, dynamic data importer step:
This could be anything like:
  • A database/data warehouse that updates regularly (SQL databases, BigQuery, Snowflake)
  • A data lake (S3 Buckets/Azure Blob Storage/GCP Storage)
  • An API which allows you to query the latest data.

Read from a dynamic datasource

Let's also slightly change our pipeline to add our new step. For this guide, we have set up a Mock APU that simulates a real world setting of reading from an external API. The data in the API is just MNIST data but new data is added every day, and we query the new data each time the pipeline runs.
1
import numpy as np
2
import pandas as pd
3
import requests
4
5
from zenml.steps import step
6
from zenml.steps.base_step_config import BaseStepConfig
7
from zenml.steps.step_output import Output
8
9
10
class ImporterConfig(BaseStepConfig):
11
n_days: int = 1
12
13
14
def get_X_y_from_api(n_days: int = 1, is_train: bool = True):
15
url = (
16
"https://storage.googleapis.com/zenml-public-bucket/mnist"
17
"/mnist_handwritten_train.json"
18
if is_train
19
else "https://storage.googleapis.com/zenml-public-bucket/mnist"
20
"/mnist_handwritten_test.json"
21
)
22
df = pd.DataFrame(requests.get(url).json())
23
X = df["image"].map(lambda x: np.array(x)).values
24
X = np.array([x.reshape(28, 28) for x in X])
25
y = df["label"].map(lambda y: np.array(y)).values
26
return X, y
27
28
29
@step
30
def dynamic_importer(
31
config: ImporterConfig,
32
) -> Output(
33
X_train=np.ndarray, y_train=np.ndarray, X_test=np.ndarray, y_test=np.ndarray
34
):
35
"""Downloads the latest data from a mock API."""
36
X_train, y_train = get_X_y_from_api(n_days=config.n_days, is_train=True)
37
X_test, y_test = get_X_y_from_api(n_days=config.n_days, is_train=False)
38
return X_train, y_train, X_test, y_test
Copied!
And then change the pipeline run as follows:
1
scikit_p = mnist_pipeline(
2
importer=dynamic_importer(),
3
normalizer=normalize_mnist(),
4
trainer=sklearn_trainer(),
5
evaluator=sklearn_evaluator(),
6
)
Copied!

Run

You can run this as follows:
1
python chapter_6.py
Copied!

Inspect

Even if our data originally lives in an external API, we have now downloaded it and versioned locally as we ran this pipeline. So we can fetch it and inspect it:
1
from zenml.core.repo import Repository
2
3
repo = Repository()
4
p = repo.get_pipeline(pipeline_name="mnist_pipeline")
5
print(f"Pipeline `mnist_pipeline` has {len(p.runs)} run(s)")
6
eval_step = p.runs[-1].get_step("evaluator")
7
val = eval_step.output.read()
8
print(f"We scored an accuracy of {val} on the latest run!")
Copied!
You will get the following output:
1
Pipeline `mnist_pipeline` has 1 run(s)
2
We scored an accuracy of 0.72 on the latest run!
Copied!
Now we are loading data dynamically from a continously changing data source!
In the near future, ZenML will help you automatically detect drift and schema changes across pipeline runs, to make your pipelines even more robust! Keep an eye out on this space and future releases!
Last modified 5d ago