22# SPDX-License-Identifier: Apache-2.0
33
44import logging
5+ from contextlib import asynccontextmanager
56
7+ import anyio
68from fast_depends import dependency_provider
7- from faststream import FastStream
9+ from faststream import ContextRepo , FastStream
10+ from faststream ._compat import ExceptionGroup
811from faststream .kafka import KafkaBroker
912from sqlalchemy .ext .asyncio import AsyncSession
1013
1114import data_rentgen
1215from data_rentgen .consumer .settings import ConsumerApplicationSettings
13- from data_rentgen .consumer .settings .security import get_broker_security
1416from data_rentgen .consumer .subscribers import runs_events_subscriber
1517from data_rentgen .db .factory import create_session_factory
1618from data_rentgen .logging .setup_logging import setup_logging
2123def broker_factory (settings : ConsumerApplicationSettings ) -> KafkaBroker :
2224 broker = KafkaBroker (
2325 bootstrap_servers = settings .kafka .bootstrap_servers ,
24- security = get_broker_security ( settings .kafka .security ),
26+ security = settings .kafka .security . to_security ( ),
2527 compression_type = settings .kafka .compression .value if settings .kafka .compression else None ,
2628 client_id = f"data-rentgen-{ data_rentgen .__version__ } " ,
2729 logger = logger ,
30+ ** settings .kafka .security .extra_broker_kwargs (),
2831 )
2932
3033 # register subscribers using settings
@@ -41,8 +44,24 @@ def broker_factory(settings: ConsumerApplicationSettings) -> KafkaBroker:
4144
4245
4346def application_factory (settings : ConsumerApplicationSettings ) -> FastStream :
47+ @asynccontextmanager
48+ async def security_lifespan (context : ContextRepo ):
49+ try :
50+ async with anyio .create_task_group () as tg :
51+ await settings .kafka .security .initialize ()
52+ tg .start_soon (settings .kafka .security .refresh )
53+
54+ yield
55+
56+ await settings .kafka .security .destroy ()
57+ tg .cancel_scope .cancel ()
58+ except ExceptionGroup as e :
59+ for exception in e .exceptions :
60+ raise exception from None
61+
4462 return FastStream (
4563 broker = broker_factory (settings ),
64+ lifespan = security_lifespan ,
4665 title = "Data.Rentgen" ,
4766 description = "Data.Rentgen is a nextgen DataLineage service" ,
4867 version = data_rentgen .__version__ ,
0 commit comments