Skip to content

Commit 561a319

Browse files
authored
Expose AllInputClosed message as a Stop message (dora-rs#1026)
This PR makes it possible for nodes to stop when all input is closed. This is necessary because when merging two different event stream and we want to stop when all event from dora finishes we can rely on the stop messages to stop. It also removes the complexity of the event stream code.
2 parents c2f14e6 + d155ee7 commit 561a319

File tree

21 files changed

+48
-36
lines changed

21 files changed

+48
-36
lines changed

apis/c++/node/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ pub struct DoraEvent(Option<Event>);
144144
fn event_type(event: &DoraEvent) -> ffi::DoraEventType {
145145
match &event.0 {
146146
Some(event) => match event {
147-
Event::Stop => ffi::DoraEventType::Stop,
147+
Event::Stop(_) => ffi::DoraEventType::Stop,
148148
Event::Input { .. } => ffi::DoraEventType::Input,
149149
Event::InputClosed { .. } => ffi::DoraEventType::InputClosed,
150150
Event::Error(_) => ffi::DoraEventType::Error,

apis/c/node/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ pub unsafe extern "C" fn dora_next_event(context: *mut c_void) -> *mut c_void {
9191
pub unsafe extern "C" fn read_dora_event_type(event: *const ()) -> EventType {
9292
let event: &Event = unsafe { &*event.cast() };
9393
match event {
94-
Event::Stop => EventType::Stop,
94+
Event::Stop(_) => EventType::Stop,
9595
Event::Input { .. } => EventType::Input,
9696
Event::InputClosed { .. } => EventType::InputClosed,
9797
Event::Error(_) => EventType::Error,

apis/python/operator/src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
use arrow::pyarrow::ToPyArrow;
77
use dora_node_api::{
88
merged::{MergeExternalSend, MergedEvent},
9-
DoraNode, Event, EventStream, Metadata, MetadataParameters, Parameter,
9+
DoraNode, Event, EventStream, Metadata, MetadataParameters, Parameter, StopCause,
1010
};
1111
use eyre::{Context, Result};
1212
use futures::{Stream, StreamExt};
@@ -146,7 +146,7 @@ impl PyEvent {
146146

147147
fn ty(event: &Event) -> &str {
148148
match event {
149-
Event::Stop => "STOP",
149+
Event::Stop(_) => "STOP",
150150
Event::Input { .. } => "INPUT",
151151
Event::InputClosed { .. } => "INPUT_CLOSED",
152152
Event::Error(_) => "ERROR",
@@ -158,6 +158,11 @@ impl PyEvent {
158158
match event {
159159
Event::Input { id, .. } => Some(id),
160160
Event::InputClosed { id } => Some(id),
161+
Event::Stop(cause) => match cause {
162+
StopCause::Manual => Some("MANUAL"),
163+
StopCause::AllInputsClosed => Some("ALL_INPUTS_CLOSED"),
164+
&_ => None,
165+
},
161166
_ => None,
162167
}
163168
}

apis/rust/node/src/event_stream/event.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use shared_memory_extended::{Shmem, ShmemConf};
1010
#[derive(Debug)]
1111
#[non_exhaustive]
1212
pub enum Event {
13-
Stop,
13+
Stop(StopCause),
1414
Reload {
1515
operator_id: Option<OperatorId>,
1616
},
@@ -25,6 +25,13 @@ pub enum Event {
2525
Error(String),
2626
}
2727

28+
#[derive(Debug, Clone)]
29+
#[non_exhaustive]
30+
pub enum StopCause {
31+
Manual,
32+
AllInputsClosed,
33+
}
34+
2835
pub enum RawData {
2936
Empty,
3037
Vec(AVec<u8, ConstAlign<128>>),

apis/rust/node/src/event_stream/mod.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use dora_message::{
1111
node_to_daemon::{DaemonRequest, Timestamped},
1212
DataflowId,
1313
};
14-
pub use event::{Event, MappedInputData, RawData};
14+
pub use event::{Event, MappedInputData, RawData, StopCause};
1515
use futures::{
1616
future::{select, Either},
1717
Stream, StreamExt,
@@ -199,7 +199,7 @@ impl EventStream {
199199
fn convert_event_item(item: EventItem) -> Event {
200200
match item {
201201
EventItem::NodeEvent { event, ack_channel } => match event {
202-
NodeEvent::Stop => Event::Stop,
202+
NodeEvent::Stop => Event::Stop(event::StopCause::Manual),
203203
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
204204
NodeEvent::InputClosed { id } => Event::InputClosed { id },
205205
NodeEvent::Input { id, metadata, data } => {
@@ -234,13 +234,7 @@ impl EventStream {
234234
Err(err) => Event::Error(format!("{err:?}")),
235235
}
236236
}
237-
NodeEvent::AllInputsClosed => {
238-
let err = eyre!(
239-
"received `AllInputsClosed` event, which should be handled by background task"
240-
);
241-
tracing::error!("{err:?}");
242-
Event::Error(err.wrap_err("internal error").to_string())
243-
}
237+
NodeEvent::AllInputsClosed => Event::Stop(event::StopCause::AllInputsClosed),
244238
},
245239

246240
EventItem::FatalError(err) => {

apis/rust/node/src/event_stream/thread.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ fn event_stream_loop(
9292
clock: Arc<uhlc::HLC>,
9393
) {
9494
let mut tx = Some(tx);
95+
let mut close_tx = false;
9596
let mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)> = Vec::new();
9697
let mut drop_tokens = Vec::new();
9798

@@ -135,10 +136,8 @@ fn event_stream_loop(
135136
data: Some(data), ..
136137
} => data.drop_token(),
137138
NodeEvent::AllInputsClosed => {
138-
// close the event stream
139-
tx = None;
140-
// skip this internal event
141-
continue;
139+
close_tx = true;
140+
None
142141
}
143142
_ => None,
144143
};
@@ -166,6 +165,10 @@ fn event_stream_loop(
166165
} else {
167166
tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`");
168167
}
168+
169+
if close_tx {
170+
tx = None;
171+
};
169172
}
170173
};
171174
if let Err(err) = result {

apis/rust/node/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub use dora_message::{
2020
metadata::{Metadata, MetadataParameters, Parameter},
2121
DataflowId,
2222
};
23-
pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData};
23+
pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData, StopCause};
2424
pub use flume::Receiver;
2525
pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD};
2626

binaries/daemon/src/spawn.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ pub async fn spawn_node(
540540
// If log is an output, we're sending the logs to the dataflow
541541
if let Some(stdout_output_name) = &send_stdout_to {
542542
// Convert logs to DataMessage
543-
let array = message.into_arrow();
543+
let array = message.as_str().into_arrow();
544544

545545
let array: ArrayData = array.into();
546546
let total_len = required_data_size(&array);

binaries/runtime/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,10 @@ async fn run(
232232
}
233233
}
234234
}
235-
RuntimeEvent::Event(Event::Stop) => {
235+
RuntimeEvent::Event(Event::Stop(cause)) => {
236236
// forward stop event to all operators and close the event channels
237237
for (_, channel) in operator_channels.drain() {
238-
let _ = channel.send_async(Event::Stop).await;
238+
let _ = channel.send_async(Event::Stop(cause.clone())).await;
239239
}
240240
}
241241
RuntimeEvent::Event(Event::Reload {

binaries/runtime/src/operator/shared_lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
182182
}
183183

184184
let mut operator_event = match event {
185-
Event::Stop => dora_operator_api_types::RawEvent {
185+
Event::Stop(_) => dora_operator_api_types::RawEvent {
186186
input: None,
187187
input_closed: None,
188188
stop: true,

0 commit comments

Comments
 (0)