Skip to content

Commit e97523e

Browse files
fix: address mentor feedback on error propagation PR
1 parent 5440b99 commit e97523e

File tree

6 files changed

+136
-72
lines changed

6 files changed

+136
-72
lines changed

binaries/cli/src/command/stop.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::{Executable, default_tracing};
22
use crate::common::{
3-
connect_to_coordinator, handle_dataflow_result_with_options, query_running_dataflows,
3+
connect_to_coordinator, handle_dataflow_result, query_running_dataflows,
44
};
55
use communication_layer_request_reply::TcpRequestReplyConnection;
66
use dora_core::topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST};
@@ -102,9 +102,7 @@ fn stop_dataflow(
102102
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
103103
match result {
104104
ControlRequestReply::DataflowStopped { uuid, result } => {
105-
// For stop commands, we consider it successful if we successfully stopped the dataflow,
106-
// even if the dataflow was already in a failed state
107-
handle_dataflow_result_with_options(result, Some(uuid), true)
105+
handle_dataflow_result(result, Some(uuid))
108106
}
109107
ControlRequestReply::Error(err) => bail!("{err}"),
110108
other => bail!("unexpected stop dataflow reply: {other:?}"),
@@ -131,9 +129,7 @@ fn stop_dataflow_by_name(
131129
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
132130
match result {
133131
ControlRequestReply::DataflowStopped { uuid, result } => {
134-
// For stop commands, we consider it successful if we successfully stopped the dataflow,
135-
// even if the dataflow was already in a failed state
136-
handle_dataflow_result_with_options(result, Some(uuid), true)
132+
handle_dataflow_result(result, Some(uuid))
137133
}
138134
ControlRequestReply::Error(err) => bail!("{err}"),
139135
other => bail!("unexpected stop dataflow reply: {other:?}"),

binaries/cli/src/command/topic/echo.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,10 @@ async fn log_to_terminal(
267267
eprintln!("Output {node_id}/{output_id} closed");
268268
break;
269269
}
270+
InterDaemonEvent::NodeFailed { .. } => {
271+
// NodeFailed events are not relevant for topic echo
272+
continue;
273+
}
270274
}
271275
}
272276

binaries/cli/src/command/topic/hz.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,10 @@ async fn subscribe_output(
337337
InterDaemonEvent::OutputClosed { .. } => {
338338
break;
339339
}
340+
InterDaemonEvent::NodeFailed { .. } => {
341+
// NodeFailed events are not relevant for topic hz
342+
continue;
343+
}
340344
}
341345
}
342346

binaries/cli/src/common.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,9 @@ use uuid::Uuid;
2121
pub(crate) fn handle_dataflow_result(
2222
result: DataflowResult,
2323
uuid: Option<Uuid>,
24-
) -> Result<(), eyre::Error> {
25-
handle_dataflow_result_with_options(result, uuid, false)
26-
}
27-
28-
pub(crate) fn handle_dataflow_result_with_options(
29-
result: DataflowResult,
30-
uuid: Option<Uuid>,
31-
ignore_failures: bool,
3224
) -> Result<(), eyre::Error> {
3325
if result.is_ok() {
3426
Ok(())
35-
} else if ignore_failures {
36-
// For stop commands, we consider it successful if we successfully stopped the dataflow,
37-
// even if the dataflow was already in a failed state
38-
Ok(())
3927
} else {
4028
Err(match uuid {
4129
Some(uuid) => {

binaries/daemon/src/lib.rs

Lines changed: 118 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -946,6 +946,56 @@ impl Daemon {
946946
}
947947
Ok(())
948948
}
949+
InterDaemonEvent::NodeFailed {
950+
dataflow_id,
951+
source_node_id,
952+
receiver_node_id,
953+
affected_input_ids,
954+
error,
955+
} => {
956+
let mut logger = self
957+
.logger
958+
.for_dataflow(dataflow_id)
959+
.for_node(receiver_node_id.clone());
960+
logger
961+
.log(
962+
LogLevel::Debug,
963+
Some("daemon".into()),
964+
format!("received NodeFailed event from {source_node_id} for {receiver_node_id}"),
965+
)
966+
.await;
967+
968+
let inner = async {
969+
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
970+
format!("failed to handle NodeFailed: no running dataflow with ID `{dataflow_id}`")
971+
})?;
972+
973+
// Forward the error event to the local receiver node
974+
if let Some(channel) = dataflow.subscribe_channels.get(&receiver_node_id) {
975+
let event = NodeEvent::NodeFailed {
976+
affected_input_ids,
977+
error,
978+
source_node_id,
979+
};
980+
let _ = send_with_timestamp(channel, event, &self.clock);
981+
} else {
982+
tracing::warn!(
983+
"Received NodeFailed event for node {} but it's not subscribed on this daemon",
984+
receiver_node_id
985+
);
986+
}
987+
Result::<(), eyre::Report>::Ok(())
988+
};
989+
if let Err(err) = inner
990+
.await
991+
.wrap_err("failed to handle NodeFailed event sent by remote daemon")
992+
{
993+
logger
994+
.log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
995+
.await;
996+
}
997+
Ok(())
998+
}
949999
}
9501000
}
9511001

@@ -1682,6 +1732,8 @@ impl Daemon {
16821732
///
16831733
/// This is called automatically when a node exits with a non-zero exit code.
16841734
/// It sends `NodeFailed` events to all downstream nodes that consume outputs from the failed node.
1735+
/// For nodes on the same daemon, events are sent directly. For nodes on other daemons,
1736+
/// events are routed through the coordinator.
16851737
async fn propagate_node_error(
16861738
&mut self,
16871739
dataflow_id: Uuid,
@@ -1722,15 +1774,45 @@ impl Daemon {
17221774
}
17231775
}
17241776

1777+
// Separate local and remote receivers to avoid borrow checker issues
1778+
let mut remote_receivers = Vec::new();
1779+
17251780
// Send one NodeFailed event per receiver node with all affected input IDs
17261781
for (receiver_node_id, affected_input_ids) in affected_by_receiver {
1782+
// Check if receiver is on this daemon (local) or another daemon (remote)
17271783
if let Some(channel) = dataflow.subscribe_channels.get(&receiver_node_id) {
1784+
// Local receiver - send directly
17281785
let event = NodeEvent::NodeFailed {
17291786
affected_input_ids,
17301787
error: error_message.clone(),
17311788
source_node_id: source_node_id.clone(),
17321789
};
17331790
let _ = send_with_timestamp(channel, event, &self.clock);
1791+
} else {
1792+
// Remote receiver - collect for later processing
1793+
remote_receivers.push((receiver_node_id, affected_input_ids));
1794+
}
1795+
}
1796+
1797+
// Now send to remote receivers (after releasing the borrow on dataflow)
1798+
for (receiver_node_id, affected_input_ids) in remote_receivers {
1799+
// Remote receiver - send through coordinator via zenoh
1800+
// The coordinator will forward this to the appropriate daemon
1801+
let output_id = OutputId(source_node_id.clone(), affected_input_ids[0].clone());
1802+
let event = InterDaemonEvent::NodeFailed {
1803+
dataflow_id,
1804+
source_node_id: source_node_id.clone(),
1805+
receiver_node_id: receiver_node_id.clone(),
1806+
affected_input_ids,
1807+
error: error_message.clone(),
1808+
};
1809+
1810+
// Send to remote receivers using the same mechanism as regular outputs
1811+
if let Err(err) = self.send_to_remote_receivers(dataflow_id, &output_id, event).await {
1812+
tracing::warn!(
1813+
"Failed to send error event to remote receiver {}: {err:?}",
1814+
receiver_node_id
1815+
);
17341816
}
17351817
}
17361818

@@ -2130,58 +2212,43 @@ impl Daemon {
21302212
dataflow.cascading_error_causes.error_caused_by(&node_id)
21312213
})
21322214
.cloned();
2133-
let grace_duration_kill = dataflow
2134-
.map(|d| d.grace_duration_kills.contains(&node_id))
2135-
.unwrap_or_default();
21362215

2137-
// Nodes killed by grace duration during stop are not errors
2138-
if grace_duration_kill {
2139-
logger
2216+
let cause = match caused_by_node {
2217+
Some(caused_by_node) => {
2218+
logger
21402219
.log(
2141-
LogLevel::Debug,
2220+
LogLevel::Info,
21422221
Some("daemon".into()),
2143-
format!("node `{node_id}` was stopped by grace duration (not an error)"),
2222+
format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
21442223
)
21452224
.await;
2146-
Ok(())
2147-
} else {
2148-
let cause = match caused_by_node {
2149-
Some(caused_by_node) => {
2150-
logger
2151-
.log(
2152-
LogLevel::Info,
2153-
Some("daemon".into()),
2154-
format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
2155-
)
2156-
.await;
21572225

2158-
NodeErrorCause::Cascading { caused_by_node }
2159-
}
2160-
None => {
2161-
let cause = dataflow
2162-
.and_then(|d| d.node_stderr_most_recent.get(&node_id))
2163-
.map(|queue| {
2164-
let mut s = if queue.is_full() {
2165-
"[...]".into()
2166-
} else {
2167-
String::new()
2168-
};
2169-
while let Some(line) = queue.pop() {
2170-
s += &line;
2171-
}
2172-
s
2173-
})
2174-
.unwrap_or_default();
2175-
2176-
NodeErrorCause::Other { stderr: cause }
2177-
}
2178-
};
2179-
Err(NodeError {
2180-
timestamp: self.clock.new_timestamp(),
2181-
cause,
2182-
exit_status,
2183-
})
2184-
}
2226+
NodeErrorCause::Cascading { caused_by_node }
2227+
}
2228+
None => {
2229+
let cause = dataflow
2230+
.and_then(|d| d.node_stderr_most_recent.get(&node_id))
2231+
.map(|queue| {
2232+
let mut s = if queue.is_full() {
2233+
"[...]".into()
2234+
} else {
2235+
String::new()
2236+
};
2237+
while let Some(line) = queue.pop() {
2238+
s += &line;
2239+
}
2240+
s
2241+
})
2242+
.unwrap_or_default();
2243+
2244+
NodeErrorCause::Other { stderr: cause }
2245+
}
2246+
};
2247+
Err(NodeError {
2248+
timestamp: self.clock.new_timestamp(),
2249+
cause,
2250+
exit_status,
2251+
})
21852252
}
21862253
};
21872254

@@ -2220,19 +2287,17 @@ impl Daemon {
22202287
}
22212288
}
22222289

2223-
// Always send NodeStopped event, even if handle_node_stop fails
2224-
// This ensures the daemon can exit properly
2225-
let stop_result = self
2290+
// Handle node stop (this will send NodeStopped event internally)
2291+
if let Err(err) = self
22262292
.handle_node_stop(dataflow_id, &node_id, dynamic_node)
2227-
.await;
2228-
if let Err(err) = &stop_result {
2293+
.await
2294+
{
22292295
tracing::warn!(
2230-
"Error handling node stop for {}/{}: {err:?}, but NodeStopped event should have been sent",
2296+
"Error handling node stop for {}/{}: {err:?}",
22312297
dataflow_id,
22322298
node_id
22332299
);
22342300
}
2235-
// Don't propagate the error - NodeStopped event is more important for shutdown
22362301
}
22372302
}
22382303
Ok(())

libraries/message/src/daemon_to_daemon.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,11 @@ pub enum InterDaemonEvent {
2121
node_id: NodeId,
2222
output_id: DataId,
2323
},
24+
NodeFailed {
25+
dataflow_id: DataflowId,
26+
source_node_id: NodeId,
27+
receiver_node_id: NodeId,
28+
affected_input_ids: Vec<DataId>,
29+
error: String,
30+
},
2431
}

0 commit comments

Comments
 (0)