55"""
66
77import os
8- from typing import Union , Optional
8+ from typing import Optional , Union
99
1010import structlog
1111from faststream import FastStream
12- from faststream .redis import RedisRouter
12+ from faststream .redis import RedisBroker , RedisRouter
1313from opentelemetry import trace
1414from opentelemetry .exporter .otlp .proto .grpc .trace_exporter import OTLPSpanExporter
1515from opentelemetry .sdk .resources import Resource
1616from opentelemetry .sdk .trace import TracerProvider
1717from opentelemetry .sdk .trace .export import BatchSpanProcessor
1818
19- from alembic .env import logger
2019from common import AppConfig , application_init
20+ from conftest import test_config
2121from domains import event_registry
2222from domains .books .events import BookCreatedV1 , BookCreatedV1Data
2323
24+ """
25+ For the sake of this example app we reuse the domain registry,
26+ which is used for publishing. In a real world these registries
27+ are different and separate.
28+ """
29+ subscriber_registry = event_registry
30+
2431
2532def setup_telemetry (service_name : str , otlp_endpoint : str ) -> TracerProvider :
2633 resource = Resource .create (attributes = {"service.name" : service_name })
@@ -33,11 +40,15 @@ def setup_telemetry(service_name: str, otlp_endpoint: str) -> TracerProvider:
3340
3441
3542def create_app (test_config : Union [AppConfig , None ] = None ) -> FastStream :
43+ config = test_config or AppConfig ()
3644 setup_telemetry (
3745 "faststream" , otlp_endpoint = os .environ ["OTEL_EXPORTER_OTLP_ENDPOINT" ]
3846 )
39- router = application_init (AppConfig ()).faststream_broker
40- broker = router .broker
47+ broker = application_init (config ).faststream_broker
48+ register_subscribers (broker )
49+ if config .EVENTS .REGISTER_SUBSCRIBERS :
50+ register_subscribers (broker )
51+
4152 app = FastStream (broker , logger = structlog .get_logger ())
4253
4354 @app .after_startup
@@ -56,18 +67,20 @@ async def after_startup():
5667 return app
5768
5869
59-
60- # TODO: Add Routing structure similar to the one in the fastapi and
61- # move this in the event_consumer_module
62- def register_subscribers (router : RedisRouter , topic : Optional [str ] = None ):
63- if topic is not None and topic in event_registry .keys ():
64- topics_map = {topic : event_registry [topic ]}
70+ # TODO: Add Routing structure similar to the one in the fastapi implementation
71+ def register_subscribers (broker : RedisBroker , topic : Optional [str ] = None ):
72+ if topic is not None and topic in subscriber_registry .keys ():
73+ topics_map = {topic : subscriber_registry [topic ]}
6574 else :
66- topics_map = event_registry .copy ()
75+ topics_map = subscriber_registry .copy ()
6776
6877 logger = structlog .get_logger ()
78+ router = RedisRouter ()
6979
7080 for topic , event_type in topics_map .items ():
81+
7182 @router .subscriber (topic )
7283 async def handler (msg : event_type ) -> None : # type: ignore[valid-type]
7384 logger .info (f"Received message { type (msg )} { msg } " )
85+
86+ broker .include_router (router )
0 commit comments