Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,7 @@ mod distributor {
C: Accountable + ContainerBytes + Send + 'static,
D: Distributor<C> + 'static,
{
type Pusher = Exchange<
T,
LogPusher<Box<dyn Push<Message<T, C>>>>,
D
>;
type Pusher = Exchange<T, LogPusher<Box<dyn Push<Message<T, C>>>>, D>;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, sorry. But seems great! :D

type Puller = LogPuller<Box<dyn Pull<Message<T, C>>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
Expand Down
11 changes: 4 additions & 7 deletions timely/src/dataflow/channels/pushers/counter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! A wrapper which counts the number of records pushed past and updates a shared count map.

use std::marker::PhantomData;
use std::rc::Rc;
use std::cell::RefCell;

Expand All @@ -11,13 +10,12 @@ use crate::Accountable;

/// A wrapper which updates shared `produced` based on the number of records pushed.
#[derive(Debug)]
pub struct Counter<T, C, P: Push<Message<T, C>>> {
pub struct Counter<T, P> {
pushee: P,
produced: Rc<RefCell<ChangeBatch<T>>>,
phantom: PhantomData<C>,
}

impl<T: Timestamp, C: Accountable, P> Push<Message<T, C>> for Counter<T, C, P> where P: Push<Message<T, C>> {
impl<T: Timestamp, C: Accountable, P> Push<Message<T, C>> for Counter<T, P> where P: Push<Message<T, C>> {
#[inline]
fn push(&mut self, message: &mut Option<Message<T, C>>) {
if let Some(message) = message {
Expand All @@ -31,13 +29,12 @@ impl<T: Timestamp, C: Accountable, P> Push<Message<T, C>> for Counter<T, C, P> w
}
}

impl<T, C, P: Push<Message<T, C>>> Counter<T, C, P> where T : Ord+Clone+'static {
impl<T, P> Counter<T, P> where T : Ord+Clone+'static {
/// Allocates a new `Counter` from a pushee and shared counts.
pub fn new(pushee: P) -> Counter<T, C, P> {
pub fn new(pushee: P) -> Counter<T, P> {
Counter {
pushee,
produced: Rc::new(RefCell::new(ChangeBatch::new())),
phantom: PhantomData,
}
}
/// A references to shared changes in counts, for cloning or draining.
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/enterleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<G: Scope, C: Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for


struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container> {
targets: Counter<TInner, TContainer, Tee<TInner, TContainer>>,
targets: Counter<TInner, Tee<TInner, TContainer>>,
phantom: ::std::marker::PhantomData<TOuter>,
activator: crate::scheduling::Activator,
active: bool,
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
pub struct Handle<T: Timestamp, CB: ContainerBuilder> {
activate: Vec<Activator>,
progress: Vec<Rc<RefCell<ChangeBatch<T>>>>,
pushers: Vec<Counter<T, CB::Container, Tee<T, CB::Container>>>,
pushers: Vec<Counter<T, Tee<T, CB::Container>>>,
builder: CB,
buffer: CB::Container,
now_at: T,
Expand Down Expand Up @@ -341,7 +341,7 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {

fn register(
&mut self,
pusher: Counter<T, CB::Container, Tee<T, CB::Container>>,
pusher: Counter<T, Tee<T, CB::Container>>,
progress: Rc<RefCell<ChangeBatch<T>>>,
) {
// flush current contents, so new registrant does not see existing data.
Expand Down Expand Up @@ -380,7 +380,7 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
fn send_container(
container: &mut CB::Container,
buffer: &mut CB::Container,
pushers: &mut [Counter<T, CB::Container, Tee<T, CB::Container>>],
pushers: &mut [Counter<T, Tee<T, CB::Container>>],
now_at: &T
) {
for index in 0 .. pushers.len() {
Expand Down
8 changes: 4 additions & 4 deletions timely/src/dataflow/operators/core/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,27 +148,27 @@ impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
/// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation.
#[derive(Debug)]
pub struct UnorderedHandle<T: Timestamp, CB: ContainerBuilder> {
buffer: PushBuffer<T, CB, Counter<T, CB::Container, Tee<T, CB::Container>>>,
buffer: PushBuffer<T, CB, Counter<T, Tee<T, CB::Container>>>,
}

impl<T: Timestamp, CB: ContainerBuilder> UnorderedHandle<T, CB> {
fn new(pusher: Counter<T, CB::Container, Tee<T, CB::Container>>) -> UnorderedHandle<T, CB> {
fn new(pusher: Counter<T, Tee<T, CB::Container>>) -> UnorderedHandle<T, CB> {
UnorderedHandle {
buffer: PushBuffer::new(pusher),
}
}

/// Allocates a new automatically flushing session based on the supplied capability.
#[inline]
pub fn session_with_builder(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<'_, T, CB, Counter<T, CB::Container, Tee<T, CB::Container>>>> {
pub fn session_with_builder(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<'_, T, CB, Counter<T, Tee<T, CB::Container>>>> {
ActivateOnDrop::new(self.buffer.autoflush_session_with_builder(cap.capability.clone()), Rc::clone(&cap.address), Rc::clone(&cap.activations))
}
}

impl<T: Timestamp, C: Container> UnorderedHandle<T, CapacityContainerBuilder<C>> {
/// Allocates a new automatically flushing session based on the supplied capability.
#[inline]
pub fn session(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<'_, T, CapacityContainerBuilder<C>, Counter<T, C, Tee<T, C>>>> {
pub fn session(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<'_, T, CapacityContainerBuilder<C>, Counter<T, Tee<T, C>>>> {
self.session_with_builder(cap)
}
}
10 changes: 5 additions & 5 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,14 @@ pub fn new_input_handle<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>>(
/// pusher is flushed (via the `cease` method) once it is no longer used.
#[derive(Debug)]
pub struct OutputWrapper<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> {
push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>,
push_buffer: Buffer<T, CB, PushCounter<T, P>>,
internal_buffer: Rc<RefCell<ChangeBatch<T>>>,
port: usize,
}

impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> OutputWrapper<T, CB, P> {
/// Creates a new output wrapper from a push buffer.
pub fn new(push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>, port: usize) -> Self {
pub fn new(push_buffer: Buffer<T, CB, PushCounter<T, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>, port: usize) -> Self {
OutputWrapper {
push_buffer,
internal_buffer,
Expand All @@ -188,7 +188,7 @@ impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Out

/// Handle to an operator's output stream.
pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push<Message<T, CB::Container>>+'a> {
push_buffer: &'a mut Buffer<T, CB, PushCounter<T, CB::Container, P>>,
push_buffer: &'a mut Buffer<T, CB, PushCounter<T, P>>,
internal_buffer: &'a Rc<RefCell<ChangeBatch<T>>>,
port: usize,
}
Expand Down Expand Up @@ -220,7 +220,7 @@ impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>>
/// });
/// });
/// ```
pub fn session_with_builder<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter<T, CB::Container, P>> where 'a: 'b {
pub fn session_with_builder<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter<T, P>> where 'a: 'b {
debug_assert!(cap.valid_for_output(self.internal_buffer, self.port), "Attempted to open output session with invalid capability");
self.push_buffer.session_with_builder(cap.time())
}
Expand Down Expand Up @@ -255,7 +255,7 @@ impl<'a, T: Timestamp, C: Container, P: Push<Message<T, C>>> OutputHandleCore<'a
/// });
/// ```
#[inline]
pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CapacityContainerBuilder<C>, PushCounter<T, C, P>> where 'a: 'b {
pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CapacityContainerBuilder<C>, PushCounter<T, P>> where 'a: 'b {
self.session_with_builder(cap)
}
}
Expand Down
Loading