Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions docs/docs/en/getting-started/observability/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,57 @@ To integrate your custom metrics with FastStream, you should declare the metric,

To learn about all the metric types supported by the `prometheus_client` Python library, check out the official instrumentation [**documentation**](https://prometheus.github.io/client_python/instrumenting/){.external-link target="_blank"}.

## Prometheus Metrics in Multi-Process Mode

When running FastStream with multiple worker processes (e.g., via `uvicorn --workers N`), you need special configuration for Prometheus metrics collection.

### Basic Configuration

1. Set the `PROMETHEUS_MULTIPROC_DIR` environment variable to a writable directory:

```bash
export PROMETHEUS_MULTIPROC_DIR=/path/to/metrics/directory
```

2. Configure your broker with Prometheus middleware and create a metrics endpoint:

```python linenums="1" hl_lines="17-18 35-45"
{!> docs_src/getting_started/prometheus/kafka_multiprocess.py!}
```

### How It Works

The metrics endpoint checks for the `PROMETHEUS_MULTIPROC_DIR` environment variable:

- **Multi-process mode**: When the variable is set, it creates a new `CollectorRegistry` with `MultiProcessCollector` that aggregates metrics from all worker processes
- **Single-process mode**: When the variable is not set, it falls back to using the default registry

This allows the same code to work in both single and multi-process deployments.

### Running with Multiple Workers

Start your application with uvicorn using multiple workers:

```bash
export PROMETHEUS_MULTIPROC_DIR=/tmp/prometheus_multiproc
mkdir -p $PROMETHEUS_MULTIPROC_DIR
uvicorn app:app --workers 4
```

### Important Requirements

1. **The metrics directory must:**
- Exist before application start
- Be writable by all worker processes
- Be on a filesystem accessible to all workers
- Be emptied between application runs

2. **For better performance:**
- Consider mounting the directory on `tmpfs`
- Set up regular cleanup of old metric files

---

### Grafana dashboard

You can import the [**Grafana dashboard**](https://grafana.com/grafana/dashboards/22130-faststream-metrics/){.external-link target="_blank"} to visualize the metrics collected by middleware.
Expand Down
48 changes: 48 additions & 0 deletions docs/docs_src/getting_started/prometheus/kafka_multiprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os

from prometheus_client import CollectorRegistry, generate_latest, multiprocess
from prometheus_client import CONTENT_TYPE_LATEST

from faststream.asgi import AsgiFastStream, AsgiResponse
from faststream.kafka import KafkaBroker
from faststream.kafka.prometheus import KafkaPrometheusMiddleware

# Create registry for metrics
registry = CollectorRegistry()

# Create broker with Prometheus middleware
broker = KafkaBroker(
middlewares=[
KafkaPrometheusMiddleware(
registry=registry,
app_name="your-app-name",
)
]
)


@broker.subscriber("test-queue")
async def handle_message(msg: str) -> None:
"""Handle incoming messages."""


async def metrics(scope):
"""Metrics endpoint that supports multi-process mode."""
if path := os.environ.get("PROMETHEUS_MULTIPROC_DIR"):
# Multi-process mode: collect metrics from all workers
registry_ = CollectorRegistry()
multiprocess.MultiProcessCollector(registry_, path=path)
else:
# Single process mode: use the default registry
registry_ = registry

headers = {"Content-Type": CONTENT_TYPE_LATEST}
return AsgiResponse(generate_latest(registry_), status_code=200, headers=headers)


app = AsgiFastStream(
broker,
asgi_routes=[
("/metrics", metrics),
],
)
53 changes: 53 additions & 0 deletions tests/prometheus/multiprocess_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os

from prometheus_client import (
CONTENT_TYPE_LATEST,
CollectorRegistry,
generate_latest,
multiprocess,
)

from faststream.asgi import AsgiFastStream, AsgiResponse
from faststream.kafka import KafkaBroker
from faststream.kafka.prometheus import KafkaPrometheusMiddleware

# Create registry for metrics
registry = CollectorRegistry()

# Create broker with Prometheus middleware
broker = KafkaBroker(
middlewares=[
KafkaPrometheusMiddleware(
registry=registry,
app_name="multiprocess-test-app",
)
]
)


@broker.subscriber("test-multiprocess-queue")
async def handle_message(msg: str) -> None:
"""Handle incoming messages."""


async def metrics_endpoint(scope, receive, send):
"""Metrics endpoint that supports multi-process mode."""
if path := os.environ.get("PROMETHEUS_MULTIPROC_DIR"):
# Multi-process mode: aggregate from all workers
registry_ = CollectorRegistry()
multiprocess.MultiProcessCollector(registry_, path=path)
else:
# Single process mode: use the default registry
registry_ = registry

headers = {"Content-Type": CONTENT_TYPE_LATEST}
response = AsgiResponse(generate_latest(registry_), status_code=200, headers=headers)
await response(scope, receive, send)


app = AsgiFastStream(
broker,
asgi_routes=[
("/metrics", metrics_endpoint),
],
)
Loading