Develop a Custom Log Store

Learning how to develop a custom log store.

Before diving into the specifics of this component type, it is beneficial to familiarize yourself with our general guide to writing custom component flavors in ZenML. This guide provides an essential understanding of ZenML's component flavor concepts.

ZenML comes equipped with Log Store implementations that you can use to store logs in your artifact store, export to OpenTelemetry-compatible backends, or send to Datadog. However, if you need to use a different logging backend, you can extend ZenML to provide your own custom Log Store implementation.

Base Abstraction

The log store is responsible for collecting, storing, and retrieving logs during pipeline execution. Let's take a deeper dive into the fundamentals behind its abstraction, namely the BaseLogStore class:

  1. Origins: A BaseLogStoreOrigin represents the source of log records (e.g., a step execution). When logging starts, you register an origin with the log store, then emit logs through the log store referencing that origin. When logging ends, you deregister the origin to release resources.

  2. Core methods: The base class defines four abstract methods that must be implemented:

    • emit(): Process and export a log record for a given origin

    • _release_origin(): Called when logging for an origin is complete (cleanup resources)

    • flush(): Ensure all pending logs are exported

    • fetch(): Retrieve stored logs for display

  3. Thread safety: The base implementation includes locking mechanisms to ensure thread-safe operation.

Here's a simplified view of the base implementation:

from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict, List, Optional, Type
import logging
import threading

from zenml.enums import StackComponentType
from zenml.models import LogsResponse
from zenml.stack import Flavor, StackComponent, StackComponentConfig
from zenml.utils.logging_utils import LogEntry


class BaseLogStoreConfig(StackComponentConfig):
    """Base configuration for all log stores."""
    pass


class BaseLogStoreOrigin:
    """Represents the source of log records (e.g., a step execution)."""

    def __init__(
        self,
        name: str,
        log_store: "BaseLogStore",
        log_model: LogsResponse,
        metadata: Dict[str, Any],
    ) -> None:
        self._name = name
        self._log_store = log_store
        self._log_model = log_model
        self._metadata = metadata

    @property
    def name(self) -> str:
        """The name of the origin."""
        return self._name

    @property
    def log_model(self) -> LogsResponse:
        """The log model associated with the origin."""
        return self._log_model

    @property
    def metadata(self) -> Dict[str, Any]:
        """The metadata associated with the origin."""
        return self._metadata

    def deregister(self) -> None:
        """Deregister the origin from the log store."""
        self._log_store.deregister_origin(self)


class BaseLogStore(StackComponent, ABC):
    """Base class for all ZenML log stores."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
        self._origins: Dict[str, BaseLogStoreOrigin] = {}
        self._lock = threading.RLock()

    @property
    def origin_class(self) -> Type[BaseLogStoreOrigin]:
        """Class of the origin used with this log store."""
        return BaseLogStoreOrigin

    def register_origin(
        self, name: str, log_model: LogsResponse, metadata: Dict[str, Any]
    ) -> BaseLogStoreOrigin:
        """Register an origin for a logging context."""
        with self._lock:
            origin = self.origin_class(name, self, log_model, metadata)
            self._origins[name] = origin
            return origin

    def deregister_origin(self, origin: BaseLogStoreOrigin) -> None:
        """Deregister an origin and finalize its logs."""
        with self._lock:
            if origin.name not in self._origins:
                return
            self._release_origin(origin)
            del self._origins[origin.name]
            if len(self._origins) == 0:
                self.flush(blocking=False)

    @abstractmethod
    def emit(
        self,
        origin: BaseLogStoreOrigin,
        record: logging.LogRecord,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> None:
        """Process a log record for the given origin."""

    @abstractmethod
    def _release_origin(self, origin: BaseLogStoreOrigin) -> None:
        """Finalize logging for an origin and release resources."""

    @abstractmethod
    def flush(self, blocking: bool = True) -> None:
        """Flush all pending logs."""

    @abstractmethod
    def fetch(
        self,
        logs_model: LogsResponse,
        start_time: Optional[datetime] = None,
        end_time: Optional[datetime] = None,
        limit: int = 20000,
    ) -> List[LogEntry]:
        """Fetch stored logs."""


class BaseLogStoreFlavor(Flavor):
    """Base class for all ZenML log store flavors."""

    @property
    def type(self) -> StackComponentType:
        return StackComponentType.LOG_STORE

    @property
    def config_class(self) -> Type[BaseLogStoreConfig]:
        return BaseLogStoreConfig

    @property
    @abstractmethod
    def implementation_class(self) -> Type["BaseLogStore"]:
        """Implementation class for this flavor."""

This is a slimmed-down version of the base implementation. For the full implementation with complete docstrings, check the SDK docs.

Extending the OTEL Log Store

For most custom implementations, you'll want to extend OtelLogStore rather than BaseLogStore directly. The OTEL Log Store provides:

  • OpenTelemetry infrastructure (LoggerProvider, BatchLogRecordProcessor)

  • Automatic log batching and retry logic

  • Standard OTEL log format conversion

To create a custom OTEL-based log store, you only need to implement:

  1. get_exporter(): Return your custom log exporter

  2. fetch(): Retrieve logs from your backend (optional, raise NotImplementedError if not supported)

Creating a Custom Log Exporter

If you're using a custom backend, you'll need to implement a log exporter. The exporter receives batches of OpenTelemetry log records and sends them to your backend:

Implementing Log Fetching

If your backend supports log retrieval, implement the fetch() method to enable log viewing in the ZenML dashboard:

Build Your Own Custom Log Store

Follow these steps to create and register your custom log store:

  1. Create the implementation: Implement your log store class, configuration, and flavor as shown above.

  2. Create the exporter (if needed): Implement a custom LogExporter for your backend.

  3. Register the flavor: Use the CLI to register your custom flavor:

For example, if your flavor class MyLogStoreFlavor is defined in flavors/my_log_store.py:

  1. Verify registration: Check that your flavor appears in the list:

  1. Register and use your log store:

Important timing notes:

  • The CustomLogStoreFlavor class is imported when registering the flavor via CLI.

  • The CustomLogStoreConfig class is imported when registering/updating a stack component (used for validation).

  • The CustomLogStore class is only imported when the component is actually used.

This separation allows you to register flavors even when their dependencies aren't installed locally.

Best Practices

  1. Extend OtelLogStore: Unless you have specific requirements, extend OtelLogStore to benefit from built-in batching and retry logic.

  2. Handle failures gracefully: Log export failures shouldn't crash your pipeline. Return LogExportResult.FAILURE and log warnings.

  3. Implement retry logic: For network-based backends, implement retry logic in your exporter.

  4. Use secrets for credentials: Store API keys and tokens in ZenML secrets, not in the config directly.

  5. Test thoroughly: Test your implementation with various log volumes and failure scenarios.

  6. Document configuration: Clearly document all configuration options and their defaults.

  7. Keep fetch() simple: Remember that fetch() runs on the server with limited dependencies. Use only built-in Python libraries and HTTP APIs.

Last updated

Was this helpful?