Skip to content

Commit dfc9012

Browse files
committed
Merge remote-tracking branch 'upstream/master' into container_rework
2 parents 153f3d8 + d5c66b5 commit dfc9012

File tree

10 files changed

+149
-101
lines changed

10 files changed

+149
-101
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [0.23.0](https://github.com/TimelyDataflow/timely-dataflow/compare/timely-v0.22.0...timely-v0.23.0) - 2025-08-28
11+
12+
### Other
13+
14+
- Remove some quadratic behavior from propagate_all
15+
- Remove T, C parameters from LogPuller/Pusher
16+
1017
## [0.22.0](https://github.com/TimelyDataflow/timely-dataflow/compare/timely-v0.21.5...timely-v0.22.0) - 2025-08-15
1118

1219
### Other

bytes/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "timely_bytes"
3-
version = "0.22.0"
3+
version = "0.23.0"
44
authors = ["Frank McSherry <[email protected]>"]
55
edition = "2018"
66

communication/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "timely_communication"
3-
version = "0.22.0"
3+
version = "0.23.0"
44
authors = ["Frank McSherry <[email protected]>"]
55
description = "Communication layer for timely dataflow"
66
edition.workspace = true
@@ -24,9 +24,9 @@ columnar = { workspace = true }
2424
getopts = { version = "0.2.21", optional = true }
2525
byteorder = "1.5"
2626
serde = { version = "1.0", features = ["derive"] }
27-
timely_bytes = { path = "../bytes", version = "0.22" }
28-
timely_container = { path = "../container", version = "0.22.0" }
29-
timely_logging = { path = "../logging", version = "0.22" }
27+
timely_bytes = { path = "../bytes", version = "0.23" }
28+
timely_container = { path = "../container", version = "0.23.0" }
29+
timely_logging = { path = "../logging", version = "0.23" }
3030

3131
# Lgalloc only supports linux and macos, don't depend on any other OS.
3232
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]

container/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "timely_container"
3-
version = "0.22.0"
3+
version = "0.23.0"
44
description = "Container abstractions for Timely"
55
license = "MIT"
66
edition.workspace = true

container/src/lib.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -113,20 +113,6 @@ pub trait ContainerBuilder: Default + 'static {
113113
/// be called repeatedly until it returns `None`.
114114
#[must_use]
115115
fn finish(&mut self) -> Option<&mut Self::Container>;
116-
/// Partitions `container` among `builders`, using the function `index` to direct items.
117-
/// Drains the container. The container is left in an undefined state.
118-
fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
119-
where
120-
Self::Container: DrainContainer,
121-
Self: for<'a> PushInto<<Self::Container as DrainContainer>::Item<'a>>,
122-
I: for<'a> FnMut(&<Self::Container as DrainContainer>::Item<'a>) -> usize,
123-
{
124-
for datum in container.drain() {
125-
let index = index(&datum);
126-
builders[index].push_into(datum);
127-
}
128-
}
129-
130116
/// Indicates a good moment to release resources.
131117
///
132118
/// By default, does nothing. Callers first needs to drain the contents using [`Self::finish`]

logging/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "timely_logging"
3-
version = "0.22.0"
3+
version = "0.23.0"
44
authors = ["Frank McSherry <[email protected]>"]
55
description = "Common timely logging infrastructure"
66
edition.workspace = true
@@ -15,4 +15,4 @@ license = "MIT"
1515
workspace = true
1616

1717
[dependencies]
18-
timely_container = { version = "0.22.0", path = "../container" }
18+
timely_container = { version = "0.23.0", path = "../container" }

timely/Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22

33
name = "timely"
4-
version = "0.22.0"
4+
version = "0.23.0"
55
authors = ["Frank McSherry <[email protected]>"]
66
readme = "../README.md"
77
edition.workspace = true
@@ -27,11 +27,12 @@ columnation = "0.1"
2727
getopts = { version = "0.2.21", optional = true }
2828
bincode = { version = "1.0" }
2929
byteorder = "1.5"
30+
itertools = "0.14.0"
3031
serde = { version = "1.0", features = ["derive"] }
31-
timely_bytes = { path = "../bytes", version = "0.22" }
32-
timely_logging = { path = "../logging", version = "0.22" }
33-
timely_communication = { path = "../communication", version = "0.22", default-features = false }
34-
timely_container = { path = "../container", version = "0.22" }
32+
timely_bytes = { path = "../bytes", version = "0.23" }
33+
timely_logging = { path = "../logging", version = "0.23" }
34+
timely_communication = { path = "../communication", version = "0.23", default-features = false }
35+
timely_container = { path = "../container", version = "0.23" }
3536
smallvec = { version = "1.13.2", features = ["serde", "const_generics"] }
3637

3738
[dev-dependencies]

timely/src/dataflow/channels/pact.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::container::{ContainerBuilder, DrainContainer, LengthPreservingContain
1515
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
1616
use crate::communication::{Push, Pull};
1717
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
18+
use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor;
1819
use crate::dataflow::channels::Message;
1920
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
2021
use crate::progress::Timestamp;
@@ -80,21 +81,24 @@ where
8081
}
8182

8283
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
83-
impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
84+
impl<T: Timestamp, CB, H> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
8485
where
85-
CB: ContainerBuilder,
86-
CB::Container: DrainContainer,
87-
CB: for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
86+
CB: ContainerBuilder<Container: DrainContainer> + for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
8887
CB::Container: Send + crate::dataflow::channels::ContainerBytes,
89-
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64
88+
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64 + 'static,
9089
{
91-
type Pusher = ExchangePusher<T, CB, LogPusher<Box<dyn Push<Message<T, CB::Container>>>>, H>;
90+
type Pusher = ExchangePusher<
91+
T,
92+
LogPusher<Box<dyn Push<Message<T, CB::Container>>>>,
93+
DrainContainerDistributor<CB, H>
94+
>;
9295
type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;
9396

9497
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
9598
let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
9699
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
97-
(ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
100+
let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers());
101+
(ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
98102
}
99103
}
100104

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 105 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,115 @@
11
//! The exchange pattern distributes pushed data between many target pushees.
22
33
use crate::communication::Push;
4-
use crate::container::{ContainerBuilder, PushInto};
4+
use crate::container::{ContainerBuilder, DrainContainer, PushInto};
55
use crate::dataflow::channels::Message;
6-
use crate::Data;
7-
use crate::container::DrainContainer;
86

9-
// TODO : Software write combining
10-
/// Distributes records among target pushees according to a distribution function.
11-
pub struct Exchange<T, CB, P, H>
12-
where
13-
CB: ContainerBuilder,
14-
CB::Container: DrainContainer,
15-
P: Push<Message<T, CB::Container>>,
16-
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64
17-
{
18-
pushers: Vec<P>,
7+
/// Distribute containers to several pushers.
8+
///
9+
/// A distributor sits behind an exchange pusher, and partitions containers at a given time
10+
/// into several pushers. It can use [`Message::push_at`] to push its outputs at the desired
11+
/// pusher.
12+
///
13+
/// It needs to uphold progress tracking requirements. The count of the input container
14+
/// must be preserved across the output containers, from the first call to `partition` until the
15+
/// call to `flush` for a specific time stamp.
16+
pub trait Distributor<C> {
17+
/// Partition the contents of `container` at `time` into the `pushers`.
18+
fn partition<T: Clone, P: Push<Message<T, C>>>(&mut self, container: &mut C, time: &T, pushers: &mut [P]);
19+
/// Flush any remaining contents into the `pushers` at time `time`.
20+
fn flush<T: Clone, P: Push<Message<T, C>>>(&mut self, time: &T, pushers: &mut [P]);
21+
/// Optionally release resources, such as memory.
22+
fn relax(&mut self);
23+
}
24+
25+
/// A distributor creating containers from a drainable container based
26+
/// on a hash function of the container's item.
27+
pub struct DrainContainerDistributor<CB, H> {
1928
builders: Vec<CB>,
20-
current: Option<T>,
2129
hash_func: H,
2230
}
2331

24-
impl<T: Clone, CB, P, H> Exchange<T, CB, P, H>
32+
impl<CB: Default, H> DrainContainerDistributor<CB, H> {
33+
/// Constructs a new `DrainContainerDistributor` with the given hash function for a number of
34+
/// peers.
35+
pub fn new(hash_func: H, peers: usize) -> Self {
36+
Self {
37+
builders: std::iter::repeat_with(Default::default).take(peers).collect(),
38+
hash_func,
39+
}
40+
}
41+
}
42+
43+
impl<CB, H> Distributor<CB::Container> for DrainContainerDistributor<CB, H>
2544
where
26-
CB: ContainerBuilder,
27-
CB::Container: DrainContainer,
28-
P: Push<Message<T, CB::Container>>,
29-
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64
45+
CB: ContainerBuilder<Container: DrainContainer> + for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
46+
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64,
3047
{
31-
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
32-
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, CB, P, H> {
33-
let builders = std::iter::repeat_with(Default::default).take(pushers.len()).collect();
34-
Exchange {
35-
pushers,
36-
hash_func: key,
37-
builders,
38-
current: None,
48+
fn partition<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) {
49+
debug_assert_eq!(self.builders.len(), pushers.len());
50+
if pushers.len().is_power_of_two() {
51+
let mask = (pushers.len() - 1) as u64;
52+
for datum in container.drain() {
53+
let index = ((self.hash_func)(&datum) & mask) as usize;
54+
self.builders[index].push_into(datum);
55+
while let Some(produced) = self.builders[index].extract() {
56+
Message::push_at(produced, time.clone(), &mut pushers[index]);
57+
}
58+
}
59+
}
60+
else {
61+
let num_pushers = pushers.len() as u64;
62+
for datum in container.drain() {
63+
let index = ((self.hash_func)(&datum) % num_pushers) as usize;
64+
self.builders[index].push_into(datum);
65+
while let Some(produced) = self.builders[index].extract() {
66+
Message::push_at(produced, time.clone(), &mut pushers[index]);
67+
}
68+
}
3969
}
4070
}
41-
#[inline]
42-
fn flush(&mut self, index: usize) {
43-
while let Some(container) = self.builders[index].finish() {
44-
if let Some(ref time) = self.current {
45-
Message::push_at(container, time.clone(), &mut self.pushers[index]);
71+
72+
fn flush<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, time: &T, pushers: &mut [P]) {
73+
for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) {
74+
while let Some(container) = builder.finish() {
75+
Message::push_at(container, time.clone(), pusher);
4676
}
4777
}
4878
}
79+
80+
fn relax(&mut self) {
81+
for builder in &mut self.builders {
82+
builder.relax();
83+
}
84+
}
4985
}
5086

51-
impl<T: Eq+Data, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
87+
// TODO : Software write combining
88+
/// Distributes records among target pushees according to a distributor.
89+
pub struct Exchange<T, P, D> {
90+
pushers: Vec<P>,
91+
current: Option<T>,
92+
distributor: D,
93+
}
94+
95+
impl<T: Clone, P, D> Exchange<T, P, D> {
96+
/// Allocates a new `Exchange` from a supplied set of pushers and a distributor.
97+
pub fn new(pushers: Vec<P>, distributor: D) -> Exchange<T, P, D> {
98+
Exchange {
99+
pushers,
100+
current: None,
101+
distributor,
102+
}
103+
}
104+
}
105+
106+
impl<T: Eq+Clone, C, P, D> Push<Message<T, C>> for Exchange<T, P, D>
52107
where
53-
CB: ContainerBuilder,
54-
CB::Container: DrainContainer,
55-
CB: for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
56-
P: Push<Message<T, CB::Container>>,
57-
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64
108+
P: Push<Message<T, C>>,
109+
D: Distributor<C>,
58110
{
59111
#[inline(never)]
60-
fn push(&mut self, message: &mut Option<Message<T, CB::Container>>) {
112+
fn push(&mut self, message: &mut Option<Message<T, C>>) {
61113
// if only one pusher, no exchange
62114
if self.pushers.len() == 1 {
63115
self.pushers[0].push(message);
@@ -68,36 +120,27 @@ where
68120
let data = &mut message.data;
69121

70122
// if the time isn't right, flush everything.
71-
if self.current.as_ref().is_some_and(|x| x != time) {
72-
for index in 0..self.pushers.len() {
73-
self.flush(index);
123+
match self.current.as_ref() {
124+
// We have a current time, and it is different from the new time.
125+
Some(current_time) if current_time != time => {
126+
self.distributor.flush(current_time, &mut self.pushers);
127+
self.current = Some(time.clone());
74128
}
129+
// We had no time before, or flushed.
130+
None => self.current = Some(time.clone()),
131+
// Time didn't change since last call.
132+
_ => {}
75133
}
76-
self.current = Some(time.clone());
77-
78-
let hash_func = &mut self.hash_func;
79134

80-
// if the number of pushers is a power of two, use a mask
81-
if self.pushers.len().is_power_of_two() {
82-
let mask = (self.pushers.len() - 1) as u64;
83-
CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) & mask) as usize);
84-
}
85-
// as a last resort, use mod (%)
86-
else {
87-
let num_pushers = self.pushers.len() as u64;
88-
CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) % num_pushers) as usize);
89-
}
90-
for (buffer, pusher) in self.builders.iter_mut().zip(self.pushers.iter_mut()) {
91-
while let Some(container) = buffer.extract() {
92-
Message::push_at(container, time.clone(), pusher);
93-
}
94-
}
135+
self.distributor.partition(data, time, &mut self.pushers);
95136
}
96137
else {
97138
// flush
139+
if let Some(time) = self.current.take() {
140+
self.distributor.flush(&time, &mut self.pushers);
141+
}
142+
self.distributor.relax();
98143
for index in 0..self.pushers.len() {
99-
self.flush(index);
100-
self.builders[index].relax();
101144
self.pushers[index].push(&mut None);
102145
}
103146
}

timely/src/progress/reachability.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -591,10 +591,14 @@ impl<T:Timestamp> Tracker<T> {
591591
// By filtering the changes through `self.pointstamps` we react only to discrete
592592
// changes in the frontier, rather than changes in the pointstamp counts that
593593
// witness that frontier.
594-
for ((target, time), diff) in self.target_changes.drain() {
594+
use itertools::Itertools;
595+
let mut target_changes = self.target_changes.drain().peekable();
596+
while let Some(((target, _), _)) = target_changes.peek() {
595597

598+
let target = *target;
596599
let operator = &mut self.per_operator[target.node].targets[target.port];
597-
let changes = operator.pointstamps.update_iter(Some((time, diff)));
600+
let target_updates = target_changes.peeking_take_while(|((t, _),_)| t == &target).map(|((_,time),diff)| (time,diff));
601+
let changes = operator.pointstamps.update_iter(target_updates);
598602

599603
for (time, diff) in changes {
600604
self.total_counts += diff;
@@ -610,10 +614,13 @@ impl<T:Timestamp> Tracker<T> {
610614
}
611615
}
612616

613-
for ((source, time), diff) in self.source_changes.drain() {
617+
let mut source_changes = self.source_changes.drain().peekable();
618+
while let Some(((source, _), _)) = source_changes.peek() {
614619

620+
let source = *source;
615621
let operator = &mut self.per_operator[source.node].sources[source.port];
616-
let changes = operator.pointstamps.update_iter(Some((time, diff)));
622+
let source_updates = source_changes.peeking_take_while(|((s, _),_)| s == &source).map(|((_,time),diff)| (time,diff));
623+
let changes = operator.pointstamps.update_iter(source_updates);
617624

618625
for (time, diff) in changes {
619626
self.total_counts += diff;
@@ -761,7 +768,7 @@ fn summarize_outputs<T: Timestamp>(
761768
}
762769
}
763770
}
764-
771+
765772
let mut results: HashMap<Location, PortConnectivity<T::Summary>> = HashMap::new();
766773
let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
767774

0 commit comments

Comments
 (0)