Skip to content

Commit 04aefdd

Browse files
Remove Container argument from Counter (#714)
1 parent a263ae1 commit 04aefdd

File tree

6 files changed

+18
-25
lines changed

6 files changed

+18
-25
lines changed

timely/src/dataflow/channels/pact.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,7 @@ mod distributor {
110110
C: Accountable + ContainerBytes + Send + 'static,
111111
D: Distributor<C> + 'static,
112112
{
113-
type Pusher = Exchange<
114-
T,
115-
LogPusher<Box<dyn Push<Message<T, C>>>>,
116-
D
117-
>;
113+
type Pusher = Exchange<T, LogPusher<Box<dyn Push<Message<T, C>>>>, D>;
118114
type Puller = LogPuller<Box<dyn Pull<Message<T, C>>>>;
119115
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
120116
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);

timely/src/dataflow/channels/pushers/counter.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
//! A wrapper which counts the number of records pushed past and updates a shared count map.
22
3-
use std::marker::PhantomData;
43
use std::rc::Rc;
54
use std::cell::RefCell;
65

@@ -11,13 +10,12 @@ use crate::Accountable;
1110

1211
/// A wrapper which updates shared `produced` based on the number of records pushed.
1312
#[derive(Debug)]
14-
pub struct Counter<T, C, P: Push<Message<T, C>>> {
13+
pub struct Counter<T, P> {
1514
pushee: P,
1615
produced: Rc<RefCell<ChangeBatch<T>>>,
17-
phantom: PhantomData<C>,
1816
}
1917

20-
impl<T: Timestamp, C: Accountable, P> Push<Message<T, C>> for Counter<T, C, P> where P: Push<Message<T, C>> {
18+
impl<T: Timestamp, C: Accountable, P> Push<Message<T, C>> for Counter<T, P> where P: Push<Message<T, C>> {
2119
#[inline]
2220
fn push(&mut self, message: &mut Option<Message<T, C>>) {
2321
if let Some(message) = message {
@@ -31,13 +29,12 @@ impl<T: Timestamp, C: Accountable, P> Push<Message<T, C>> for Counter<T, C, P> w
3129
}
3230
}
3331

34-
impl<T, C, P: Push<Message<T, C>>> Counter<T, C, P> where T : Ord+Clone+'static {
32+
impl<T, P> Counter<T, P> where T : Ord+Clone+'static {
3533
/// Allocates a new `Counter` from a pushee and shared counts.
36-
pub fn new(pushee: P) -> Counter<T, C, P> {
34+
pub fn new(pushee: P) -> Counter<T, P> {
3735
Counter {
3836
pushee,
3937
produced: Rc::new(RefCell::new(ChangeBatch::new())),
40-
phantom: PhantomData,
4138
}
4239
}
4340
/// A references to shared changes in counts, for cloning or draining.

timely/src/dataflow/operators/core/enterleave.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl<G: Scope, C: Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for
131131

132132

133133
struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container> {
134-
targets: Counter<TInner, TContainer, Tee<TInner, TContainer>>,
134+
targets: Counter<TInner, Tee<TInner, TContainer>>,
135135
phantom: ::std::marker::PhantomData<TOuter>,
136136
activator: crate::scheduling::Activator,
137137
active: bool,

timely/src/dataflow/operators/core/input.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
218218
pub struct Handle<T: Timestamp, CB: ContainerBuilder> {
219219
activate: Vec<Activator>,
220220
progress: Vec<Rc<RefCell<ChangeBatch<T>>>>,
221-
pushers: Vec<Counter<T, CB::Container, Tee<T, CB::Container>>>,
221+
pushers: Vec<Counter<T, Tee<T, CB::Container>>>,
222222
builder: CB,
223223
buffer: CB::Container,
224224
now_at: T,
@@ -341,7 +341,7 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
341341

342342
fn register(
343343
&mut self,
344-
pusher: Counter<T, CB::Container, Tee<T, CB::Container>>,
344+
pusher: Counter<T, Tee<T, CB::Container>>,
345345
progress: Rc<RefCell<ChangeBatch<T>>>,
346346
) {
347347
// flush current contents, so new registrant does not see existing data.
@@ -380,7 +380,7 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
380380
fn send_container(
381381
container: &mut CB::Container,
382382
buffer: &mut CB::Container,
383-
pushers: &mut [Counter<T, CB::Container, Tee<T, CB::Container>>],
383+
pushers: &mut [Counter<T, Tee<T, CB::Container>>],
384384
now_at: &T
385385
) {
386386
for index in 0 .. pushers.len() {

timely/src/dataflow/operators/core/unordered_input.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,27 +148,27 @@ impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
148148
/// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation.
149149
#[derive(Debug)]
150150
pub struct UnorderedHandle<T: Timestamp, CB: ContainerBuilder> {
151-
buffer: PushBuffer<T, CB, Counter<T, CB::Container, Tee<T, CB::Container>>>,
151+
buffer: PushBuffer<T, CB, Counter<T, Tee<T, CB::Container>>>,
152152
}
153153

154154
impl<T: Timestamp, CB: ContainerBuilder> UnorderedHandle<T, CB> {
155-
fn new(pusher: Counter<T, CB::Container, Tee<T, CB::Container>>) -> UnorderedHandle<T, CB> {
155+
fn new(pusher: Counter<T, Tee<T, CB::Container>>) -> UnorderedHandle<T, CB> {
156156
UnorderedHandle {
157157
buffer: PushBuffer::new(pusher),
158158
}
159159
}
160160

161161
/// Allocates a new automatically flushing session based on the supplied capability.
162162
#[inline]
163-
pub fn session_with_builder(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<'_, T, CB, Counter<T, CB::Container, Tee<T, CB::Container>>>> {
163+
pub fn session_with_builder(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<'_, T, CB, Counter<T, Tee<T, CB::Container>>>> {
164164
ActivateOnDrop::new(self.buffer.autoflush_session_with_builder(cap.capability.clone()), Rc::clone(&cap.address), Rc::clone(&cap.activations))
165165
}
166166
}
167167

168168
impl<T: Timestamp, C: Container> UnorderedHandle<T, CapacityContainerBuilder<C>> {
169169
/// Allocates a new automatically flushing session based on the supplied capability.
170170
#[inline]
171-
pub fn session(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<'_, T, CapacityContainerBuilder<C>, Counter<T, C, Tee<T, C>>>> {
171+
pub fn session(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<'_, T, CapacityContainerBuilder<C>, Counter<T, Tee<T, C>>>> {
172172
self.session_with_builder(cap)
173173
}
174174
}

timely/src/dataflow/operators/generic/handles.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,14 +159,14 @@ pub fn new_input_handle<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>>(
159159
/// pusher is flushed (via the `cease` method) once it is no longer used.
160160
#[derive(Debug)]
161161
pub struct OutputWrapper<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> {
162-
push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>,
162+
push_buffer: Buffer<T, CB, PushCounter<T, P>>,
163163
internal_buffer: Rc<RefCell<ChangeBatch<T>>>,
164164
port: usize,
165165
}
166166

167167
impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> OutputWrapper<T, CB, P> {
168168
/// Creates a new output wrapper from a push buffer.
169-
pub fn new(push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>, port: usize) -> Self {
169+
pub fn new(push_buffer: Buffer<T, CB, PushCounter<T, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>, port: usize) -> Self {
170170
OutputWrapper {
171171
push_buffer,
172172
internal_buffer,
@@ -188,7 +188,7 @@ impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Out
188188

189189
/// Handle to an operator's output stream.
190190
pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push<Message<T, CB::Container>>+'a> {
191-
push_buffer: &'a mut Buffer<T, CB, PushCounter<T, CB::Container, P>>,
191+
push_buffer: &'a mut Buffer<T, CB, PushCounter<T, P>>,
192192
internal_buffer: &'a Rc<RefCell<ChangeBatch<T>>>,
193193
port: usize,
194194
}
@@ -220,7 +220,7 @@ impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>>
220220
/// });
221221
/// });
222222
/// ```
223-
pub fn session_with_builder<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter<T, CB::Container, P>> where 'a: 'b {
223+
pub fn session_with_builder<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter<T, P>> where 'a: 'b {
224224
debug_assert!(cap.valid_for_output(self.internal_buffer, self.port), "Attempted to open output session with invalid capability");
225225
self.push_buffer.session_with_builder(cap.time())
226226
}
@@ -255,7 +255,7 @@ impl<'a, T: Timestamp, C: Container, P: Push<Message<T, C>>> OutputHandleCore<'a
255255
/// });
256256
/// ```
257257
#[inline]
258-
pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CapacityContainerBuilder<C>, PushCounter<T, C, P>> where 'a: 'b {
258+
pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CapacityContainerBuilder<C>, PushCounter<T, P>> where 'a: 'b {
259259
self.session_with_builder(cap)
260260
}
261261
}

0 commit comments

Comments
 (0)