From 1f6fe3d8e917a5f57a740cb5cbbfd83ae6c4f08f Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Tue, 27 Jan 2026 08:52:54 +0000 Subject: [PATCH 1/6] Fix: Check for dataflow finish after stop_all for dynamic nodes When all nodes in a dataflow are dynamic and a stop signal is received, the daemon now properly checks if the dataflow should be marked as finished immediately after stop_all() completes. Previously, the finish check was only triggered by handle_node_stop(), which is only called when a SpawnedNodeResult event is received. Dynamic nodes don't send this event, so dataflows with only dynamic nodes would never finish after a stop signal. The fix adds check_dataflow_finished_after_stop() which is called after stop_all() in both the Ctrl+C handler and the coordinator stop event handler. This ensures that when all running nodes are dynamic and stop was sent, the dataflow is immediately marked as finished. Fixes #1333 Co-authored-by: Haixuan Xavier Tao --- binaries/daemon/src/lib.rs | 78 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 4f20c110f..4ecdac2e2 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -631,6 +631,8 @@ impl Daemon { &mut logger, ) .await?; + // After stop_all, check if dataflow should finish (for all-dynamic nodes case) + self.check_dataflow_finished_after_stop(dataflow_id).await?; } self.exit_when_all_finished = true; Ok(()) @@ -899,6 +901,8 @@ impl Daemon { if let Some(future) = future { future.await?; + // After stop_all, check if dataflow should finish (for all-dynamic nodes case) + self.check_dataflow_finished_after_stop(dataflow_id).await?; } RunStatus::Continue @@ -2150,6 +2154,80 @@ impl Daemon { Ok(()) } + /// Check if a dataflow should be marked as finished after stop_all() is called. + /// This is needed for dataflows with only dynamic nodes, which don't send + /// SpawnedNodeResult events and thus never trigger the normal finish check. + async fn check_dataflow_finished_after_stop(&mut self, dataflow_id: Uuid) -> eyre::Result<()> { + let mut logger = self.logger.for_dataflow(dataflow_id); + + // Check if dataflow still exists (might have been removed already) + let Some(dataflow) = self.running.get(&dataflow_id) else { + return Ok(()); + }; + + // Only finish if: + // 1. No pending nodes + // 2. All running nodes are dynamic (they won't send SpawnedNodeResult) + // 3. Stop was sent (stop_all() was called) + let should_finish = !dataflow.pending_nodes.local_nodes_pending() + && dataflow + .running_nodes + .iter() + .all(|(_id, n)| n.node_config.dynamic) + && dataflow.stop_sent; + + if should_finish { + let dataflow = self + .running + .get(&dataflow_id) + .context("dataflow disappeared during finish check")?; + + let result = DataflowDaemonResult { + timestamp: self.clock.new_timestamp(), + node_results: self + .dataflow_node_results + .get(&dataflow.id) + .context("failed to get dataflow node results")? + .clone(), + }; + + self.git_manager + .clones_in_use + .values_mut() + .for_each(|dataflows| { + dataflows.remove(&dataflow_id); + }); + + logger + .log( + LogLevel::Info, + None, + Some("daemon".into()), + format!("dataflow finished on machine `{}`", self.daemon_id), + ) + .await; + + if let Some(connection) = &mut self.coordinator_connection { + let msg = serde_json::to_vec(&Timestamped { + inner: CoordinatorRequest::Event { + daemon_id: self.daemon_id.clone(), + event: DaemonEvent::AllNodesFinished { + dataflow_id, + result, + }, + }, + timestamp: self.clock.new_timestamp(), + })?; + socket_stream_send(connection, &msg) + .await + .wrap_err("failed to report dataflow finish to dora-coordinator")?; + } + self.running.remove(&dataflow_id); + } + + Ok(()) + } + async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<()> { match event { DoraEvent::Timer { From 77f527cd38378d880d89e1c31f11a3e151260eba Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Tue, 27 Jan 2026 10:02:34 +0000 Subject: [PATCH 2/6] Fix borrow checker error in check_dataflow_finished_after_stop Remove redundant dataflow borrow that was causing compilation error. The should_finish check already borrows from self.running in a scoped manner, so we don't need to borrow it again before accessing self fields. Co-authored-by: Haixuan Xavier Tao --- binaries/daemon/src/lib.rs | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 4ecdac2e2..87303f99d 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -2160,33 +2160,28 @@ impl Daemon { async fn check_dataflow_finished_after_stop(&mut self, dataflow_id: Uuid) -> eyre::Result<()> { let mut logger = self.logger.for_dataflow(dataflow_id); - // Check if dataflow still exists (might have been removed already) - let Some(dataflow) = self.running.get(&dataflow_id) else { + // Check if dataflow should finish + let should_finish = if let Some(dataflow) = self.running.get(&dataflow_id) { + // Only finish if: + // 1. No pending nodes + // 2. All running nodes are dynamic (they won't send SpawnedNodeResult) + // 3. Stop was sent (stop_all() was called) + !dataflow.pending_nodes.local_nodes_pending() + && dataflow + .running_nodes + .iter() + .all(|(_id, n)| n.node_config.dynamic) + && dataflow.stop_sent + } else { return Ok(()); }; - // Only finish if: - // 1. No pending nodes - // 2. All running nodes are dynamic (they won't send SpawnedNodeResult) - // 3. Stop was sent (stop_all() was called) - let should_finish = !dataflow.pending_nodes.local_nodes_pending() - && dataflow - .running_nodes - .iter() - .all(|(_id, n)| n.node_config.dynamic) - && dataflow.stop_sent; - if should_finish { - let dataflow = self - .running - .get(&dataflow_id) - .context("dataflow disappeared during finish check")?; - let result = DataflowDaemonResult { timestamp: self.clock.new_timestamp(), node_results: self .dataflow_node_results - .get(&dataflow.id) + .get(&dataflow_id) .context("failed to get dataflow node results")? .clone(), }; From cf98562e78378abb1730970e30cf5cb377570d48 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Tue, 27 Jan 2026 10:06:38 +0000 Subject: [PATCH 3/6] Fix borrow checker error with explicit scope block Wrap the dataflow check in an explicit block scope to ensure the immutable borrow from self.running.get() is dropped before we access other self fields mutably. Co-authored-by: Haixuan Xavier Tao --- binaries/daemon/src/lib.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 87303f99d..b3b8838c4 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -2161,19 +2161,21 @@ impl Daemon { let mut logger = self.logger.for_dataflow(dataflow_id); // Check if dataflow should finish - let should_finish = if let Some(dataflow) = self.running.get(&dataflow_id) { - // Only finish if: - // 1. No pending nodes - // 2. All running nodes are dynamic (they won't send SpawnedNodeResult) - // 3. Stop was sent (stop_all() was called) - !dataflow.pending_nodes.local_nodes_pending() - && dataflow - .running_nodes - .iter() - .all(|(_id, n)| n.node_config.dynamic) - && dataflow.stop_sent - } else { - return Ok(()); + let should_finish = { + if let Some(dataflow) = self.running.get(&dataflow_id) { + // Only finish if: + // 1. No pending nodes + // 2. All running nodes are dynamic (they won't send SpawnedNodeResult) + // 3. Stop was sent (stop_all() was called) + !dataflow.pending_nodes.local_nodes_pending() + && dataflow + .running_nodes + .iter() + .all(|(_id, n)| n.node_config.dynamic) + && dataflow.stop_sent + } else { + return Ok(()); + } }; if should_finish { From 12b9258f6497f090f76fa70be50490405cde3475 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Tue, 27 Jan 2026 11:54:17 +0100 Subject: [PATCH 4/6] Fix the compile issue and result --- binaries/daemon/src/lib.rs | 80 +++++++++++++++++++++++++------------- 1 file changed, 52 insertions(+), 28 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index b3b8838c4..afa8e359d 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -272,6 +272,7 @@ impl Daemon { ReceiverStream::new(tokio::sync::mpsc::channel(1).1) }; + let all_nodes_dynamic = spawn_command.nodes.values().all(|n| n.kind.dynamic()); let exit_when_done = spawn_command .nodes .values() @@ -329,12 +330,22 @@ impl Daemon { let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?; + let node_results = match dataflow_results.remove(&dataflow_id) { + Some(results) => results, + None if all_nodes_dynamic => { + // All nodes are dynamic - they don't send SpawnedNodeResult events, + // so there are no node results to report. This is expected and means success. + BTreeMap::new() + } + None => { + return Err(eyre::eyre!("no node results for dataflow_id {dataflow_id}")); + } + }; + Ok(DataflowResult { uuid: dataflow_id, timestamp: clock.new_timestamp(), - node_results: dataflow_results - .remove(&dataflow_id) - .context("no node results for dataflow_id")?, + node_results, }) } @@ -876,32 +887,42 @@ impl Daemon { grace_duration, force, } => { - let mut logger = self.logger.for_dataflow(dataflow_id); - let dataflow = self - .running - .get_mut(&dataflow_id) - .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`")); - let (reply, future) = match dataflow { - Ok(dataflow) => { - let future = dataflow.stop_all( - &mut self.coordinator_connection, - &self.clock, - grace_duration, - force, - &mut logger, - ); - (Ok(()), Some(future)) + let stop_succeeded = { + let mut logger = self.logger.for_dataflow(dataflow_id); + let dataflow = self + .running + .get_mut(&dataflow_id) + .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`")); + let (reply, future) = match dataflow { + Ok(dataflow) => { + let future = dataflow.stop_all( + &mut self.coordinator_connection, + &self.clock, + grace_duration, + force, + &mut logger, + ); + (Ok(()), Some(future)) + } + Err(err) => (Err(err.to_string()), None), + }; + + let _ = reply_tx + .send(Some(DaemonCoordinatorReply::StopResult(reply))) + .map_err(|_| { + error!("could not send stop reply from daemon to coordinator") + }); + + if let Some(future) = future { + future.await?; + true + } else { + false } - Err(err) => (Err(err.to_string()), None), }; - let _ = reply_tx - .send(Some(DaemonCoordinatorReply::StopResult(reply))) - .map_err(|_| error!("could not send stop reply from daemon to coordinator")); - - if let Some(future) = future { - future.await?; - // After stop_all, check if dataflow should finish (for all-dynamic nodes case) + // After stop_all, check if dataflow should finish (for all-dynamic nodes case) + if stop_succeeded { self.check_dataflow_finished_after_stop(dataflow_id).await?; } @@ -2179,13 +2200,16 @@ impl Daemon { }; if should_finish { + // We verified above that all running nodes are dynamic. Dynamic nodes don't + // send SpawnedNodeResult events, so there may be no entry in dataflow_node_results. + // An empty map means all dynamic nodes handled stop successfully. let result = DataflowDaemonResult { timestamp: self.clock.new_timestamp(), node_results: self .dataflow_node_results .get(&dataflow_id) - .context("failed to get dataflow node results")? - .clone(), + .cloned() + .unwrap_or_default(), }; self.git_manager From ed761d508e236eb7179bf167b21ca4dbbc258e0f Mon Sep 17 00:00:00 2001 From: haixuantao Date: Wed, 28 Jan 2026 15:02:25 +0100 Subject: [PATCH 5/6] Refactor: Extract dataflow finish logic and add FinishDataflowWhen enum Address PR review feedback from @phil-opp: 1. Add `#[must_use]` enum `FinishDataflowWhen` to indicate whether a dataflow should be finished immediately or wait for nodes 2. Add `should_finish_immediately()` method on `RunningDataflow` to avoid extra lookup in `self.running` 3. Modify `stop_all()` to return `FinishDataflowWhen` instead of `()` 4. Extract duplicated cleanup code into shared `finish_dataflow()` method on `Daemon` 5. Update callers to handle the return value properly This ensures we don't keep two cleanup routines in sync and the compiler will remind us to handle the result via `#[must_use]`. Co-Authored-By: Claude Opus 4.5 --- binaries/daemon/src/lib.rs | 220 +++++++++++++++++-------------------- 1 file changed, 99 insertions(+), 121 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index afa8e359d..bb2a5261d 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -887,7 +887,7 @@ impl Daemon { grace_duration, force, } => { - let stop_succeeded = { + let finish_when = { let mut logger = self.logger.for_dataflow(dataflow_id); let dataflow = self .running @@ -914,16 +914,15 @@ impl Daemon { }); if let Some(future) = future { - future.await?; - true + Some(future.await?) } else { - false + None } }; - // After stop_all, check if dataflow should finish (for all-dynamic nodes case) - if stop_succeeded { - self.check_dataflow_finished_after_stop(dataflow_id).await?; + // If stop_all returns Now, finish the dataflow immediately + if matches!(finish_when, Some(FinishDataflowWhen::Now)) { + self.finish_dataflow(dataflow_id).await?; } RunStatus::Continue @@ -2119,132 +2118,78 @@ impl Daemon { self.handle_outputs_done(dataflow_id, node_id, might_restart) .await?; - let mut logger = self.logger.for_dataflow(dataflow_id); - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { - format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") - })?; - dataflow.running_nodes.remove(node_id); - if !dataflow.pending_nodes.local_nodes_pending() - && dataflow - .running_nodes - .iter() - .all(|(_id, n)| n.node_config.dynamic) - { - let result = DataflowDaemonResult { - timestamp: self.clock.new_timestamp(), - node_results: self - .dataflow_node_results - .get(&dataflow.id) - .context("failed to get dataflow node results")? - .clone(), - }; - - self.git_manager - .clones_in_use - .values_mut() - .for_each(|dataflows| { - dataflows.remove(&dataflow_id); - }); - - logger - .log( - LogLevel::Info, - None, - Some("daemon".into()), - format!("dataflow finished on machine `{}`", self.daemon_id), + let should_finish = { + let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { + format!( + "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`" ) - .await; - if let Some(connection) = &mut self.coordinator_connection { - let msg = serde_json::to_vec(&Timestamped { - inner: CoordinatorRequest::Event { - daemon_id: self.daemon_id.clone(), - event: DaemonEvent::AllNodesFinished { - dataflow_id, - result, - }, - }, - timestamp: self.clock.new_timestamp(), - })?; - socket_stream_send(connection, &msg) - .await - .wrap_err("failed to report dataflow finish to dora-coordinator")?; - } - self.running.remove(&dataflow_id); + })?; + dataflow.running_nodes.remove(node_id); + // Check if all remaining nodes are dynamic (won't send SpawnedNodeResult) + !dataflow.pending_nodes.local_nodes_pending() + && dataflow + .running_nodes + .iter() + .all(|(_id, n)| n.node_config.dynamic) + }; + + if should_finish { + self.finish_dataflow(dataflow_id).await?; } Ok(()) } - /// Check if a dataflow should be marked as finished after stop_all() is called. - /// This is needed for dataflows with only dynamic nodes, which don't send - /// SpawnedNodeResult events and thus never trigger the normal finish check. - async fn check_dataflow_finished_after_stop(&mut self, dataflow_id: Uuid) -> eyre::Result<()> { + /// Mark a dataflow as finished and perform cleanup. + /// This should be called when: + /// - `stop_all()` returns `FinishDataflowWhen::Now`, or + /// - All non-dynamic nodes have sent `SpawnedNodeResult` events + async fn finish_dataflow(&mut self, dataflow_id: Uuid) -> eyre::Result<()> { let mut logger = self.logger.for_dataflow(dataflow_id); - // Check if dataflow should finish - let should_finish = { - if let Some(dataflow) = self.running.get(&dataflow_id) { - // Only finish if: - // 1. No pending nodes - // 2. All running nodes are dynamic (they won't send SpawnedNodeResult) - // 3. Stop was sent (stop_all() was called) - !dataflow.pending_nodes.local_nodes_pending() - && dataflow - .running_nodes - .iter() - .all(|(_id, n)| n.node_config.dynamic) - && dataflow.stop_sent - } else { - return Ok(()); - } + // Dynamic nodes don't send SpawnedNodeResult events, so there may be no entry + // in dataflow_node_results. An empty map means all dynamic nodes handled stop successfully. + let result = DataflowDaemonResult { + timestamp: self.clock.new_timestamp(), + node_results: self + .dataflow_node_results + .get(&dataflow_id) + .cloned() + .unwrap_or_default(), }; - if should_finish { - // We verified above that all running nodes are dynamic. Dynamic nodes don't - // send SpawnedNodeResult events, so there may be no entry in dataflow_node_results. - // An empty map means all dynamic nodes handled stop successfully. - let result = DataflowDaemonResult { - timestamp: self.clock.new_timestamp(), - node_results: self - .dataflow_node_results - .get(&dataflow_id) - .cloned() - .unwrap_or_default(), - }; - - self.git_manager - .clones_in_use - .values_mut() - .for_each(|dataflows| { - dataflows.remove(&dataflow_id); - }); + self.git_manager + .clones_in_use + .values_mut() + .for_each(|dataflows| { + dataflows.remove(&dataflow_id); + }); - logger - .log( - LogLevel::Info, - None, - Some("daemon".into()), - format!("dataflow finished on machine `{}`", self.daemon_id), - ) - .await; + logger + .log( + LogLevel::Info, + None, + Some("daemon".into()), + format!("dataflow finished on machine `{}`", self.daemon_id), + ) + .await; - if let Some(connection) = &mut self.coordinator_connection { - let msg = serde_json::to_vec(&Timestamped { - inner: CoordinatorRequest::Event { - daemon_id: self.daemon_id.clone(), - event: DaemonEvent::AllNodesFinished { - dataflow_id, - result, - }, + if let Some(connection) = &mut self.coordinator_connection { + let msg = serde_json::to_vec(&Timestamped { + inner: CoordinatorRequest::Event { + daemon_id: self.daemon_id.clone(), + event: DaemonEvent::AllNodesFinished { + dataflow_id, + result, }, - timestamp: self.clock.new_timestamp(), - })?; - socket_stream_send(connection, &msg) - .await - .wrap_err("failed to report dataflow finish to dora-coordinator")?; - } - self.running.remove(&dataflow_id); + }, + timestamp: self.clock.new_timestamp(), + })?; + socket_stream_send(connection, &msg) + .await + .wrap_err("failed to report dataflow finish to dora-coordinator")?; } + self.running.remove(&dataflow_id); Ok(()) } @@ -2833,6 +2778,16 @@ pub struct RunningDataflow { publish_all_messages_to_zenoh: bool, } +/// Indicates whether a dataflow should be finished immediately after stop_all() +/// or whether to wait for SpawnedNodeResult events from running nodes. +#[must_use] +pub enum FinishDataflowWhen { + /// Finish the dataflow immediately (all nodes are dynamic or no nodes running) + Now, + /// Wait for SpawnedNodeResult events from non-dynamic nodes + WaitForNodes, +} + impl RunningDataflow { fn new( dataflow_id: Uuid, @@ -2931,7 +2886,7 @@ impl RunningDataflow { grace_duration: Option, force: bool, logger: &mut DataflowLogger<'_>, - ) -> eyre::Result<()> { + ) -> eyre::Result { self.pending_nodes .handle_dataflow_stop( coordinator_connection, @@ -2992,7 +2947,30 @@ impl RunningDataflow { }); } self.stop_sent = true; - Ok(()) + + // Determine if we should finish immediately or wait for nodes + Ok(self.should_finish_immediately()) + } + + /// Check if dataflow should finish immediately after stop_all(). + /// Returns `Now` if all running nodes are dynamic (they won't send SpawnedNodeResult). + /// Returns `WaitForNodes` if there are non-dynamic nodes to wait for. + fn should_finish_immediately(&self) -> FinishDataflowWhen { + // Only finish immediately if: + // 1. No pending nodes + // 2. All running nodes are dynamic (they won't send SpawnedNodeResult) + // 3. Stop was sent (stop_all() was called) + if !self.pending_nodes.local_nodes_pending() + && self + .running_nodes + .iter() + .all(|(_id, n)| n.node_config.dynamic) + && self.stop_sent + { + FinishDataflowWhen::Now + } else { + FinishDataflowWhen::WaitForNodes + } } fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet { From d33bb22169c08033b1cf0405a62358a1f459605f Mon Sep 17 00:00:00 2001 From: haixuantao Date: Wed, 28 Jan 2026 15:48:28 +0100 Subject: [PATCH 6/6] fix merge error --- binaries/daemon/src/lib.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index bb2a5261d..f59f85cb2 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -631,9 +631,12 @@ impl Daemon { } async fn trigger_manual_stop(&mut self) -> eyre::Result<()> { + // Collect dataflow IDs that need immediate finishing + let mut dataflows_to_finish = Vec::new(); + for dataflow in self.running.values_mut() { let mut logger = self.logger.for_dataflow(dataflow.id); - dataflow + let finish_when = dataflow .stop_all( &mut self.coordinator_connection, &self.clock, @@ -642,9 +645,18 @@ impl Daemon { &mut logger, ) .await?; - // After stop_all, check if dataflow should finish (for all-dynamic nodes case) - self.check_dataflow_finished_after_stop(dataflow_id).await?; + + // If stop_all returns Now, we need to finish this dataflow + if matches!(finish_when, FinishDataflowWhen::Now) { + dataflows_to_finish.push(dataflow.id); + } } + + // Finish dataflows after the loop to avoid borrow checker issues + for dataflow_id in dataflows_to_finish { + self.finish_dataflow(dataflow_id).await?; + } + self.exit_when_all_finished = true; Ok(()) }