1111from typing import Any , Awaitable , cast
1212
1313from frequenz .channels import Broadcast , Receiver , Sender , select
14+ from frequenz .channels .timer import SkipMissedAndDrift , Timer
1415from frequenz .client .common .microgrid .components import ComponentCategory
1516from frequenz .client .microgrid import ComponentId
1617from frequenz .sdk .actor import Actor , BackgroundService
@@ -142,59 +143,6 @@ async def main():
142143 ```
143144 """
144145
145- class FailedDispatchesRetrier (BackgroundService ):
146- """Manages the retring of failed dispatches."""
147-
148- def __init__ (self , retry_interval : timedelta ) -> None :
149- """Initialize the retry manager.
150-
151- Args:
152- retry_interval: The interval between retries.
153- """
154- super ().__init__ ()
155- self ._retry_interval = retry_interval
156- self ._channel = Broadcast [Dispatch ](name = "retry_channel" )
157- self ._sender = self ._channel .new_sender ()
158-
159- def start (self ) -> None :
160- """Start the background service.
161-
162- This is a no-op.
163- """
164-
165- def new_receiver (self ) -> Receiver [Dispatch ]:
166- """Create a new receiver for dispatches to retry.
167-
168- Returns:
169- The receiver.
170- """
171- return self ._channel .new_receiver ()
172-
173- def retry (self , dispatch : Dispatch ) -> None :
174- """Retry a dispatch.
175-
176- Args:
177- dispatch: The dispatch information to retry.
178- """
179- task = asyncio .create_task (self ._retry_after_delay (dispatch ))
180- self ._tasks .add (task )
181- task .add_done_callback (self ._tasks .remove )
182-
183- async def _retry_after_delay (self , dispatch : Dispatch ) -> None :
184- """Retry a dispatch after a delay.
185-
186- Args:
187- dispatch: The dispatch information to retry.
188- """
189- _logger .info (
190- "Will retry dispatch %s after %s" ,
191- dispatch .id ,
192- self ._retry_interval ,
193- )
194- await asyncio .sleep (self ._retry_interval .total_seconds ())
195- _logger .info ("Retrying dispatch %s now" , dispatch .id )
196- await self ._sender .send (dispatch )
197-
198146 @dataclass (frozen = True , kw_only = True )
199147 class ActorAndChannel :
200148 """Actor and its sender."""
@@ -225,18 +173,18 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
225173 running_status_receiver: The receiver for dispatch running status changes.
226174 dispatch_identity: A function to identify to which actor a dispatch refers.
227175 By default, it uses the dispatch ID.
228- retry_interval: The interval between retries .
176+ retry_interval: How long to wait until trying to start failed actors again .
229177 """
230178 super ().__init__ ()
231179 self ._dispatch_identity : Callable [[Dispatch ], int ] = (
232180 dispatch_identity if dispatch_identity else lambda d : d .id
233181 )
234182
235183 self ._dispatch_rx = running_status_receiver
184+ self ._retry_timer_rx = Timer (retry_interval , SkipMissedAndDrift ())
236185 self ._actor_factory = actor_factory
237186 self ._actors : dict [int , ActorDispatcher .ActorAndChannel ] = {}
238-
239- self ._retrier = ActorDispatcher .FailedDispatchesRetrier (retry_interval )
187+ self ._pending_dispatches : dict [int , Dispatch ] = {}
240188
241189 def start (self ) -> None :
242190 """Start the background service."""
@@ -292,7 +240,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
292240 dispatch .type ,
293241 exc_info = e ,
294242 )
295- self ._retrier . retry ( dispatch )
243+ self ._pending_dispatches [ identity ] = dispatch
296244 else :
297245 # No exception occurred, so we can add the actor to the list
298246 self ._actors [identity ] = ActorDispatcher .ActorAndChannel (
@@ -318,21 +266,35 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
318266
319267 async def _run (self ) -> None :
320268 """Run the background service."""
321- async with self ._retrier :
322- retry_recv = self ._retrier .new_receiver ()
269+ async for selected in select (self ._retry_timer_rx , self ._dispatch_rx ):
270+ if self ._retry_timer_rx .triggered (selected ):
271+ if not self ._pending_dispatches :
272+ continue
273+
274+ _logger .info (
275+ "Retrying %d pending dispatches" ,
276+ len (self ._pending_dispatches ),
277+ )
278+ keys = self ._pending_dispatches .keys ()
279+ for identity in keys :
280+ dispatch = self ._pending_dispatches [identity ]
323281
324- async for selected in select (retry_recv , self ._dispatch_rx ):
325- if retry_recv .triggered (selected ):
326- self ._retrier .retry (selected .message )
327- elif self ._dispatch_rx .triggered (selected ):
328- await self ._handle_dispatch (selected .message )
282+ await self ._handle_dispatch (dispatch )
283+ elif self ._dispatch_rx .triggered (selected ):
284+ await self ._handle_dispatch (selected .message )
329285
330286 async def _handle_dispatch (self , dispatch : Dispatch ) -> None :
331287 """Handle a dispatch.
332288
289+ Also removes the dispatch from pending dispatches.
290+
333291 Args:
334292 dispatch: The dispatch to handle.
335293 """
294+ identity = self ._dispatch_identity (dispatch )
295+ if identity in self ._pending_dispatches :
296+ self ._pending_dispatches .pop (identity )
297+
336298 if dispatch .started :
337299 await self ._start_actor (dispatch )
338300 else :
0 commit comments