Skip to content

Commit c90b92e

Browse files
Changes to track timely master (#542)
* Update bounds to track timely master * Update around container traits * Point at timely on crates Signed-off-by: Moritz Hoffmann <[email protected]> --------- Signed-off-by: Moritz Hoffmann <[email protected]> Co-authored-by: Moritz Hoffmann <[email protected]>
1 parent a3bf1db commit c90b92e

File tree

5 files changed

+13
-13
lines changed

5 files changed

+13
-13
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ fnv="1.0.2"
4646
timely = {workspace = true}
4747

4848
[workspace.dependencies]
49-
timely = { version = "0.14", default-features = false }
49+
timely = { version = "0.15", default-features = false }
5050
#timely = { path = "../timely-dataflow/timely/", default-features = false }
5151

5252
[features]

src/collection.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl<G: Scope, D, R, C> Collection<G, D, R, C> {
6060
Collection { inner: stream, phantom: std::marker::PhantomData }
6161
}
6262
}
63-
impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
63+
impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
6464
/// Creates a new collection accumulating the contents of the two collections.
6565
///
6666
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
@@ -654,7 +654,7 @@ where
654654
G: Scope,
655655
D: Data,
656656
R: Semigroup+'static,
657-
C: Container,
657+
C: Container + Clone + 'static,
658658
I: IntoIterator<Item=Collection<G, D, R, C>>,
659659
{
660660
scope

src/consolidation.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
use std::cmp::Ordering;
1414
use std::collections::VecDeque;
1515
use timely::Container;
16-
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
16+
use timely::container::{ContainerBuilder, PushInto};
1717
use timely::container::flatcontainer::{FlatStack, Push, Region};
1818
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
1919
use crate::Data;
@@ -156,7 +156,7 @@ where
156156
// TODO: Can we replace `multiple` by a bool?
157157
#[cold]
158158
fn consolidate_and_flush_through(&mut self, multiple: usize) {
159-
let preferred_capacity = <Vec<(D,T,R)>>::preferred_capacity();
159+
let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
160160
consolidate_updates(&mut self.current);
161161
let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
162162
while drain.peek().is_some() {
@@ -180,7 +180,7 @@ where
180180
/// Precondition: `current` is not allocated or has space for at least one element.
181181
#[inline]
182182
fn push_into(&mut self, item: P) {
183-
let preferred_capacity = <Vec<(D,T,R)>>::preferred_capacity();
183+
let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
184184
if self.current.capacity() < preferred_capacity * 2 {
185185
self.current.reserve(preferred_capacity * 2 - self.current.capacity());
186186
}

src/operators/arrange/arrangement.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ where
406406
G::Timestamp: Lattice,
407407
P: ParallelizationContract<G::Timestamp, Ba::Input>,
408408
Ba: Batcher<Time=G::Timestamp> + 'static,
409-
Ba::Input: Container,
409+
Ba::Input: Container + Clone + 'static,
410410
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
411411
Tr: Trace<Time=G::Timestamp>+'static,
412412
Tr::Batch: Batch,

src/trace/implementations/chunker.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,15 +264,15 @@ where
264264
+ PushInto<Input::ItemRef<'a>>,
265265
{
266266
fn push_into(&mut self, container: &'a mut Input) {
267-
if self.pending.capacity() < Output::preferred_capacity() {
268-
self.pending.reserve(Output::preferred_capacity() - self.pending.len());
269-
}
267+
self.pending.ensure_capacity(&mut None);
268+
270269
let form_batch = |this: &mut Self| {
271-
if this.pending.len() == this.pending.capacity() {
270+
if this.pending.at_capacity() {
271+
let starting_len = this.pending.len();
272272
consolidate_container(&mut this.pending, &mut this.empty);
273273
std::mem::swap(&mut this.pending, &mut this.empty);
274274
this.empty.clear();
275-
if this.pending.len() > this.pending.capacity() / 2 {
275+
if this.pending.len() > starting_len / 2 {
276276
// Note that we're pushing non-full containers, which is a deviation from
277277
// other implementation. The reason for this is that we cannot extract
278278
// partial data from `this.pending`. We should revisit this in the future.
@@ -289,7 +289,7 @@ where
289289

290290
impl<Output> ContainerBuilder for ContainerChunker<Output>
291291
where
292-
Output: SizableContainer + ConsolidateLayout,
292+
Output: SizableContainer + ConsolidateLayout + Clone + 'static,
293293
{
294294
type Container = Output;
295295

0 commit comments

Comments
 (0)