diff --git a/Cargo.toml b/Cargo.toml index 3d989ee4d..57a6c3ddf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ fnv="1.0.2" timely = {workspace = true} [workspace.dependencies] -timely = { version = "0.14", default-features = false } +timely = { version = "0.15", default-features = false } #timely = { path = "../timely-dataflow/timely/", default-features = false } [features] diff --git a/src/collection.rs b/src/collection.rs index 678193df7..392f9001c 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -60,7 +60,7 @@ impl Collection { Collection { inner: stream, phantom: std::marker::PhantomData } } } -impl Collection { +impl Collection { /// Creates a new collection accumulating the contents of the two collections. /// /// Despite the name, differential dataflow collections are unordered. This method is so named because the @@ -654,7 +654,7 @@ where G: Scope, D: Data, R: Semigroup+'static, - C: Container, + C: Container + Clone + 'static, I: IntoIterator>, { scope diff --git a/src/consolidation.rs b/src/consolidation.rs index 310628f4a..deb859fa5 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -13,7 +13,7 @@ use std::cmp::Ordering; use std::collections::VecDeque; use timely::Container; -use timely::container::{ContainerBuilder, PushInto, SizableContainer}; +use timely::container::{ContainerBuilder, PushInto}; use timely::container::flatcontainer::{FlatStack, Push, Region}; use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use crate::Data; @@ -156,7 +156,7 @@ where // TODO: Can we replace `multiple` by a bool? #[cold] fn consolidate_and_flush_through(&mut self, multiple: usize) { - let preferred_capacity = >::preferred_capacity(); + let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>(); consolidate_updates(&mut self.current); let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable(); while drain.peek().is_some() { @@ -180,7 +180,7 @@ where /// Precondition: `current` is not allocated or has space for at least one element. #[inline] fn push_into(&mut self, item: P) { - let preferred_capacity = >::preferred_capacity(); + let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>(); if self.current.capacity() < preferred_capacity * 2 { self.current.reserve(preferred_capacity * 2 - self.current.capacity()); } diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 7d10d3dfc..16d4c16be 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -406,7 +406,7 @@ where G::Timestamp: Lattice, P: ParallelizationContract, Ba: Batcher + 'static, - Ba::Input: Container, + Ba::Input: Container + Clone + 'static, Bu: Builder, Tr: Trace+'static, Tr::Batch: Batch, diff --git a/src/trace/implementations/chunker.rs b/src/trace/implementations/chunker.rs index 478480803..40553897c 100644 --- a/src/trace/implementations/chunker.rs +++ b/src/trace/implementations/chunker.rs @@ -264,15 +264,15 @@ where + PushInto>, { fn push_into(&mut self, container: &'a mut Input) { - if self.pending.capacity() < Output::preferred_capacity() { - self.pending.reserve(Output::preferred_capacity() - self.pending.len()); - } + self.pending.ensure_capacity(&mut None); + let form_batch = |this: &mut Self| { - if this.pending.len() == this.pending.capacity() { + if this.pending.at_capacity() { + let starting_len = this.pending.len(); consolidate_container(&mut this.pending, &mut this.empty); std::mem::swap(&mut this.pending, &mut this.empty); this.empty.clear(); - if this.pending.len() > this.pending.capacity() / 2 { + if this.pending.len() > starting_len / 2 { // Note that we're pushing non-full containers, which is a deviation from // other implementation. The reason for this is that we cannot extract // partial data from `this.pending`. We should revisit this in the future. @@ -289,7 +289,7 @@ where impl ContainerBuilder for ContainerChunker where - Output: SizableContainer + ConsolidateLayout, + Output: SizableContainer + ConsolidateLayout + Clone + 'static, { type Container = Output;