55
66import anyio
77import injection
8+ from anyio .abc import TaskGroup
89
910from cq ._core .message import Event , EventBus
1011from cq ._core .scope import CQScope
@@ -20,20 +21,23 @@ def add(self, *events: Event) -> None:
2021
2122
2223@dataclass (repr = False , eq = False , frozen = True , slots = True )
23- class SimpleRelatedEvents (RelatedEvents ):
24- items : list [Event ] = field (default_factory = list )
24+ class AnyIORelatedEvents (RelatedEvents ):
25+ event_bus : EventBus
26+ task_group : TaskGroup
27+ history : list [Event ] = field (default_factory = list , init = False )
2528
26- def __bool__ (self ) -> bool :
27- return bool (self .items )
29+ def __bool__ (self ) -> bool : # pragma: no cover
30+ return bool (self .history )
2831
2932 def add (self , * events : Event ) -> None :
30- self .items .extend (events )
33+ self .history .extend (events )
34+ dispatch_method = self .event_bus .dispatch
35+
36+ for event in events :
37+ self .task_group .start_soon (dispatch_method , event )
3138
3239
3340@injection .scoped (CQScope .TRANSACTION , mode = "fallback" )
3441async def related_events_recipe (event_bus : EventBus ) -> AsyncIterator [RelatedEvents ]:
35- yield (instance := SimpleRelatedEvents ())
36-
3742 async with anyio .create_task_group () as task_group :
38- for event in instance .items :
39- task_group .start_soon (event_bus .dispatch , event )
43+ yield AnyIORelatedEvents (event_bus , task_group )
0 commit comments