from zenml.artifacts import DataArtifact
from zenml.io import fileio
from zenml.materializers.base_materializer import BaseMaterializer
from zenml.steps import step, Output, BaseStepConfig
from zenml.pipelines import pipeline
def __init__(self, name: str):
class MyMaterializer(BaseMaterializer):
ASSOCIATED_TYPES = (MyObj,)
ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)
def handle_input(self, data_type: Type[MyObj]) -> MyObj:
"""Read from artifact store"""
super().handle_input(data_type)
with fileio.open(os.path.join(self.artifact.uri, 'data.txt'),
def handle_return(self, my_obj: MyObj) -> None:
"""Write to artifact store"""
super().handle_return(my_obj)
with fileio.open(os.path.join(self.artifact.uri, 'data.txt'),
def my_first_step() -> Output(output_int=int, output_float=float):
"""Step that returns a pre-defined integer and float"""
class SecondStepConfig(BaseStepConfig):
def my_second_step(config: SecondStepConfig, input_int: int,
) -> Output(output_int=int,
"""Step that multiply the inputs"""
return (config.multiplier * input_int,
config.multiplier * input_float,
@pipeline(enable_cache=False)
output_1, output_2 = step_1()
step_2(output_1, output_2)