88import anyio
99from fast_depends import dependency_provider
1010from faststream import ContextRepo , FastStream
11- from faststream ._compat import ExceptionGroup
11+ from faststream ._internal . _compat import ExceptionGroup
1212from faststream .asgi import AsgiFastStream , AsgiResponse , get
1313from faststream .kafka import KafkaBroker
14- from faststream .kafka .publisher .asyncapi import AsyncAPIDefaultPublisher
14+ from faststream .kafka .publisher import DefaultPublisher
15+ from faststream .kafka .subscriber .usecase import BatchSubscriber
16+ from faststream .specification .asyncapi import AsyncAPI
1517from sqlalchemy .ext .asyncio import AsyncSession
1618
1719import data_rentgen
1820from data_rentgen .consumer .settings import ConsumerApplicationSettings
1921from data_rentgen .consumer .subscribers import runs_events_subscriber
20- from data_rentgen .db .factory import create_session_factory
22+ from data_rentgen .db .factory import session_generator
2123from data_rentgen .logging .setup_logging import setup_logging
2224
2325logger = logging .getLogger (__name__ )
@@ -43,21 +45,34 @@ def broker_factory(settings: ConsumerApplicationSettings) -> KafkaBroker:
4345 # register subscribers using settings
4446 consumer_settings = settings .consumer .model_dump (exclude = {"topics_list" , "topics_pattern" , "malformed_topic" })
4547
46- subscriber = broker .subscriber (
48+ subscribe = broker .subscriber (
4749 * settings .consumer .topics_list ,
4850 pattern = settings .consumer .topics_pattern ,
4951 ** consumer_settings ,
5052 batch = True ,
5153 # Disable parsing JSONs on FastStream level
5254 decoder = lambda _ : None ,
5355 )
56+
57+ # register subscriber
58+ batch_subscriber = subscribe (runs_events_subscriber )
59+
60+ async def get_subscriber ():
61+ return batch_subscriber
62+
63+ # FastStream uses WeakSet for subscribers, so we need to keep long lived reference somewhere
64+ dependency_provider .override (BatchSubscriber , get_subscriber )
65+
66+ # register publisher
5467 publisher = broker .publisher (settings .producer .malformed_topic )
5568
56- # perform registration
57- subscriber (runs_events_subscriber )
69+ async def get_publisher ():
70+ return publisher
71+
72+ dependency_provider .override (DefaultPublisher , get_publisher )
5873
59- dependency_provider . override ( AsyncSession , create_session_factory ( settings . database ))
60- dependency_provider .override (AsyncAPIDefaultPublisher , lambda : publisher )
74+ # Override session generator
75+ dependency_provider .override (AsyncSession , session_generator ( settings . database ) )
6176 return broker
6277
6378
@@ -78,11 +93,13 @@ async def security_lifespan(context: ContextRepo):
7893 raise exception from None
7994
8095 return FastStream (
81- broker = broker_factory (settings ),
96+ broker_factory (settings ),
8297 lifespan = security_lifespan ,
83- title = "Data.Rentgen" ,
84- description = "Data.Rentgen is a nextgen DataLineage service" ,
85- version = data_rentgen .__version__ ,
98+ specification = AsyncAPI (
99+ title = "Data.Rentgen" ,
100+ description = "Data.Rentgen is a nextgen DataLineage service" ,
101+ version = data_rentgen .__version__ ,
102+ ),
86103 logger = logger ,
87104 ).as_asgi (asgi_routes = [("/monitoring/ping" , liveness )])
88105
0 commit comments