Skip to content

Commit dd7c6bd

Browse files
authored
Fix: Check for dataflow finish after stop_all for dynamic nodes (#1334)
When all nodes in a dataflow are dynamic and a stop signal is received, the daemon now properly checks if the dataflow should be marked as finished immediately after stop_all() completes. Previously, the finish check was only triggered by handle_node_stop(), which is only called when a SpawnedNodeResult event is received. Dynamic nodes don't send this event, so dataflows with only dynamic nodes would never finish after a stop signal. The fix adds check_dataflow_finished_after_stop() which is called after stop_all() in both the Ctrl+C handler and the coordinator stop event handler. This ensures that when all running nodes are dynamic and stop was sent, the dataflow is immediately marked as finished. Fixes #1333 Generated with [Claude Code](https://claude.ai/code)
2 parents 35c8d1c + d33bb22 commit dd7c6bd

File tree

1 file changed

+164
-75
lines changed

1 file changed

+164
-75
lines changed

binaries/daemon/src/lib.rs

Lines changed: 164 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ impl Daemon {
272272
ReceiverStream::new(tokio::sync::mpsc::channel(1).1)
273273
};
274274

275+
let all_nodes_dynamic = spawn_command.nodes.values().all(|n| n.kind.dynamic());
275276
let exit_when_done = spawn_command
276277
.nodes
277278
.values()
@@ -329,12 +330,22 @@ impl Daemon {
329330

330331
let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;
331332

333+
let node_results = match dataflow_results.remove(&dataflow_id) {
334+
Some(results) => results,
335+
None if all_nodes_dynamic => {
336+
// All nodes are dynamic - they don't send SpawnedNodeResult events,
337+
// so there are no node results to report. This is expected and means success.
338+
BTreeMap::new()
339+
}
340+
None => {
341+
return Err(eyre::eyre!("no node results for dataflow_id {dataflow_id}"));
342+
}
343+
};
344+
332345
Ok(DataflowResult {
333346
uuid: dataflow_id,
334347
timestamp: clock.new_timestamp(),
335-
node_results: dataflow_results
336-
.remove(&dataflow_id)
337-
.context("no node results for dataflow_id")?,
348+
node_results,
338349
})
339350
}
340351

@@ -620,9 +631,12 @@ impl Daemon {
620631
}
621632

622633
async fn trigger_manual_stop(&mut self) -> eyre::Result<()> {
634+
// Collect dataflow IDs that need immediate finishing
635+
let mut dataflows_to_finish = Vec::new();
636+
623637
for dataflow in self.running.values_mut() {
624638
let mut logger = self.logger.for_dataflow(dataflow.id);
625-
dataflow
639+
let finish_when = dataflow
626640
.stop_all(
627641
&mut self.coordinator_connection,
628642
&self.clock,
@@ -631,7 +645,18 @@ impl Daemon {
631645
&mut logger,
632646
)
633647
.await?;
648+
649+
// If stop_all returns Now, we need to finish this dataflow
650+
if matches!(finish_when, FinishDataflowWhen::Now) {
651+
dataflows_to_finish.push(dataflow.id);
652+
}
634653
}
654+
655+
// Finish dataflows after the loop to avoid borrow checker issues
656+
for dataflow_id in dataflows_to_finish {
657+
self.finish_dataflow(dataflow_id).await?;
658+
}
659+
635660
self.exit_when_all_finished = true;
636661
Ok(())
637662
}
@@ -874,31 +899,42 @@ impl Daemon {
874899
grace_duration,
875900
force,
876901
} => {
877-
let mut logger = self.logger.for_dataflow(dataflow_id);
878-
let dataflow = self
879-
.running
880-
.get_mut(&dataflow_id)
881-
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
882-
let (reply, future) = match dataflow {
883-
Ok(dataflow) => {
884-
let future = dataflow.stop_all(
885-
&mut self.coordinator_connection,
886-
&self.clock,
887-
grace_duration,
888-
force,
889-
&mut logger,
890-
);
891-
(Ok(()), Some(future))
902+
let finish_when = {
903+
let mut logger = self.logger.for_dataflow(dataflow_id);
904+
let dataflow = self
905+
.running
906+
.get_mut(&dataflow_id)
907+
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
908+
let (reply, future) = match dataflow {
909+
Ok(dataflow) => {
910+
let future = dataflow.stop_all(
911+
&mut self.coordinator_connection,
912+
&self.clock,
913+
grace_duration,
914+
force,
915+
&mut logger,
916+
);
917+
(Ok(()), Some(future))
918+
}
919+
Err(err) => (Err(err.to_string()), None),
920+
};
921+
922+
let _ = reply_tx
923+
.send(Some(DaemonCoordinatorReply::StopResult(reply)))
924+
.map_err(|_| {
925+
error!("could not send stop reply from daemon to coordinator")
926+
});
927+
928+
if let Some(future) = future {
929+
Some(future.await?)
930+
} else {
931+
None
892932
}
893-
Err(err) => (Err(err.to_string()), None),
894933
};
895934

896-
let _ = reply_tx
897-
.send(Some(DaemonCoordinatorReply::StopResult(reply)))
898-
.map_err(|_| error!("could not send stop reply from daemon to coordinator"));
899-
900-
if let Some(future) = future {
901-
future.await?;
935+
// If stop_all returns Now, finish the dataflow immediately
936+
if matches!(finish_when, Some(FinishDataflowWhen::Now)) {
937+
self.finish_dataflow(dataflow_id).await?;
902938
}
903939

904940
RunStatus::Continue
@@ -2094,58 +2130,78 @@ impl Daemon {
20942130
self.handle_outputs_done(dataflow_id, node_id, might_restart)
20952131
.await?;
20962132

2133+
let should_finish = {
2134+
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
2135+
format!(
2136+
"failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
2137+
)
2138+
})?;
2139+
dataflow.running_nodes.remove(node_id);
2140+
// Check if all remaining nodes are dynamic (won't send SpawnedNodeResult)
2141+
!dataflow.pending_nodes.local_nodes_pending()
2142+
&& dataflow
2143+
.running_nodes
2144+
.iter()
2145+
.all(|(_id, n)| n.node_config.dynamic)
2146+
};
2147+
2148+
if should_finish {
2149+
self.finish_dataflow(dataflow_id).await?;
2150+
}
2151+
2152+
Ok(())
2153+
}
2154+
2155+
/// Mark a dataflow as finished and perform cleanup.
2156+
/// This should be called when:
2157+
/// - `stop_all()` returns `FinishDataflowWhen::Now`, or
2158+
/// - All non-dynamic nodes have sent `SpawnedNodeResult` events
2159+
async fn finish_dataflow(&mut self, dataflow_id: Uuid) -> eyre::Result<()> {
20972160
let mut logger = self.logger.for_dataflow(dataflow_id);
2098-
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
2099-
format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
2100-
})?;
2101-
dataflow.running_nodes.remove(node_id);
2102-
if !dataflow.pending_nodes.local_nodes_pending()
2103-
&& dataflow
2104-
.running_nodes
2105-
.iter()
2106-
.all(|(_id, n)| n.node_config.dynamic)
2107-
{
2108-
let result = DataflowDaemonResult {
2109-
timestamp: self.clock.new_timestamp(),
2110-
node_results: self
2111-
.dataflow_node_results
2112-
.get(&dataflow.id)
2113-
.context("failed to get dataflow node results")?
2114-
.clone(),
2115-
};
21162161

2117-
self.git_manager
2118-
.clones_in_use
2119-
.values_mut()
2120-
.for_each(|dataflows| {
2121-
dataflows.remove(&dataflow_id);
2122-
});
2162+
// Dynamic nodes don't send SpawnedNodeResult events, so there may be no entry
2163+
// in dataflow_node_results. An empty map means all dynamic nodes handled stop successfully.
2164+
let result = DataflowDaemonResult {
2165+
timestamp: self.clock.new_timestamp(),
2166+
node_results: self
2167+
.dataflow_node_results
2168+
.get(&dataflow_id)
2169+
.cloned()
2170+
.unwrap_or_default(),
2171+
};
21232172

2124-
logger
2125-
.log(
2126-
LogLevel::Info,
2127-
None,
2128-
Some("daemon".into()),
2129-
format!("dataflow finished on machine `{}`", self.daemon_id),
2130-
)
2131-
.await;
2132-
if let Some(connection) = &mut self.coordinator_connection {
2133-
let msg = serde_json::to_vec(&Timestamped {
2134-
inner: CoordinatorRequest::Event {
2135-
daemon_id: self.daemon_id.clone(),
2136-
event: DaemonEvent::AllNodesFinished {
2137-
dataflow_id,
2138-
result,
2139-
},
2173+
self.git_manager
2174+
.clones_in_use
2175+
.values_mut()
2176+
.for_each(|dataflows| {
2177+
dataflows.remove(&dataflow_id);
2178+
});
2179+
2180+
logger
2181+
.log(
2182+
LogLevel::Info,
2183+
None,
2184+
Some("daemon".into()),
2185+
format!("dataflow finished on machine `{}`", self.daemon_id),
2186+
)
2187+
.await;
2188+
2189+
if let Some(connection) = &mut self.coordinator_connection {
2190+
let msg = serde_json::to_vec(&Timestamped {
2191+
inner: CoordinatorRequest::Event {
2192+
daemon_id: self.daemon_id.clone(),
2193+
event: DaemonEvent::AllNodesFinished {
2194+
dataflow_id,
2195+
result,
21402196
},
2141-
timestamp: self.clock.new_timestamp(),
2142-
})?;
2143-
socket_stream_send(connection, &msg)
2144-
.await
2145-
.wrap_err("failed to report dataflow finish to dora-coordinator")?;
2146-
}
2147-
self.running.remove(&dataflow_id);
2197+
},
2198+
timestamp: self.clock.new_timestamp(),
2199+
})?;
2200+
socket_stream_send(connection, &msg)
2201+
.await
2202+
.wrap_err("failed to report dataflow finish to dora-coordinator")?;
21482203
}
2204+
self.running.remove(&dataflow_id);
21492205

21502206
Ok(())
21512207
}
@@ -2734,6 +2790,16 @@ pub struct RunningDataflow {
27342790
publish_all_messages_to_zenoh: bool,
27352791
}
27362792

2793+
/// Indicates whether a dataflow should be finished immediately after stop_all()
2794+
/// or whether to wait for SpawnedNodeResult events from running nodes.
2795+
#[must_use]
2796+
pub enum FinishDataflowWhen {
2797+
/// Finish the dataflow immediately (all nodes are dynamic or no nodes running)
2798+
Now,
2799+
/// Wait for SpawnedNodeResult events from non-dynamic nodes
2800+
WaitForNodes,
2801+
}
2802+
27372803
impl RunningDataflow {
27382804
fn new(
27392805
dataflow_id: Uuid,
@@ -2832,7 +2898,7 @@ impl RunningDataflow {
28322898
grace_duration: Option<Duration>,
28332899
force: bool,
28342900
logger: &mut DataflowLogger<'_>,
2835-
) -> eyre::Result<()> {
2901+
) -> eyre::Result<FinishDataflowWhen> {
28362902
self.pending_nodes
28372903
.handle_dataflow_stop(
28382904
coordinator_connection,
@@ -2893,7 +2959,30 @@ impl RunningDataflow {
28932959
});
28942960
}
28952961
self.stop_sent = true;
2896-
Ok(())
2962+
2963+
// Determine if we should finish immediately or wait for nodes
2964+
Ok(self.should_finish_immediately())
2965+
}
2966+
2967+
/// Check if dataflow should finish immediately after stop_all().
2968+
/// Returns `Now` if all running nodes are dynamic (they won't send SpawnedNodeResult).
2969+
/// Returns `WaitForNodes` if there are non-dynamic nodes to wait for.
2970+
fn should_finish_immediately(&self) -> FinishDataflowWhen {
2971+
// Only finish immediately if:
2972+
// 1. No pending nodes
2973+
// 2. All running nodes are dynamic (they won't send SpawnedNodeResult)
2974+
// 3. Stop was sent (stop_all() was called)
2975+
if !self.pending_nodes.local_nodes_pending()
2976+
&& self
2977+
.running_nodes
2978+
.iter()
2979+
.all(|(_id, n)| n.node_config.dynamic)
2980+
&& self.stop_sent
2981+
{
2982+
FinishDataflowWhen::Now
2983+
} else {
2984+
FinishDataflowWhen::WaitForNodes
2985+
}
28972986
}
28982987

28992988
fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {

0 commit comments

Comments
 (0)