Skip to content

Conversation

@jayshrivastava
Copy link
Collaborator

@jayshrivastava jayshrivastava commented Aug 1, 2025

This change addresses a TODO in the StageDelegation struct. Previously, it was possible for StageContexts to leak if a context was produced and not consumed (or if a waiter created a map entry and timed out without any corresponding producer call). This change introduces a GC task to clear contexts from the map if they have been there for longer than a certain TTL. The TTL proposed is 2 * wait_timeout.

When the StageDelegation is dropped, the GC task is terminated.

Testing

  • Adds two unit tests (one for add_delegate_info and one for wait_for_delegate_info) which test that abandoned map entries are garbage collected appropriately.

@jayshrivastava jayshrivastava changed the title asdf delegation: add gc task to clean up abandoned contexts Aug 1, 2025
@jayshrivastava jayshrivastava marked this pull request as ready for review August 1, 2025 22:56
This change addresses a TODO in the `StageDelegation` struct. Previously, it was possible for `StageContext`s to leak if a context was produced and not consumed (or if a waiter created a map entry and timed out without any corresponding producer call). This change introduces a GC task to clear contexts from the map if they have been there for longer than a certain TTL. The TTL proposed is 2 * `wait_timeout`.

When the `StageDelegation` is dropped, the GC task is terminated.

Testing
- Adds two unit tests (one for `add_delegate_info` and one for `wait_for_delegate_info`) which test that abandoned map entries are garbage collected appropriately.
Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind that, as discussed in our private channels, the full delegation system is going to go away, so unfortunately this code will probably not reach the main branch.

However, this is a really good exercise, as this pattern of garbage collecting abandoned tasks is going to appear more times in different places across the project, so it's going to be very important to nail it down and find a solution as performant and robust as possible to this.

wait_timeout: Duration,

/// notify is used to shut down the garbage collection task when the StageDelegation is dropped.
notify: Arc<Notify>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no need for a Notify here, there's already tooling available in the tokio and DataFusion ecosystem for cancelling tasks on Drop, for example you could simply do

pub struct StageDelegation {
    stage_targets: Arc<DashMap<(String, usize), Value>>,
    wait_timeout: Duration,
    /// notify is used to shut down the garbage collection task when the StageDelegation is dropped.
    _task: SpawnedTask<()>,
}

impl Default for StageDelegation {
    fn default() -> Self {
        let stage_targets = Arc::new(DashMap::default());
        Self {
            stage_targets: stage_targets.clone(),
            wait_timeout: Duration::from_secs(5),
            _task: SpawnedTask::spawn(run_gc(
                stage_targets.clone(),
                Duration::from_secs(30), /* gc period */
            )),
        }
    }
}

// run_gc will continuously clear expired entries from the map, checking every `period`. The
// function terminates if `shutdown` is signalled.
async fn run_gc(stage_targets: Arc<DashMap<(String, usize), Value>>, period: Duration) {
    loop {
        tokio::time::sleep(period).await;
        stage_targets.retain(|_key, value| value.expiry.gt(&Instant::now()));
    }
}

And it should work the same

}
}

const GC_PERIOD_SECONDS: usize = 30;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this const is unused?

Comment on lines +87 to +90
// Performance: This iterator is sharded, so it won't lock the whole map.
stage_targets.retain(|_key, value| {
value.expiry.gt(&Instant::now())
});
Copy link
Collaborator

@gabotechs gabotechs Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.retain() iterates the whole map, so it do will end up locking the whole map unfortunately.

I think there should be ways of just garbage collecting abandoned tasks without iterating the full map, maybe spawning a task with a big tokio::sleep() at the beginning for each for each added entry that gets either cancelled due to a drop (the task was not abandoned) or completely executed included the tokio::sleep (the task was abandoned)

Copy link
Collaborator

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, Jay. Big first step to add GC. Hopefully the time waiting for the new delegation in place, you will have sometime to polish this work.

@jayshrivastava
Copy link
Collaborator Author

Closing in favor of #92

@jayshrivastava jayshrivastava deleted the js/delegation-gc branch October 17, 2025 13:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants