Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 0 additions & 54 deletions communication/src/message.rs

This file was deleted.

29 changes: 23 additions & 6 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ pub mod pullers;
/// Parallelization contracts, describing how data must be exchanged between operators.
pub mod pact;

/// The input to and output from timely dataflow communication channels.
pub type Bundle<T, C> = crate::Message<Message<T, C>>;

/// A serializable representation of timestamped data.
#[derive(Clone, Serialize, Deserialize)]
pub struct Message<T, C> {
Expand Down Expand Up @@ -44,17 +41,37 @@ impl<T, C: Container> Message<T, C> {
/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or the container's default element. The buffer is cleared.
#[inline]
pub fn push_at<P: Push<Bundle<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {

let data = ::std::mem::take(buffer);
let message = Message::new(time, data, 0, 0);
let mut bundle = Some(Bundle::from_typed(message));
let mut bundle = Some(message);

pusher.push(&mut bundle);

if let Some(message) = bundle {
*buffer = message.payload.data;
*buffer = message.data;
buffer.clear();
}
}
}

// Instructions for serialization of `Message`.
// Intended to swap out the constraint on `C` for `C: Bytesable`.
impl<T, C> crate::communication::Bytesable for Message<T, C>
where
T: Serialize + for<'a> Deserialize<'a>,
C: Serialize + for<'a> Deserialize<'a>,
{
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
}

fn length_in_bytes(&self) -> usize {
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
}
}
38 changes: 19 additions & 19 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
use crate::communication::{Push, Pull};
use crate::container::PushPartitioned;
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
use crate::dataflow::channels::Bundle;
use crate::dataflow::channels::Message;
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
use crate::progress::Timestamp;
use crate::worker::AsWorker;
Expand All @@ -24,9 +24,9 @@ use crate::ExchangeData;
/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T, C> {
/// Type implementing `Push` produced by this pact.
type Pusher: Push<Bundle<T, C>>+'static;
type Pusher: Push<Message<T, C>>+'static;
/// Type implementing `Pull` produced by this pact.
type Puller: Pull<Bundle<T, C>>+'static;
type Puller: Pull<Message<T, C>>+'static;
/// Allocates a matched pair of push and pull endpoints implementing the pact.
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}
Expand All @@ -36,10 +36,10 @@ pub trait ParallelizationContract<T, C> {
pub struct Pipeline;

impl<T: 'static, C: Container> ParallelizationContract<T, C> for Pipeline {
type Pusher = LogPusher<T, C, ThreadPusher<Bundle<T, C>>>;
type Puller = LogPuller<T, C, ThreadPuller<Bundle<T, C>>>;
type Pusher = LogPusher<T, C, ThreadPusher<Message<T, C>>>;
type Puller = LogPuller<T, C, ThreadPuller<Message<T, C>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.pipeline::<Bundle<T, C>>(identifier, address);
let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging))
}
Expand Down Expand Up @@ -71,11 +71,11 @@ where
C: ExchangeData + PushPartitioned,
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Bundle<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<Bundle<T, C>>>>;
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Message<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<Message<T, C>>>>;

fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Bundle<T, C>>(identifier, address);
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
(ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
}
Expand All @@ -89,7 +89,7 @@ impl<C, F> Debug for ExchangeCore<C, F> {

/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPusher<T, C, P: Push<Bundle<T, C>>> {
pub struct LogPusher<T, C, P: Push<Message<T, C>>> {
pusher: P,
channel: usize,
counter: usize,
Expand All @@ -99,7 +99,7 @@ pub struct LogPusher<T, C, P: Push<Bundle<T, C>>> {
logging: Option<Logger>,
}

impl<T, C, P: Push<Bundle<T, C>>> LogPusher<T, C, P> {
impl<T, C, P: Push<Message<T, C>>> LogPusher<T, C, P> {
/// Allocates a new pusher.
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPusher {
Expand All @@ -114,16 +114,16 @@ impl<T, C, P: Push<Bundle<T, C>>> LogPusher<T, C, P> {
}
}

impl<T, C: Container, P: Push<Bundle<T, C>>> Push<Bundle<T, C>> for LogPusher<T, C, P> {
impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<T, C, P> {
#[inline]
fn push(&mut self, pair: &mut Option<Bundle<T, C>>) {
fn push(&mut self, pair: &mut Option<Message<T, C>>) {
if let Some(bundle) = pair {
self.counter += 1;

// Stamp the sequence number and source.
// FIXME: Awkward moment/logic.
bundle.payload.seq = self.counter - 1;
bundle.payload.from = self.source;
bundle.seq = self.counter - 1;
bundle.from = self.source;

if let Some(logger) = self.logging.as_ref() {
logger.log(MessagesEvent {
Expand All @@ -143,15 +143,15 @@ impl<T, C: Container, P: Push<Bundle<T, C>>> Push<Bundle<T, C>> for LogPusher<T,

/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPuller<T, C, P: Pull<Bundle<T, C>>> {
pub struct LogPuller<T, C, P: Pull<Message<T, C>>> {
puller: P,
channel: usize,
index: usize,
phantom: PhantomData<(T, C)>,
logging: Option<Logger>,
}

impl<T, C, P: Pull<Bundle<T, C>>> LogPuller<T, C, P> {
impl<T, C, P: Pull<Message<T, C>>> LogPuller<T, C, P> {
/// Allocates a new `Puller`.
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPuller {
Expand All @@ -164,9 +164,9 @@ impl<T, C, P: Pull<Bundle<T, C>>> LogPuller<T, C, P> {
}
}

impl<T, C: Container, P: Pull<Bundle<T, C>>> Pull<Bundle<T, C>> for LogPuller<T, C, P> {
impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<T, C, P> {
#[inline]
fn pull(&mut self) -> &mut Option<Bundle<T, C>> {
fn pull(&mut self) -> &mut Option<Message<T, C>> {
let result = self.puller.pull();
if let Some(bundle) = result {
let channel = self.channel;
Expand Down
12 changes: 6 additions & 6 deletions timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
use std::rc::Rc;
use std::cell::RefCell;

use crate::dataflow::channels::Bundle;
use crate::dataflow::channels::Message;
use crate::progress::ChangeBatch;
use crate::communication::Pull;
use crate::Container;

/// A wrapper which accounts records pulled past in a shared count map.
pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Bundle<T, C>>> {
pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Message<T, C>>> {
pullable: P,
consumed: Rc<RefCell<ChangeBatch<T>>>,
phantom: ::std::marker::PhantomData<C>,
Expand All @@ -36,15 +36,15 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
}
}

impl<T:Ord+Clone+'static, C: Container, P: Pull<Bundle<T, C>>> Counter<T, C, P> {
impl<T:Ord+Clone+'static, C: Container, P: Pull<Message<T, C>>> Counter<T, C, P> {
/// Retrieves the next timestamp and batch of data.
#[inline]
pub fn next(&mut self) -> Option<&mut Bundle<T, C>> {
pub fn next(&mut self) -> Option<&mut Message<T, C>> {
self.next_guarded().map(|(_guard, bundle)| bundle)
}

#[inline]
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Bundle<T, C>)> {
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Message<T, C>)> {
if let Some(message) = self.pullable.pull() {
let guard = ConsumedGuard {
consumed: Rc::clone(&self.consumed),
Expand All @@ -57,7 +57,7 @@ impl<T:Ord+Clone+'static, C: Container, P: Pull<Bundle<T, C>>> Counter<T, C, P>
}
}

impl<T:Ord+Clone+'static, C, P: Pull<Bundle<T, C>>> Counter<T, C, P> {
impl<T:Ord+Clone+'static, C, P: Pull<Message<T, C>>> Counter<T, C, P> {
/// Allocates a new `Counter` from a boxed puller.
pub fn new(pullable: P) -> Self {
Counter {
Expand Down
24 changes: 12 additions & 12 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::communication::Push;
use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
use crate::dataflow::channels::{Bundle, Message};
use crate::dataflow::channels::Message;
use crate::dataflow::operators::Capability;
use crate::progress::Timestamp;
use crate::Container;
Expand Down Expand Up @@ -44,7 +44,7 @@ impl<T, CB: Default, P> Buffer<T, CB, P> {
}
}

impl<T, C: Container, P: Push<Bundle<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
impl<T, C: Container, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
/// Returns a `Session`, which accepts data to send at the associated time
#[inline]
pub fn session(&mut self, time: &T) -> Session<T, CapacityContainerBuilder<C>, P> {
Expand All @@ -66,7 +66,7 @@ impl<T, C: Container, P: Push<Bundle<T, C>>> Buffer<T, CapacityContainerBuilder<
}
}

impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
/// Returns a `Session`, which accepts data to send at the associated time
pub fn session_with_builder(&mut self, time: &T) -> Session<T, CB, P> {
if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); }
Expand All @@ -85,7 +85,7 @@ impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P
}
}

impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
/// Flushes all data and pushes a `None` to `self.pusher`, indicating a flush.
pub fn cease(&mut self) {
self.flush();
Expand Down Expand Up @@ -115,7 +115,7 @@ impl<T, CB, P, D> PushInto<D> for Buffer<T, CB, P>
where
T: Eq+Clone,
CB: ContainerBuilder + PushInto<D>,
P: Push<Bundle<T, CB::Container>>
P: Push<Message<T, CB::Container>>
{
#[inline]
fn push_into(&mut self, item: D) {
Expand All @@ -136,7 +136,7 @@ pub struct Session<'a, T, CB, P> {
impl<'a, T, C: Container, P> Session<'a, T, CapacityContainerBuilder<C>, P>
where
T: Eq + Clone + 'a,
P: Push<Bundle<T, C>> + 'a,
P: Push<Message<T, C>> + 'a,
{
/// Provide a container at the time specified by the [Session].
pub fn give_container(&mut self, container: &mut C) {
Expand All @@ -148,7 +148,7 @@ impl<'a, T, CB, P> Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a
P: Push<Message<T, CB::Container>> + 'a
{
/// Access the builder. Immutable access to prevent races with flushing
/// the underlying buffer.
Expand Down Expand Up @@ -179,7 +179,7 @@ impl<'a, T, CB, P, D> PushInto<D> for Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + PushInto<D> + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
P: Push<Message<T, CB::Container>> + 'a,
{
#[inline]
fn push_into(&mut self, item: D) {
Expand All @@ -192,7 +192,7 @@ pub struct AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
P: Push<Message<T, CB::Container>> + 'a,
{
/// A reference to the underlying buffer.
buffer: &'a mut Buffer<T, CB, P>,
Expand All @@ -204,7 +204,7 @@ impl<'a, T, CB, P> AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
P: Push<Message<T, CB::Container>> + 'a,
{
/// Transmits a single record.
#[inline]
Expand All @@ -231,7 +231,7 @@ impl<'a, T, CB, P, D> PushInto<D> for AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + PushInto<D> + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
P: Push<Message<T, CB::Container>> + 'a,
{
#[inline]
fn push_into(&mut self, item: D) {
Expand All @@ -243,7 +243,7 @@ impl<'a, T, CB, P> Drop for AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
P: Push<Message<T, CB::Container>> + 'a,
{
fn drop(&mut self) {
self.buffer.cease();
Expand Down
Loading