Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions docs/book/component-guide/log-stores/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
---
description: Collecting and storing logs from your pipeline runs.
icon: file-lines
---

# Log Stores

The Log Store is a stack component that handles the collection, storage, and retrieval of logs generated during pipeline and step execution. It provides a centralized way to manage logs across different backends.

ZenML automatically captures logs from your pipeline runs, including stdout, stderr, and any logging output from your steps. The Log Store determines where these logs are stored and how they can be retrieved later for debugging and monitoring.

{% hint style="info" %}
By default, if no Log Store is configured in your stack, ZenML will automatically use the Artifact Store as a fallback location for storing logs. This ensures backward compatibility and that logs are always captured without requiring additional configuration.
{% endhint %}

### When to use it

The Log Store is an optional component in the ZenML stack. While ZenML provides a default fallback mechanism (using the Artifact Store), you may want to configure a dedicated Log Store when you need:

* **Centralized logging infrastructure**: Send logs to your existing logging platform (e.g., Datadog, Elasticsearch, Splunk)
* **Real-time log streaming**: View logs as they are generated during pipeline execution
* **Advanced log analysis**: Use specialized logging platforms for searching, filtering, and analyzing logs
* **Compliance requirements**: Store logs in specific systems for regulatory or audit purposes

#### Log Store Flavors

Out of the box, ZenML provides several Log Store implementations:

| Log Store | Flavor | Integration | Notes |
| ------------------ | ---------- | ----------- | ----------------------------------------------------------------------------- |
| [OpenTelemetry](otel.md) | `otel` | _built-in_ | Generic OpenTelemetry-based log store that can export to various backends |
| [Datadog](datadog.md) | `datadog` | _built-in_ | Sends logs directly to Datadog's logging platform |
| [Custom Implementation](custom.md) | _custom_ | | Extend the Log Store abstraction and provide your own implementation |

If you would like to see the available flavors of Log Stores, you can use the command:

```shell
zenml log-store flavor list
```

### How to use it

The Log Store works automatically once configured in your stack. You don't need to make any changes to your pipeline code. All logging output, print statements, and errors are automatically captured and sent to the configured Log Store.

#### Basic Setup

To register and configure a Log Store:

```shell
# Register a log store
zenml log-store register my_datadog_logs --flavor datadog \
--api_key=<YOUR_API_KEY> \
--site=datadoghq.com

# Add it to your stack
zenml stack update -l my_datadog_logs
```

Once configured, all subsequent pipeline runs will send their logs to the configured Log Store.

#### Viewing Logs

Logs can be viewed through:

1. **ZenML Dashboard**: View logs directly in the pipeline run UI
2. **CLI**: Use `zenml logs` commands to fetch and display logs
3. **External Platform**: Access logs directly in your logging platform (e.g., Datadog UI)

#### Log Metadata

All logs captured by ZenML include important metadata:

* `pipeline_run_id`: The unique identifier of the pipeline run
* `step_id`: The unique identifier of the step (if applicable)
* `source`: Where the logs originated from (e.g., "step", "orchestrator")

This metadata allows you to filter and query logs effectively in your logging platform.

#### Fallback Behavior

If no Log Store is configured in your stack, ZenML will:

1. Automatically use the Artifact Store as the storage backend
2. Save logs as files in the artifact store
3. Make logs accessible through the same APIs and UI

This ensures that logs are always captured and retrievable, even without explicit Log Store configuration.

<figure><img src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" alt="ZenML Scarf"><figcaption></figcaption></figure>

238 changes: 238 additions & 0 deletions docs/book/component-guide/log-stores/custom.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
---
description: Developing a custom log store.
---

# Develop a Custom Log Store

If you want to send logs to a backend that isn't covered by the built-in log stores, you can create your own custom log store implementation.

### Base Abstraction

The `BaseLogStore` provides three main methods that you need to implement:

```python
from zenml.log_stores import BaseLogStore, BaseLogStoreConfig

class MyLogStoreConfig(BaseLogStoreConfig):
"""Configuration for my custom log store."""

my_setting: str
another_setting: int = 100

class MyLogStore(BaseLogStore):
"""My custom log store implementation."""

@property
def config(self) -> MyLogStoreConfig:
return cast(MyLogStoreConfig, self._config)

def activate(
self,
pipeline_run_id: UUID,
step_id: Optional[UUID] = None,
source: str = "step",
) -> None:
"""Activate log collection.

This is called at the start of a pipeline run or step.
Set up your logging handlers, connections, and any
background processing here.
"""
pass

def deactivate(self) -> None:
"""Deactivate log collection and clean up.

This is called at the end of a pipeline run or step.
Flush any pending logs, close connections, and clean
up resources here.
"""
pass

def fetch(
self,
pipeline_run_id: UUID,
step_id: Optional[UUID] = None,
source: Optional[str] = None,
logs_uri: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 20000,
) -> List[LogEntry]:
"""Fetch logs from the backend.

This is called by the server to retrieve logs for display.
Query your backend and return logs as LogEntry objects.
"""
return []
```

### Implementation Patterns

#### 1. Using Python Logging Handlers

The most common pattern is to create a `logging.Handler` that sends logs to your backend:

```python
import logging
from zenml.log_stores import BaseLogStore
from zenml.logger import logging_handlers, get_storage_log_level

class MyLogStore(BaseLogStore):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.handler = None
self._original_root_level = None

def activate(self, pipeline_run_id, step_id=None, source="step"):
self.handler = MyCustomHandler(
backend_url=self.config.backend_url,
pipeline_run_id=pipeline_run_id,
step_id=step_id,
)

self.handler.setLevel(get_storage_log_level().value)

root_logger = logging.getLogger()
root_logger.addHandler(self.handler)

self._original_root_level = root_logger.level
handler_levels = [h.level for h in root_logger.handlers]
root_logger.setLevel(min(handler_levels))

logging_handlers.add(self.handler)

def deactivate(self):
if not self.handler:
return

root_logger = logging.getLogger()
if self.handler in root_logger.handlers:
root_logger.removeHandler(self.handler)

if self._original_root_level is not None:
root_logger.setLevel(self._original_root_level)

logging_handlers.remove(self.handler)
```

#### 2. Background Processing

For efficient log handling, use background threads or async processing:

```python
import queue
import threading

class MyLogStore(BaseLogStore):
def activate(self, pipeline_run_id, step_id=None, source="step"):
self.log_queue = queue.Queue(maxsize=2048)
self.shutdown_event = threading.Event()
self.worker_thread = threading.Thread(
target=self._process_logs,
daemon=True
)
self.worker_thread.start()

def _process_logs(self):
while not self.shutdown_event.is_set():
try:
log_entry = self.log_queue.get(timeout=1)
self._send_to_backend(log_entry)
except queue.Empty:
continue

def deactivate(self):
self.shutdown_event.set()
if self.worker_thread:
self.worker_thread.join(timeout=5)
```

#### 3. Fetching Logs

Implement fetch using HTTP APIs or SDKs:

```python
from zenml.logging.step_logging import LogEntry

class MyLogStore(BaseLogStore):
def fetch(
self,
pipeline_run_id,
step_id=None,
source=None,
logs_uri=None,
start_time=None,
end_time=None,
limit=20000,
):
query = {
"pipeline_run_id": str(pipeline_run_id),
}
if step_id:
query["step_id"] = str(step_id)
if start_time:
query["start_time"] = start_time.isoformat()
if end_time:
query["end_time"] = end_time.isoformat()

response = requests.post(
f"{self.config.backend_url}/query",
json=query,
headers={"Authorization": f"Bearer {self.config.api_key}"}
)

logs = []
for log_data in response.json()["logs"][:limit]:
logs.append(LogEntry(
message=log_data["message"],
level=log_data.get("level"),
timestamp=log_data.get("timestamp"),
))
return logs
```

### Creating a Flavor

To make your log store usable via CLI, create a flavor:

```python
from zenml.enums import StackComponentType
from zenml.stack.flavor import Flavor

class MyLogStoreFlavor(Flavor):
@property
def name(self) -> str:
return "my_custom_store"

@property
def type(self) -> StackComponentType:
return StackComponentType.LOG_STORE

@property
def config_class(self) -> Type[BaseLogStoreConfig]:
from my_module import MyLogStoreConfig
return MyLogStoreConfig

@property
def implementation_class(self) -> Type[BaseLogStore]:
from my_module import MyLogStore
return MyLogStore
```

Register your flavor:

```bash
zenml log-store flavor register my_module.MyLogStoreFlavor
```

Then use it:

```bash
zenml log-store register my_logs --flavor my_custom_store \
--backend_url=https://logs.example.com \
--api_key=secret
```

<figure><img src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" alt="ZenML Scarf"><figcaption></figcaption></figure>

Loading