diff --git a/Cargo.toml b/Cargo.toml index 2d0b3b84e..f2633be76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ edition = "2021" [workspace.dependencies] columnar = "0.6" +serde = { version = "1.0", features = ["derive", "rc"] } [workspace.lints.clippy] type_complexity = "allow" diff --git a/communication/Cargo.toml b/communication/Cargo.toml index 925ffd9f0..40f189f2b 100644 --- a/communication/Cargo.toml +++ b/communication/Cargo.toml @@ -23,7 +23,7 @@ default = ["getopts"] columnar = { workspace = true } getopts = { version = "0.2.21", optional = true } byteorder = "1.5" -serde = { version = "1.0", features = ["derive"] } +serde = { workspace = true } timely_bytes = { path = "../bytes", version = "0.13" } timely_container = { path = "../container", version = "0.15.1" } timely_logging = { path = "../logging", version = "0.13" } diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 6b589738a..379764d3b 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -27,7 +27,7 @@ columnation = "0.1" getopts = { version = "0.2.21", optional = true } bincode = { version = "1.0" } byteorder = "1.5" -serde = { version = "1.0", features = ["derive"] } +serde = { workspace = true } timely_bytes = { path = "../bytes", version = "0.13" } timely_logging = { path = "../logging", version = "0.13" } timely_communication = { path = "../communication", version = "0.19", default-features = false } diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 6a351753b..694ca440b 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -136,6 +136,20 @@ mod implementations { } } + impl ContainerBytes for std::sync::Arc { + fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self { + std::sync::Arc::new(T::from_bytes(bytes)) + } + + fn length_in_bytes(&self) -> usize { + self.as_ref().length_in_bytes() + } + + fn into_bytes(&self, writer: &mut W) { + self.as_ref().into_bytes(writer); + } + } + use write_counter::WriteCounter; /// A `Write` wrapper that counts the bytes written. mod write_counter { diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 55404015c..cdb6f2ef5 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -44,6 +44,24 @@ impl ParallelizationContract for Pipel } } +/// A broadcast connection +#[derive(Debug)] +pub struct Broadcast; + +impl ParallelizationContract for Broadcast +where + T: Timestamp, + C: Container + Data + Send + crate::dataflow::channels::ContainerBytes, +{ + type Pusher = LogPusher>>>; + type Puller = LogPuller>>>; + fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { + let (pusher, puller) = allocator.broadcast::>(identifier, address); + (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()), + LogPuller::new(puller, allocator.index(), identifier, logging)) + } +} + /// An exchange between multiple observers by data pub struct ExchangeCore { hash_func: F, phantom: PhantomData } diff --git a/timely/src/dataflow/operators/broadcast.rs b/timely/src/dataflow/operators/broadcast.rs deleted file mode 100644 index 21d80d6da..000000000 --- a/timely/src/dataflow/operators/broadcast.rs +++ /dev/null @@ -1,35 +0,0 @@ -//! Broadcast records to all workers. - -use crate::ExchangeData; -use crate::dataflow::{Stream, Scope}; -use crate::dataflow::operators::{Map, Exchange}; - -/// Broadcast records to all workers. -pub trait Broadcast { - /// Broadcast records to all workers. - /// - /// # Examples - /// ``` - /// use timely::dataflow::operators::{ToStream, Broadcast, Inspect}; - /// - /// timely::example(|scope| { - /// (0..10).to_stream(scope) - /// .broadcast() - /// .inspect(|x| println!("seen: {:?}", x)); - /// }); - /// ``` - fn broadcast(&self) -> Self; -} - -impl Broadcast for Stream { - fn broadcast(&self) -> Stream { - - // NOTE: Simplified implementation due to underlying motion - // in timely dataflow internals. Optimize once they have - // settled down. - let peers = self.scope().peers() as u64; - self.flat_map(move |x| (0 .. peers).map(move |i| (i,x.clone()))) - .exchange(|ix| ix.0) - .map(|(_i,x)| x) - } -} diff --git a/timely/src/dataflow/operators/core/broadcast.rs b/timely/src/dataflow/operators/core/broadcast.rs new file mode 100644 index 000000000..7394e8a66 --- /dev/null +++ b/timely/src/dataflow/operators/core/broadcast.rs @@ -0,0 +1,61 @@ +//! Broadcast records to all workers. + +use crate::ExchangeData; +use crate::dataflow::{StreamCore, Scope}; +use crate::dataflow::channels::ContainerBytes; +use crate::dataflow::operators::Operator; + +/// Broadcast records to all workers. +pub trait Broadcast { + /// Broadcast records to all workers. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Broadcast, Inspect}; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .broadcast() + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn broadcast(&self) -> Self; +} + +impl Broadcast for StreamCore { + fn broadcast(&self) -> StreamCore { + self.unary(crate::dataflow::channels::pact::Broadcast, "Broadcast", |_, _| { + // This operator does not need to do anything, as the + // broadcast pact will handle the distribution of data. + move |input, output| { + input.for_each(|time, data| { + output.session(&time).give_container(data); + }); + } + }) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + use crate::dataflow::channels::pact::Pipeline; + use crate::dataflow::operators::Operator; + + #[test] + fn test_broadcast() { + use crate::dataflow::operators::{ToStream, Broadcast, Inspect}; + crate::example(|scope| { + (0..10).to_stream(scope) + .unary(Pipeline, "ToArc", |_, _| { + move |input, output| { + input.for_each(|time, data| { + output.session(&time).give_container(&mut Arc::new(std::mem::take(data))); + }); + } + }) + .broadcast() + .inspect(|x| println!("seen: {:?}", x)); + }); + } +} \ No newline at end of file diff --git a/timely/src/dataflow/operators/core/mod.rs b/timely/src/dataflow/operators/core/mod.rs index 3674fc49e..3c502f95c 100644 --- a/timely/src/dataflow/operators/core/mod.rs +++ b/timely/src/dataflow/operators/core/mod.rs @@ -1,6 +1,7 @@ //! Extension traits for `StreamCore` implementing various operators that //! are independent of specific container types. +pub mod broadcast; pub mod capture; pub mod concat; pub mod enterleave; diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 2644e7ce0..2d1aa1a6f 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -42,7 +42,8 @@ pub use self::core::inspect; pub mod filter; pub mod delay; pub use self::core::exchange; -pub mod broadcast; +// Backwards compatibility with old names. +pub use core::broadcast; pub use self::core::probe::{self, Probe}; pub mod to_stream; pub use self::core::capture::{self, Capture};