Skip to content

Commit 2081554

Browse files
authored
Allow event iterators to surface owned data (#627)
* Allow event iterators to surface owned data Previously, event iterators were forced to handing out references to data, even if they could return owned data. This is not great because it requires the replay operator to clone the data to send it downstream. With this change, the event iterator can surface either owned or shared data, which allows the `Rc<EventLink>` to surface owned data when it is uniquely owned, and references when it is shared. This avoids cloning data when there is only a single replay operator attached to an event link. Signed-off-by: Moritz Hoffmann <[email protected]> * Cow instead of Result Signed-off-by: Moritz Hoffmann <[email protected]> --------- Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 09994a8 commit 2081554

File tree

2 files changed

+27
-12
lines changed

2 files changed

+27
-12
lines changed

timely/src/dataflow/operators/core/capture/event.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ pub enum Event<T, C> {
1818

1919
/// Iterates over contained `Event<T, C>`.
2020
///
21-
/// The `EventIterator` trait describes types that can iterate over references to events,
21+
/// The `EventIterator` trait describes types that can iterate over `Cow`s of events,
2222
/// and which can be used to replay a stream into a new timely dataflow computation.
2323
///
2424
/// This method is not simply an iterator because of the lifetime in the result.
25-
pub trait EventIterator<T, C> {
26-
/// Iterates over references to `Event<T, C>` elements.
27-
fn next(&mut self) -> Option<&Event<T, C>>;
25+
pub trait EventIterator<T: Clone, C: Clone> {
26+
/// Iterates over `Cow<Event<T, C>>` elements.
27+
fn next(&mut self) -> Option<std::borrow::Cow<Event<T, C>>>;
2828
}
2929

3030
/// Receives `Event<T, C>` events.
@@ -45,6 +45,7 @@ impl<T, C> EventPusher<T, C> for ::std::sync::mpsc::Sender<Event<T, C>> {
4545
/// A linked-list event pusher and iterator.
4646
pub mod link {
4747

48+
use std::borrow::Cow;
4849
use std::rc::Rc;
4950
use std::cell::RefCell;
5051

@@ -77,13 +78,18 @@ pub mod link {
7778
}
7879
}
7980

80-
impl<T, C> EventIterator<T, C> for Rc<EventLink<T, C>> {
81-
fn next(&mut self) -> Option<&Event<T, C>> {
81+
impl<T: Clone, C: Clone> EventIterator<T, C> for Rc<EventLink<T, C>> {
82+
fn next(&mut self) -> Option<Cow<Event<T, C>>> {
8283
let is_some = self.next.borrow().is_some();
8384
if is_some {
8485
let next = self.next.borrow().as_ref().unwrap().clone();
8586
*self = next;
86-
self.event.as_ref()
87+
if let Some(this) = Rc::get_mut(self) {
88+
this.event.take().map(Cow::Owned)
89+
}
90+
else {
91+
self.event.as_ref().map(Cow::Borrowed)
92+
}
8793
}
8894
else {
8995
None
@@ -121,6 +127,8 @@ pub mod link {
121127
/// A binary event pusher and iterator.
122128
pub mod binary {
123129

130+
use std::borrow::Cow;
131+
124132
use serde::{de::DeserializeOwned, Serialize};
125133

126134
use super::{Event, EventPusher, EventIterator};
@@ -164,10 +172,10 @@ pub mod binary {
164172
}
165173
}
166174

167-
impl<T: DeserializeOwned, C: DeserializeOwned, R: ::std::io::Read> EventIterator<T, C> for EventReader<T, C, R> {
168-
fn next(&mut self) -> Option<&Event<T, C>> {
175+
impl<T: DeserializeOwned + Clone, C: DeserializeOwned + Clone, R: ::std::io::Read> EventIterator<T, C> for EventReader<T, C, R> {
176+
fn next(&mut self) -> Option<Cow<Event<T, C>>> {
169177
self.decoded = ::bincode::deserialize_from(&mut self.reader).ok();
170-
self.decoded.as_ref()
178+
self.decoded.take().map(Cow::Owned)
171179
}
172180
}
173181
}

timely/src/dataflow/operators/core/capture/replay.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,18 @@ where
9494

9595
for event_stream in event_streams.iter_mut() {
9696
while let Some(event) = event_stream.next() {
97+
use std::borrow::Cow::*;
9798
match event {
98-
Event::Progress(vec) => {
99+
Owned(Event::Progress(vec)) => {
100+
progress.internals[0].extend(vec.into_iter());
101+
},
102+
Owned(Event::Messages(time, mut data)) => {
103+
output.session(&time).give_container(&mut data);
104+
}
105+
Borrowed(Event::Progress(vec)) => {
99106
progress.internals[0].extend(vec.iter().cloned());
100107
},
101-
Event::Messages(ref time, data) => {
108+
Borrowed(Event::Messages(time, data)) => {
102109
allocation.clone_from(data);
103110
output.session(time).give_container(&mut allocation);
104111
}

0 commit comments

Comments
 (0)