Skip to content

Commit b392c87

Browse files
committed
Remove unneeded type parameters and assert pusher length
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 730fcfc commit b392c87

File tree

2 files changed

+18
-23
lines changed

2 files changed

+18
-23
lines changed

timely/src/dataflow/channels/pact.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,8 @@ where
8888
{
8989
type Pusher = ExchangePusher<
9090
T,
91-
CB::Container,
9291
LogPusher<Box<dyn Push<Message<T, CB::Container>>>>,
93-
DrainContainerDistributor<CB, T, LogPusher<Box<dyn Push<Message<T, CB::Container>>>>, H>
92+
DrainContainerDistributor<CB, H>
9493
>;
9594
type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;
9695

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,46 +14,44 @@ use crate::Container;
1414
/// It needs to uphold progress tracking requirements. The count of the input container
1515
/// must be preserved across the output containers, from the first call to `partition` until the
1616
/// call to `flush` for a specific time stamp.
17-
pub trait Distributor<T, C, P> {
17+
pub trait Distributor<C> {
1818
/// Partition the contents of `container` at `time` into the `pushers`.
19-
fn partition(&mut self, container: &mut C, time: &T, pushers: &mut [P]);
19+
fn partition<T: Clone, P: Push<Message<T, C>>>(&mut self, container: &mut C, time: &T, pushers: &mut [P]);
2020
/// Flush any remaining contents into the `pushers` at time `time`.
21-
fn flush(&mut self, time: &T, pushers: &mut [P]);
21+
fn flush<T: Clone, P: Push<Message<T, C>>>(&mut self, time: &T, pushers: &mut [P]);
2222
/// Optionally release resources, such as memory.
2323
fn relax(&mut self);
2424
}
2525

2626
/// A distributor creating containers from a drainable container based
2727
/// on a hash function of the container's item.
28-
pub struct DrainContainerDistributor<CB, T, P, H> {
28+
pub struct DrainContainerDistributor<CB, H> {
2929
builders: Vec<CB>,
3030
hash_func: H,
31-
_phantom: std::marker::PhantomData<(T, P)>,
3231
}
3332

34-
impl<CB: Default, T, P, H> DrainContainerDistributor<CB, T, P, H> {
33+
impl<CB: Default, H> DrainContainerDistributor<CB, H> {
3534
/// Constructs a new `DrainContainerDistributor` with the given hash function.
3635
pub fn new(hash_func: H) -> Self {
3736
Self {
3837
builders: Vec::new(),
3938
hash_func,
40-
_phantom: std::marker::PhantomData,
4139
}
4240
}
4341
}
4442

45-
impl<CB, T, P, H> Distributor<T, CB::Container, P> for DrainContainerDistributor<CB, T, P, H>
43+
impl<CB, H> Distributor<CB::Container> for DrainContainerDistributor<CB, H>
4644
where
4745
CB: ContainerBuilder + for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
48-
CB::Container: Container,
49-
T: Clone,
50-
P: Push<Message<T, CB::Container>>,
5146
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64,
5247
{
53-
fn partition(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) {
54-
if self.builders.len() != pushers.len() {
48+
fn partition<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) {
49+
if self.builders.len() <= pushers.len() {
5550
self.builders.resize_with(pushers.len(), Default::default);
5651
}
52+
else {
53+
debug_assert_eq!(self.builders.len(), pushers.len());
54+
}
5755
if pushers.len().is_power_of_two() {
5856
let mask = (pushers.len() - 1) as u64;
5957
for datum in container.drain() {
@@ -76,7 +74,7 @@ where
7674
}
7775
}
7876

79-
fn flush(&mut self, time: &T, pushers: &mut [P]) {
77+
fn flush<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, time: &T, pushers: &mut [P]) {
8078
for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) {
8179
while let Some(container) = builder.finish() {
8280
Message::push_at(container, time.clone(), pusher);
@@ -93,29 +91,27 @@ where
9391

9492
// TODO : Software write combining
9593
/// Distributes records among target pushees according to a distributor.
96-
pub struct Exchange<T, C, P, D> {
94+
pub struct Exchange<T, P, D> {
9795
pushers: Vec<P>,
9896
current: Option<T>,
9997
distributor: D,
100-
_phantom: std::marker::PhantomData<C>,
10198
}
10299

103-
impl<T: Clone, C, P, D> Exchange<T, C, P, D> {
100+
impl<T: Clone, P, D> Exchange<T, P, D> {
104101
/// Allocates a new `Exchange` from a supplied set of pushers and a distributor.
105-
pub fn new(pushers: Vec<P>, distributor: D) -> Exchange<T, C, P, D> {
102+
pub fn new(pushers: Vec<P>, distributor: D) -> Exchange<T, P, D> {
106103
Exchange {
107104
pushers,
108105
current: None,
109106
distributor,
110-
_phantom: std::marker::PhantomData,
111107
}
112108
}
113109
}
114110

115-
impl<T: Eq+Clone, C, P, D> Push<Message<T, C>> for Exchange<T, C, P, D>
111+
impl<T: Eq+Clone, C, P, D> Push<Message<T, C>> for Exchange<T, P, D>
116112
where
117113
P: Push<Message<T, C>>,
118-
D: Distributor<T, C, P>,
114+
D: Distributor<C>,
119115
{
120116
#[inline(never)]
121117
fn push(&mut self, message: &mut Option<Message<T, C>>) {

0 commit comments

Comments
 (0)