diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 4f20c110f..f59f85cb2 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -272,6 +272,7 @@ impl Daemon { ReceiverStream::new(tokio::sync::mpsc::channel(1).1) }; + let all_nodes_dynamic = spawn_command.nodes.values().all(|n| n.kind.dynamic()); let exit_when_done = spawn_command .nodes .values() @@ -329,12 +330,22 @@ impl Daemon { let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?; + let node_results = match dataflow_results.remove(&dataflow_id) { + Some(results) => results, + None if all_nodes_dynamic => { + // All nodes are dynamic - they don't send SpawnedNodeResult events, + // so there are no node results to report. This is expected and means success. + BTreeMap::new() + } + None => { + return Err(eyre::eyre!("no node results for dataflow_id {dataflow_id}")); + } + }; + Ok(DataflowResult { uuid: dataflow_id, timestamp: clock.new_timestamp(), - node_results: dataflow_results - .remove(&dataflow_id) - .context("no node results for dataflow_id")?, + node_results, }) } @@ -620,9 +631,12 @@ impl Daemon { } async fn trigger_manual_stop(&mut self) -> eyre::Result<()> { + // Collect dataflow IDs that need immediate finishing + let mut dataflows_to_finish = Vec::new(); + for dataflow in self.running.values_mut() { let mut logger = self.logger.for_dataflow(dataflow.id); - dataflow + let finish_when = dataflow .stop_all( &mut self.coordinator_connection, &self.clock, @@ -631,7 +645,18 @@ impl Daemon { &mut logger, ) .await?; + + // If stop_all returns Now, we need to finish this dataflow + if matches!(finish_when, FinishDataflowWhen::Now) { + dataflows_to_finish.push(dataflow.id); + } } + + // Finish dataflows after the loop to avoid borrow checker issues + for dataflow_id in dataflows_to_finish { + self.finish_dataflow(dataflow_id).await?; + } + self.exit_when_all_finished = true; Ok(()) } @@ -874,31 +899,42 @@ impl Daemon { grace_duration, force, } => { - let mut logger = self.logger.for_dataflow(dataflow_id); - let dataflow = self - .running - .get_mut(&dataflow_id) - .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`")); - let (reply, future) = match dataflow { - Ok(dataflow) => { - let future = dataflow.stop_all( - &mut self.coordinator_connection, - &self.clock, - grace_duration, - force, - &mut logger, - ); - (Ok(()), Some(future)) + let finish_when = { + let mut logger = self.logger.for_dataflow(dataflow_id); + let dataflow = self + .running + .get_mut(&dataflow_id) + .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`")); + let (reply, future) = match dataflow { + Ok(dataflow) => { + let future = dataflow.stop_all( + &mut self.coordinator_connection, + &self.clock, + grace_duration, + force, + &mut logger, + ); + (Ok(()), Some(future)) + } + Err(err) => (Err(err.to_string()), None), + }; + + let _ = reply_tx + .send(Some(DaemonCoordinatorReply::StopResult(reply))) + .map_err(|_| { + error!("could not send stop reply from daemon to coordinator") + }); + + if let Some(future) = future { + Some(future.await?) + } else { + None } - Err(err) => (Err(err.to_string()), None), }; - let _ = reply_tx - .send(Some(DaemonCoordinatorReply::StopResult(reply))) - .map_err(|_| error!("could not send stop reply from daemon to coordinator")); - - if let Some(future) = future { - future.await?; + // If stop_all returns Now, finish the dataflow immediately + if matches!(finish_when, Some(FinishDataflowWhen::Now)) { + self.finish_dataflow(dataflow_id).await?; } RunStatus::Continue @@ -2094,58 +2130,78 @@ impl Daemon { self.handle_outputs_done(dataflow_id, node_id, might_restart) .await?; + let should_finish = { + let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { + format!( + "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`" + ) + })?; + dataflow.running_nodes.remove(node_id); + // Check if all remaining nodes are dynamic (won't send SpawnedNodeResult) + !dataflow.pending_nodes.local_nodes_pending() + && dataflow + .running_nodes + .iter() + .all(|(_id, n)| n.node_config.dynamic) + }; + + if should_finish { + self.finish_dataflow(dataflow_id).await?; + } + + Ok(()) + } + + /// Mark a dataflow as finished and perform cleanup. + /// This should be called when: + /// - `stop_all()` returns `FinishDataflowWhen::Now`, or + /// - All non-dynamic nodes have sent `SpawnedNodeResult` events + async fn finish_dataflow(&mut self, dataflow_id: Uuid) -> eyre::Result<()> { let mut logger = self.logger.for_dataflow(dataflow_id); - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { - format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") - })?; - dataflow.running_nodes.remove(node_id); - if !dataflow.pending_nodes.local_nodes_pending() - && dataflow - .running_nodes - .iter() - .all(|(_id, n)| n.node_config.dynamic) - { - let result = DataflowDaemonResult { - timestamp: self.clock.new_timestamp(), - node_results: self - .dataflow_node_results - .get(&dataflow.id) - .context("failed to get dataflow node results")? - .clone(), - }; - self.git_manager - .clones_in_use - .values_mut() - .for_each(|dataflows| { - dataflows.remove(&dataflow_id); - }); + // Dynamic nodes don't send SpawnedNodeResult events, so there may be no entry + // in dataflow_node_results. An empty map means all dynamic nodes handled stop successfully. + let result = DataflowDaemonResult { + timestamp: self.clock.new_timestamp(), + node_results: self + .dataflow_node_results + .get(&dataflow_id) + .cloned() + .unwrap_or_default(), + }; - logger - .log( - LogLevel::Info, - None, - Some("daemon".into()), - format!("dataflow finished on machine `{}`", self.daemon_id), - ) - .await; - if let Some(connection) = &mut self.coordinator_connection { - let msg = serde_json::to_vec(&Timestamped { - inner: CoordinatorRequest::Event { - daemon_id: self.daemon_id.clone(), - event: DaemonEvent::AllNodesFinished { - dataflow_id, - result, - }, + self.git_manager + .clones_in_use + .values_mut() + .for_each(|dataflows| { + dataflows.remove(&dataflow_id); + }); + + logger + .log( + LogLevel::Info, + None, + Some("daemon".into()), + format!("dataflow finished on machine `{}`", self.daemon_id), + ) + .await; + + if let Some(connection) = &mut self.coordinator_connection { + let msg = serde_json::to_vec(&Timestamped { + inner: CoordinatorRequest::Event { + daemon_id: self.daemon_id.clone(), + event: DaemonEvent::AllNodesFinished { + dataflow_id, + result, }, - timestamp: self.clock.new_timestamp(), - })?; - socket_stream_send(connection, &msg) - .await - .wrap_err("failed to report dataflow finish to dora-coordinator")?; - } - self.running.remove(&dataflow_id); + }, + timestamp: self.clock.new_timestamp(), + })?; + socket_stream_send(connection, &msg) + .await + .wrap_err("failed to report dataflow finish to dora-coordinator")?; } + self.running.remove(&dataflow_id); Ok(()) } @@ -2734,6 +2790,16 @@ pub struct RunningDataflow { publish_all_messages_to_zenoh: bool, } +/// Indicates whether a dataflow should be finished immediately after stop_all() +/// or whether to wait for SpawnedNodeResult events from running nodes. +#[must_use] +pub enum FinishDataflowWhen { + /// Finish the dataflow immediately (all nodes are dynamic or no nodes running) + Now, + /// Wait for SpawnedNodeResult events from non-dynamic nodes + WaitForNodes, +} + impl RunningDataflow { fn new( dataflow_id: Uuid, @@ -2832,7 +2898,7 @@ impl RunningDataflow { grace_duration: Option, force: bool, logger: &mut DataflowLogger<'_>, - ) -> eyre::Result<()> { + ) -> eyre::Result { self.pending_nodes .handle_dataflow_stop( coordinator_connection, @@ -2893,7 +2959,30 @@ impl RunningDataflow { }); } self.stop_sent = true; - Ok(()) + + // Determine if we should finish immediately or wait for nodes + Ok(self.should_finish_immediately()) + } + + /// Check if dataflow should finish immediately after stop_all(). + /// Returns `Now` if all running nodes are dynamic (they won't send SpawnedNodeResult). + /// Returns `WaitForNodes` if there are non-dynamic nodes to wait for. + fn should_finish_immediately(&self) -> FinishDataflowWhen { + // Only finish immediately if: + // 1. No pending nodes + // 2. All running nodes are dynamic (they won't send SpawnedNodeResult) + // 3. Stop was sent (stop_all() was called) + if !self.pending_nodes.local_nodes_pending() + && self + .running_nodes + .iter() + .all(|(_id, n)| n.node_config.dynamic) + && self.stop_sent + { + FinishDataflowWhen::Now + } else { + FinishDataflowWhen::WaitForNodes + } } fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet {