From ebc92b3f5ec4ed078be6b12c255a53671499fc38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=9C=D0=B5=D0=BB?= =?UTF-8?q?=D1=8C=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Mon, 9 Jun 2025 08:57:04 +0300 Subject: [PATCH 1/8] docs: add Prometheus metrics configuration for multi-process mode --- .../observability/prometheus.md | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/docs/docs/en/getting-started/observability/prometheus.md b/docs/docs/en/getting-started/observability/prometheus.md index 0eaedbe6df..23d629fcda 100644 --- a/docs/docs/en/getting-started/observability/prometheus.md +++ b/docs/docs/en/getting-started/observability/prometheus.md @@ -127,6 +127,39 @@ passing in the registry that was passed to `PrometheusMiddleware`. | destination | Where the message is sent | | | exception_type (while publishing) | Exception type when publishing message | | +## Prometheus Metrics in Multi-Process Mode + +When running FastStream with multiple worker processes, you need to configure Prometheus metrics collection specially: + +1. Set the `PROMETHEUS_MULTIPROC_DIR` environment variable to a writable directory +2. Initialize your collector registry with multiprocess support: + + ```python linenums="1" hl_lines="8 10" + from prometheus_client import CollectorRegistry, multiprocess + import os + + multiprocess_dir = os.getenv("PROMETHEUS_MULTIPROC_DIR") + + registry = CollectorRegistry() + if multiprocess_dir: + multiprocess.MultiProcessCollector(registry, path=multiprocess_dir) + + broker = KafkaBroker( + "...", + middlewares=[ + KafkaPrometheusMiddleware( + registry=registry, + app_name="your-app-name" + ) + ] + ) + ``` + +The metrics directory must: +* Exist before application start +* Be writable by all worker processes +* Be on a filesystem accessible to all workers + ### Grafana dashboard You can import the [**Grafana dashboard**](https://grafana.com/grafana/dashboards/22130-faststream-metrics/){.external-link target="_blank"} to visualize the metrics collected by middleware. From d393130f73c55a054949b8abff5fd9d72eef073e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=9C=D0=B5=D0=BB?= =?UTF-8?q?=D1=8C=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Mon, 9 Jun 2025 09:04:56 +0300 Subject: [PATCH 2/8] docs: add highlights --- docs/docs/en/getting-started/observability/prometheus.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/docs/en/getting-started/observability/prometheus.md b/docs/docs/en/getting-started/observability/prometheus.md index 23d629fcda..42d7707f07 100644 --- a/docs/docs/en/getting-started/observability/prometheus.md +++ b/docs/docs/en/getting-started/observability/prometheus.md @@ -129,7 +129,7 @@ passing in the registry that was passed to `PrometheusMiddleware`. ## Prometheus Metrics in Multi-Process Mode -When running FastStream with multiple worker processes, you need to configure Prometheus metrics collection specially: +When running `FastStream` with multiple worker processes, you need to configure Prometheus metrics collection specially: 1. Set the `PROMETHEUS_MULTIPROC_DIR` environment variable to a writable directory 2. Initialize your collector registry with multiprocess support: From bece422b04f8167852b5c948b34b514d574a1bb6 Mon Sep 17 00:00:00 2001 From: Peopl3s <28685107+Peopl3s@users.noreply.github.com> Date: Mon, 9 Jun 2025 06:09:52 +0000 Subject: [PATCH 3/8] docs: generate API References --- docs/docs/en/getting-started/observability/prometheus.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/docs/en/getting-started/observability/prometheus.md b/docs/docs/en/getting-started/observability/prometheus.md index 42d7707f07..3b85122a63 100644 --- a/docs/docs/en/getting-started/observability/prometheus.md +++ b/docs/docs/en/getting-started/observability/prometheus.md @@ -137,13 +137,13 @@ When running `FastStream` with multiple worker processes, you need to configure ```python linenums="1" hl_lines="8 10" from prometheus_client import CollectorRegistry, multiprocess import os - + multiprocess_dir = os.getenv("PROMETHEUS_MULTIPROC_DIR") - + registry = CollectorRegistry() if multiprocess_dir: multiprocess.MultiProcessCollector(registry, path=multiprocess_dir) - + broker = KafkaBroker( "...", middlewares=[ From 87a9e4bcf97c73eb17cd33fb7d050233301788cb Mon Sep 17 00:00:00 2001 From: Pastukhov Nikita Date: Mon, 9 Jun 2025 18:59:32 +0300 Subject: [PATCH 4/8] Update prometheus.md --- docs/docs/en/getting-started/observability/prometheus.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/docs/en/getting-started/observability/prometheus.md b/docs/docs/en/getting-started/observability/prometheus.md index 3b85122a63..5b83b607c5 100644 --- a/docs/docs/en/getting-started/observability/prometheus.md +++ b/docs/docs/en/getting-started/observability/prometheus.md @@ -134,7 +134,7 @@ When running `FastStream` with multiple worker processes, you need to configure 1. Set the `PROMETHEUS_MULTIPROC_DIR` environment variable to a writable directory 2. Initialize your collector registry with multiprocess support: - ```python linenums="1" hl_lines="8 10" + ```python linenums="1" hl_lines="8" from prometheus_client import CollectorRegistry, multiprocess import os @@ -145,7 +145,6 @@ When running `FastStream` with multiple worker processes, you need to configure multiprocess.MultiProcessCollector(registry, path=multiprocess_dir) broker = KafkaBroker( - "...", middlewares=[ KafkaPrometheusMiddleware( registry=registry, From d872f3281243cf713990c38413a979746ed00050 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=9C=D0=B5=D0=BB?= =?UTF-8?q?=D1=8C=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Wed, 11 Jun 2025 18:10:25 +0300 Subject: [PATCH 5/8] docs: fix --- .../observability/prometheus.md | 91 +++++++++++++------ 1 file changed, 62 insertions(+), 29 deletions(-) diff --git a/docs/docs/en/getting-started/observability/prometheus.md b/docs/docs/en/getting-started/observability/prometheus.md index 5b83b607c5..ea55adc903 100644 --- a/docs/docs/en/getting-started/observability/prometheus.md +++ b/docs/docs/en/getting-started/observability/prometheus.md @@ -129,35 +129,68 @@ passing in the registry that was passed to `PrometheusMiddleware`. ## Prometheus Metrics in Multi-Process Mode -When running `FastStream` with multiple worker processes, you need to configure Prometheus metrics collection specially: - -1. Set the `PROMETHEUS_MULTIPROC_DIR` environment variable to a writable directory -2. Initialize your collector registry with multiprocess support: - - ```python linenums="1" hl_lines="8" - from prometheus_client import CollectorRegistry, multiprocess - import os - - multiprocess_dir = os.getenv("PROMETHEUS_MULTIPROC_DIR") - - registry = CollectorRegistry() - if multiprocess_dir: - multiprocess.MultiProcessCollector(registry, path=multiprocess_dir) - - broker = KafkaBroker( - middlewares=[ - KafkaPrometheusMiddleware( - registry=registry, - app_name="your-app-name" - ) - ] - ) - ``` - -The metrics directory must: -* Exist before application start -* Be writable by all worker processes -* Be on a filesystem accessible to all workers +When running FastStream with multiple worker processes, follow these steps to properly configure Prometheus metrics collection: + +### Basic Configuration +1. Set the `PROMETHEUS_MULTIPROC_DIR` environment variable to a writable directory: + ```bash + export PROMETHEUS_MULTIPROC_DIR=/path/to/metrics/directory + ``` + +2. The metrics will automatically work in multiprocess mode when the environment variable is set. Here's a minimal working example: + +```python linenums="1" hl_lines="8" +import os + +from prometheus_client import CollectorRegistry + +from faststream.kafka import KafkaBroker +from faststream.kafka.prometheus import KafkaPrometheusMiddleware + +broker = KafkaBroker( + middlewares=[ + KafkaPrometheusMiddleware( + registry=CollectorRegistry(), + app_name="your-app-name" + ) + ] +) +``` + +### Metrics Export Endpoint +For exporting metrics in multi-process mode, you need a special endpoint: + +```python linenums="1" hl_lines="8" +import os + +from prometheus_client import CollectorRegistry, multiprocess, generate_latest +from prometheus_client import CONTENT_TYPE_LATEST + +from faststream.asgi import AsgiResponse + +registry = CollectorRegistry() + +@get +async def metrics(scope): + if path := os.environ.get("PROMETHEUS_MULTIPROC_DIR"): + registry_ = CollectorRegistry() + multiprocess.MultiProcessCollector(registry_, path=path) + else: + registry_ = registry + + headers = {"Content-Type": CONTENT_TYPE_LATEST} + return AsgiResponse(generate_latest(registry_), status_code=200, headers=headers) +``` + +### Important Requirements +1. The metrics directory must: + - Exist before application start + - Be writable by all worker processes + - Be on a filesystem accessible to all workers + - Be emptied between application runs +2. For better performance: + - Consider mounting the directory on `tmpfs` + - Set up regular cleanup of old metric files ### Grafana dashboard From 93adac8bf22e922bf1c933f8f27671246e82ea77 Mon Sep 17 00:00:00 2001 From: Peopl3s <28685107+Peopl3s@users.noreply.github.com> Date: Wed, 11 Jun 2025 15:18:16 +0000 Subject: [PATCH 6/8] docs: generate API References --- docs/docs/en/getting-started/observability/prometheus.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/docs/en/getting-started/observability/prometheus.md b/docs/docs/en/getting-started/observability/prometheus.md index ea55adc903..bd92332cbf 100644 --- a/docs/docs/en/getting-started/observability/prometheus.md +++ b/docs/docs/en/getting-started/observability/prometheus.md @@ -136,7 +136,7 @@ When running FastStream with multiple worker processes, follow these steps to pr ```bash export PROMETHEUS_MULTIPROC_DIR=/path/to/metrics/directory ``` - + 2. The metrics will automatically work in multiprocess mode when the environment variable is set. Here's a minimal working example: ```python linenums="1" hl_lines="8" @@ -150,7 +150,7 @@ from faststream.kafka.prometheus import KafkaPrometheusMiddleware broker = KafkaBroker( middlewares=[ KafkaPrometheusMiddleware( - registry=CollectorRegistry(), + registry=CollectorRegistry(), app_name="your-app-name" ) ] From cef81074b9675422ca002204bd1978c55bbe4333 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B5=D0=BB=D1=8C=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9C=D0=B0=D0=BA=D1=81=D0=B8=D0=BC?= Date: Fri, 3 Oct 2025 08:47:47 +0300 Subject: [PATCH 7/8] docs/feat(prometheus): add Prometheus metrics configuration for multi-process mode + tests --- .../observability/prometheus.md | 60 ++-- .../prometheus/kafka_multiprocess.py | 49 +++ tests/prometheus/multiprocess_app.py | 54 ++++ tests/prometheus/test_multiprocess.py | 290 ++++++++++++++++++ 4 files changed, 416 insertions(+), 37 deletions(-) create mode 100644 docs/docs_src/getting_started/prometheus/kafka_multiprocess.py create mode 100644 tests/prometheus/multiprocess_app.py create mode 100644 tests/prometheus/test_multiprocess.py diff --git a/docs/docs/en/getting-started/observability/prometheus.md b/docs/docs/en/getting-started/observability/prometheus.md index 3819e28da9..0865883de6 100644 --- a/docs/docs/en/getting-started/observability/prometheus.md +++ b/docs/docs/en/getting-started/observability/prometheus.md @@ -140,69 +140,55 @@ To learn about all the metric types supported by the `prometheus_client` Python ## Prometheus Metrics in Multi-Process Mode -When running FastStream with multiple worker processes, follow these steps to properly configure Prometheus metrics collection: +When running FastStream with multiple worker processes (e.g., via `uvicorn --workers N`), you need special configuration for Prometheus metrics collection. ### Basic Configuration + 1. Set the `PROMETHEUS_MULTIPROC_DIR` environment variable to a writable directory: + ```bash export PROMETHEUS_MULTIPROC_DIR=/path/to/metrics/directory ``` -2. The metrics will automatically work in multiprocess mode when the environment variable is set. Here's a minimal working example: - -```python linenums="1" hl_lines="8" -import os - -from prometheus_client import CollectorRegistry - -from faststream.kafka import KafkaBroker -from faststream.kafka.prometheus import KafkaPrometheusMiddleware +2. Configure your broker with Prometheus middleware and create a metrics endpoint: -broker = KafkaBroker( - middlewares=[ - KafkaPrometheusMiddleware( - registry=CollectorRegistry(), - app_name="your-app-name" - ) - ] -) +```python linenums="1" hl_lines="17-18 35-45" +{!> docs_src/getting_started/prometheus/kafka_multiprocess.py!} ``` -### Metrics Export Endpoint -For exporting metrics in multi-process mode, you need a special endpoint: +### How It Works -```python linenums="1" hl_lines="8" -import os +The metrics endpoint checks for the `PROMETHEUS_MULTIPROC_DIR` environment variable: -from prometheus_client import CollectorRegistry, multiprocess, generate_latest -from prometheus_client import CONTENT_TYPE_LATEST +- **Multi-process mode**: When the variable is set, it creates a new `CollectorRegistry` with `MultiProcessCollector` that aggregates metrics from all worker processes +- **Single-process mode**: When the variable is not set, it falls back to using the default registry -from faststream.asgi import AsgiResponse +This allows the same code to work in both single and multi-process deployments. -registry = CollectorRegistry() +### Running with Multiple Workers -@get -async def metrics(scope): - if path := os.environ.get("PROMETHEUS_MULTIPROC_DIR"): - registry_ = CollectorRegistry() - multiprocess.MultiProcessCollector(registry_, path=path) - else: - registry_ = registry +Start your application with uvicorn using multiple workers: - headers = {"Content-Type": CONTENT_TYPE_LATEST} - return AsgiResponse(generate_latest(registry_), status_code=200, headers=headers) +```bash +export PROMETHEUS_MULTIPROC_DIR=/tmp/prometheus_multiproc +mkdir -p $PROMETHEUS_MULTIPROC_DIR +uvicorn app:app --workers 4 ``` ### Important Requirements -1. The metrics directory must: + +1. **The metrics directory must:** - Exist before application start - Be writable by all worker processes - Be on a filesystem accessible to all workers - Be emptied between application runs -2. For better performance: + +2. **For better performance:** - Consider mounting the directory on `tmpfs` - Set up regular cleanup of old metric files +--- + ### Grafana dashboard You can import the [**Grafana dashboard**](https://grafana.com/grafana/dashboards/22130-faststream-metrics/){.external-link target="_blank"} to visualize the metrics collected by middleware. diff --git a/docs/docs_src/getting_started/prometheus/kafka_multiprocess.py b/docs/docs_src/getting_started/prometheus/kafka_multiprocess.py new file mode 100644 index 0000000000..451c06780b --- /dev/null +++ b/docs/docs_src/getting_started/prometheus/kafka_multiprocess.py @@ -0,0 +1,49 @@ +import os + +from prometheus_client import CollectorRegistry, generate_latest, multiprocess +from prometheus_client import CONTENT_TYPE_LATEST + +from faststream.asgi import AsgiFastStream, AsgiResponse +from faststream.kafka import KafkaBroker +from faststream.kafka.prometheus import KafkaPrometheusMiddleware + +# Create registry for metrics +registry = CollectorRegistry() + +# Create broker with Prometheus middleware +broker = KafkaBroker( + middlewares=[ + KafkaPrometheusMiddleware( + registry=registry, + app_name="your-app-name", + ) + ] +) + + +@broker.subscriber("test-queue") +async def handle_message(msg: str) -> None: + """Handle incoming messages.""" + pass + + +async def metrics(scope): + """Metrics endpoint that supports multi-process mode.""" + if path := os.environ.get("PROMETHEUS_MULTIPROC_DIR"): + # Multi-process mode: collect metrics from all workers + registry_ = CollectorRegistry() + multiprocess.MultiProcessCollector(registry_, path=path) + else: + # Single process mode: use the default registry + registry_ = registry + + headers = {"Content-Type": CONTENT_TYPE_LATEST} + return AsgiResponse(generate_latest(registry_), status_code=200, headers=headers) + + +app = AsgiFastStream( + broker, + asgi_routes=[ + ("/metrics", metrics), + ], +) diff --git a/tests/prometheus/multiprocess_app.py b/tests/prometheus/multiprocess_app.py new file mode 100644 index 0000000000..9679b535ac --- /dev/null +++ b/tests/prometheus/multiprocess_app.py @@ -0,0 +1,54 @@ +import os + +from prometheus_client import ( + CONTENT_TYPE_LATEST, + CollectorRegistry, + generate_latest, + multiprocess, +) + +from faststream.asgi import AsgiFastStream, AsgiResponse +from faststream.kafka import KafkaBroker +from faststream.kafka.prometheus import KafkaPrometheusMiddleware + +# Create registry for metrics +registry = CollectorRegistry() + +# Create broker with Prometheus middleware +broker = KafkaBroker( + middlewares=[ + KafkaPrometheusMiddleware( + registry=registry, + app_name="multiprocess-test-app", + ) + ] +) + + +@broker.subscriber("test-multiprocess-queue") +async def handle_message(msg: str) -> None: + """Handle incoming messages.""" + pass + + +async def metrics_endpoint(scope, receive, send): + """Metrics endpoint that supports multi-process mode.""" + if path := os.environ.get("PROMETHEUS_MULTIPROC_DIR"): + # Multi-process mode: aggregate from all workers + registry_ = CollectorRegistry() + multiprocess.MultiProcessCollector(registry_, path=path) + else: + # Single process mode: use the default registry + registry_ = registry + + headers = {"Content-Type": CONTENT_TYPE_LATEST} + response = AsgiResponse(generate_latest(registry_), status_code=200, headers=headers) + await response(scope, receive, send) + + +app = AsgiFastStream( + broker, + asgi_routes=[ + ("/metrics", metrics_endpoint), + ], +) diff --git a/tests/prometheus/test_multiprocess.py b/tests/prometheus/test_multiprocess.py new file mode 100644 index 0000000000..a68436a070 --- /dev/null +++ b/tests/prometheus/test_multiprocess.py @@ -0,0 +1,290 @@ +import os +import shutil +import signal +import subprocess +import tempfile +import time +from pathlib import Path + +import pytest + +# Skip tests if Kafka or Prometheus dependencies are not available +pytest.importorskip("aiokafka") +pytest.importorskip("prometheus_client") + + +@pytest.fixture() +def metrics_dir(): + """Create a temporary directory for multi-process metrics.""" + temp_dir = tempfile.mkdtemp(prefix="prometheus_multiproc_") + yield temp_dir + if Path(temp_dir).exists(): + shutil.rmtree(temp_dir) + + +@pytest.fixture() +def kafka_broker_url(): + """Kafka broker URL for testing.""" + return os.getenv("KAFKA_BROKER", "localhost:9092") + + +class TestPrometheusMultiprocess: + """Test suite for Prometheus multi-process mode.""" + + @pytest.mark.slow() + def test_multiprocess_metrics_collection(self, metrics_dir, kafka_broker_url): + """Test that metrics are correctly collected in multi-process mode. + + This test: + 1. Launches application via uvicorn with multiple workers + 2. Sets PROMETHEUS_MULTIPROC_DIR environment variable + 3. Sends messages to test queue + 4. Verifies /metrics endpoint responds consistently from any worker + """ + from aiokafka import AIOKafkaProducer + + # Path to the test application + app_module = "tests.prometheus.multiprocess_app:app" + + env = os.environ.copy() + env["PROMETHEUS_MULTIPROC_DIR"] = metrics_dir + env["KAFKA_BROKER"] = kafka_broker_url + + # Start uvicorn with 2 workers to test multi-process behavior + process = subprocess.Popen( + [ + "uvicorn", + app_module, + "--host", + "127.0.0.1", + "--port", + "8000", + "--workers", + "2", + ], + env=env, + stdout=subprocess.PIPE, + text=True, + ) + + try: + # Wait for the application to start and workers to initialize + time.sleep(15) + + # Verify the process is running + if process.poll() is not None: + _, stderr = process.communicate() + pytest.skip(f"Uvicorn process failed to start. stderr: {stderr[:500]}") + + import asyncio + import httpx + + try: + for attempt in range(3): + try: + response = httpx.get("http://127.0.0.1:8000/metrics", timeout=5) + if response.status_code in (200, 500): + break + except (httpx.ConnectError, httpx.ConnectTimeout): + if attempt < 2: + time.sleep(5) + else: + pytest.skip("Application not responding after 3 attempts") + except Exception as e: + pytest.skip(f"Could not connect to application: {e}") + + async def send_test_messages(): + producer = AIOKafkaProducer(bootstrap_servers=kafka_broker_url) + try: + await producer.start() + for i in range(10): + await producer.send_and_wait( + "test-multiprocess-queue", + f"test message {i}".encode(), + ) + except Exception as e: + pytest.skip(f"Failed to send messages to Kafka: {e}") + finally: + await producer.stop() + + try: + asyncio.run(send_test_messages()) + except Exception as e: + pytest.skip(f"Kafka not available: {e}") + + # Wait for messages to be processed + time.sleep(5) + + # Query /metrics endpoint multiple times + # In multi-process mode, any worker can respond + try: + responses = [] + for _ in range(5): + response = httpx.get("http://127.0.0.1:8000/metrics", timeout=10) + + if response.status_code == 500: + pytest.skip("Application returned 500 - Kafka connection issue") + + assert response.status_code == 200, ( + f"Metrics endpoint returned {response.status_code}" + ) + responses.append(response.text) + + # Verify all responses contain metrics + for metrics_text in responses: + assert "# HELP" in metrics_text or "# TYPE" in metrics_text + # Check for FastStream metrics + assert "faststream" in metrics_text.lower() + + # Verify metrics contain data about processed messages + # At least one response should show received messages + has_received_metrics = any( + "received_messages" in resp for resp in responses + ) + assert has_received_metrics, "No received_messages metrics found" + + except (httpx.ConnectError, httpx.ConnectTimeout): + pytest.skip("Could not connect to application") + + finally: + if process.poll() is None: + process.send_signal(signal.SIGTERM) + try: + process.wait(timeout=10) + except subprocess.TimeoutExpired: + process.kill() + process.wait() + + @pytest.mark.slow() + def test_multiprocess_directory_requirements(self, kafka_broker_url): + """Test that the metrics directory must exist and be writable. + + This test verifies the important requirements mentioned in the documentation. + """ + non_existent_dir = "/tmp/non_existent_prometheus_dir_12345" + if Path(non_existent_dir).exists(): + shutil.rmtree(non_existent_dir) + + app_module = "tests.prometheus.multiprocess_app:app" + + env = os.environ.copy() + env["PROMETHEUS_MULTIPROC_DIR"] = non_existent_dir + env["KAFKA_BROKER"] = kafka_broker_url + + process = subprocess.Popen( + [ + "uvicorn", + app_module, + "--host", + "127.0.0.1", + "--port", + "8001", + "--workers", + "1", + ], + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + try: + time.sleep(3) + + # The process might fail or run with warnings + # Check if stderr contains warnings about the directory + if process.poll() is not None: + process.communicate() + assert True + else: + process.send_signal(signal.SIGTERM) + process.wait(timeout=5) + + finally: + if process.poll() is None: + process.kill() + process.wait() + + def test_multiprocess_metrics_directory_cleanup(self, metrics_dir): + """Test that metrics directory should be emptied between application runs. + + This verifies the requirement that the directory should be cleaned + between runs to avoid stale metrics. + """ + # Create some dummy metric files + test_file = Path(metrics_dir) / "test_metric.db" + test_file.write_text("dummy content") + + assert test_file.exists() + + # In a real scenario, the directory should be cleaned before starting + # Here we verify that files exist and can be cleaned + for file in Path(metrics_dir).glob("*.db"): + file.unlink() + + remaining_files = list(Path(metrics_dir).glob("*.db")) + assert len(remaining_files) == 0, ( + "Metrics directory should be empty after cleanup" + ) + + @pytest.mark.slow() + def test_single_process_fallback(self, kafka_broker_url): + """Test that the application works in single-process mode without PROMETHEUS_MULTIPROC_DIR. + + This verifies that the metrics endpoint gracefully falls back to single-process mode. + """ + app_module = "tests.prometheus.multiprocess_app:app" + + env = os.environ.copy() + # Do NOT set PROMETHEUS_MULTIPROC_DIR + if "PROMETHEUS_MULTIPROC_DIR" in env: + del env["PROMETHEUS_MULTIPROC_DIR"] + env["KAFKA_BROKER"] = kafka_broker_url + + process = subprocess.Popen( + [ + "uvicorn", + app_module, + "--host", + "127.0.0.1", + "--port", + "8003", + ], + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + try: + time.sleep(15) + + if process.poll() is not None: + pytest.skip("Uvicorn process failed to start") + + import httpx + + try: + response = httpx.get("http://127.0.0.1:8003/metrics", timeout=10) + + if response.status_code == 500: + pytest.skip( + "Application returned 500 - likely Kafka connection issue" + ) + + assert response.status_code == 200 + metrics_text = response.text + + assert "# HELP" in metrics_text or "# TYPE" in metrics_text + + except (httpx.ConnectError, httpx.ConnectTimeout): + pytest.skip("Could not connect to application") + + finally: + if process.poll() is None: + process.send_signal(signal.SIGTERM) + try: + process.wait(timeout=10) + except subprocess.TimeoutExpired: + process.kill() + process.wait() From 443d5c1810371d9af3ab311f1764fe011e093bdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B5=D0=BB=D1=8C=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9C=D0=B0=D0=BA=D1=81=D0=B8=D0=BC?= Date: Mon, 6 Oct 2025 06:53:14 +0300 Subject: [PATCH 8/8] docs/feat(prometheus): fix lint --- docs/docs_src/getting_started/prometheus/kafka_multiprocess.py | 1 - tests/prometheus/multiprocess_app.py | 1 - tests/prometheus/test_multiprocess.py | 1 + 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/docs_src/getting_started/prometheus/kafka_multiprocess.py b/docs/docs_src/getting_started/prometheus/kafka_multiprocess.py index 451c06780b..6086aafe66 100644 --- a/docs/docs_src/getting_started/prometheus/kafka_multiprocess.py +++ b/docs/docs_src/getting_started/prometheus/kafka_multiprocess.py @@ -24,7 +24,6 @@ @broker.subscriber("test-queue") async def handle_message(msg: str) -> None: """Handle incoming messages.""" - pass async def metrics(scope): diff --git a/tests/prometheus/multiprocess_app.py b/tests/prometheus/multiprocess_app.py index 9679b535ac..865a1f2c65 100644 --- a/tests/prometheus/multiprocess_app.py +++ b/tests/prometheus/multiprocess_app.py @@ -28,7 +28,6 @@ @broker.subscriber("test-multiprocess-queue") async def handle_message(msg: str) -> None: """Handle incoming messages.""" - pass async def metrics_endpoint(scope, receive, send): diff --git a/tests/prometheus/test_multiprocess.py b/tests/prometheus/test_multiprocess.py index a68436a070..bfb602b9c9 100644 --- a/tests/prometheus/test_multiprocess.py +++ b/tests/prometheus/test_multiprocess.py @@ -77,6 +77,7 @@ def test_multiprocess_metrics_collection(self, metrics_dir, kafka_broker_url): pytest.skip(f"Uvicorn process failed to start. stderr: {stderr[:500]}") import asyncio + import httpx try: