Skip to content

Commit c06a057

Browse files
Merge pull request #698 from antiguru/log_push_pull
Remove T, C parameters from LogPuller/Pusher
2 parents 3d33001 + 825dd8e commit c06a057

File tree

1 file changed

+10
-14
lines changed
  • timely/src/dataflow/channels

1 file changed

+10
-14
lines changed

timely/src/dataflow/channels/pact.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ pub trait ParallelizationContract<T, C> {
3535
pub struct Pipeline;
3636

3737
impl<T: 'static, C: Container + 'static> ParallelizationContract<T, C> for Pipeline {
38-
type Pusher = LogPusher<T, C, ThreadPusher<Message<T, C>>>;
39-
type Puller = LogPuller<T, C, ThreadPuller<Message<T, C>>>;
38+
type Pusher = LogPusher<ThreadPusher<Message<T, C>>>;
39+
type Puller = LogPuller<ThreadPuller<Message<T, C>>>;
4040
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
4141
let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
4242
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
@@ -86,8 +86,8 @@ where
8686
CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
8787
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
8888
{
89-
type Pusher = ExchangePusher<T, CB, LogPusher<T, CB::Container, Box<dyn Push<Message<T, CB::Container>>>>, H>;
90-
type Puller = LogPuller<T, CB::Container, Box<dyn Pull<Message<T, CB::Container>>>>;
89+
type Pusher = ExchangePusher<T, CB, LogPusher<Box<dyn Push<Message<T, CB::Container>>>>, H>;
90+
type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;
9191

9292
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
9393
let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
@@ -104,17 +104,16 @@ impl<C, F> Debug for ExchangeCore<C, F> {
104104

105105
/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
106106
#[derive(Debug)]
107-
pub struct LogPusher<T, C, P: Push<Message<T, C>>> {
107+
pub struct LogPusher<P> {
108108
pusher: P,
109109
channel: usize,
110110
counter: usize,
111111
source: usize,
112112
target: usize,
113-
phantom: PhantomData<(T, C)>,
114113
logging: Option<Logger>,
115114
}
116115

117-
impl<T, C, P: Push<Message<T, C>>> LogPusher<T, C, P> {
116+
impl<P> LogPusher<P> {
118117
/// Allocates a new pusher.
119118
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
120119
LogPusher {
@@ -123,13 +122,12 @@ impl<T, C, P: Push<Message<T, C>>> LogPusher<T, C, P> {
123122
counter: 0,
124123
source,
125124
target,
126-
phantom: PhantomData,
127125
logging,
128126
}
129127
}
130128
}
131129

132-
impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<T, C, P> {
130+
impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
133131
#[inline]
134132
fn push(&mut self, pair: &mut Option<Message<T, C>>) {
135133
if let Some(bundle) = pair {
@@ -158,28 +156,26 @@ impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<
158156

159157
/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
160158
#[derive(Debug)]
161-
pub struct LogPuller<T, C, P: Pull<Message<T, C>>> {
159+
pub struct LogPuller<P> {
162160
puller: P,
163161
channel: usize,
164162
index: usize,
165-
phantom: PhantomData<(T, C)>,
166163
logging: Option<Logger>,
167164
}
168165

169-
impl<T, C, P: Pull<Message<T, C>>> LogPuller<T, C, P> {
166+
impl<P> LogPuller<P> {
170167
/// Allocates a new `Puller`.
171168
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
172169
LogPuller {
173170
puller,
174171
channel,
175172
index,
176-
phantom: PhantomData,
177173
logging,
178174
}
179175
}
180176
}
181177

182-
impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<T, C, P> {
178+
impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
183179
#[inline]
184180
fn pull(&mut self) -> &mut Option<Message<T, C>> {
185181
let result = self.puller.pull();

0 commit comments

Comments
 (0)