Skip to content

Conversation

@guptapratykshh
Copy link
Contributor

@guptapratykshh guptapratykshh commented Dec 6, 2025

This PR implements error event propagation (#1232) that allows nodes to send errors to downstream nodes instead of crashing. The feature provides node.send_error(output_id, error) API method that propagates error events to all affected downstream consumers, who receive InputError events containing error message, source node ID, and affected input ID for graceful error handling.

The implementation extends the message protocol with NodeEvent::InputError and DaemonRequest::SendError types, adds daemon-side error propagation logic that finds and notifies all downstream receivers, and includes complete working example demonstrating producer/consumer error handling patterns. All changes are backward compatible with no breaking changes to existing functionality.

Previous PR #1250

Copy link
Collaborator

@phil-opp phil-opp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!

We discussed this and would like to propose the following change: Instead of providing an explicit send_error function in the API, the error should be created by the daemon when a node exits with a non-zero exit code. This way, we can notify downstream nodes of failed nodes.

If nodes want to send errors they can still do that using normal outputs.

So please remove the send_error method from the API (and also the modifications to node_to_daemon.

},
/// An error occurred in an upstream node.
///
/// Instead of crashing, the upstream node sent an error event to allow
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sending crashed because of this error. (It might restart once we merge #1245)

/// Instead of crashing, the upstream node sent an error event to allow
/// downstream nodes to handle the error gracefully (e.g., use cached data,
/// switch to backup source, log and continue).
InputError {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename this variant to NodeFailed or similar.

/// switch to backup source, log and continue).
InputError {
/// The ID of the input that had an error, as specified in the YAML file.
id: DataId,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The node error might affect multiple errors, so let's change this to affected_input_ids: Vec<DataId>.

if let Err(err) = tcp_send(&mut daemon_connection.stream, &message).await {
// If we can't send the message, the daemon might have already closed the connection
// This can happen during shutdown. Log a debug message but don't fail.
tracing::debug!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not happen typically, so I would prefer to keep this an error. The code in destroy_daemons will still proceed to send destroy commands to the other daemons after an error.

Or is there any specific reason why this can now happen more commonly?

Comment on lines 1510 to 1544
// wait for reply
let reply_raw = tcp_receive(&mut daemon_connection.stream)
.await
.wrap_err("failed to receive destroy reply from daemon")?;
match serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize destroy reply from daemon")?
{
DaemonCoordinatorReply::DestroyResult { result, .. } => result
let reply_raw = match tcp_receive(&mut daemon_connection.stream).await {
Ok(raw) => raw,
Err(err) => {
// If we can't receive the reply, the daemon might have already closed the connection
// This can happen during shutdown. Log a debug message but don't fail.
tracing::debug!(
"failed to receive destroy reply from daemon `{daemon_id}`: {err}, \
assuming daemon is already shut down"
);
return Ok(());
}
};

// Try to deserialize the reply. If it fails, treat it as a shutdown.
match serde_json::from_slice(&reply_raw) {
Ok(DaemonCoordinatorReply::DestroyResult { result, .. }) => result
.map_err(|e| eyre!(e))
.wrap_err("failed to destroy dataflow")?,
other => bail!("unexpected reply after sending `destroy`: {other:?}"),
Ok(other) => {
tracing::debug!(
"unexpected reply after sending `destroy` to daemon `{daemon_id}`: {other:?}, \
assuming daemon is already shut down"
);
return Ok(());
}
Err(err) => {
// If deserialization fails, the daemon might have sent a partial reply before closing
// This can happen during shutdown. Log a debug message but don't fail.
tracing::debug!(
"failed to deserialize destroy reply from daemon `{daemon_id}`: {err}, \
assuming daemon is already shut down"
);
return Ok(());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above: Please keep these as errors, unless there is a good reason to change this.

Comment on lines 77 to 162
bincode::deserialize(&raw)
.wrap_err("failed to deserialize DaemonRequest")
.map(Some)
match bincode::deserialize(&raw) {
Ok(message) => Ok(Some(message)),
Err(err) => {
// If deserialization fails, treat it as a connection error
// This can happen if the connection was closed mid-transmission
// or if there's a version mismatch
// Log the raw data length and first few bytes to help diagnose the issue
let preview = if raw.len() > 16 {
format!("{:?}", &raw[..16])
} else {
format!("{:?}", raw)
};
// Log hex dump of first 32 bytes to help diagnose the issue
let hex_preview = if raw.len() > 32 {
format!("{:02x?}", &raw[..32])
} else {
format!("{:02x?}", raw)
};
// Try to identify what variant this might be
// Note: socket_stream_receive returns the message body (without length prefix)
// So the first 4 bytes should be the bincode enum variant index
let variant_hint = if raw.len() >= 4 {
let variant_idx = u32::from_le_bytes([raw[0], raw[1], raw[2], raw[3]]);
let variant_name = match variant_idx {
0 => "Register",
1 => "Subscribe",
2 => "SendMessage",
3 => "CloseOutputs",
4 => "OutputsDone",
5 => "NextEvent",
6 => "ReportDropTokens",
7 => "SubscribeDrop",
8 => "NextFinishedDropTokens",
9 => "EventStreamDropped",
10 => "NodeConfig",
_ => "Unknown",
};
format!(
" (variant index: {} = {}, expected 8 for SubscribeDrop)",
variant_idx, variant_name
)
} else {
String::new()
};

// Workaround for sync/async I/O mismatch: if we receive variant 7 (ReportDropTokens)
// with a 28-byte message (which matches SubscribeDrop size), it's likely a corrupted
// SubscribeDrop message due to sync/async I/O mismatch. Try to fix it.
let mut fixed_raw = raw.clone();
if raw.len() == 28 && raw.len() >= 4 {
let variant_idx = u32::from_le_bytes([raw[0], raw[1], raw[2], raw[3]]);
if variant_idx == 7 {
// Change variant index from 7 (ReportDropTokens) to 8 (SubscribeDrop)
tracing::debug!(
"attempting to fix corrupted SubscribeDrop message (variant 7 -> 8) due to sync/async I/O mismatch"
);
fixed_raw[0] = 8;
fixed_raw[1] = 0;
fixed_raw[2] = 0;
fixed_raw[3] = 0;
// Try to deserialize the fixed message
if let Ok(message) =
bincode::deserialize::<Timestamped<DaemonRequest>>(&fixed_raw)
{
if matches!(message.inner, DaemonRequest::SubscribeDrop) {
tracing::debug!(
"successfully fixed corrupted SubscribeDrop message"
);
return Ok(Some(message));
}
}
}
}

tracing::warn!(
"failed to deserialize DaemonRequest: {err}, raw data length: {}, preview: {}, hex: {}{}, treating as disconnect",
raw.len(),
preview,
hex_preview,
variant_hint
);
Ok(None)
}
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, there should not be any deserialization errors because we use reliable channels. Could you clarify why you think that this change is needed?

Either way, this change it not really related to this PR. So let's move it to a separate one if there is good motivation to do this.

Comment on lines 75 to 86
bincode::deserialize(&raw)
.wrap_err("failed to deserialize DaemonRequest")
.map(Some)
match bincode::deserialize(&raw) {
Ok(message) => Ok(Some(message)),
Err(err) => {
// If deserialization fails, treat it as a connection error
// This can happen if the connection was closed mid-transmission
// or if there's a version mismatch
tracing::debug!(
"failed to deserialize DaemonRequest: {err}, treating as disconnect"
);
Ok(None)
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above (change unrelated to this PR)

Comment on lines 144 to 153
match bincode::deserialize(&raw) {
Ok(message) => Ok(Some(message)),
Err(err) => {
// If deserialization fails, treat it as a connection error
// This can happen if the connection was closed mid-transmission
// or if there's a version mismatch
tracing::debug!("failed to deserialize DaemonRequest: {err}, treating as disconnect");
Ok(None)
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above (change unrelated to this PR)

InputClosed {
id: DataId,
},
InputError {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please apply the same adjustments as for the other InputError variant here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All requested changes have been addressed.

Comment on lines 103 to 107
handle_dataflow_result(result, Some(uuid))
// For stop commands, we consider it successful if we successfully stopped the dataflow,
// even if the dataflow was already in a failed state
handle_dataflow_result_with_options(result, Some(uuid), true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we should hide errors here. A stop command should lead to normal stopping of all nodes. If that doesn't work, we should report it.

Comment on lines 130 to 136
handle_dataflow_result(result, Some(uuid))
// For stop commands, we consider it successful if we successfully stopped the dataflow,
// even if the dataflow was already in a failed state
handle_dataflow_result_with_options(result, Some(uuid), true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above (don't hide error)

Comment on lines 35 to 38
} else if ignore_failures {
// For stop commands, we consider it successful if we successfully stopped the dataflow,
// even if the dataflow was already in a failed state
Ok(())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should we ignore errors on stop?

.wrap_err(format!(
"failed to send destroy message to daemon `{daemon_id}`"
))?;
.wrap_err("failed to send destroy message to daemon")?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this change about?

Comment on lines +297 to +313
// Deserialization failed or connection closed
// The node has likely sent a request and is waiting for a reply
// Try to send an error reply before closing so the node doesn't hang
// We check subscribed_drop_events to determine if it's likely a SubscribeDrop request
let error_msg = if self.subscribed_drop_events.is_none() {
// Node hasn't subscribed to drop stream yet, likely waiting for SubscribeDrop reply
"failed to deserialize SubscribeDrop request: connection error or version mismatch"
} else {
// Node has already subscribed, might be a different request
"failed to deserialize request: connection error or version mismatch"
};
let error_reply = DaemonReply::Result(Err(error_msg.into()));
if let Err(err) = self.send_reply(error_reply, &mut connection).await {
tracing::debug!(
"failed to send error reply after deserialization failure: {err:?}"
);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this change about? I does not seem related to this PR?

break;
}
}
// Exit when all dataflows are finished after Ctrl-C
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We normally don't exit the daemon on ctrl-c. The exit_when_all_finished flag is set only in some specific scenarios, e.g. when running dora run.

Comment on lines 545 to 551
// Also exit in example mode when all dataflows finish, even if exit_when_done
// tracking didn't catch all nodes (e.g., if nodes finish before being tracked)
if self.exit_when_done.is_some() && self.running.is_empty() {
tracing::info!(
"exiting daemon because all dataflows are finished (example mode)"
);
break;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this change about? Is it related to this PR?

Comment on lines 2137 to 2146
// Nodes killed by grace duration during stop are not errors
if grace_duration_kill {
logger
.log(
LogLevel::Debug,
Some("daemon".into()),
format!("node `{node_id}` was stopped by grace duration (not an error)"),
)
.await;
Ok(())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? If nodes are unresponsive and need to be killed that seems like an error. For example, there might be some important cleanup logic in the node that was skipped because of the kill. So I think it's better to report this error than to hide it silently.

Comment on lines 2223 to 2235
// Always send NodeStopped event, even if handle_node_stop fails
// This ensures the daemon can exit properly
let stop_result = self
.handle_node_stop(dataflow_id, &node_id, dynamic_node)
.await;
if let Err(err) = &stop_result {
tracing::warn!(
"Error handling node stop for {}/{}: {err:?}, but NodeStopped event should have been sent",
dataflow_id,
node_id
);
}
// Don't propagate the error - NodeStopped event is more important for shutdown
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments and warn message does not seem to match the code.

We added a new `NodeEvent` variant, which is not backwards compatible (yet, see dora-rs#1273). We also added a new `InterDaemonEvent` variant that is not backwards compatible either.
@phil-opp phil-opp force-pushed the feature/error-event-propagation branch from 466f384 to 615788b Compare December 20, 2025 13:43
Comment on lines 27 to 28
receiver_node_id: NodeId,
affected_input_ids: Vec<DataId>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really need these two fields in the message, the receiver can figure it out on its own. This has the advantage that we only need to send a single message.

Comment on lines 1797 to 1815
// Now send to remote receivers (after releasing the borrow on dataflow)
for (receiver_node_id, affected_input_ids) in remote_receivers {
// Remote receiver - send through coordinator via zenoh
// The coordinator will forward this to the appropriate daemon
let output_id = OutputId(source_node_id.clone(), affected_input_ids[0].clone());
let event = InterDaemonEvent::NodeFailed {
dataflow_id,
source_node_id: source_node_id.clone(),
receiver_node_id: receiver_node_id.clone(),
affected_input_ids,
error: error_message.clone(),
};

// Send to remote receivers using the same mechanism as regular outputs
if let Err(err) = self.send_to_remote_receivers(dataflow_id, &output_id, event).await {
tracing::warn!(
"Failed to send error event to remote receiver {}: {err:?}",
receiver_node_id
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simplify this by removing the receiver_node_id and affected_input_ids fields of InterDaemonEvent::NodeFailed. Then we only need to send a single message here.

@phil-opp
Copy link
Collaborator

I pushed a commit to bump the dora-message version so that we don't forget about it.

Comment on lines 567 to 572
// Also exit in example mode when all dataflows finish, even if exit_when_done
// tracking didn't catch all nodes (e.g., if nodes finish before being tracked)
if self.exit_when_done.is_some() && self.running.is_empty() {
tracing::info!(
"exiting daemon because all dataflows are finished (example mode)"
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Comment on lines 1875 to 1914
// Get all outputs of the failed node
let outputs: Vec<DataId> = dataflow
.mappings
.keys()
.filter(|m| &m.0 == &source_node_id)
.map(|m| m.1.clone())
.collect();

// Group affected inputs by receiver node
let mut affected_by_receiver: BTreeMap<NodeId, Vec<DataId>> = BTreeMap::new();

for output_id in &outputs {
let output_key = OutputId(source_node_id.clone(), output_id.clone());
if let Some(receivers) = dataflow.mappings.get(&output_key) {
for (receiver_node_id, input_id) in receivers {
affected_by_receiver
.entry(receiver_node_id.clone())
.or_insert_with(Vec::new)
.push(input_id.clone());
}
}
}

// Check if there are any remote receivers (nodes not on this daemon)
let has_remote_receivers = affected_by_receiver
.keys()
.any(|receiver_node_id| !dataflow.subscribe_channels.contains_key(receiver_node_id));

// Send NodeFailed event to each local receiver
for (receiver_node_id, affected_input_ids) in affected_by_receiver {
if let Some(channel) = dataflow.subscribe_channels.get(&receiver_node_id) {
// Local receiver - send directly
let event = NodeEvent::NodeFailed {
affected_input_ids,
error: error_message.clone(),
source_node_id: source_node_id.clone(),
};
let _ = send_with_timestamp(channel, event, &self.clock);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like the same code as when handling InterDaemonEvent::NodeFailed. Why not move it to a separate function?

if has_remote_receivers {
// Use the first output as the routing key (any output will work)
let first_output = outputs.first().ok_or_else(|| {
eyre!("Failed node has no outputs, cannot send error to remote receivers")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of erroring, we could just do nothing (if node has no outputs, no remote receivers are affected)


// Send to remote receivers using the same mechanism as regular outputs
if let Err(err) = self
.send_to_remote_receivers(dataflow_id, &output_id, event)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot that daemons subscribe to a specific output topic. So just sending the message to the first output is not enough unfortunately (as some daemon might only be subscribed to the second or third output).

So I guess we have two options:

  • send the message to all outputs and deduplicate it on the receiver side (i.e. remember which nodes failed already to avoid sending duplicated events to nodes)
  • add a new node-level subscription topic for such messages that all affected daemons subscribe to

What do you prefer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@guptapratykshh
Copy link
Contributor Author

Can you please review this PR. The changes are done , the ci is failing through the different version

@guptapratykshh
Copy link
Contributor Author

any updated on this, it is ready for review @phil-opp

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants