Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 164 additions & 75 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ impl Daemon {
ReceiverStream::new(tokio::sync::mpsc::channel(1).1)
};

let all_nodes_dynamic = spawn_command.nodes.values().all(|n| n.kind.dynamic());
let exit_when_done = spawn_command
.nodes
.values()
Expand Down Expand Up @@ -329,12 +330,22 @@ impl Daemon {

let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;

let node_results = match dataflow_results.remove(&dataflow_id) {
Some(results) => results,
None if all_nodes_dynamic => {
// All nodes are dynamic - they don't send SpawnedNodeResult events,
// so there are no node results to report. This is expected and means success.
BTreeMap::new()
}
None => {
return Err(eyre::eyre!("no node results for dataflow_id {dataflow_id}"));
}
};

Ok(DataflowResult {
uuid: dataflow_id,
timestamp: clock.new_timestamp(),
node_results: dataflow_results
.remove(&dataflow_id)
.context("no node results for dataflow_id")?,
node_results,
})
}

Expand Down Expand Up @@ -620,9 +631,12 @@ impl Daemon {
}

async fn trigger_manual_stop(&mut self) -> eyre::Result<()> {
// Collect dataflow IDs that need immediate finishing
let mut dataflows_to_finish = Vec::new();

for dataflow in self.running.values_mut() {
let mut logger = self.logger.for_dataflow(dataflow.id);
dataflow
let finish_when = dataflow
.stop_all(
&mut self.coordinator_connection,
&self.clock,
Expand All @@ -631,7 +645,18 @@ impl Daemon {
&mut logger,
)
.await?;

// If stop_all returns Now, we need to finish this dataflow
if matches!(finish_when, FinishDataflowWhen::Now) {
dataflows_to_finish.push(dataflow.id);
}
}

// Finish dataflows after the loop to avoid borrow checker issues
for dataflow_id in dataflows_to_finish {
self.finish_dataflow(dataflow_id).await?;
}

self.exit_when_all_finished = true;
Ok(())
}
Expand Down Expand Up @@ -874,31 +899,42 @@ impl Daemon {
grace_duration,
force,
} => {
let mut logger = self.logger.for_dataflow(dataflow_id);
let dataflow = self
.running
.get_mut(&dataflow_id)
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
let (reply, future) = match dataflow {
Ok(dataflow) => {
let future = dataflow.stop_all(
&mut self.coordinator_connection,
&self.clock,
grace_duration,
force,
&mut logger,
);
(Ok(()), Some(future))
let finish_when = {
let mut logger = self.logger.for_dataflow(dataflow_id);
let dataflow = self
.running
.get_mut(&dataflow_id)
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"));
let (reply, future) = match dataflow {
Ok(dataflow) => {
let future = dataflow.stop_all(
&mut self.coordinator_connection,
&self.clock,
grace_duration,
force,
&mut logger,
);
(Ok(()), Some(future))
}
Err(err) => (Err(err.to_string()), None),
};

let _ = reply_tx
.send(Some(DaemonCoordinatorReply::StopResult(reply)))
.map_err(|_| {
error!("could not send stop reply from daemon to coordinator")
});

if let Some(future) = future {
Some(future.await?)
} else {
None
}
Err(err) => (Err(err.to_string()), None),
};

let _ = reply_tx
.send(Some(DaemonCoordinatorReply::StopResult(reply)))
.map_err(|_| error!("could not send stop reply from daemon to coordinator"));

if let Some(future) = future {
future.await?;
// If stop_all returns Now, finish the dataflow immediately
if matches!(finish_when, Some(FinishDataflowWhen::Now)) {
self.finish_dataflow(dataflow_id).await?;
}

RunStatus::Continue
Expand Down Expand Up @@ -2094,58 +2130,78 @@ impl Daemon {
self.handle_outputs_done(dataflow_id, node_id, might_restart)
.await?;

let should_finish = {
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!(
"failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
)
})?;
dataflow.running_nodes.remove(node_id);
// Check if all remaining nodes are dynamic (won't send SpawnedNodeResult)
!dataflow.pending_nodes.local_nodes_pending()
&& dataflow
.running_nodes
.iter()
.all(|(_id, n)| n.node_config.dynamic)
};

if should_finish {
self.finish_dataflow(dataflow_id).await?;
}

Ok(())
}

/// Mark a dataflow as finished and perform cleanup.
/// This should be called when:
/// - `stop_all()` returns `FinishDataflowWhen::Now`, or
/// - All non-dynamic nodes have sent `SpawnedNodeResult` events
async fn finish_dataflow(&mut self, dataflow_id: Uuid) -> eyre::Result<()> {
let mut logger = self.logger.for_dataflow(dataflow_id);
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
})?;
dataflow.running_nodes.remove(node_id);
if !dataflow.pending_nodes.local_nodes_pending()
&& dataflow
.running_nodes
.iter()
.all(|(_id, n)| n.node_config.dynamic)
{
let result = DataflowDaemonResult {
timestamp: self.clock.new_timestamp(),
node_results: self
.dataflow_node_results
.get(&dataflow.id)
.context("failed to get dataflow node results")?
.clone(),
};

self.git_manager
.clones_in_use
.values_mut()
.for_each(|dataflows| {
dataflows.remove(&dataflow_id);
});
// Dynamic nodes don't send SpawnedNodeResult events, so there may be no entry
// in dataflow_node_results. An empty map means all dynamic nodes handled stop successfully.
let result = DataflowDaemonResult {
timestamp: self.clock.new_timestamp(),
node_results: self
.dataflow_node_results
.get(&dataflow_id)
.cloned()
.unwrap_or_default(),
};

logger
.log(
LogLevel::Info,
None,
Some("daemon".into()),
format!("dataflow finished on machine `{}`", self.daemon_id),
)
.await;
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
daemon_id: self.daemon_id.clone(),
event: DaemonEvent::AllNodesFinished {
dataflow_id,
result,
},
self.git_manager
.clones_in_use
.values_mut()
.for_each(|dataflows| {
dataflows.remove(&dataflow_id);
});

logger
.log(
LogLevel::Info,
None,
Some("daemon".into()),
format!("dataflow finished on machine `{}`", self.daemon_id),
)
.await;

if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
daemon_id: self.daemon_id.clone(),
event: DaemonEvent::AllNodesFinished {
dataflow_id,
result,
},
timestamp: self.clock.new_timestamp(),
})?;
socket_stream_send(connection, &msg)
.await
.wrap_err("failed to report dataflow finish to dora-coordinator")?;
}
self.running.remove(&dataflow_id);
},
timestamp: self.clock.new_timestamp(),
})?;
socket_stream_send(connection, &msg)
.await
.wrap_err("failed to report dataflow finish to dora-coordinator")?;
}
self.running.remove(&dataflow_id);

Ok(())
}
Expand Down Expand Up @@ -2734,6 +2790,16 @@ pub struct RunningDataflow {
publish_all_messages_to_zenoh: bool,
}

/// Indicates whether a dataflow should be finished immediately after stop_all()
/// or whether to wait for SpawnedNodeResult events from running nodes.
#[must_use]
pub enum FinishDataflowWhen {
/// Finish the dataflow immediately (all nodes are dynamic or no nodes running)
Now,
/// Wait for SpawnedNodeResult events from non-dynamic nodes
WaitForNodes,
}

impl RunningDataflow {
fn new(
dataflow_id: Uuid,
Expand Down Expand Up @@ -2832,7 +2898,7 @@ impl RunningDataflow {
grace_duration: Option<Duration>,
force: bool,
logger: &mut DataflowLogger<'_>,
) -> eyre::Result<()> {
) -> eyre::Result<FinishDataflowWhen> {
self.pending_nodes
.handle_dataflow_stop(
coordinator_connection,
Expand Down Expand Up @@ -2893,7 +2959,30 @@ impl RunningDataflow {
});
}
self.stop_sent = true;
Ok(())

// Determine if we should finish immediately or wait for nodes
Ok(self.should_finish_immediately())
}

/// Check if dataflow should finish immediately after stop_all().
/// Returns `Now` if all running nodes are dynamic (they won't send SpawnedNodeResult).
/// Returns `WaitForNodes` if there are non-dynamic nodes to wait for.
fn should_finish_immediately(&self) -> FinishDataflowWhen {
// Only finish immediately if:
// 1. No pending nodes
// 2. All running nodes are dynamic (they won't send SpawnedNodeResult)
// 3. Stop was sent (stop_all() was called)
if !self.pending_nodes.local_nodes_pending()
&& self
.running_nodes
.iter()
.all(|(_id, n)| n.node_config.dynamic)
&& self.stop_sent
{
FinishDataflowWhen::Now
} else {
FinishDataflowWhen::WaitForNodes
}
}

fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
Expand Down
Loading