Develop a Custom Log Store
Learning how to develop a custom log store.
ZenML comes equipped with Log Store implementations that you can use to store logs in your artifact store, export to OpenTelemetry-compatible backends, or send to Datadog. However, if you need to use a different logging backend, you can extend ZenML to provide your own custom Log Store implementation.
Base Abstraction
The log store is responsible for collecting, storing, and retrieving logs during pipeline execution. Let's take a deeper dive into the fundamentals behind its abstraction, namely the BaseLogStore class:
Origins: A
BaseLogStoreOriginrepresents the source of log records (e.g., a step execution). When logging starts, you register an origin with the log store, then emit logs through the log store referencing that origin. When logging ends, you deregister the origin to release resources.Core methods: The base class defines four abstract methods that must be implemented:
emit(): Process and export a log record for a given origin_release_origin(): Called when logging for an origin is complete (cleanup resources)flush(): Ensure all pending logs are exportedfetch(): Retrieve stored logs for display
Thread safety: The base implementation includes locking mechanisms to ensure thread-safe operation.
Here's a simplified view of the base implementation:
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict, List, Optional, Type
import logging
import threading
from zenml.enums import StackComponentType
from zenml.models import LogsResponse
from zenml.stack import Flavor, StackComponent, StackComponentConfig
from zenml.utils.logging_utils import LogEntry
class BaseLogStoreConfig(StackComponentConfig):
"""Base configuration for all log stores."""
pass
class BaseLogStoreOrigin:
"""Represents the source of log records (e.g., a step execution)."""
def __init__(
self,
name: str,
log_store: "BaseLogStore",
log_model: LogsResponse,
metadata: Dict[str, Any],
) -> None:
self._name = name
self._log_store = log_store
self._log_model = log_model
self._metadata = metadata
@property
def name(self) -> str:
"""The name of the origin."""
return self._name
@property
def log_model(self) -> LogsResponse:
"""The log model associated with the origin."""
return self._log_model
@property
def metadata(self) -> Dict[str, Any]:
"""The metadata associated with the origin."""
return self._metadata
def deregister(self) -> None:
"""Deregister the origin from the log store."""
self._log_store.deregister_origin(self)
class BaseLogStore(StackComponent, ABC):
"""Base class for all ZenML log stores."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._origins: Dict[str, BaseLogStoreOrigin] = {}
self._lock = threading.RLock()
@property
def origin_class(self) -> Type[BaseLogStoreOrigin]:
"""Class of the origin used with this log store."""
return BaseLogStoreOrigin
def register_origin(
self, name: str, log_model: LogsResponse, metadata: Dict[str, Any]
) -> BaseLogStoreOrigin:
"""Register an origin for a logging context."""
with self._lock:
origin = self.origin_class(name, self, log_model, metadata)
self._origins[name] = origin
return origin
def deregister_origin(self, origin: BaseLogStoreOrigin) -> None:
"""Deregister an origin and finalize its logs."""
with self._lock:
if origin.name not in self._origins:
return
self._release_origin(origin)
del self._origins[origin.name]
if len(self._origins) == 0:
self.flush(blocking=False)
@abstractmethod
def emit(
self,
origin: BaseLogStoreOrigin,
record: logging.LogRecord,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Process a log record for the given origin."""
@abstractmethod
def _release_origin(self, origin: BaseLogStoreOrigin) -> None:
"""Finalize logging for an origin and release resources."""
@abstractmethod
def flush(self, blocking: bool = True) -> None:
"""Flush all pending logs."""
@abstractmethod
def fetch(
self,
logs_model: LogsResponse,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 20000,
) -> List[LogEntry]:
"""Fetch stored logs."""
class BaseLogStoreFlavor(Flavor):
"""Base class for all ZenML log store flavors."""
@property
def type(self) -> StackComponentType:
return StackComponentType.LOG_STORE
@property
def config_class(self) -> Type[BaseLogStoreConfig]:
return BaseLogStoreConfig
@property
@abstractmethod
def implementation_class(self) -> Type["BaseLogStore"]:
"""Implementation class for this flavor."""Extending the OTEL Log Store
For most custom implementations, you'll want to extend OtelLogStore rather than BaseLogStore directly. The OTEL Log Store provides:
OpenTelemetry infrastructure (LoggerProvider, BatchLogRecordProcessor)
Automatic log batching and retry logic
Standard OTEL log format conversion
To create a custom OTEL-based log store, you only need to implement:
get_exporter(): Return your custom log exporterfetch(): Retrieve logs from your backend (optional, raiseNotImplementedErrorif not supported)
Creating a Custom Log Exporter
If you're using a custom backend, you'll need to implement a log exporter. The exporter receives batches of OpenTelemetry log records and sends them to your backend:
Implementing Log Fetching
If your backend supports log retrieval, implement the fetch() method to enable log viewing in the ZenML dashboard:
Build Your Own Custom Log Store
Follow these steps to create and register your custom log store:
Create the implementation: Implement your log store class, configuration, and flavor as shown above.
Create the exporter (if needed): Implement a custom
LogExporterfor your backend.Register the flavor: Use the CLI to register your custom flavor:
For example, if your flavor class MyLogStoreFlavor is defined in flavors/my_log_store.py:
ZenML resolves the flavor class by taking the path where you initialized zenml (via zenml init) as the starting point of resolution. Ensure you follow the best practice of initializing zenml at the root of your repository.
Verify registration: Check that your flavor appears in the list:
Register and use your log store:
Important: Log stores are instantiated on the ZenML server to fetch logs for display in the dashboard. This introduces a critical constraint on your implementation. When the ZenML dashboard or API requests logs, the server instantiates the log store and calls its fetch() method. This means that there can be no external dependencies that aren't already installed on the ZenML server.
Best Practices
Extend OtelLogStore: Unless you have specific requirements, extend
OtelLogStoreto benefit from built-in batching and retry logic.Handle failures gracefully: Log export failures shouldn't crash your pipeline. Return
LogExportResult.FAILUREand log warnings.Implement retry logic: For network-based backends, implement retry logic in your exporter.
Use secrets for credentials: Store API keys and tokens in ZenML secrets, not in the config directly.
Test thoroughly: Test your implementation with various log volumes and failure scenarios.
Document configuration: Clearly document all configuration options and their defaults.
Keep fetch() simple: Remember that
fetch()runs on the server with limited dependencies. Use only built-in Python libraries and HTTP APIs.
Last updated
Was this helpful?