docs: add Prometheus metrics configuration for multi-process mode#2275
docs: add Prometheus metrics configuration for multi-process mode#2275Peopl3s wants to merge 17 commits intoag2ai:mainfrom
Conversation
|
Hi, your example in the documentation is not valid. All metrics will automatically pick up the multiprocessing mode if the Here is an example of a function that works correctly in multiprocessing mode. Try to simplify it for documentation. import os
import time
from prometheus_client import CollectorRegistry, multiprocess, generate_latest, CONTENT_TYPE_LATEST
from prometheus_client.asgi import make_asgi_app
from faststream.asgi import AsgiFastStream, get, AsgiResponse
from faststream.rabbit.annotations import RabbitBroker
from faststream.rabbit.prometheus import RabbitPrometheusMiddleware
registry = CollectorRegistry()
@get
async def metrics(scope):
if path := os.environ.get("PROMETHEUS_MULTIPROC_DIR"):
registry_ = CollectorRegistry()
multiprocess.MultiProcessCollector(registry_, path=path)
else:
registry_ = registry
headers = {"Content-Type": CONTENT_TYPE_LATEST}
return AsgiResponse(generate_latest(registry_), status_code=200, headers=headers)
mid = RabbitPrometheusMiddleware(registry=registry)
broker = RabbitBroker(middlewares=[mid], max_consumers=10)
app = AsgiFastStream(broker, asgi_routes=[("/metrics", metrics)])
@broker.subscriber("test")
async def handler(msg: str) -> None:
time.sleep(0.1)
print(msg, os.getpid())To test the behavior, you need to launch the application via What else needs to be specified in the documentation:
You also need to write a test that will check the correctness of the work. Let me know if you can't get it tested. |
|
@draincoder Thx, I'll try to write the tests, and I'll let you know if I do/don't. |
|
@Peopl3s @draincoder hi! Have we any updates here? |
|
I've corrected the documentation, but I haven't gotten around to writing tests yet |
|
@Peopl3s Hi, are there any updates regarding the tests? |
|
@draincoder Hi. I'm sorry that I'm updating the status late. I've corrected the documentation, but I won't be able to take the tests in July. If someone else joins in, it would be great. |
# Conflicts: # docs/docs/en/getting-started/observability/prometheus.md
|
Hi, thank you for your contribution to this PR. This documentation will be very helpful. I apologize for not being able to complete the review in a timely manner. |
| from aiokafka import AIOKafkaProducer | ||
|
|
||
| # Path to the test application | ||
| app_module = "tests.prometheus.multiprocess_app:app" |
There was a problem hiding this comment.
Why not test exactly the code that will be output in the documentation? (docs/docs_src/getting_started/prometheus/kafka_multiprocess.py)
and the test will then be located at https://github.com/ag2ai/faststream/tree/main/tests/docs/getting_started/prometheus
then you won't have to write a duplicate in tests/prometheus/multiprocess_app.py
| process.kill() | ||
| process.wait() | ||
|
|
||
| def test_multiprocess_metrics_directory_cleanup(self, metrics_dir): |
There was a problem hiding this comment.
Is this test really necessary? It doesn't check any code from the documentation.
| env["KAFKA_BROKER"] = kafka_broker_url | ||
|
|
||
| # Start uvicorn with 2 workers to test multi-process behavior | ||
| process = subprocess.Popen( |
There was a problem hiding this comment.
I think that instead of working with the process directly, one could:
from docs.docs_src.getting_started.prometheus.kafka_multiprocess import app
await app.run(run_extra_options={"host": "127.0.0.1", "port": "8000", "workers": "2"})
... # some test
app.exit()This way the tests will be cleaner in my opinion.
Description
This PR updates the documentation to include detailed instructions for configuring Prometheus metrics collection when running
FastStream inmulti-process mode. The new section describes the required environment variable (PROMETHEUS_MULTIPROC_DIR), how to initialize the Prometheus collector registry with multiprocess support, and provides a code example for integrating the configuration with FastStream and Kafka.Close #2220
Type of change
Checklist
scripts/lint.shshows no errors)scripts/test-cov.shscripts/static-analysis.sh