diff --git a/Cargo.toml b/Cargo.toml index 57a6c3ddf..b880f1530 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,11 +39,13 @@ byteorder="1" itertools="^0.13" serde_json = "1.0" graph_map = "0.1" +bytemuck = "1.18.0" [dependencies] serde = { version = "1.0", features = ["derive"] } fnv="1.0.2" timely = {workspace = true} +columnar = "0.2" [workspace.dependencies] timely = { version = "0.15", default-features = false } diff --git a/examples/columnar.rs b/examples/columnar.rs new file mode 100644 index 000000000..81c39eec8 --- /dev/null +++ b/examples/columnar.rs @@ -0,0 +1,455 @@ +//! Wordcount based on `columnar`. + +use { + timely::container::CapacityContainerBuilder, + timely::dataflow::channels::pact::ExchangeCore, + timely::dataflow::InputHandleCore, + timely::dataflow::ProbeHandle, +}; + + +use differential_dataflow::trace::implementations::ord_neu::ColKeyBuilder; +use differential_dataflow::trace::implementations::ord_neu::ColKeySpine; + +use differential_dataflow::operators::arrange::arrangement::arrange_core; + +fn main() { + + type WordCount = ((String, ()), u64, i64); + type Container = Column; + + let _config = timely::Config { + communication: timely::CommunicationConfig::ProcessBinary(3), + worker: timely::WorkerConfig::default(), + }; + + let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap(); + let size: usize = std::env::args().nth(2).unwrap().parse().unwrap(); + + let timer1 = ::std::time::Instant::now(); + let timer2 = timer1.clone(); + + // initializes and runs a timely dataflow. + // timely::execute(_config, move |worker| { + timely::execute_from_args(std::env::args(), move |worker| { + + let mut data_input = >>::new(); + let mut keys_input = >>::new(); + let mut probe = ProbeHandle::new(); + + // create a new input, exchange data, and inspect its output + worker.dataflow::(|scope| { + + let data = data_input.to_stream(scope); + let keys = keys_input.to_stream(scope); + + let data_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().sum::() as u64); + let keys_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().sum::() as u64); + + let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); + let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); + + keys.join_core(&data, |_k, &(), &()| Option::<()>::None) + .probe_with(&mut probe); + + }); + + // Load up data in batches. + let mut counter = 0; + while counter < 10 * keys { + let mut i = worker.index(); + let time = *data_input.time(); + while i < size { + let val = (counter + i) % keys; + data_input.send(((&format!("{:?}", val), ()), time, 1)); + i += worker.peers(); + } + counter += size; + data_input.advance_to(data_input.time() + 1); + keys_input.advance_to(keys_input.time() + 1); + while probe.less_than(data_input.time()) { + worker.step(); + } + } + println!("{:?}\tloading complete", timer1.elapsed()); + + let mut queries = 0; + + while queries < 10 * keys { + let mut i = worker.index(); + let time = *data_input.time(); + while i < size { + let val = (queries + i) % keys; + data_input.send(((&format!("{:?}", val), ()), time, 1)); + i += worker.peers(); + } + queries += size; + data_input.advance_to(data_input.time() + 1); + keys_input.advance_to(keys_input.time() + 1); + while probe.less_than(data_input.time()) { + worker.step(); + } + } + + println!("{:?}\tqueries complete", timer1.elapsed()); + + + }) + .unwrap(); + + println!("{:?}\tshut down", timer2.elapsed()); +} + + +pub use container::Column; +mod container { + + use columnar::Columnar; + use columnar::Container as FooBozzle; + + use timely::bytes::arc::Bytes; + + /// A container based on a columnar store, encoded in aligned bytes. + pub enum Column { + /// The typed variant of the container. + Typed(C::Container), + /// The binary variant of the container. + Bytes(Bytes), + /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason. + /// + /// Reasons could include misalignment, cloning of data, or wanting + /// to release the `Bytes` as a scarce resource. + Align(Box<[u64]>), + } + + impl Default for Column { + fn default() -> Self { Self::Typed(Default::default()) } + } + + impl Clone for Column where C::Container: Clone { + fn clone(&self) -> Self { + match self { + Column::Typed(t) => Column::Typed(t.clone()), + Column::Bytes(b) => { + assert!(b.len() % 8 == 0); + let mut alloc: Vec = vec![0; b.len() / 8]; + bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&b[..]); + Self::Align(alloc.into()) + }, + Column::Align(a) => Column::Align(a.clone()), + } + } + } + + use columnar::{Clear, Len, Index, AsBytes, FromBytes}; + use columnar::bytes::serialization::decode; + use columnar::common::IterOwn; + + use timely::Container; + impl Container for Column { + fn len(&self) -> usize { + match self { + Column::Typed(t) => t.len(), + Column::Bytes(b) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).len(), + Column::Align(a) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(a)).len(), + } + } + // This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into. + fn clear(&mut self) { + match self { + Column::Typed(t) => t.clear(), + Column::Bytes(_) => *self = Column::Typed(Default::default()), + Column::Align(_) => *self = Column::Typed(Default::default()), + } + } + + type ItemRef<'a> = C::Ref<'a>; + type Iter<'a> = IterOwn<>::Borrowed<'a>>; + fn iter<'a>(&'a self) -> Self::Iter<'a> { + match self { + Column::Typed(t) => t.borrow().into_iter(), + Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(), + Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(), + } + } + + type Item<'a> = C::Ref<'a>; + type DrainIter<'a> = IterOwn<>::Borrowed<'a>>; + fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { + match self { + Column::Typed(t) => t.borrow().into_iter(), + Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(), + Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(), + } + } + } + + use timely::container::SizableContainer; + impl SizableContainer for Column { + fn at_capacity(&self) -> bool { + match self { + Self::Typed(t) => { + let length_in_bytes = t.borrow().length_in_words() * 8; + length_in_bytes >= (1 << 20) + }, + Self::Bytes(_) => true, + Self::Align(_) => true, + } + } + fn ensure_capacity(&mut self, _stash: &mut Option) { } + } + + use timely::container::PushInto; + impl PushInto for Column where C::Container: columnar::Push { + #[inline] + fn push_into(&mut self, item: T) { + use columnar::Push; + match self { + Column::Typed(t) => t.push(item), + Column::Align(_) | Column::Bytes(_) => { + // We really oughtn't be calling this in this case. + // We could convert to owned, but need more constraints on `C`. + unimplemented!("Pushing into Column::Bytes without first clearing"); + } + } + } + } + + use timely::dataflow::channels::ContainerBytes; + impl ContainerBytes for Column { + fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { + // Our expectation / hope is that `bytes` is `u64` aligned and sized. + // If the alignment is borked, we can relocate. IF the size is borked, + // not sure what we do in that case. + assert!(bytes.len() % 8 == 0); + if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) { + Self::Bytes(bytes) + } + else { + // println!("Re-locating bytes for alignment reasons"); + let mut alloc: Vec = vec![0; bytes.len() / 8]; + bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&bytes[..]); + Self::Align(alloc.into()) + } + } + + fn length_in_bytes(&self) -> usize { + match self { + // We'll need one u64 for the length, then the length rounded up to a multiple of 8. + Column::Typed(t) => 8 * t.borrow().length_in_words(), + Column::Bytes(b) => b.len(), + Column::Align(a) => 8 * a.len(), + } + } + + fn into_bytes(&self, writer: &mut W) { + match self { + Column::Typed(t) => { + use columnar::Container; + // Columnar data is serialized as a sequence of `u64` values, with each `[u8]` slice + // serialize as first its length in bytes, and then as many `u64` values as needed. + // Padding should be added, but only for alignment; no specific values are required. + for (align, bytes) in t.borrow().as_bytes() { + assert!(align <= 8); + let length: u64 = bytes.len().try_into().unwrap(); + writer.write_all(bytemuck::cast_slice(std::slice::from_ref(&length))).unwrap(); + writer.write_all(bytes).unwrap(); + let padding: usize = ((8 - (length % 8)) % 8).try_into().unwrap(); + writer.write_all(&[0; 8][..padding]).unwrap(); + } + }, + Column::Bytes(b) => writer.write_all(b).unwrap(), + Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(), + } + } + } +} + + +use builder::ColumnBuilder; +mod builder { + + use std::collections::VecDeque; + use columnar::{Columnar, Clear, Len, AsBytes, Push}; + use super::Column; + + /// A container builder for `Column`. + pub struct ColumnBuilder { + /// Container that we're writing to. + current: C::Container, + /// Empty allocation. + empty: Option>, + /// Completed containers pending to be sent. + pending: VecDeque>, + } + + use timely::container::PushInto; + impl PushInto for ColumnBuilder where C::Container: columnar::Push { + #[inline] + fn push_into(&mut self, item: T) { + self.current.push(item); + // If there is less than 10% slop with 2MB backing allocations, mint a container. + use columnar::Container; + let words = self.current.borrow().length_in_words(); + let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1); + if round - words < round / 10 { + let mut alloc = Vec::with_capacity(round); + columnar::bytes::serialization::encode(&mut alloc, self.current.borrow().as_bytes()); + self.pending.push_back(Column::Align(alloc.into_boxed_slice())); + self.current.clear(); + } + } + } + + impl Default for ColumnBuilder { + fn default() -> Self { + ColumnBuilder { + current: Default::default(), + empty: None, + pending: Default::default(), + } + } + } + + use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; + impl ContainerBuilder for ColumnBuilder where C::Container: Clone { + type Container = Column; + + #[inline] + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(container) = self.pending.pop_front() { + self.empty = Some(container); + self.empty.as_mut() + } else { + None + } + } + + #[inline] + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.current.is_empty() { + self.pending.push_back(Column::Typed(std::mem::take(&mut self.current))); + } + self.empty = self.pending.pop_front(); + self.empty.as_mut() + } + } + + impl LengthPreservingContainerBuilder for ColumnBuilder where C::Container: Clone { } +} + + +use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; +use differential_dataflow::trace::implementations::merge_batcher::ColMerger; +use timely::container::columnation::TimelyStack; + +/// A batcher for columnar storage. +pub type Col2ValBatcher = MergeBatcher, batcher::Chunker>, ColMerger<(K,V),T,R>>; +pub type Col2KeyBatcher = Col2ValBatcher; + +/// Types for consolidating, merging, and extracting columnar update collections. +pub mod batcher { + + use std::collections::VecDeque; + use columnar::Columnar; + use timely::Container; + use timely::container::{ContainerBuilder, PushInto}; + use differential_dataflow::difference::Semigroup; + use crate::Column; + + // First draft: build a "chunker" and a "merger". + + #[derive(Default)] + pub struct Chunker { + /// Buffer into which we'll consolidate. + /// + /// Also the buffer where we'll stage responses to `extract` and `finish`. + /// When these calls return, the buffer is available for reuse. + empty: C, + /// Consolidated buffers ready to go. + ready: VecDeque, + } + + impl ContainerBuilder for Chunker { + type Container = C; + + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(ready) = self.ready.pop_front() { + self.empty = ready; + Some(&mut self.empty) + } else { + None + } + } + + fn finish(&mut self) -> Option<&mut Self::Container> { + self.extract() + } + } + + impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker + where + D: Columnar, + for<'b> D::Ref<'b>: Ord + Copy, + T: Columnar, + for<'b> T::Ref<'b>: Ord + Copy, + R: Columnar + Semigroup + for<'b> Semigroup>, + for<'b> R::Ref<'b>: Ord, + C2: Container + for<'b> PushInto<&'b (D, T, R)>, + { + fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) { + + // Scoped to let borrow through `permutation` drop. + { + // Sort input data + // TODO: consider `Vec` that we retain, containing indexes. + let mut permutation = Vec::with_capacity(container.len()); + permutation.extend(container.drain()); + permutation.sort(); + + self.empty.clear(); + // Iterate over the data, accumulating diffs for like keys. + let mut iter = permutation.drain(..); + if let Some((data, time, diff)) = iter.next() { + + let mut owned_data = D::into_owned(data); + let mut owned_time = T::into_owned(time); + + let mut prev_data = data; + let mut prev_time = time; + let mut prev_diff = ::into_owned(diff); + + for (data, time, diff) in iter { + if (&prev_data, &prev_time) == (&data, &time) { + prev_diff.plus_equals(&diff); + } + else { + if !prev_diff.is_zero() { + D::copy_from(&mut owned_data, prev_data); + T::copy_from(&mut owned_time, prev_time); + let tuple = (owned_data, owned_time, prev_diff); + self.empty.push_into(&tuple); + owned_data = tuple.0; + owned_time = tuple.1; + } + prev_data = data; + prev_time = time; + prev_diff = ::into_owned(diff); + } + } + + if !prev_diff.is_zero() { + D::copy_from(&mut owned_data, prev_data); + T::copy_from(&mut owned_time, prev_time); + let tuple = (owned_data, owned_time, prev_diff); + self.empty.push_into(&tuple); + } + } + } + + if !self.empty.is_empty() { + self.ready.push_back(std::mem::take(&mut self.empty)); + } + } + } +} diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 159aee4a1..7923ba3e7 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -1,43 +1,50 @@ -//! A general purpose `Batcher` implementation based on radix sort. +//! A `Batcher` implementation based on merge sort. +//! +//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger". +//! The chunker receives input batches and consolidates them, producing sorted output +//! "chunks" that are fully consolidated (no adjacent updates can be accumulated). +//! The merger implements the [`Merger`] trait, and provides hooks for manipulating +//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also +//! splitting them apart based on time. +//! +//! Implementations of `MergeBatcher` can be instantiated through the choice of both +//! the chunker and the merger, provided their respective output and input types align. -use std::collections::VecDeque; use std::marker::PhantomData; use timely::logging_core::Logger; use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; -use timely::{Container, PartialOrder}; +use timely::Container; use timely::container::{ContainerBuilder, PushInto}; -use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; use crate::trace::{Batcher, Builder, Description}; -use crate::Data; -/// Creates batches from unordered tuples. -pub struct MergeBatcher -where - C: ContainerBuilder, - M: Merger, -{ - /// each power-of-two length list of allocations. - /// Do not push/pop directly but use the corresponding functions - /// ([`Self::chain_push`]/[`Self::chain_pop`]). +/// Creates batches from containers of unordered tuples. +/// +/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs, +/// and must produce outputs of type `M::Chunk`. +pub struct MergeBatcher { + /// Transforms input streams to chunks of sorted, consolidated data. + chunker: C, + /// A sequence of power-of-two length lists of sorted, consolidated containers. + /// + /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]). chains: Vec>, - /// Stash of empty chunks + /// Stash of empty chunks, recycled through the merging process. stash: Vec, - /// Chunker to transform input streams to chunks of data. - chunker: C, - /// Thing to accept data, merge chains, and talk to the builder. + /// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time. merger: M, - /// Logger for size accounting. - logger: Option>, - /// Timely operator ID. - operator_id: usize, /// Current lower frontier, we sealed up to here. lower: Antichain, /// The lower-bound frontier of the data, after the last call to seal. frontier: Antichain, + /// Logger for size accounting. + logger: Option>, + /// Timely operator ID. + operator_id: usize, + /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present. _marker: PhantomData, } @@ -123,7 +130,6 @@ where impl MergeBatcher where - C: ContainerBuilder + Default, M: Merger, { /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered @@ -191,7 +197,6 @@ where impl Drop for MergeBatcher where - C: ContainerBuilder + Default, M: Merger, { fn drop(&mut self) { @@ -223,170 +228,473 @@ pub trait Merger: Default { fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize); } -/// A merger that knows how to accept and maintain chains of vectors. -pub struct VecMerger { - _marker: PhantomData, -} - -impl Default for VecMerger { - fn default() -> Self { - Self { _marker: PhantomData } +pub use container::{VecMerger, ColMerger}; + +pub mod container { + + //! A general purpose `Merger` implementation for arbitrary containers. + //! + //! The implementation requires implementations of two traits, `ContainerQueue` and `MergerChunk`. + //! The `ContainerQueue` trait is meant to wrap a container and provide iterable access to it, as + //! well as the ability to return the container when iteration is complete. + //! The `MergerChunk` trait is meant to be implemented by containers, and it explains how container + //! items should be interpreted with respect to times, and with respect to differences. + //! These two traits exist instead of a stack of constraints on the structure of the associated items + //! of the containers, allowing them to perform their functions without destructuring their guts. + //! + //! Standard implementations exist in the `vec`, `columnation`, and `flat_container` modules. + + use std::cmp::Ordering; + use std::marker::PhantomData; + use timely::{Container, container::{PushInto, SizableContainer}}; + use timely::progress::frontier::{Antichain, AntichainRef}; + use timely::{Data, PartialOrder}; + + use crate::trace::implementations::merge_batcher::Merger; + + /// An abstraction for a container that can be iterated over, and conclude by returning itself. + pub trait ContainerQueue { + /// Returns either the next item in the container, or the container itself. + fn next_or_alloc(&mut self) -> Result, C>; + /// Indicates whether `next_or_alloc` will return `Ok`, and whether `peek` will return `Some`. + fn is_empty(&self) -> bool; + /// Compare the heads of two queues, where empty queues come last. + fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering; + /// Create a new queue from an existing container. + fn from(container: C) -> Self; } -} -impl VecMerger { - const BUFFER_SIZE_BYTES: usize = 8 << 10; - fn chunk_capacity(&self) -> usize { - let size = ::std::mem::size_of::(); - if size == 0 { - Self::BUFFER_SIZE_BYTES - } else if size <= Self::BUFFER_SIZE_BYTES { - Self::BUFFER_SIZE_BYTES / size - } else { - 1 + /// Behavior to dissect items of chunks in the merge batcher + pub trait MergerChunk : SizableContainer { + /// An owned time type. + /// + /// This type is provided so that users can maintain antichains of something, in order to track + /// the forward movement of time and extract intervals from chains of updates. + type TimeOwned; + /// The owned diff type. + /// + /// This type is provided so that users can provide an owned instance to the `push_and_add` method, + /// to act as a scratch space when the type is substantial and could otherwise require allocations. + type DiffOwned: Default; + + /// Relates a borrowed time to antichains of owned times. + /// + /// If `upper` is less or equal to `time`, the method returns `true` and ensures that `frontier` reflects `time`. + fn time_kept(time1: &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool; + + /// Push an entry that adds together two diffs. + /// + /// This is only called when two items are deemed mergeable by the container queue. + /// If the two diffs added together is zero do not push anything. + fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned); + + /// Account the allocations behind the chunk. + // TODO: Find a more universal home for this: `Container`? + fn account(&self) -> (usize, usize, usize, usize) { + let (size, capacity, allocations) = (0, 0, 0); + (self.len(), size, capacity, allocations) } } + + /// A merger for arbitrary containers. + /// + /// `MC` is a [`Container`] that implements [`MergerChunk`]. + /// `CQ` is a [`ContainerQueue`] supporting `MC`. + pub struct ContainerMerger { + _marker: PhantomData<(MC, CQ)>, + } - /// Helper to get pre-sized vector from the stash. - #[inline] - fn empty(&self, stash: &mut Vec>) -> Vec { - stash.pop().unwrap_or_else(|| Vec::with_capacity(self.chunk_capacity())) + impl Default for ContainerMerger { + fn default() -> Self { + Self { _marker: PhantomData, } + } } - /// Helper to return a chunk to the stash. - #[inline] - fn recycle(&self, mut chunk: Vec, stash: &mut Vec>) { - // TODO: Should we limit the size of `stash`? - if chunk.capacity() == self.chunk_capacity() { + impl ContainerMerger { + /// Helper to get pre-sized vector from the stash. + #[inline] + fn empty(&self, stash: &mut Vec) -> MC { + stash.pop().unwrap_or_else(|| { + let mut container = MC::default(); + container.ensure_capacity(&mut None); + container + }) + } + /// Helper to return a chunk to the stash. + #[inline] + fn recycle(&self, mut chunk: MC, stash: &mut Vec) { + // TODO: Should we only retain correctly sized containers? chunk.clear(); stash.push(chunk); } } -} -impl Merger for VecMerger<(D, T, R)> -where - D: Data, - T: Ord + PartialOrder + Clone + 'static, - R: Semigroup + 'static, -{ - type Time = T; - type Chunk = Vec<(D, T, R)>; - - fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { - let mut list1 = list1.into_iter(); - let mut list2 = list2.into_iter(); - let mut head1 = VecDeque::from(list1.next().unwrap_or_default()); - let mut head2 = VecDeque::from(list2.next().unwrap_or_default()); - - let mut result = self.empty(stash); - - // while we have valid data in each input, merge. - while !head1.is_empty() && !head2.is_empty() { - while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { - let cmp = { - let x = head1.front().unwrap(); - let y = head2.front().unwrap(); - (&x.0, &x.1).cmp(&(&y.0, &y.1)) - }; - use std::cmp::Ordering; - match cmp { - Ordering::Less => result.push(head1.pop_front().unwrap()), - Ordering::Greater => result.push(head2.pop_front().unwrap()), - Ordering::Equal => { - let (data1, time1, mut diff1) = head1.pop_front().unwrap(); - let (_data2, _time2, diff2) = head2.pop_front().unwrap(); - diff1.plus_equals(&diff2); - if !diff1.is_zero() { - result.push((data1, time1, diff1)); + impl Merger for ContainerMerger + where + for<'a> MC: MergerChunk + Clone + PushInto<::Item<'a>> + 'static, + for<'a> MC::TimeOwned: Ord + PartialOrder + Data, + CQ: ContainerQueue, + { + type Time = MC::TimeOwned; + type Chunk = MC; + + // TODO: Consider integrating with `ConsolidateLayout`. + fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { + let mut list1 = list1.into_iter(); + let mut list2 = list2.into_iter(); + + let mut head1 = CQ::from(list1.next().unwrap_or_default()); + let mut head2 = CQ::from(list2.next().unwrap_or_default()); + + let mut result = self.empty(stash); + + let mut diff_owned = Default::default(); + + // while we have valid data in each input, merge. + while !head1.is_empty() && !head2.is_empty() { + while !result.at_capacity() && !head1.is_empty() && !head2.is_empty() { + let cmp = head1.cmp_heads(&head2); + // TODO: The following less/greater branches could plausibly be a good moment for + // `copy_range`, on account of runs of records that might benefit more from a + // `memcpy`. + match cmp { + Ordering::Less => { + result.push_into(head1.next_or_alloc().ok().unwrap()); } + Ordering::Greater => { + result.push_into(head2.next_or_alloc().ok().unwrap()); + } + Ordering::Equal => { + let item1 = head1.next_or_alloc().ok().unwrap(); + let item2 = head2.next_or_alloc().ok().unwrap(); + result.push_and_add(item1, item2, &mut diff_owned); + } } } + + if result.at_capacity() { + output.push_into(result); + result = self.empty(stash); + } + + if head1.is_empty() { + self.recycle(head1.next_or_alloc().err().unwrap(), stash); + head1 = CQ::from(list1.next().unwrap_or_default()); + } + if head2.is_empty() { + self.recycle(head2.next_or_alloc().err().unwrap(), stash); + head2 = CQ::from(list2.next().unwrap_or_default()); + } } - if result.capacity() == result.len() { - output.push(result); + // TODO: recycle `head1` rather than discarding. + while let Ok(next) = head1.next_or_alloc() { + result.push_into(next); + if result.at_capacity() { + output.push_into(result); + result = self.empty(stash); + } + } + if !result.is_empty() { + output.push_into(result); result = self.empty(stash); } + output.extend(list1); + + // TODO: recycle `head2` rather than discarding. + while let Ok(next) = head2.next_or_alloc() { + result.push_into(next); + if result.at_capacity() { + output.push(result); + result = self.empty(stash); + } + } + if !result.is_empty() { + output.push_into(result); + // result = self.empty(stash); + } + output.extend(list2); + } - if head1.is_empty() { - let done1 = Vec::from(head1); - self.recycle(done1, stash); - head1 = VecDeque::from(list1.next().unwrap_or_default()); + fn extract( + &mut self, + merged: Vec, + upper: AntichainRef, + frontier: &mut Antichain, + readied: &mut Vec, + kept: &mut Vec, + stash: &mut Vec, + ) { + let mut keep = self.empty(stash); + let mut ready = self.empty(stash); + + for mut buffer in merged { + for item in buffer.drain() { + if MC::time_kept(&item, &upper, frontier) { + if keep.at_capacity() && !keep.is_empty() { + kept.push(keep); + keep = self.empty(stash); + } + keep.push_into(item); + } else { + if ready.at_capacity() && !ready.is_empty() { + readied.push(ready); + ready = self.empty(stash); + } + ready.push_into(item); + } + } + // Recycling buffer. + self.recycle(buffer, stash); + } + // Finish the kept data. + if !keep.is_empty() { + kept.push(keep); } - if head2.is_empty() { - let done2 = Vec::from(head2); - self.recycle(done2, stash); - head2 = VecDeque::from(list2.next().unwrap_or_default()); + if !ready.is_empty() { + readied.push(ready); } } - if !result.is_empty() { - output.push(result); - } else { - self.recycle(result, stash); + /// Account the allocations behind the chunk. + fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { + chunk.account() } + } - if !head1.is_empty() { - let mut result = self.empty(stash); - for item1 in head1 { - result.push(item1); + pub use vec::VecMerger; + /// Implementations of `ContainerQueue` and `MergerChunk` for `Vec` containers. + pub mod vec { + + use std::collections::VecDeque; + use timely::progress::{Antichain, frontier::AntichainRef}; + use crate::difference::Semigroup; + use super::{ContainerQueue, MergerChunk}; + + /// A `Merger` implementation backed by vector containers. + pub type VecMerger = super::ContainerMerger, std::collections::VecDeque<(D, T, R)>>; + + impl ContainerQueue> for VecDeque<(D, T, R)> { + fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> { + if self.is_empty() { + Err(Vec::from(std::mem::take(self))) + } + else { + Ok(self.pop_front().unwrap()) + } + } + fn is_empty(&self) -> bool { + self.is_empty() + } + fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering { + let (data1, time1, _) = self.front().unwrap(); + let (data2, time2, _) = other.front().unwrap(); + (data1, time1).cmp(&(data2, time2)) + } + fn from(list: Vec<(D, T, R)>) -> Self { + >::from(list) } - output.push(result); } - output.extend(list1); - - if !head2.is_empty() { - let mut result = self.empty(stash); - for item2 in head2 { - result.push(item2); + + impl MergerChunk for Vec<(D, T, R)> { + type TimeOwned = T; + type DiffOwned = (); + + fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool { + if upper.less_equal(time) { + frontier.insert_with(&time, |time| time.clone()); + true + } + else { false } + } + fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, _stash: &mut Self::DiffOwned) { + let (data, time, mut diff1) = item1; + let (_data, _time, diff2) = item2; + diff1.plus_equals(&diff2); + if !diff1.is_zero() { + self.push((data, time, diff1)); + } + } + fn account(&self) -> (usize, usize, usize, usize) { + let (size, capacity, allocations) = (0, 0, 0); + (self.len(), size, capacity, allocations) } - output.push(result); } - output.extend(list2); } - fn extract( - &mut self, - merged: Vec, - upper: AntichainRef, - frontier: &mut Antichain, - readied: &mut Vec, - kept: &mut Vec, - stash: &mut Vec, - ) { - let mut keep = self.empty(stash); - let mut ready = self.empty(stash); - - for mut buffer in merged { - for (data, time, diff) in buffer.drain(..) { - if upper.less_equal(&time) { - frontier.insert_ref(&time); - if keep.len() == keep.capacity() && !keep.is_empty() { - kept.push(keep); - keep = self.empty(stash); - } - keep.push((data, time, diff)); - } else { - if ready.len() == ready.capacity() && !ready.is_empty() { - readied.push(ready); - ready = self.empty(stash); - } - ready.push((data, time, diff)); + pub use columnation::ColMerger; + /// Implementations of `ContainerQueue` and `MergerChunk` for `TimelyStack` containers (columnation). + pub mod columnation { + + use timely::progress::{Antichain, frontier::AntichainRef}; + use timely::container::columnation::TimelyStack; + use timely::container::columnation::Columnation; + use crate::difference::Semigroup; + use super::{ContainerQueue, MergerChunk}; + + /// A `Merger` implementation backed by `TimelyStack` containers (columnation). + pub type ColMerger = super::ContainerMerger,TimelyStackQueue<(D, T, R)>>; + + /// TODO + pub struct TimelyStackQueue { + list: TimelyStack, + head: usize, + } + + impl ContainerQueue> for TimelyStackQueue<(D, T, R)> { + fn next_or_alloc(&mut self) -> Result<&(D, T, R), TimelyStack<(D, T, R)>> { + if self.is_empty() { + Err(std::mem::take(&mut self.list)) + } + else { + Ok(self.pop()) } } - // Recycling buffer. - self.recycle(buffer, stash); + fn is_empty(&self) -> bool { + self.head == self.list[..].len() + } + fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering { + let (data1, time1, _) = self.peek(); + let (data2, time2, _) = other.peek(); + (data1, time1).cmp(&(data2, time2)) + } + fn from(list: TimelyStack<(D, T, R)>) -> Self { + TimelyStackQueue { list, head: 0 } + } } - // Finish the kept data. - if !keep.is_empty() { - kept.push(keep); + + impl TimelyStackQueue { + fn pop(&mut self) -> &T { + self.head += 1; + &self.list[self.head - 1] + } + + fn peek(&self) -> &T { + &self.list[self.head] + } } - if !ready.is_empty() { - readied.push(ready); + + impl MergerChunk for TimelyStack<(D, T, R)> { + type TimeOwned = T; + type DiffOwned = R; + + fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool { + if upper.less_equal(time) { + frontier.insert_with(&time, |time| time.clone()); + true + } + else { false } + } + fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) { + let (data, time, diff1) = item1; + let (_data, _time, diff2) = item2; + stash.clone_from(diff1); + stash.plus_equals(&diff2); + if !stash.is_zero() { + self.copy_destructured(data, time, stash); + } + } + fn account(&self) -> (usize, usize, usize, usize) { + let (mut size, mut capacity, mut allocations) = (0, 0, 0); + let cb = |siz, cap| { + size += siz; + capacity += cap; + allocations += 1; + }; + self.heap_size(cb); + (self.len(), size, capacity, allocations) + } } } - fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { - (chunk.len(), 0, 0, 0) + + pub use flat_container::FlatMerger; + /// Implementations of `ContainerQueue` and `MergerChunk` for `FlatStack` containers (flat_container). + /// + /// This is currently non-functional, while we try and sort out some missing constraints that seem to + /// allow the direct implementation to work, but the corresponding implementation here to not compile. + pub mod flat_container { + + use timely::progress::{Antichain, frontier::AntichainRef}; + use timely::container::flatcontainer::{FlatStack, Region}; + use timely::container::flatcontainer::impls::tuple::TupleABCRegion; + use timely::container::flatcontainer::Push; + use crate::difference::{IsZero, Semigroup}; + use super::{ContainerQueue, MergerChunk}; + + /// A `Merger` implementation backed by `FlatStack` containers (flat_container). + pub type FlatMerger = super::ContainerMerger,FlatStackQueue<((K,V), T, R)>>; + + /// A queue implementation over a flat stack. + pub struct FlatStackQueue { + list: FlatStack, + head: usize, + } + + impl ContainerQueue> for FlatStackQueue + where + for<'a> R::ReadItem<'a>: Ord, + { + fn next_or_alloc(&mut self) -> Result, FlatStack> { + if self.is_empty() { + Err(std::mem::take(&mut self.list)) + } + else { + Ok(self.pop()) + } + } + fn is_empty(&self) -> bool { + self.head >= self.list.len() + } + fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering { + self.peek().cmp(&other.peek()) + } + fn from(list: FlatStack) -> Self { + FlatStackQueue { list, head: 0 } + } + } + + impl FlatStackQueue { + + fn pop(&mut self) -> R::ReadItem<'_> { + self.head += 1; + self.list.get(self.head - 1) + } + + fn peek(&self) -> R::ReadItem<'_> { + self.list.get(self.head) + } + } + + impl MergerChunk for FlatStack> + where + D: Region, + for<'a> D::ReadItem<'a>: Ord, + T: Region, + for<'a> T::ReadItem<'a>: Ord, + R: Region, + R::Owned: Default + IsZero + for<'a> Semigroup>, + TupleABCRegion: for<'a,'b> Push<(D::ReadItem<'a>, T::ReadItem<'a>, &'b R::Owned)>, + { + type TimeOwned = T::Owned; + type DiffOwned = R::Owned; + + fn time_kept(_time: &Self::Item<'_>, _upper: &AntichainRef, _frontier: &mut Antichain) -> bool { + unimplemented!() + } + fn push_and_add<'a>(&mut self, _item1: as Region>::ReadItem<'a>, _item2: Self::Item<'a>, _stash: &mut Self::DiffOwned) { + // let (_, _, _) = _item1; + unimplemented!() + } + fn account(&self) -> (usize, usize, usize, usize) { + let (mut size, mut capacity, mut allocations) = (0, 0, 0); + let cb = |siz, cap| { + size += siz; + capacity += cap; + allocations += 1; + }; + self.heap_size(cb); + (self.len(), size, capacity, allocations) + } + } } } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs deleted file mode 100644 index c70863070..000000000 --- a/src/trace/implementations/merge_batcher_col.rs +++ /dev/null @@ -1,232 +0,0 @@ -//! A general purpose `Batcher` implementation based on radix sort for TimelyStack. - -use std::cmp::Ordering; -use std::marker::PhantomData; -use timely::container::columnation::{Columnation, TimelyStack}; -use timely::progress::frontier::{Antichain, AntichainRef}; -use timely::{Container, Data, PartialOrder}; - -use crate::difference::Semigroup; -use crate::trace::implementations::merge_batcher::Merger; - -/// A merger for timely stacks -pub struct ColumnationMerger { - _marker: PhantomData, -} - -impl Default for ColumnationMerger { - fn default() -> Self { - Self { - _marker: PhantomData, - } - } -} - -impl ColumnationMerger { - const BUFFER_SIZE_BYTES: usize = 64 << 10; - fn chunk_capacity(&self) -> usize { - let size = ::std::mem::size_of::(); - if size == 0 { - Self::BUFFER_SIZE_BYTES - } else if size <= Self::BUFFER_SIZE_BYTES { - Self::BUFFER_SIZE_BYTES / size - } else { - 1 - } - } - - /// Helper to get pre-sized vector from the stash. - #[inline] - fn empty(&self, stash: &mut Vec>) -> TimelyStack { - stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(self.chunk_capacity())) - } - - /// Helper to return a chunk to the stash. - #[inline] - fn recycle(&self, mut chunk: TimelyStack, stash: &mut Vec>) { - // TODO: Should we limit the size of `stash`? - if chunk.capacity() == self.chunk_capacity() { - chunk.clear(); - stash.push(chunk); - } - } -} - -impl Merger for ColumnationMerger<(D, T, R)> -where - D: Columnation + Ord + Data, - T: Columnation + Ord + PartialOrder + Data, - R: Columnation + Semigroup + 'static, -{ - type Time = T; - type Chunk = TimelyStack<(D, T, R)>; - - fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { - let mut list1 = list1.into_iter(); - let mut list2 = list2.into_iter(); - - let mut head1 = TimelyStackQueue::from(list1.next().unwrap_or_default()); - let mut head2 = TimelyStackQueue::from(list2.next().unwrap_or_default()); - - let mut result = self.empty(stash); - - // while we have valid data in each input, merge. - while !head1.is_empty() && !head2.is_empty() { - while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { - let cmp = { - let x = head1.peek(); - let y = head2.peek(); - (&x.0, &x.1).cmp(&(&y.0, &y.1)) - }; - match cmp { - Ordering::Less => { - result.copy(head1.pop()); - } - Ordering::Greater => { - result.copy(head2.pop()); - } - Ordering::Equal => { - let (data1, time1, diff1) = head1.pop(); - let (_data2, _time2, diff2) = head2.pop(); - let mut diff1 = diff1.clone(); - diff1.plus_equals(diff2); - if !diff1.is_zero() { - result.copy_destructured(data1, time1, &diff1); - } - } - } - } - - if result.capacity() == result.len() { - output.push(result); - result = self.empty(stash); - } - - if head1.is_empty() { - self.recycle(head1.done(), stash); - head1 = TimelyStackQueue::from(list1.next().unwrap_or_default()); - } - if head2.is_empty() { - self.recycle(head2.done(), stash); - head2 = TimelyStackQueue::from(list2.next().unwrap_or_default()); - } - } - - if result.len() > 0 { - output.push(result); - } else { - self.recycle(result, stash); - } - - if !head1.is_empty() { - let mut result = self.empty(stash); - result.reserve_items(head1.iter()); - for item in head1.iter() { - result.copy(item); - } - output.push(result); - } - output.extend(list1); - - if !head2.is_empty() { - let mut result = self.empty(stash); - result.reserve_items(head2.iter()); - for item in head2.iter() { - result.copy(item); - } - output.push(result); - } - output.extend(list2); - } - - fn extract( - &mut self, - merged: Vec, - upper: AntichainRef, - frontier: &mut Antichain, - readied: &mut Vec, - kept: &mut Vec, - stash: &mut Vec, - ) { - let mut keep = self.empty(stash); - let mut ready = self.empty(stash); - - for buffer in merged { - for d @ (_data, time, _diff) in buffer.iter() { - if upper.less_equal(time) { - frontier.insert_ref(time); - if keep.len() == keep.capacity() && !keep.is_empty() { - kept.push(keep); - keep = self.empty(stash); - } - keep.copy(d); - } else { - if ready.len() == ready.capacity() && !ready.is_empty() { - readied.push(ready); - ready = self.empty(stash); - } - ready.copy(d); - } - } - // Recycling buffer. - self.recycle(buffer, stash); - } - // Finish the kept data. - if !keep.is_empty() { - kept.push(keep); - } - if !ready.is_empty() { - readied.push(ready); - } - } - - fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { - let (mut size, mut capacity, mut allocations) = (0, 0, 0); - let cb = |siz, cap| { - size += siz; - capacity += cap; - allocations += 1; - }; - chunk.heap_size(cb); - (chunk.len(), size, capacity, allocations) - } -} - -struct TimelyStackQueue { - list: TimelyStack, - head: usize, -} - -impl Default for TimelyStackQueue { - fn default() -> Self { - Self::from(Default::default()) - } -} - -impl TimelyStackQueue { - fn pop(&mut self) -> &T { - self.head += 1; - &self.list[self.head - 1] - } - - fn peek(&self) -> &T { - &self.list[self.head] - } - - fn from(list: TimelyStack) -> Self { - TimelyStackQueue { list, head: 0 } - } - - fn done(self) -> TimelyStack { - self.list - } - - fn is_empty(&self) -> bool { - self.head == self.list[..].len() - } - - /// Return an iterator over the remaining elements. - fn iter(&self) -> impl Iterator + Clone { - self.list[self.head..].iter() - } -} diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 31bd4c628..472293447 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -41,7 +41,6 @@ pub mod spine_fueled; pub mod merge_batcher; -pub mod merge_batcher_col; pub mod merge_batcher_flat; pub mod ord_neu; pub mod rhh; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index f0c18a3c4..df4e9d68a 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -13,10 +13,8 @@ use timely::container::columnation::{TimelyStack}; use timely::container::flatcontainer::{FlatStack, RegionPreference}; use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker, VecChunker}; - use crate::trace::implementations::spine_fueled::Spine; -use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; -use crate::trace::implementations::merge_batcher_col::ColumnationMerger; +use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger; use crate::trace::rc_blanket_impls::RcBuilder; @@ -28,7 +26,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine>>>; /// A batcher using ordered lists. -pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>; +pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>; /// A builder using ordered lists. pub type RcOrdValBuilder = RcBuilder, Vec<((K,V),T,R)>>>; @@ -38,7 +36,7 @@ pub type RcOrdValBuilder = RcBuilder = Spine>>>; /// A batcher for columnar storage. -pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>; +pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; /// A builder for columnar storage. pub type ColValBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; @@ -63,7 +61,7 @@ pub type FlatValBuilderDefault = FlatValBuilder = Spine>>>; /// A batcher for ordered lists. -pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, VecMerger<((K, ()), T, R)>>; +pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>; /// A builder for ordered lists. pub type RcOrdKeyBuilder = RcBuilder, Vec<((K,()),T,R)>>>; @@ -73,7 +71,7 @@ pub type RcOrdKeyBuilder = RcBuilder /// A trace implementation backed by columnar storage. pub type ColKeySpine = Spine>>>; /// A batcher for columnar storage -pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColumnationMerger<((K,()),T,R)>>; +pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>; /// A builder for columnar storage pub type ColKeyBuilder = RcBuilder, TimelyStack<((K,()),T,R)>>>; @@ -96,7 +94,7 @@ pub type FlatKeyBuilderDefault = FlatKeyBuilder = Spine>>>; /// A batcher for columnar storage. -pub type PreferredBatcher = MergeBatcher::Owned,::Owned),T,R)>, ColumnationChunker<((::Owned,::Owned),T,R)>, ColumnationMerger<((::Owned,::Owned),T,R)>>; +pub type PreferredBatcher = MergeBatcher::Owned,::Owned),T,R)>, ColumnationChunker<((::Owned,::Owned),T,R)>, ColMerger<(::Owned,::Owned),T,R>>; /// A builder for columnar storage. pub type PreferredBuilder = RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>; diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 4c1ac9a11..d2160b0b6 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -13,8 +13,7 @@ use timely::container::columnation::TimelyStack; use crate::Hashable; use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; -use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; -use crate::trace::implementations::merge_batcher_col::ColumnationMerger; +use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::rc_blanket_impls::RcBuilder; @@ -25,7 +24,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; /// A trace implementation using a spine of ordered lists. pub type VecSpine = Spine>>>; /// A batcher for ordered lists. -pub type VecBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>; +pub type VecBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>; /// A builder for ordered lists. pub type VecBuilder = RcBuilder, Vec<((K,V),T,R)>>>; @@ -35,7 +34,7 @@ pub type VecBuilder = RcBuilder, Vec< /// A trace implementation backed by columnar storage. pub type ColSpine = Spine>>>; /// A batcher for columnar storage. -pub type ColBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>; +pub type ColBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; /// A builder for columnar storage. pub type ColBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>;