Skip to content

Commit b6b3a18

Browse files
haixuanTaoclaude
andcommitted
Refactor: Extract dataflow finish logic and add FinishDataflowWhen enum
Address PR review feedback from @phil-opp: 1. Add `#[must_use]` enum `FinishDataflowWhen` to indicate whether a dataflow should be finished immediately or wait for nodes 2. Add `should_finish_immediately()` method on `RunningDataflow` to avoid extra lookup in `self.running` 3. Modify `stop_all()` to return `FinishDataflowWhen` instead of `()` 4. Extract duplicated cleanup code into shared `finish_dataflow()` method on `Daemon` 5. Update callers to handle the return value properly This ensures we don't keep two cleanup routines in sync and the compiler will remind us to handle the result via `#[must_use]`. Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 65c0aee commit b6b3a18

File tree

1 file changed

+117
-134
lines changed

1 file changed

+117
-134
lines changed

binaries/daemon/src/lib.rs

Lines changed: 117 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -466,19 +466,24 @@ impl Daemon {
466466
tracing::info!("received ctrlc signal -> stopping all dataflows");
467467
let dataflow_ids: Vec<_> = self.running.keys().copied().collect();
468468
for dataflow_id in dataflow_ids {
469-
if let Some(dataflow) = self.running.get_mut(&dataflow_id) {
470-
let mut logger = self.logger.for_dataflow(dataflow.id);
471-
dataflow
472-
.stop_all(
473-
&mut self.coordinator_connection,
474-
&self.clock,
475-
None,
476-
false,
477-
&mut logger,
478-
)
479-
.await?;
480-
// After stop_all, check if dataflow should finish (for all-dynamic nodes case)
481-
self.check_dataflow_finished_after_stop(dataflow_id).await?;
469+
let finish_when = {
470+
if let Some(dataflow) = self.running.get_mut(&dataflow_id) {
471+
let mut logger = self.logger.for_dataflow(dataflow.id);
472+
dataflow
473+
.stop_all(
474+
&mut self.coordinator_connection,
475+
&self.clock,
476+
None,
477+
false,
478+
&mut logger,
479+
)
480+
.await?
481+
} else {
482+
continue;
483+
}
484+
};
485+
if matches!(finish_when, FinishDataflowWhen::Now) {
486+
self.finish_dataflow(dataflow_id).await?;
482487
}
483488
}
484489
self.exit_when_all_finished = true;
@@ -851,7 +856,7 @@ impl Daemon {
851856
grace_duration,
852857
force,
853858
} => {
854-
let stop_succeeded = {
859+
let finish_when = {
855860
let mut logger = self.logger.for_dataflow(dataflow_id);
856861
let dataflow = self
857862
.running
@@ -878,16 +883,15 @@ impl Daemon {
878883
});
879884

880885
if let Some(future) = future {
881-
future.await?;
882-
true
886+
Some(future.await?)
883887
} else {
884-
false
888+
None
885889
}
886890
};
887891

888-
// After stop_all, check if dataflow should finish (for all-dynamic nodes case)
889-
if stop_succeeded {
890-
self.check_dataflow_finished_after_stop(dataflow_id).await?;
892+
// If stop_all returns Now, finish the dataflow immediately
893+
if matches!(finish_when, Some(FinishDataflowWhen::Now)) {
894+
self.finish_dataflow(dataflow_id).await?;
891895
}
892896

893897
RunStatus::Continue
@@ -2083,132 +2087,78 @@ impl Daemon {
20832087
self.handle_outputs_done(dataflow_id, node_id, might_restart)
20842088
.await?;
20852089

2086-
let mut logger = self.logger.for_dataflow(dataflow_id);
2087-
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
2088-
format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
2089-
})?;
2090-
dataflow.running_nodes.remove(node_id);
2091-
if !dataflow.pending_nodes.local_nodes_pending()
2092-
&& dataflow
2093-
.running_nodes
2094-
.iter()
2095-
.all(|(_id, n)| n.node_config.dynamic)
2096-
{
2097-
let result = DataflowDaemonResult {
2098-
timestamp: self.clock.new_timestamp(),
2099-
node_results: self
2100-
.dataflow_node_results
2101-
.get(&dataflow.id)
2102-
.context("failed to get dataflow node results")?
2103-
.clone(),
2104-
};
2105-
2106-
self.git_manager
2107-
.clones_in_use
2108-
.values_mut()
2109-
.for_each(|dataflows| {
2110-
dataflows.remove(&dataflow_id);
2111-
});
2112-
2113-
logger
2114-
.log(
2115-
LogLevel::Info,
2116-
None,
2117-
Some("daemon".into()),
2118-
format!("dataflow finished on machine `{}`", self.daemon_id),
2090+
let should_finish = {
2091+
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
2092+
format!(
2093+
"failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
21192094
)
2120-
.await;
2121-
if let Some(connection) = &mut self.coordinator_connection {
2122-
let msg = serde_json::to_vec(&Timestamped {
2123-
inner: CoordinatorRequest::Event {
2124-
daemon_id: self.daemon_id.clone(),
2125-
event: DaemonEvent::AllNodesFinished {
2126-
dataflow_id,
2127-
result,
2128-
},
2129-
},
2130-
timestamp: self.clock.new_timestamp(),
2131-
})?;
2132-
socket_stream_send(connection, &msg)
2133-
.await
2134-
.wrap_err("failed to report dataflow finish to dora-coordinator")?;
2135-
}
2136-
self.running.remove(&dataflow_id);
2095+
})?;
2096+
dataflow.running_nodes.remove(node_id);
2097+
// Check if all remaining nodes are dynamic (won't send SpawnedNodeResult)
2098+
!dataflow.pending_nodes.local_nodes_pending()
2099+
&& dataflow
2100+
.running_nodes
2101+
.iter()
2102+
.all(|(_id, n)| n.node_config.dynamic)
2103+
};
2104+
2105+
if should_finish {
2106+
self.finish_dataflow(dataflow_id).await?;
21372107
}
21382108

21392109
Ok(())
21402110
}
21412111

2142-
/// Check if a dataflow should be marked as finished after stop_all() is called.
2143-
/// This is needed for dataflows with only dynamic nodes, which don't send
2144-
/// SpawnedNodeResult events and thus never trigger the normal finish check.
2145-
async fn check_dataflow_finished_after_stop(&mut self, dataflow_id: Uuid) -> eyre::Result<()> {
2112+
/// Mark a dataflow as finished and perform cleanup.
2113+
/// This should be called when:
2114+
/// - `stop_all()` returns `FinishDataflowWhen::Now`, or
2115+
/// - All non-dynamic nodes have sent `SpawnedNodeResult` events
2116+
async fn finish_dataflow(&mut self, dataflow_id: Uuid) -> eyre::Result<()> {
21462117
let mut logger = self.logger.for_dataflow(dataflow_id);
21472118

2148-
// Check if dataflow should finish
2149-
let should_finish = {
2150-
if let Some(dataflow) = self.running.get(&dataflow_id) {
2151-
// Only finish if:
2152-
// 1. No pending nodes
2153-
// 2. All running nodes are dynamic (they won't send SpawnedNodeResult)
2154-
// 3. Stop was sent (stop_all() was called)
2155-
!dataflow.pending_nodes.local_nodes_pending()
2156-
&& dataflow
2157-
.running_nodes
2158-
.iter()
2159-
.all(|(_id, n)| n.node_config.dynamic)
2160-
&& dataflow.stop_sent
2161-
} else {
2162-
return Ok(());
2163-
}
2119+
// Dynamic nodes don't send SpawnedNodeResult events, so there may be no entry
2120+
// in dataflow_node_results. An empty map means all dynamic nodes handled stop successfully.
2121+
let result = DataflowDaemonResult {
2122+
timestamp: self.clock.new_timestamp(),
2123+
node_results: self
2124+
.dataflow_node_results
2125+
.get(&dataflow_id)
2126+
.cloned()
2127+
.unwrap_or_default(),
21642128
};
21652129

2166-
if should_finish {
2167-
// We verified above that all running nodes are dynamic. Dynamic nodes don't
2168-
// send SpawnedNodeResult events, so there may be no entry in dataflow_node_results.
2169-
// An empty map means all dynamic nodes handled stop successfully.
2170-
let result = DataflowDaemonResult {
2171-
timestamp: self.clock.new_timestamp(),
2172-
node_results: self
2173-
.dataflow_node_results
2174-
.get(&dataflow_id)
2175-
.cloned()
2176-
.unwrap_or_default(),
2177-
};
2178-
2179-
self.git_manager
2180-
.clones_in_use
2181-
.values_mut()
2182-
.for_each(|dataflows| {
2183-
dataflows.remove(&dataflow_id);
2184-
});
2130+
self.git_manager
2131+
.clones_in_use
2132+
.values_mut()
2133+
.for_each(|dataflows| {
2134+
dataflows.remove(&dataflow_id);
2135+
});
21852136

2186-
logger
2187-
.log(
2188-
LogLevel::Info,
2189-
None,
2190-
Some("daemon".into()),
2191-
format!("dataflow finished on machine `{}`", self.daemon_id),
2192-
)
2193-
.await;
2137+
logger
2138+
.log(
2139+
LogLevel::Info,
2140+
None,
2141+
Some("daemon".into()),
2142+
format!("dataflow finished on machine `{}`", self.daemon_id),
2143+
)
2144+
.await;
21942145

2195-
if let Some(connection) = &mut self.coordinator_connection {
2196-
let msg = serde_json::to_vec(&Timestamped {
2197-
inner: CoordinatorRequest::Event {
2198-
daemon_id: self.daemon_id.clone(),
2199-
event: DaemonEvent::AllNodesFinished {
2200-
dataflow_id,
2201-
result,
2202-
},
2146+
if let Some(connection) = &mut self.coordinator_connection {
2147+
let msg = serde_json::to_vec(&Timestamped {
2148+
inner: CoordinatorRequest::Event {
2149+
daemon_id: self.daemon_id.clone(),
2150+
event: DaemonEvent::AllNodesFinished {
2151+
dataflow_id,
2152+
result,
22032153
},
2204-
timestamp: self.clock.new_timestamp(),
2205-
})?;
2206-
socket_stream_send(connection, &msg)
2207-
.await
2208-
.wrap_err("failed to report dataflow finish to dora-coordinator")?;
2209-
}
2210-
self.running.remove(&dataflow_id);
2154+
},
2155+
timestamp: self.clock.new_timestamp(),
2156+
})?;
2157+
socket_stream_send(connection, &msg)
2158+
.await
2159+
.wrap_err("failed to report dataflow finish to dora-coordinator")?;
22112160
}
2161+
self.running.remove(&dataflow_id);
22122162

22132163
Ok(())
22142164
}
@@ -2797,6 +2747,16 @@ pub struct RunningDataflow {
27972747
publish_all_messages_to_zenoh: bool,
27982748
}
27992749

2750+
/// Indicates whether a dataflow should be finished immediately after stop_all()
2751+
/// or whether to wait for SpawnedNodeResult events from running nodes.
2752+
#[must_use]
2753+
pub enum FinishDataflowWhen {
2754+
/// Finish the dataflow immediately (all nodes are dynamic or no nodes running)
2755+
Now,
2756+
/// Wait for SpawnedNodeResult events from non-dynamic nodes
2757+
WaitForNodes,
2758+
}
2759+
28002760
impl RunningDataflow {
28012761
fn new(
28022762
dataflow_id: Uuid,
@@ -2895,7 +2855,7 @@ impl RunningDataflow {
28952855
grace_duration: Option<Duration>,
28962856
force: bool,
28972857
logger: &mut DataflowLogger<'_>,
2898-
) -> eyre::Result<()> {
2858+
) -> eyre::Result<FinishDataflowWhen> {
28992859
self.pending_nodes
29002860
.handle_dataflow_stop(
29012861
coordinator_connection,
@@ -2956,7 +2916,30 @@ impl RunningDataflow {
29562916
});
29572917
}
29582918
self.stop_sent = true;
2959-
Ok(())
2919+
2920+
// Determine if we should finish immediately or wait for nodes
2921+
Ok(self.should_finish_immediately())
2922+
}
2923+
2924+
/// Check if dataflow should finish immediately after stop_all().
2925+
/// Returns `Now` if all running nodes are dynamic (they won't send SpawnedNodeResult).
2926+
/// Returns `WaitForNodes` if there are non-dynamic nodes to wait for.
2927+
fn should_finish_immediately(&self) -> FinishDataflowWhen {
2928+
// Only finish immediately if:
2929+
// 1. No pending nodes
2930+
// 2. All running nodes are dynamic (they won't send SpawnedNodeResult)
2931+
// 3. Stop was sent (stop_all() was called)
2932+
if !self.pending_nodes.local_nodes_pending()
2933+
&& self
2934+
.running_nodes
2935+
.iter()
2936+
.all(|(_id, n)| n.node_config.dynamic)
2937+
&& self.stop_sent
2938+
{
2939+
FinishDataflowWhen::Now
2940+
} else {
2941+
FinishDataflowWhen::WaitForNodes
2942+
}
29602943
}
29612944

29622945
fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {

0 commit comments

Comments
 (0)