Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fnv="1.0.2"
timely = {workspace = true}

[workspace.dependencies]
timely = { version = "0.13", default-features = false }
timely = { version = "0.14", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[features]
Expand Down
4 changes: 1 addition & 3 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ where
let arrangement_stream = arrangement.stream;

let mut stash = HashMap::new();
let mut buffer = Vec::new();

let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());

Expand All @@ -167,10 +166,9 @@ where

// drain the first input, stashing requests.
input1.for_each(|capability, data| {
data.swap(&mut buffer);
stash.entry(capability.retain())
.or_insert(Vec::new())
.extend(buffer.drain(..))
.extend(data.drain(..))
});

// Drain input batches; although we do not observe them, we want access to the input
Expand Down
5 changes: 1 addition & 4 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ where
let mut logic1 = key_selector.clone();
let mut logic2 = key_selector.clone();

let mut buffer = Vec::new();

let mut key: K = supplied_key0;
let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| {
logic1(&update.0, &mut key);
Expand All @@ -64,10 +62,9 @@ where

// drain the first input, stashing requests.
input1.for_each(|capability, data| {
data.swap(&mut buffer);
stash.entry(capability.retain())
.or_insert(Vec::new())
.extend(buffer.drain(..))
.extend(data.drain(..))
});

// Drain input batches; although we do not observe them, we want access to the input
Expand Down
11 changes: 2 additions & 9 deletions interactive/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ where
let (mut park_out, park) = demux.new_output();
let (mut text_out, text) = demux.new_output();

let mut demux_buffer = Vec::new();

demux.build(move |_capability| {

move |_frontiers| {
Expand All @@ -71,8 +69,6 @@ where

input.for_each(|time, data| {

data.swap(&mut demux_buffer);

let mut operates_session = operates.session(&time);
let mut channels_session = channels.session(&time);
let mut schedule_session = schedule.session(&time);
Expand All @@ -81,7 +77,7 @@ where
let mut park_session = park.session(&time);
let mut text_session = text.session(&time);

for (time, _worker, datum) in demux_buffer.drain(..) {
for (time, _worker, datum) in data.drain(..) {

// Round time up to next multiple of `granularity_ns`.
let time_ns = (((time.as_nanos() as u64) / granularity_ns) + 1) * granularity_ns;
Expand Down Expand Up @@ -235,8 +231,6 @@ where
let (mut batch_out, batch) = demux.new_output();
let (mut merge_out, merge) = demux.new_output();

let mut demux_buffer = Vec::new();

demux.build(move |_capability| {

move |_frontiers| {
Expand All @@ -246,11 +240,10 @@ where

input.for_each(|time, data| {

data.swap(&mut demux_buffer);
let mut batch_session = batch.session(&time);
let mut merge_session = merge.session(&time);

for (time, _worker, datum) in demux_buffer.drain(..) {
for (time, _worker, datum) in data.drain(..) {

// Round time up to next multiple of `granularity_ns`.
let time_ns = (((time.as_nanos() as u64) / granularity_ns) + 1) * granularity_ns;
Expand Down
2 changes: 1 addition & 1 deletion src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ pub mod sink {
}

// Now record the update to the writer.
send_queue.push_back(Message::Updates(updates.replace(Vec::new())));
send_queue.push_back(Message::Updates(std::mem::take(updates)));

// Transmit timestamp counts downstream.
output
Expand Down
6 changes: 2 additions & 4 deletions src/dynamic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,20 @@ where
let (mut output, stream) = builder.new_output();
let mut input = builder.new_input_connection(&self.inner, Pipeline, vec![Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } })]);

let mut vector = Default::default();
builder.build(move |_capability| move |_frontier| {
let mut output = output.activate();
input.for_each(|cap, data| {
data.swap(&mut vector);
let mut new_time = cap.time().clone();
let mut vec = std::mem::take(&mut new_time.inner).into_vec();
vec.truncate(level - 1);
new_time.inner = PointStamp::new(vec);
let new_cap = cap.delayed(&new_time);
for (_data, time, _diff) in vector.iter_mut() {
for (_data, time, _diff) in data.iter_mut() {
let mut vec = std::mem::take(&mut time.inner).into_vec();
vec.truncate(level - 1);
time.inner = PointStamp::new(vec);
}
output.session(&new_cap).give_container(&mut vector);
output.session(&new_cap).give_container(data);
});
});

Expand Down
4 changes: 1 addition & 3 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ where

// Tracks the lower envelope of times in `priority_queue`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let mut buffer = Vec::new();
// Form the trace we will both use internally and publish.
let activator = Some(stream.scope().activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
Expand All @@ -186,8 +185,7 @@ where
// Stash capabilities and associated data (ordered by time).
input.for_each(|cap, data| {
capabilities.insert(cap.retain());
data.swap(&mut buffer);
for (key, val, time) in buffer.drain(..) {
for (key, val, time) in data.drain(..) {
priority_queue.push(std::cmp::Reverse((time, key, val)))
}
});
Expand Down
4 changes: 1 addition & 3 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,9 @@ where
self.inner
.unary::<ConsolidatingContainerBuilder<_>, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| {

let mut vector = Vec::new();
move |input, output| {
input.for_each(|time, data| {
data.swap(&mut vector);
output.session_with_builder(&time).give_iterator(vector.drain(..));
output.session_with_builder(&time).give_iterator(data.drain(..));
})
}
})
Expand Down
4 changes: 1 addition & 3 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ where
fn count_total_core<R2: Semigroup + From<i8> + 'static>(&self) -> Collection<G, (K, T1::Diff), R2> {

let mut trace = self.trace.clone();
let mut buffer = Vec::new();

self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {

Expand All @@ -87,8 +86,7 @@ where
if cap.is_none() { // NB: Assumes batches are in-order
cap = Some(capability.retain());
}
batches.swap(&mut buffer);
for batch in buffer.drain(..) {
for batch in batches.drain(..) {
upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
batch_cursors.push(batch.cursor());
batch_storage.push(batch);
Expand Down
10 changes: 2 additions & 8 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,6 @@ where
let mut trace1_option = Some(trace1);
let mut trace2_option = Some(trace2);

// Swappable buffers for input extraction.
let mut input1_buffer = Vec::new();
let mut input2_buffer = Vec::new();

move |input1, input2, output| {

// 1. Consuming input.
Expand All @@ -468,8 +464,7 @@ where
// This test *should* always pass, as we only drop a trace in response to the other input emptying.
if let Some(ref mut trace2) = trace2_option {
let capability = capability.retain();
data.swap(&mut input1_buffer);
for batch1 in input1_buffer.drain(..) {
for batch1 in data.drain(..) {
// Ignore any pre-loaded data.
if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
if !batch1.is_empty() {
Expand All @@ -496,8 +491,7 @@ where
// This test *should* always pass, as we only drop a trace in response to the other input emptying.
if let Some(ref mut trace1) = trace1_option {
let capability = capability.retain();
data.swap(&mut input2_buffer);
for batch2 in input2_buffer.drain(..) {
for batch2 in data.drain(..) {
// Ignore any pre-loaded data.
if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
if !batch2.is_empty() {
Expand Down
5 changes: 1 addition & 4 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,6 @@ where
let mut output_upper = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
let mut output_lower = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

let mut input_buffer = Vec::new();

let id = trace.stream.scope().index();

move |input, output| {
Expand Down Expand Up @@ -409,8 +407,7 @@ where
// times in the batch.
input.for_each(|capability, batches| {

batches.swap(&mut input_buffer);
for batch in input_buffer.drain(..) {
for batch in batches.drain(..) {
upper_limit.clone_from(batch.upper());
batch_cursors.push(batch.cursor());
batch_storage.push(batch);
Expand Down
4 changes: 1 addition & 3 deletions src/operators/threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ where
{

let mut trace = self.trace.clone();
let mut buffer = Vec::new();

self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {

Expand All @@ -134,8 +133,7 @@ where
if cap.is_none() { // NB: Assumes batches are in-order
cap = Some(capability.retain());
}
batches.swap(&mut buffer);
for batch in buffer.drain(..) {
for batch in batches.drain(..) {
upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
batch_cursors.push(batch.cursor());
batch_storage.push(batch);
Expand Down
81 changes: 19 additions & 62 deletions src/trace/implementations/chunker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Organize streams of data into sorted chunks.

use std::collections::VecDeque;
use timely::communication::message::RefOrMut;
use timely::Container;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
Expand Down Expand Up @@ -64,14 +63,14 @@ where
}
}

impl<'a, K, V, T, R> PushInto<RefOrMut<'a, Vec<((K, V), T, R)>>> for VecChunker<((K, V), T, R)>
impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)>
where
K: Ord + Clone,
V: Ord + Clone,
T: Ord + Clone,
R: Semigroup + Clone,
{
fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) {
fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
// because we don't write more than capacity elements into the buffer.
// Important: Consolidation requires `pending` to have twice the chunk capacity to
Expand All @@ -80,27 +79,11 @@ where
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
}

// `container` is either a shared reference or an owned allocations.
match container {
RefOrMut::Ref(vec) => {
let mut slice = &vec[..];
while !slice.is_empty() {
let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len()));
slice = tail;
self.pending.extend_from_slice(head);
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
RefOrMut::Mut(vec) => {
let mut drain = vec.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
let mut drain = container.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
Expand Down Expand Up @@ -196,41 +179,25 @@ where
}
}

impl<'a, K, V, T, R> PushInto<RefOrMut<'a, Vec<((K, V), T, R)>>> for ColumnationChunker<((K, V), T, R)>
impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for ColumnationChunker<((K, V), T, R)>
where
K: Columnation + Ord + Clone,
V: Columnation + Ord + Clone,
T: Columnation + Ord + Clone,
R: Columnation + Semigroup + Clone,
{
fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) {
fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
// because we don't write more than capacity elements into the buffer.
if self.pending.capacity() < Self::chunk_capacity() * 2 {
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
}

// `container` is either a shared reference or an owned allocations.
match container {
RefOrMut::Ref(vec) => {
let mut slice = &vec[..];
while !slice.is_empty() {
let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len()));
slice = tail;
self.pending.extend_from_slice(head);
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
RefOrMut::Mut(vec) => {
let mut drain = vec.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
let mut drain = container.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
Expand Down Expand Up @@ -288,15 +255,15 @@ where
}
}

impl<'a, Input, Output> PushInto<RefOrMut<'a, Input>> for ContainerChunker<Output>
impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker<Output>
where
Input: Container,
Output: SizableContainer
+ ConsolidateLayout
+ PushInto<Input::Item<'a>>
+ PushInto<Input::ItemRef<'a>>,
{
fn push_into(&mut self, container: RefOrMut<'a, Input>) {
fn push_into(&mut self, container: &'a mut Input) {
if self.pending.capacity() < Output::preferred_capacity() {
self.pending.reserve(Output::preferred_capacity() - self.pending.len());
}
Expand All @@ -313,19 +280,9 @@ where
}
}
};
match container {
RefOrMut::Ref(container) => {
for item in container.iter() {
self.pending.push(item);
form_batch(self);
}
}
RefOrMut::Mut(container) => {
for item in container.drain() {
self.pending.push(item);
form_batch(self);
}
}
for item in container.drain() {
self.pending.push(item);
form_batch(self);
}
}
}
Expand Down
Loading