Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
454cea3
feat: add error event propagation to downstream nodes
guptapratykshh Dec 5, 2025
359bca6
fix: treat nodes killed by grace duration as successful stops
guptapratykshh Dec 6, 2025
7d7aad7
fix: gracefully handle connection errors during daemon shutdown
guptapratykshh Dec 11, 2025
49707c0
fixed the formatting
guptapratykshh Dec 11, 2025
fb05aa4
Fixed CLI command failures (dora stop/destroy) that were occurring
guptapratykshh Dec 11, 2025
21d8546
feature/error-event-propagation
guptapratykshh Dec 11, 2025
321590c
refactor: replace explicit send_error API with automatic error propag…
guptapratykshh Dec 11, 2025
52e6be1
fixed the formatting issues
guptapratykshh Dec 11, 2025
a4f8435
refactor: rename InputError to NodeFailed and support multiple affect…
guptapratykshh Dec 12, 2025
e61e585
refactor: rename InputError to NodeFailed and support multiple affect…
guptapratykshh Dec 12, 2025
5440b99
refactor: rename InputError to NodeFailed and support multiple affect…
guptapratykshh Dec 12, 2025
e97523e
fix: address mentor feedback on error propagation PR
guptapratykshh Dec 16, 2025
c1b0445
fix: apply cargo fmt formatting fixes
guptapratykshh Dec 16, 2025
615788b
Bump dora-message version to 0.7.0-alpha
phil-opp Dec 20, 2025
295d048
Refactor error propagation to send single NodeFailed message, receive…
guptapratykshh Dec 20, 2025
51dfa4e
Merge remote-tracking branch 'upstream/main' into feature/error-event…
guptapratykshh Dec 20, 2025
2f0979a
Add NodeFailed pattern match to topic info command
guptapratykshh Dec 20, 2025
170c69c
fix: address reviewer feedback on error propagation PR (#1252)
guptapratykshh Dec 27, 2025
afa9c14
Merge branch 'main' into feature/error-event-propagation
guptapratykshh Jan 9, 2026
2435f00
Merge branch 'main' into feature/error-event-propagation
phil-opp Jan 28, 2026
3f5a188
Bump dora-message version to 0.8.0-alpha
phil-opp Jan 28, 2026
22d4536
Undo unrelated change (and use `wrap_err_with` instead)
phil-opp Jan 28, 2026
fd32593
Undo more unrelated changes
phil-opp Jan 28, 2026
ba55ab5
Fix: Make new example crates part of workspace
phil-opp Jan 28, 2026
bdbbfa7
Add build keys to dataflow and use correct paths
phil-opp Jan 28, 2026
fc3d61d
Improve println outputs in examples
phil-opp Jan 28, 2026
fb78c72
Fix build instruction in README and remove unneccesary/incorrect sect…
phil-opp Jan 28, 2026
f2350b1
Run new error event example on CI
phil-opp Jan 28, 2026
669256b
Add `rust-version` field to new example
phil-opp Jan 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ dora-ros2-bridge = { version = "0.3.13", path = "libraries/extensions/ros2-bridg
dora-ros2-bridge-msg-gen = { version = "0.3.13", path = "libraries/extensions/ros2-bridge/msg-gen" }
dora-ros2-bridge-python = { path = "libraries/extensions/ros2-bridge/python" }
# versioned independently from the other dora crates
dora-message = { version = "0.6.0", path = "libraries/message" }
dora-message = { version = "0.7.0-alpha", path = "libraries/message" }
arrow = { version = "54.2.1" }
arrow-schema = { version = "54.2.1" }
arrow-data = { version = "54.2.1" }
Expand Down
18 changes: 17 additions & 1 deletion apis/rust/node/src/event_stream/event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use dora_arrow_convert::ArrowData;
use dora_core::config::{DataId, OperatorId};
use dora_core::config::{DataId, NodeId, OperatorId};
use dora_message::metadata::Metadata;

/// Represents an incoming Dora event.
Expand Down Expand Up @@ -41,6 +41,22 @@ pub enum Event {
/// assigned to the input in the YAML file.
id: DataId,
},
/// A node failed and exited with a non-zero exit code.
///
/// The daemon automatically creates this event when a node exits with a non-zero exit code.
/// This allows downstream nodes to handle the error gracefully (e.g., use cached data,
/// switch to backup source, log and continue).
NodeFailed {
/// The IDs of the inputs that are affected by the node failure, as specified in the YAML file.
///
/// A node failure can affect multiple inputs if the failed node produced multiple outputs
/// that are consumed by this node.
affected_input_ids: Vec<DataId>,
/// The error message describing the failure.
error: String,
/// The ID of the node that failed.
source_node_id: NodeId,
},
/// Notification that the event stream is about to close.
///
/// The [`StopCause`] field contains the reason for the event stream closure.
Expand Down
27 changes: 27 additions & 0 deletions apis/rust/node/src/event_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,24 @@ impl EventStream {
});
Some(event_json)
}
NodeEvent::NodeFailed {
affected_input_ids,
error,
source_node_id,
} => {
let time_offset = self
.clock
.new_timestamp()
.get_diff_duration(&self.start_timestamp);
let event_json = serde_json::json!({
"type": "NodeFailed",
"affected_input_ids": affected_input_ids.iter().map(|id| id.to_string()).collect::<Vec<_>>(),
"error": error,
"source_node_id": source_node_id.to_string(),
"time_offset_secs": time_offset.as_secs_f64(),
});
Some(event_json)
}
NodeEvent::AllInputsClosed => {
let time_offset = self
.clock
Expand Down Expand Up @@ -486,6 +504,15 @@ impl EventStream {
NodeEvent::Stop => Event::Stop(event::StopCause::Manual),
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::NodeFailed {
affected_input_ids,
error,
source_node_id,
} => Event::NodeFailed {
affected_input_ids,
error,
source_node_id,
},
NodeEvent::Input { id, metadata, data } => {
let data = data_to_arrow_array(data, &metadata, ack_channel);
match data {
Expand Down
4 changes: 4 additions & 0 deletions binaries/cli/src/command/topic/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ async fn log_to_terminal(
eprintln!("Output {node_id}/{output_id} closed");
break;
}
InterDaemonEvent::NodeFailed { .. } => {
// NodeFailed events are not relevant for topic echo
continue;
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions binaries/cli/src/command/topic/hz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ async fn subscribe_output(
InterDaemonEvent::OutputClosed { .. } => {
break;
}
InterDaemonEvent::NodeFailed { .. } => {
// NodeFailed events are not relevant for topic hz
continue;
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions binaries/cli/src/command/topic/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ fn info(
InterDaemonEvent::OutputClosed { .. } => {
break;
}
InterDaemonEvent::NodeFailed { .. } => {
// NodeFailed events are not relevant for topic info
continue;
}
}
}
Err(_) => break,
Expand Down
4 changes: 1 addition & 3 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1553,9 +1553,7 @@ async fn destroy_daemon(

tcp_send(&mut daemon_connection.stream, &message)
.await
.wrap_err(format!(
"failed to send destroy message to daemon `{daemon_id}`"
))?;
.wrap_err("failed to send destroy message to daemon")?;

// wait for reply
let reply_raw = tcp_receive(&mut daemon_connection.stream)
Expand Down
Loading
Loading