Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ edition = "2021"

[workspace.dependencies]
columnar = "0.6"
serde = { version = "1.0", features = ["derive", "rc"] }

[workspace.lints.clippy]
type_complexity = "allow"
Expand Down
2 changes: 1 addition & 1 deletion communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ default = ["getopts"]
columnar = { workspace = true }
getopts = { version = "0.2.21", optional = true }
byteorder = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde = { workspace = true }
timely_bytes = { path = "../bytes", version = "0.13" }
timely_container = { path = "../container", version = "0.15.1" }
timely_logging = { path = "../logging", version = "0.13" }
Expand Down
2 changes: 1 addition & 1 deletion timely/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ columnation = "0.1"
getopts = { version = "0.2.21", optional = true }
bincode = { version = "1.0" }
byteorder = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde = { workspace = true }
timely_bytes = { path = "../bytes", version = "0.13" }
timely_logging = { path = "../logging", version = "0.13" }
timely_communication = { path = "../communication", version = "0.19", default-features = false }
Expand Down
14 changes: 14 additions & 0 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,20 @@ mod implementations {
}
}

impl<T: ContainerBytes> ContainerBytes for std::sync::Arc<T> {
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
std::sync::Arc::new(T::from_bytes(bytes))
}

fn length_in_bytes(&self) -> usize {
self.as_ref().length_in_bytes()
}

fn into_bytes<W: Write>(&self, writer: &mut W) {
self.as_ref().into_bytes(writer);
}
}

use write_counter::WriteCounter;
/// A `Write` wrapper that counts the bytes written.
mod write_counter {
Expand Down
18 changes: 18 additions & 0 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ impl<T: 'static, C: Container + 'static> ParallelizationContract<T, C> for Pipel
}
}

/// A broadcast connection
#[derive(Debug)]
pub struct Broadcast;

impl<T, C> ParallelizationContract<T, C> for Broadcast
where
T: Timestamp,
C: Container + Data + Send + crate::dataflow::channels::ContainerBytes,
{
type Pusher = LogPusher<T, C, Box<dyn Push<Message<T, C>>>>;
type Puller = LogPuller<T, C, Box<dyn Pull<Message<T, C>>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.broadcast::<Message<T, C>>(identifier, address);
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging))
}
}

/// An exchange between multiple observers by data
pub struct ExchangeCore<CB, F> { hash_func: F, phantom: PhantomData<CB> }

Expand Down
35 changes: 0 additions & 35 deletions timely/src/dataflow/operators/broadcast.rs

This file was deleted.

61 changes: 61 additions & 0 deletions timely/src/dataflow/operators/core/broadcast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//! Broadcast records to all workers.

use crate::ExchangeData;
use crate::dataflow::{StreamCore, Scope};
use crate::dataflow::channels::ContainerBytes;
use crate::dataflow::operators::Operator;

/// Broadcast records to all workers.
pub trait Broadcast<D: ExchangeData> {
/// Broadcast records to all workers.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Broadcast, Inspect};
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .broadcast()
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn broadcast(&self) -> Self;
}

impl<G: Scope, C: crate::Container + Send + ExchangeData + ContainerBytes> Broadcast<C> for StreamCore<G, C> {
fn broadcast(&self) -> StreamCore<G, C> {
self.unary(crate::dataflow::channels::pact::Broadcast, "Broadcast", |_, _| {
// This operator does not need to do anything, as the
// broadcast pact will handle the distribution of data.
move |input, output| {
input.for_each(|time, data| {
output.session(&time).give_container(data);
});
}
})
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::Operator;

#[test]
fn test_broadcast() {
use crate::dataflow::operators::{ToStream, Broadcast, Inspect};
crate::example(|scope| {
(0..10).to_stream(scope)
.unary(Pipeline, "ToArc", |_, _| {
move |input, output| {
input.for_each(|time, data| {
output.session(&time).give_container(&mut Arc::new(std::mem::take(data)));
});
}
})
.broadcast()
.inspect(|x| println!("seen: {:?}", x));
});
}
}
1 change: 1 addition & 0 deletions timely/src/dataflow/operators/core/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Extension traits for `StreamCore` implementing various operators that
//! are independent of specific container types.

pub mod broadcast;
pub mod capture;
pub mod concat;
pub mod enterleave;
Expand Down
3 changes: 2 additions & 1 deletion timely/src/dataflow/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ pub use self::core::inspect;
pub mod filter;
pub mod delay;
pub use self::core::exchange;
pub mod broadcast;
// Backwards compatibility with old names.
pub use core::broadcast;
pub use self::core::probe::{self, Probe};
pub mod to_stream;
pub use self::core::capture::{self, Capture};
Expand Down
Loading