Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion experiments/src/bin/graspan2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ fn unoptimized() {

let value_flow_next =
value_flow_next
.arrange::<ValBatcher<_,_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand Down
57 changes: 2 additions & 55 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ where
T: Timestamp,
{
type Input = Input;
type Output = M::Output;
type Time = T;
type Output = M::Chunk;

fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
Self {
Expand Down Expand Up @@ -109,7 +109,7 @@ where

self.stash.clear();

let seal = M::seal::<B>(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow());
let seal = B::seal(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow());
self.lower = upper;
seal
}
Expand Down Expand Up @@ -204,10 +204,6 @@ where
pub trait Merger: Default {
/// The internal representation of chunks of data.
type Chunk: Container;
/// The output type
/// TODO: This should be replaced by `Chunk` or another container once the builder understands
/// building from a complete chain.
type Output;
/// The type of time in frontiers to extract updates.
type Time;
/// Merge chains into an output chain.
Expand All @@ -223,15 +219,6 @@ pub trait Merger: Default {
stash: &mut Vec<Self::Chunk>,
);

/// Build from a chain
/// TODO: We can move this entirely to `MergeBatcher` once builders can accepts chains.
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
chain: &mut Vec<Self::Chunk>,
lower: AntichainRef<Self::Time>,
upper: AntichainRef<Self::Time>,
since: AntichainRef<Self::Time>,
) -> B::Output;

/// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations).
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
}
Expand Down Expand Up @@ -286,7 +273,6 @@ where
{
type Time = T;
type Chunk = Vec<((K, V), T, R)>;
type Output = Vec<((K, V), T, R)>;

fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
let mut list1 = list1.into_iter();
Expand Down Expand Up @@ -401,45 +387,6 @@ where
readied.push(ready);
}
}

fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
chain: &mut Vec<Self::Chunk>,
lower: AntichainRef<Self::Time>,
upper: AntichainRef<Self::Time>,
since: AntichainRef<Self::Time>,
) -> B::Output {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for buffer in chain.iter() {
for ((key, val), time, _) in buffer.iter() {
if !upper.less_equal(time) {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
}
let mut builder = B::with_capacity(keys, vals, upds);

for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
}

fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
(chunk.len(), 0, 0, 0)
}
Expand Down
39 changes: 0 additions & 39 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use timely::{Container, Data, PartialOrder};

use crate::difference::Semigroup;
use crate::trace::implementations::merge_batcher::Merger;
use crate::trace::Builder;

/// A merger for timely stacks
pub struct ColumnationMerger<T> {
Expand Down Expand Up @@ -62,7 +61,6 @@ where
{
type Time = T;
type Chunk = TimelyStack<((K, V), T, R)>;
type Output = TimelyStack<((K, V), T, R)>;

fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
let mut list1 = list1.into_iter();
Expand Down Expand Up @@ -183,43 +181,6 @@ where
}
}

fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
chain: &mut Vec<Self::Chunk>,
lower: AntichainRef<Self::Time>,
upper: AntichainRef<Self::Time>,
since: AntichainRef<Self::Time>,
) -> B::Output {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for buffer in chain.iter() {
for ((key, val), time, _) in buffer.iter() {
if !upper.less_equal(time) {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
}
let mut builder = B::with_capacity(keys, vals, upds);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
}

fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
let (mut size, mut capacity, mut allocations) = (0, 0, 0);
let cb = |siz, cap| {
Expand Down
45 changes: 1 addition & 44 deletions src/trace/implementations/merge_batcher_flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
use std::cmp::Ordering;
use std::marker::PhantomData;
use timely::progress::frontier::{Antichain, AntichainRef};
use timely::{Container, Data, PartialOrder};
use timely::{Data, PartialOrder};
use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};

use crate::difference::{IsZero, Semigroup};
use crate::trace::implementations::merge_batcher::Merger;
use crate::trace::Builder;
use crate::trace::cursor::IntoOwned;

/// A merger for flat stacks.
Expand Down Expand Up @@ -110,7 +109,6 @@ where
{
type Time = MC::TimeOwned;
type Chunk = FlatStack<MC>;
type Output = FlatStack<MC>;

fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
let mut list1 = list1.into_iter();
Expand Down Expand Up @@ -242,47 +240,6 @@ where
}
}

fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
chain: &mut Vec<Self::Chunk>,
lower: AntichainRef<Self::Time>,
upper: AntichainRef<Self::Time>,
since: AntichainRef<Self::Time>,
) -> B::Output {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
{
let mut prev_keyval = None;
for buffer in chain.iter() {
for (key, val, time, _diff) in buffer.iter().map(MC::into_parts) {
if !upper.less_equal(&time) {
if let Some((p_key, p_val)) = prev_keyval {
debug_assert!(p_key <= key);
debug_assert!(p_key != key || p_val <= val);
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
}
}
let mut builder = B::with_capacity(keys, vals, upds);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
}

fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
let (mut size, mut capacity, mut allocations) = (0, 0, 0);
let cb = |siz, cap| {
Expand Down
78 changes: 78 additions & 0 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: Container {

/// Test that the value equals a key in the layout's value container.
fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;

/// Count the number of distinct keys, (key, val) pairs, and total updates.
fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
}

impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
Expand Down Expand Up @@ -372,6 +375,31 @@ where
fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
VBC::reborrow(other) == this
}

fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for link in chain.iter() {
for ((key, val), _, _) in link.iter() {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
(keys, vals, upds)
}
}

impl<K,V,T,R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
Expand Down Expand Up @@ -401,6 +429,31 @@ where
fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
V::reborrow(other) == *this
}

fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for link in chain.iter() {
for ((key, val), _, _) in link.iter() {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
(keys, vals, upds)
}
}

mod flatcontainer {
Expand Down Expand Up @@ -483,6 +536,31 @@ mod flatcontainer {
fn val_eq(this: &Self::Val<'_>, other: VBC::ReadItem<'_>) -> bool {
VBC::reborrow(other) == V::reborrow(*this)
}

fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for link in chain.iter() {
for ((key, val), _, _) in link.iter() {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
(keys, vals, upds)
}
}
}

Expand Down
31 changes: 30 additions & 1 deletion src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,8 +694,22 @@ mod val_batch {
description: Description::new(lower, upper, since),
}
}
}

fn seal(
chain: &mut Vec<Self::Input>,
lower: AntichainRef<Self::Time>,
upper: AntichainRef<Self::Time>,
since: AntichainRef<Self::Time>,
) -> Self::Output {
let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
let mut builder = Self::with_capacity(keys, vals, upds);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
}
}
}

mod key_batch {
Expand Down Expand Up @@ -1167,6 +1181,21 @@ mod key_batch {
description: Description::new(lower, upper, since),
}
}

fn seal(
chain: &mut Vec<Self::Input>,
lower: AntichainRef<Self::Time>,
upper: AntichainRef<Self::Time>,
since: AntichainRef<Self::Time>,
) -> Self::Output {
let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
let mut builder = Self::with_capacity(keys, vals, upds);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
}
}

}
Loading
Loading