@@ -35,10 +35,15 @@ use tokio::time::Instant;
3535/// On 2, the `wait_for_delegate_info` call will create an entry in the [DashMap] with a
3636/// [oneshot::Sender], and listen on the other end of the channel [oneshot::Receiver] for
3737/// the delegate to put something there.
38+ ///
39+ /// It's possible for [StageContext] to "get lost" if `add_delegate_info` is called without
40+ /// a corresponding call to `wait_for_delegate_info` or vice versa. In this case, a task will
41+ /// reap any contexts that live for longer than the `gc_ttl`.
3842pub struct StageDelegation {
3943 stage_targets : Arc < DashMap < ( String , usize ) , Value > > ,
4044 wait_timeout : Duration ,
4145
46+ /// notify is used to shut down the garbage collection task when the StageDelegation is dropped.
4247 notify : Arc < Notify > ,
4348}
4449
@@ -50,11 +55,11 @@ impl Default for StageDelegation {
5055 let result = Self {
5156 stage_targets : stage_targets. clone ( ) ,
5257 wait_timeout : Duration :: from_secs ( 5 ) ,
53-
5458 notify : notify. clone ( ) ,
5559 } ;
5660
57- tokio:: spawn ( run_gc_async (
61+ // Run the GC task.
62+ tokio:: spawn ( run_gc (
5863 stage_targets. clone ( ) ,
5964 notify. clone ( ) ,
6065 Duration :: from_secs ( 30 ) , /* gc period */
@@ -64,10 +69,11 @@ impl Default for StageDelegation {
6469 }
6570}
6671
67- // gc_interval is the period over which gc runs to purge old stage_targets entries which will
68- // never be read. This may happen if the actor encounters an error before it can read
69- // the delagate info.
70- async fn run_gc_async (
72+ const GC_PERIOD_SECONDS : usize = 30 ;
73+
74+ // run_gc will continuously clear expired entries from the map, checking every `period`. The
75+ // function terminates if `shutdown` is signalled.
76+ async fn run_gc (
7177 stage_targets : Arc < DashMap < ( String , usize ) , Value > > ,
7278 shutdown : Arc < Notify > ,
7379 period : Duration ,
@@ -78,7 +84,7 @@ async fn run_gc_async(
7884 break ;
7985 }
8086 _ = tokio:: time:: sleep( period) => {
81- // PERF : This iterator is sharded, so it won't lock the whole map.
87+ // Performance : This iterator is sharded, so it won't lock the whole map.
8288 stage_targets. retain( |_key, value| {
8389 value. expiry. gt( & Instant :: now( ) )
8490 } ) ;
@@ -94,9 +100,6 @@ impl Drop for StageDelegation {
94100}
95101
96102impl StageDelegation {
97- fn gc_ttl ( & self ) -> Duration {
98- self . wait_timeout * 2
99- }
100103 /// Puts the [StageContext] info so that an actor can pick it up with `wait_for_delegate_info`.
101104 ///
102105 /// - If the actor was already waiting for this info, it just puts it on the
@@ -126,20 +129,13 @@ impl StageDelegation {
126129 Entry :: Vacant ( entry) => {
127130 let ( tx, rx) = oneshot:: channel ( ) ;
128131 entry. insert ( Value {
129- // Use 2 * the waiter wait duration for now.
130132 expiry : Instant :: now ( ) . add ( self . gc_ttl ( ) ) ,
131133 value : Oneof :: Receiver ( rx) ,
132134 } ) ;
133135 tx
134136 }
135137 } ;
136138
137- // TODO: `send` does not wait for the other end of the channel to receive the message,
138- // so if nobody waits for it, we might leak an entry in `stage_targets` that will never
139- // be cleaned up. We can either:
140- // 1. schedule a cleanup task that iterates the entries cleaning up old ones
141- // 2. find some other API that allows us to .await until the other end receives the message,
142- // and on a timeout, cleanup the entry anyway.
143139 tx. send ( next_stage_context)
144140 . map_err ( |_| exec_datafusion_err ! ( "Could not send stage context info" ) )
145141 }
@@ -170,7 +166,6 @@ impl StageDelegation {
170166 Entry :: Vacant ( entry) => {
171167 let ( tx, rx) = oneshot:: channel ( ) ;
172168 entry. insert ( Value {
173- // Use 2 * the waiter wait duration for now.
174169 expiry : Instant :: now ( ) . add ( self . gc_ttl ( ) ) ,
175170 value : Oneof :: Sender ( tx) ,
176171 } ) ;
@@ -187,6 +182,12 @@ impl StageDelegation {
187182 )
188183 } )
189184 }
185+
186+ // gc_ttl is used to set the expiry of elements in the map. Use 2 * the waiter wait duration
187+ // to avoid running gc too early.
188+ fn gc_ttl ( & self ) -> Duration {
189+ self . wait_timeout * 2
190+ }
190191}
191192
192193struct Value {
@@ -384,15 +385,15 @@ mod tests {
384385 // Wait for expiry time to pass.
385386 tokio:: time:: sleep ( delegation. gc_ttl ( ) ) . await ;
386387
387- // Run GC to cleanup expired entries
388- let gc_task = tokio:: spawn ( run_gc_async (
388+ // Run GC to clean up expired entries
389+ let gc_task = tokio:: spawn ( run_gc (
389390 stage_targets. clone ( ) ,
390391 shutdown. clone ( ) ,
391392 Duration :: from_millis ( 5 ) ,
392393 ) ) ;
393394
394395 // Wait for GC to clear the map
395- for _ in 0 ..50 {
396+ for _ in 0 ..10 {
396397 tokio:: time:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
397398 if stage_targets. len ( ) == 0 {
398399 break ;
@@ -403,7 +404,7 @@ mod tests {
403404 drop ( delegation) ;
404405 gc_task. await . unwrap ( ) ;
405406
406- // After GC, map should be cleared
407+ // After GC, map should be cleared.
407408 assert_eq ! ( stage_targets. len( ) , 0 ) ;
408409 }
409410
@@ -431,21 +432,21 @@ mod tests {
431432 tokio:: time:: sleep ( delegation. gc_ttl ( ) ) . await ;
432433
433434 // Run GC to cleanup expired entries
434- let gc_task = tokio:: spawn ( run_gc_async (
435+ let gc_task = tokio:: spawn ( run_gc (
435436 stage_targets. clone ( ) ,
436437 shutdown. clone ( ) ,
437438 Duration :: from_millis ( 10 ) ,
438439 ) ) ;
439440
440441 // Wait for GC to clear the map
441- for _ in 0 ..50 {
442+ for _ in 0 ..10 {
442443 tokio:: time:: sleep ( Duration :: from_millis ( 20 ) ) . await ;
443444 if stage_targets. len ( ) == 0 {
444445 break ;
445446 }
446447 }
447448
448- // Stop GC
449+ // Stop GC.
449450 drop ( delegation) ;
450451 gc_task. await . unwrap ( ) ;
451452
0 commit comments