Skip to content

fix(daemon): abort shmem listener tasks on dataflow finish#1379

Open
suhr25 wants to merge 2 commits intodora-rs:mainfrom
suhr25:fix/shmem-listener-task-leak
Open

fix(daemon): abort shmem listener tasks on dataflow finish#1379
suhr25 wants to merge 2 commits intodora-rs:mainfrom
suhr25:fix/shmem-listener-task-leak

Conversation

@suhr25
Copy link
Contributor

@suhr25 suhr25 commented Feb 26, 2026

Summary

This PR fixes a critical resource leak in the shared-memory (shmem) listener path where crashed nodes could permanently exhaust Tokio’s blocking thread pool.

Previously, the shmem branch in spawn_listener_loop() spawned four fire-and-forget listener tasks and returned None, meaning they were never tracked or cancelled:

tokio::spawn(shmem::listener_loop(...));
...
Ok((..., None)) // no AbortHandle

Each of these tasks internally used spawn_blocking() and called:

event.wait(Timeout::Infinite);

If a node crashed (e.g. SIGKILL, OOM), the disconnect signal was never sent, causing:

  • Blocking threads to hang indefinitely
  • Async tasks to never complete
  • +4 leaked threads per crash, leading to eventual Tokio thread pool exhaustion

Fix

This PR introduces three key changes to make the shmem listener robust and cancellable:

1. Bounded polling instead of infinite blocking

Replaced infinite wait with a short polling interval:

const LISTEN_POLL_INTERVAL: Duration = Duration::from_millis(50);
let result = self.channel.receive(Some(Self::LISTEN_POLL_INTERVAL));

This ensures the listener periodically wakes up and can react to cancellation or disconnection.


2. Retry loop + cooperative shutdown in blocking thread

The spawn_blocking loop now:

  • Retries on timeout instead of blocking forever
  • Exits cleanly when the async side is cancelled
let result = loop {
    match server.listen() {
        Ok(v) => break Ok(v),
        Err(_) => {
            if rx.is_disconnected() {
                return; // async side dropped → exit thread
            }
        }
    }
};

3. Single task + AbortHandle for all shmem listeners

Instead of 4 untracked spawns, all listeners are now grouped under a single parent task, and its AbortHandle is returned:

let handle = tokio::spawn(async move {
    tokio::join!(
        shmem::listener_loop(...),
        shmem::listener_loop(...),
        shmem::listener_loop(...),
        shmem::listener_loop(...),
    );
});
let abort_handle = handle.abort_handle();

Returned as:

Ok((DaemonCommunication::Shmem { ... }, Some(abort_handle)))

This integrates with existing _listener_tasks cleanup logic and ensures proper shutdown.


Verification

  • Crash scenario tested: simulating node crashes (kill -9)

    • Blocking threads no longer accumulate
    • Listener tasks terminate after cancellation
  • Tokio behavior

    • spawn_blocking threads exit once channel is dropped
    • No indefinite waits observed
  • Functional validation

    • Shmem communication continues to work normally
    • No regressions in TCP/Unix paths
  • Expected outcome

    • Blocking thread count returns to baseline after dataflow completion
    • No memory/thread growth across repeated runs
    • Daemon remains stable under repeated node crashes

Each shmem node spawned 4 fire-and-forget tasks (one per shmem channel)
with no abort handles, so they were never cleaned up when a dataflow
finished.  Worse, each task internally called spawn_blocking which
blocked forever in pthread_cond_wait(INFINITE) if the node process
crashed (SIGKILL/OOM) without signalling the disconnect event.  With
enough node crashes Tokio's blocking thread pool (default 512 threads)
exhausted, deadlocking the daemon.

Fix in three parts:
- Wrap the 4 shmem listener_loop spawns in a single parent task via
  tokio::join! and return its abort_handle so RunningDataflow picks it
  up via the existing _listener_tasks mechanism.
- Change ShmemServer::listen() to poll with a 50 ms timeout instead of
  blocking indefinitely, so each call returns Err on timeout.
- In the spawn_blocking loop, retry listen() on Err and check
  rx.is_disconnected() between retries; when the parent task is
  aborted the flume Sender drops, is_disconnected() returns true, and
  the blocking thread exits cleanly within one poll interval.

Signed-off-by: suhr25 <suhridmarwah07@gmail.com>
@suhr25
Copy link
Contributor Author

suhr25 commented Feb 26, 2026

Hi @phil-opp,
This PR fixes the shmem listener task leak by ensuring spawned blocking tasks are properly tracked and aborted on dataflow shutdown.
Thanks!

@suhr25
Copy link
Contributor Author

suhr25 commented Mar 5, 2026

Hi @phil-opp,
Kindly following up on this PR, please review it whenever you get time.
Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant