1111from typing import Any , Awaitable , cast
1212
1313from frequenz .channels import Broadcast , Receiver , Sender , select
14+ from frequenz .channels .timer import SkipMissedAndDrift , Timer
1415from frequenz .client .common .microgrid .components import ComponentCategory
1516from frequenz .client .microgrid import ComponentId
1617from frequenz .sdk .actor import Actor , BackgroundService
@@ -142,59 +143,6 @@ async def main():
142143 ```
143144 """
144145
145- class FailedDispatchesRetrier (BackgroundService ):
146- """Manages the retring of failed dispatches."""
147-
148- def __init__ (self , retry_interval : timedelta ) -> None :
149- """Initialize the retry manager.
150-
151- Args:
152- retry_interval: The interval between retries.
153- """
154- super ().__init__ ()
155- self ._retry_interval = retry_interval
156- self ._channel = Broadcast [Dispatch ](name = "retry_channel" )
157- self ._sender = self ._channel .new_sender ()
158-
159- def start (self ) -> None :
160- """Start the background service.
161-
162- This is a no-op.
163- """
164-
165- def new_receiver (self ) -> Receiver [Dispatch ]:
166- """Create a new receiver for dispatches to retry.
167-
168- Returns:
169- The receiver.
170- """
171- return self ._channel .new_receiver ()
172-
173- def retry (self , dispatch : Dispatch ) -> None :
174- """Retry a dispatch.
175-
176- Args:
177- dispatch: The dispatch information to retry.
178- """
179- task = asyncio .create_task (self ._retry_after_delay (dispatch ))
180- self ._tasks .add (task )
181- task .add_done_callback (self ._tasks .remove )
182-
183- async def _retry_after_delay (self , dispatch : Dispatch ) -> None :
184- """Retry a dispatch after a delay.
185-
186- Args:
187- dispatch: The dispatch information to retry.
188- """
189- _logger .info (
190- "Will retry dispatch %s after %s" ,
191- dispatch .id ,
192- self ._retry_interval ,
193- )
194- await asyncio .sleep (self ._retry_interval .total_seconds ())
195- _logger .info ("Retrying dispatch %s now" , dispatch .id )
196- await self ._sender .send (dispatch )
197-
198146 @dataclass (frozen = True , kw_only = True )
199147 class ActorAndChannel :
200148 """Actor and its sender."""
@@ -225,18 +173,19 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
225173 running_status_receiver: The receiver for dispatch running status changes.
226174 dispatch_identity: A function to identify to which actor a dispatch refers.
227175 By default, it uses the dispatch ID.
228- retry_interval: The interval between retries .
176+ retry_interval: How long to wait until trying to start failed actors again .
229177 """
230178 super ().__init__ ()
231179 self ._dispatch_identity : Callable [[Dispatch ], int ] = (
232180 dispatch_identity if dispatch_identity else lambda d : d .id
233181 )
234182
235183 self ._dispatch_rx = running_status_receiver
184+ self ._retry_timer_rx = Timer (retry_interval , SkipMissedAndDrift ())
236185 self ._actor_factory = actor_factory
237186 self ._actors : dict [int , ActorDispatcher .ActorAndChannel ] = {}
238-
239- self . _retrier = ActorDispatcher . FailedDispatchesRetrier ( retry_interval )
187+ self . _failed_dispatches : dict [ int , Dispatch ] = {}
188+ """Failed dispatches that will be retried later."""
240189
241190 def start (self ) -> None :
242191 """Start the background service."""
@@ -292,7 +241,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
292241 dispatch .type ,
293242 exc_info = e ,
294243 )
295- self ._retrier . retry ( dispatch )
244+ self ._failed_dispatches [ identity ] = dispatch
296245 else :
297246 # No exception occurred, so we can add the actor to the list
298247 self ._actors [identity ] = ActorDispatcher .ActorAndChannel (
@@ -318,21 +267,37 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
318267
319268 async def _run (self ) -> None :
320269 """Run the background service."""
321- async with self ._retrier :
322- retry_recv = self ._retrier .new_receiver ()
270+ async for selected in select (self ._retry_timer_rx , self ._dispatch_rx ):
271+ if self ._retry_timer_rx .triggered (selected ):
272+ if not self ._failed_dispatches :
273+ continue
274+
275+ _logger .info (
276+ "Retrying %d failed actor starts" ,
277+ len (self ._failed_dispatches ),
278+ )
279+ keys = list (self ._failed_dispatches .keys ())
280+ for identity in keys :
281+ dispatch = self ._failed_dispatches [identity ]
323282
324- async for selected in select (retry_recv , self ._dispatch_rx ):
325- if retry_recv .triggered (selected ):
326- self ._retrier .retry (selected .message )
327- elif self ._dispatch_rx .triggered (selected ):
328- await self ._handle_dispatch (selected .message )
283+ await self ._handle_dispatch (dispatch )
284+ elif self ._dispatch_rx .triggered (selected ):
285+ await self ._handle_dispatch (selected .message )
329286
330287 async def _handle_dispatch (self , dispatch : Dispatch ) -> None :
331- """Handle a dispatch.
288+ """Process a dispatch to start, update, or stop an actor.
289+
290+ If a newer version of a previously failed dispatch is received, the
291+ pending retry for the older version is canceled to ensure only the
292+ latest dispatch is processed.
332293
333294 Args:
334295 dispatch: The dispatch to handle.
335296 """
297+ identity = self ._dispatch_identity (dispatch )
298+ if identity in self ._failed_dispatches :
299+ self ._failed_dispatches .pop (identity )
300+
336301 if dispatch .started :
337302 await self ._start_actor (dispatch )
338303 else :
0 commit comments