Develop a Custom Log Store
Learning how to develop a custom log store.
Base Abstraction
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict, List, Optional, Type
import logging
import threading
from zenml.enums import StackComponentType
from zenml.models import LogsResponse
from zenml.stack import Flavor, StackComponent, StackComponentConfig
from zenml.utils.logging_utils import LogEntry
class BaseLogStoreConfig(StackComponentConfig):
"""Base configuration for all log stores."""
pass
class BaseLogStoreOrigin:
"""Represents the source of log records (e.g., a step execution)."""
def __init__(
self,
name: str,
log_store: "BaseLogStore",
log_model: LogsResponse,
metadata: Dict[str, Any],
) -> None:
self._name = name
self._log_store = log_store
self._log_model = log_model
self._metadata = metadata
@property
def name(self) -> str:
"""The name of the origin."""
return self._name
@property
def log_model(self) -> LogsResponse:
"""The log model associated with the origin."""
return self._log_model
@property
def metadata(self) -> Dict[str, Any]:
"""The metadata associated with the origin."""
return self._metadata
def deregister(self) -> None:
"""Deregister the origin from the log store."""
self._log_store.deregister_origin(self)
class BaseLogStore(StackComponent, ABC):
"""Base class for all ZenML log stores."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._origins: Dict[str, BaseLogStoreOrigin] = {}
self._lock = threading.RLock()
@property
def origin_class(self) -> Type[BaseLogStoreOrigin]:
"""Class of the origin used with this log store."""
return BaseLogStoreOrigin
def register_origin(
self, name: str, log_model: LogsResponse, metadata: Dict[str, Any]
) -> BaseLogStoreOrigin:
"""Register an origin for a logging context."""
with self._lock:
origin = self.origin_class(name, self, log_model, metadata)
self._origins[name] = origin
return origin
def deregister_origin(self, origin: BaseLogStoreOrigin) -> None:
"""Deregister an origin and finalize its logs."""
with self._lock:
if origin.name not in self._origins:
return
self._release_origin(origin)
del self._origins[origin.name]
if len(self._origins) == 0:
self.flush(blocking=False)
@abstractmethod
def emit(
self,
origin: BaseLogStoreOrigin,
record: logging.LogRecord,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Process a log record for the given origin."""
@abstractmethod
def _release_origin(self, origin: BaseLogStoreOrigin) -> None:
"""Finalize logging for an origin and release resources."""
@abstractmethod
def flush(self, blocking: bool = True) -> None:
"""Flush all pending logs."""
@abstractmethod
def fetch(
self,
logs_model: LogsResponse,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 20000,
) -> List[LogEntry]:
"""Fetch stored logs."""
class BaseLogStoreFlavor(Flavor):
"""Base class for all ZenML log store flavors."""
@property
def type(self) -> StackComponentType:
return StackComponentType.LOG_STORE
@property
def config_class(self) -> Type[BaseLogStoreConfig]:
return BaseLogStoreConfig
@property
@abstractmethod
def implementation_class(self) -> Type["BaseLogStore"]:
"""Implementation class for this flavor."""Extending the OTEL Log Store
Creating a Custom Log Exporter
Implementing Log Fetching
Build Your Own Custom Log Store
Best Practices
Last updated
Was this helpful?