Skip to content

Conversation

HammadB
Copy link
Collaborator

@HammadB HammadB commented Oct 10, 2025

Description of changes

Summarize the changes made by this PR.

  • Improvements & Bug fixes
    • Fixes a memory leak of handles in the scheduler by adding a drop guard to handles and tracking them in a hashmap
  • New functionality
    • None

Test plan

How are these changes tested?
Added a test to ensure map is empty after schedule finished

  • Tests pass locally with pytest for python, yarn test for js, cargo test for rust

Migration plan

None required

Observability plan

None required

Documentation Changes

None required

@HammadB HammadB requested a review from rescrv October 10, 2025 18:48
Copy link

Reviewer Checklist

Please leverage this checklist to ensure your code review is thorough before approving

Testing, Bugs, Errors, Logs, Documentation

  • Can you think of any use case in which the code does not behave as intended? Have they been tested?
  • Can you think of any inputs or external events that could break the code? Is user input validated and safe? Have they been tested?
  • If appropriate, are there adequate property based tests?
  • If appropriate, are there adequate unit tests?
  • Should any logging, debugging, tracing information be added or removed?
  • Are error messages user-friendly?
  • Have all documentation changes needed been made?
  • Have all non-obvious changes been commented?

System Compatibility

  • Are there any potential impacts on other parts of the system or backward compatibility?
  • Does this change intersect with any items on our roadmap, and if so, is there a plan for fitting them together?

Quality

  • Is this code of a unexpectedly high quality (Readability, Modularity, Intuitiveness)

Copy link
Contributor

propel-code-bot bot commented Oct 10, 2025

Fix Memory Leak in Scheduler by Tracking Task Handles and Adding Cleanup Guard

This pull request addresses a memory leak in the scheduler module by overhauling how task handles are managed. The implementation transitions from storing a Vec<SchedulerTaskHandle> to a HashMap<TaskId, SchedulerTaskHandle>, introduces unique TaskId assignment, and adds a HandleGuard with a Drop implementation to automatically clean up scheduler handles when tasks complete. Associated test cases have been adjusted and expanded to assert that all handles are correctly removed following scheduled tasks, ensuring no residual handles remain after task completion.

Key Changes

• Replaced Vec<SchedulerTaskHandle> with HashMap<TaskId, SchedulerTaskHandle> for task tracking in Scheduler
• Implemented TaskId allocation with AtomicU64 for uniquely identifying each scheduled task
• Added HandleGuard struct with a Drop implementation to remove task handles from handles HashMap when tasks complete
• Updated handle insertion/removal logic throughout scheduling and interval scheduling functions to utilize new HashMap approach
• Tests refactored: added checks to verify the handles map is empty after task completion, new test to detect handle leakage
• Made structural and idiomatic improvements, such as custom Debug for SchedulerTaskHandle, and improved encapsulation per review comments

Affected Areas

rust/system/src/scheduler.rs (scheduler implementation, handle management, and tests)

This summary was automatically generated by @propel-code-bot

Comment on lines +337 to +350
async fn test_handle_cleaned_up() {
let system = System::new();
let counter = Arc::new(AtomicUsize::new(0));
let handles_empty_after = Arc::new(AtomicBool::new(false));
let component = OneMessageComponent::new(10, counter.clone(), handles_empty_after.clone());
let _handle = system.start_component(component);
// Wait for the 100ms schedule to trigger
tokio::time::sleep(Duration::from_millis(500)).await;
// yield to allow the component to process the messages
tokio::task::yield_now().await;
assert!(handles_empty_after.load(Ordering::SeqCst));
// We should have scheduled the message once
system.join().await;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CriticalError]

There's a potential race condition in this test. The test verifies that the handles map is empty by checking a flag set from within the OneMessageComponent::handle method.

However, the handle method runs in the component's task, while the HandleGuard's drop implementation (which cleans up the map) runs at the end of the scheduler's spawned task. These are two different tasks, and their execution order isn't guaranteed. It's possible for handle to be called and check is_empty() before the scheduler task has finished and removed its handle from the map. This could lead to a flaky test.

A more robust approach would be to check the state of the scheduler's handles map directly from the test function. This provides a deterministic way to verify the state before and after the scheduled task has executed.

With the handles field made pub(crate) (as suggested in another comment), you could rewrite the test like this:

    #[tokio::test]
    async fn test_handle_cleaned_up() {
        let system = System::new();
        let counter = Arc::new(AtomicUsize::new(0));
        // The OneMessageComponent can be simplified to not need `handles_empty_after`
        let component = OneMessageComponent::new(10, counter.clone());
        let handle = system.start_component(component);

        // Allow on_start to run and schedule the task.
        tokio::task::yield_now().await;
        assert_eq!(handle.ctx.scheduler.handles.read().len(), 1, "Handle should be present after scheduling");

        // Wait for the schedule to trigger and the task to be cleaned up.
        tokio::time::sleep(Duration::from_millis(500)).await;
        
        assert!(handle.ctx.scheduler.handles.read().is_empty(), "Handles map should be empty after task completion");
        assert_eq!(counter.load(Ordering::SeqCst), 1, "Message should have been handled once");
        
        system.join().await;
    }

This would also allow simplifying OneMessageComponent by removing the handles_empty_after field and its related logic.

Context for Agents
[**CriticalError**]

There's a potential race condition in this test. The test verifies that the `handles` map is empty by checking a flag set from within the `OneMessageComponent::handle` method.

However, the `handle` method runs in the component's task, while the `HandleGuard`'s `drop` implementation (which cleans up the map) runs at the end of the scheduler's spawned task. These are two different tasks, and their execution order isn't guaranteed. It's possible for `handle` to be called and check `is_empty()` *before* the scheduler task has finished and removed its handle from the map. This could lead to a flaky test.

A more robust approach would be to check the state of the scheduler's `handles` map directly from the test function. This provides a deterministic way to verify the state before and after the scheduled task has executed.

With the `handles` field made `pub(crate)` (as suggested in another comment), you could rewrite the test like this:

```rust
    #[tokio::test]
    async fn test_handle_cleaned_up() {
        let system = System::new();
        let counter = Arc::new(AtomicUsize::new(0));
        // The OneMessageComponent can be simplified to not need `handles_empty_after`
        let component = OneMessageComponent::new(10, counter.clone());
        let handle = system.start_component(component);

        // Allow on_start to run and schedule the task.
        tokio::task::yield_now().await;
        assert_eq!(handle.ctx.scheduler.handles.read().len(), 1, "Handle should be present after scheduling");

        // Wait for the schedule to trigger and the task to be cleaned up.
        tokio::time::sleep(Duration::from_millis(500)).await;
        
        assert!(handle.ctx.scheduler.handles.read().is_empty(), "Handles map should be empty after task completion");
        assert_eq!(counter.load(Ordering::SeqCst), 1, "Message should have been handled once");
        
        system.join().await;
    }
```

This would also allow simplifying `OneMessageComponent` by removing the `handles_empty_after` field and its related logic.

File: rust/system/src/scheduler.rs
Line: 350

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it to you to decide if this is worth the lift.

@HammadB HammadB requested a review from sanketkedia October 10, 2025 19:10
@blacksmith-sh blacksmith-sh bot deleted a comment from HammadB Oct 10, 2025
HammadB and others added 2 commits October 10, 2025 12:14
Co-authored-by: propel-code-bot[bot] <203372662+propel-code-bot[bot]@users.noreply.github.com>
Co-authored-by: propel-code-bot[bot] <203372662+propel-code-bot[bot]@users.noreply.github.com>
Copy link
Contributor

@rescrv rescrv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left two comments. Neither needs to be addressed, but I'd address both if they were asked of me.

Comment on lines +337 to +350
async fn test_handle_cleaned_up() {
let system = System::new();
let counter = Arc::new(AtomicUsize::new(0));
let handles_empty_after = Arc::new(AtomicBool::new(false));
let component = OneMessageComponent::new(10, counter.clone(), handles_empty_after.clone());
let _handle = system.start_component(component);
// Wait for the 100ms schedule to trigger
tokio::time::sleep(Duration::from_millis(500)).await;
// yield to allow the component to process the messages
tokio::task::yield_now().await;
assert!(handles_empty_after.load(Ordering::SeqCst));
// We should have scheduled the message once
system.join().await;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it to you to decide if this is worth the lift.

#[derive(Clone, Debug)]
pub struct Scheduler {
handles: Arc<RwLock<Vec<SchedulerTaskHandle>>>,
handles: Arc<RwLock<HashMap<TaskId, SchedulerTaskHandle>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend not using RwLock here unless we have a heavy read path. An RwLock is typically more expensive per access if it's 100% write, and I only see read calls from tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah thats fine, can do, was just leaving it as it was before to minimize churn.

@HammadB HammadB merged commit 4212a46 into main Oct 16, 2025
58 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants