@@ -220,6 +220,18 @@ where
220220 }
221221}
222222
223+ /// Configuration and dependencies for running background tasks.
224+ struct BackgroundTaskConfig < S > {
225+ fetcher : Arc < dyn Fetch > ,
226+ checker : Arc < dyn HealthCheck > ,
227+ routing_snapshot : AtomicSwap < S > ,
228+ seeds : Vec < Node > ,
229+ fetch_period : Duration ,
230+ fetch_retry_interval : Duration ,
231+ check_period : Duration ,
232+ token : stop_token:: StopToken ,
233+ }
234+
223235impl < S > DynamicRouteProvider < S >
224236where
225237 S : RoutingSnapshot + ' static ,
@@ -261,44 +273,36 @@ where
261273
262274 // We won the race - start the background tasks
263275 // Clone what we need for the spawned task
264- let fetcher = Arc :: clone ( & self . fetcher ) ;
265- let checker = Arc :: clone ( & self . checker ) ;
266- let routing_snapshot = Arc :: clone ( & self . routing_snapshot ) ;
267- let seeds = self . seeds . clone ( ) ;
268- let fetch_period = self . fetch_period ;
269- let fetch_retry_interval = self . fetch_retry_interval ;
270- let check_period = self . check_period ;
271- let token = self . token . token ( ) ;
276+ let config = BackgroundTaskConfig {
277+ fetcher : Arc :: clone ( & self . fetcher ) ,
278+ checker : Arc :: clone ( & self . checker ) ,
279+ routing_snapshot : Arc :: clone ( & self . routing_snapshot ) ,
280+ seeds : self . seeds . clone ( ) ,
281+ fetch_period : self . fetch_period ,
282+ fetch_retry_interval : self . fetch_retry_interval ,
283+ check_period : self . check_period ,
284+ token : self . token . token ( ) ,
285+ } ;
272286
273287 // Spawn the initialization - don't wait (fire-and-forget)
274288 crate :: util:: spawn ( async move {
275- Self :: run_background_tasks (
276- fetcher,
277- checker,
278- routing_snapshot,
279- seeds,
280- fetch_period,
281- fetch_retry_interval,
282- check_period,
283- token,
284- )
285- . await ;
289+ Self :: run_background_tasks ( config) . await ;
286290 } ) ;
287291 }
288292
289293 /// Internal implementation that starts the background tasks and waits for initialization.
290294 async fn run ( & self ) {
291- Self :: run_background_tasks (
292- Arc :: clone ( & self . fetcher ) ,
293- Arc :: clone ( & self . checker ) ,
294- Arc :: clone ( & self . routing_snapshot ) ,
295- self . seeds . clone ( ) ,
296- self . fetch_period ,
297- self . fetch_retry_interval ,
298- self . check_period ,
299- self . token . token ( ) ,
300- )
301- . await ;
295+ let config = BackgroundTaskConfig {
296+ fetcher : Arc :: clone ( & self . fetcher ) ,
297+ checker : Arc :: clone ( & self . checker ) ,
298+ routing_snapshot : Arc :: clone ( & self . routing_snapshot ) ,
299+ seeds : self . seeds . clone ( ) ,
300+ fetch_period : self . fetch_period ,
301+ fetch_retry_interval : self . fetch_retry_interval ,
302+ check_period : self . check_period ,
303+ token : self . token . token ( ) ,
304+ } ;
305+ Self :: run_background_tasks ( config ) . await ;
302306 }
303307
304308 /// Starts two background tasks:
@@ -308,16 +312,7 @@ where
308312 /// - Listens to the fetched nodes messages from the `NodesFetchActor`.
309313 /// - Starts/stops health check tasks (`HealthCheckActors`) based on the newly added/removed nodes.
310314 /// - These spawned health check tasks periodically update the snapshot with the latest node health info.
311- async fn run_background_tasks (
312- fetcher : Arc < dyn Fetch > ,
313- checker : Arc < dyn HealthCheck > ,
314- routing_snapshot : AtomicSwap < S > ,
315- seeds : Vec < Node > ,
316- fetch_period : Duration ,
317- fetch_retry_interval : Duration ,
318- check_period : Duration ,
319- token : stop_token:: StopToken ,
320- ) {
315+ async fn run_background_tasks ( config : BackgroundTaskConfig < S > ) {
321316 log ! ( info, "{DYNAMIC_ROUTE_PROVIDER}: started ..." ) ;
322317 // Communication channel between NodesFetchActor and HealthManagerActor.
323318 let ( fetch_sender, fetch_receiver) = async_watch:: channel ( None ) ;
@@ -327,18 +322,18 @@ where
327322
328323 // Start the receiving part first.
329324 let health_manager_actor = HealthManagerActor :: new (
330- Arc :: clone ( & checker) ,
331- check_period,
332- Arc :: clone ( & routing_snapshot) ,
325+ Arc :: clone ( & config . checker ) ,
326+ config . check_period ,
327+ Arc :: clone ( & config . routing_snapshot ) ,
333328 fetch_receiver,
334329 init_sender,
335- token. clone ( ) ,
330+ config . token . clone ( ) ,
336331 ) ;
337332 crate :: util:: spawn ( async move { health_manager_actor. run ( ) . await } ) ;
338333
339334 // Dispatch all seed nodes for initial health checks
340335 if let Err ( _err) = fetch_sender. send ( Some ( FetchedNodes {
341- nodes : seeds. clone ( ) ,
336+ nodes : config . seeds . clone ( ) ,
342337 } ) ) {
343338 log ! (
344339 error,
@@ -368,12 +363,12 @@ where
368363 init_receiver. close ( ) ;
369364
370365 let fetch_actor = NodesFetchActor :: new (
371- Arc :: clone ( & fetcher) ,
372- fetch_period,
373- fetch_retry_interval,
366+ Arc :: clone ( & config . fetcher ) ,
367+ config . fetch_period ,
368+ config . fetch_retry_interval ,
374369 fetch_sender,
375- Arc :: clone ( & routing_snapshot) ,
376- token,
370+ Arc :: clone ( & config . routing_snapshot ) ,
371+ config . token ,
377372 ) ;
378373 crate :: util:: spawn ( async move { fetch_actor. run ( ) . await } ) ;
379374 log ! (
0 commit comments