Skip to content

Commit a609da1

Browse files
committed
Make generic versions canonical
1 parent 235d317 commit a609da1

File tree

4 files changed

+14
-190
lines changed

4 files changed

+14
-190
lines changed

src/trace/implementations/merge_batcher.rs

Lines changed: 5 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,17 @@
1010
//! Implementations of `MergeBatcher` can be instantiated through the choice of both
1111
//! the chunker and the merger, provided their respective output and input types align.
1212
13-
use std::collections::VecDeque;
1413
use std::marker::PhantomData;
1514

1615
use timely::logging::WorkerIdentifier;
1716
use timely::logging_core::Logger;
1817
use timely::progress::frontier::AntichainRef;
1918
use timely::progress::{frontier::Antichain, Timestamp};
20-
use timely::{Container, PartialOrder};
19+
use timely::Container;
2120
use timely::container::{ContainerBuilder, PushInto};
2221

23-
use crate::difference::Semigroup;
2422
use crate::logging::{BatcherEvent, DifferentialEvent};
2523
use crate::trace::{Batcher, Builder, Description};
26-
use crate::Data;
2724

2825
/// Creates batches from containers of unordered tuples.
2926
///
@@ -232,6 +229,8 @@ pub trait Merger: Default {
232229
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
233230
}
234231

232+
pub use container::{VecMerger, ColMerger};
233+
235234
pub mod container {
236235

237236
//! A general purpose `Merger` implementation for arbitrary containers.
@@ -473,7 +472,7 @@ pub mod container {
473472
use super::{ContainerQueue, MergerChunk};
474473

475474
/// A `Merger` implementation backed by vector containers.
476-
pub type VecMerger<K, V, T, R> = super::ContainerMerger<Vec<((K, V), T, R)>, std::collections::VecDeque<((K, V), T, R)>>;
475+
pub type VecMerger<D, T, R> = super::ContainerMerger<Vec<(D, T, R)>, std::collections::VecDeque<(D, T, R)>>;
477476

478477
impl<D: Ord, T: Ord, R> ContainerQueue<Vec<(D, T, R)>> for VecDeque<(D, T, R)> {
479478
fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> {
@@ -534,7 +533,7 @@ pub mod container {
534533
use super::{ContainerQueue, MergerChunk};
535534

536535
/// A `Merger` implementation backed by `TimelyStack` containers (columnation).
537-
pub type ColMerger<K, V, T, R> = super::ContainerMerger<TimelyStack<((K,V),T,R)>,TimelyStackQueue<((K,V), T, R)>>;
536+
pub type ColMerger<D, T, R> = super::ContainerMerger<TimelyStack<(D,T,R)>,TimelyStackQueue<(D, T, R)>>;
538537

539538
/// TODO
540539
pub struct TimelyStackQueue<T: Columnation> {
@@ -693,171 +692,3 @@ pub mod container {
693692
}
694693
}
695694
}
696-
697-
/// A merger that knows how to accept and maintain chains of vectors.
698-
pub struct VecMerger<T> {
699-
_marker: PhantomData<T>,
700-
}
701-
702-
impl<T> Default for VecMerger<T> {
703-
fn default() -> Self {
704-
Self { _marker: PhantomData }
705-
}
706-
}
707-
708-
impl<T> VecMerger<T> {
709-
const BUFFER_SIZE_BYTES: usize = 8 << 10;
710-
fn chunk_capacity(&self) -> usize {
711-
let size = ::std::mem::size_of::<T>();
712-
if size == 0 {
713-
Self::BUFFER_SIZE_BYTES
714-
} else if size <= Self::BUFFER_SIZE_BYTES {
715-
Self::BUFFER_SIZE_BYTES / size
716-
} else {
717-
1
718-
}
719-
}
720-
721-
/// Helper to get pre-sized vector from the stash.
722-
#[inline]
723-
fn empty(&self, stash: &mut Vec<Vec<T>>) -> Vec<T> {
724-
stash.pop().unwrap_or_else(|| Vec::with_capacity(self.chunk_capacity()))
725-
}
726-
727-
/// Helper to return a chunk to the stash.
728-
#[inline]
729-
fn recycle(&self, mut chunk: Vec<T>, stash: &mut Vec<Vec<T>>) {
730-
// TODO: Should we limit the size of `stash`?
731-
if chunk.capacity() == self.chunk_capacity() {
732-
chunk.clear();
733-
stash.push(chunk);
734-
}
735-
}
736-
}
737-
738-
impl<D, T, R> Merger for VecMerger<(D, T, R)>
739-
where
740-
D: Data,
741-
T: Ord + PartialOrder + Clone + 'static,
742-
R: Semigroup + 'static,
743-
{
744-
type Time = T;
745-
type Chunk = Vec<(D, T, R)>;
746-
747-
fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
748-
let mut list1 = list1.into_iter();
749-
let mut list2 = list2.into_iter();
750-
let mut head1 = VecDeque::from(list1.next().unwrap_or_default());
751-
let mut head2 = VecDeque::from(list2.next().unwrap_or_default());
752-
753-
let mut result = self.empty(stash);
754-
755-
// while we have valid data in each input, merge.
756-
while !head1.is_empty() && !head2.is_empty() {
757-
while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
758-
let cmp = {
759-
let x = head1.front().unwrap();
760-
let y = head2.front().unwrap();
761-
(&x.0, &x.1).cmp(&(&y.0, &y.1))
762-
};
763-
use std::cmp::Ordering;
764-
match cmp {
765-
Ordering::Less => result.push(head1.pop_front().unwrap()),
766-
Ordering::Greater => result.push(head2.pop_front().unwrap()),
767-
Ordering::Equal => {
768-
let (data1, time1, mut diff1) = head1.pop_front().unwrap();
769-
let (_data2, _time2, diff2) = head2.pop_front().unwrap();
770-
diff1.plus_equals(&diff2);
771-
if !diff1.is_zero() {
772-
result.push((data1, time1, diff1));
773-
}
774-
}
775-
}
776-
}
777-
778-
if result.capacity() == result.len() {
779-
output.push(result);
780-
result = self.empty(stash);
781-
}
782-
783-
if head1.is_empty() {
784-
let done1 = Vec::from(head1);
785-
self.recycle(done1, stash);
786-
head1 = VecDeque::from(list1.next().unwrap_or_default());
787-
}
788-
if head2.is_empty() {
789-
let done2 = Vec::from(head2);
790-
self.recycle(done2, stash);
791-
head2 = VecDeque::from(list2.next().unwrap_or_default());
792-
}
793-
}
794-
795-
if !result.is_empty() {
796-
output.push(result);
797-
} else {
798-
self.recycle(result, stash);
799-
}
800-
801-
if !head1.is_empty() {
802-
let mut result = self.empty(stash);
803-
for item1 in head1 {
804-
result.push(item1);
805-
}
806-
output.push(result);
807-
}
808-
output.extend(list1);
809-
810-
if !head2.is_empty() {
811-
let mut result = self.empty(stash);
812-
for item2 in head2 {
813-
result.push(item2);
814-
}
815-
output.push(result);
816-
}
817-
output.extend(list2);
818-
}
819-
820-
fn extract(
821-
&mut self,
822-
merged: Vec<Self::Chunk>,
823-
upper: AntichainRef<Self::Time>,
824-
frontier: &mut Antichain<Self::Time>,
825-
readied: &mut Vec<Self::Chunk>,
826-
kept: &mut Vec<Self::Chunk>,
827-
stash: &mut Vec<Self::Chunk>,
828-
) {
829-
let mut keep = self.empty(stash);
830-
let mut ready = self.empty(stash);
831-
832-
for mut buffer in merged {
833-
for (data, time, diff) in buffer.drain(..) {
834-
if upper.less_equal(&time) {
835-
frontier.insert_ref(&time);
836-
if keep.len() == keep.capacity() && !keep.is_empty() {
837-
kept.push(keep);
838-
keep = self.empty(stash);
839-
}
840-
keep.push((data, time, diff));
841-
} else {
842-
if ready.len() == ready.capacity() && !ready.is_empty() {
843-
readied.push(ready);
844-
ready = self.empty(stash);
845-
}
846-
ready.push((data, time, diff));
847-
}
848-
}
849-
// Recycling buffer.
850-
self.recycle(buffer, stash);
851-
}
852-
// Finish the kept data.
853-
if !keep.is_empty() {
854-
kept.push(keep);
855-
}
856-
if !ready.is_empty() {
857-
readied.push(ready);
858-
}
859-
}
860-
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
861-
(chunk.len(), 0, 0, 0)
862-
}
863-
}

src/trace/implementations/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
pub mod spine_fueled;
4242

4343
pub mod merge_batcher;
44-
pub mod merge_batcher_col;
4544
pub mod merge_batcher_flat;
4645
pub mod ord_neu;
4746
pub mod rhh;

src/trace/implementations/ord_neu.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@ use timely::container::flatcontainer::{FlatStack, RegionPreference};
1414
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
1515
use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker, VecChunker};
1616
use crate::trace::implementations::spine_fueled::Spine;
17-
use crate::trace::implementations::merge_batcher::{MergeBatcher};
18-
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
17+
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
1918
use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger;
20-
use crate::trace::implementations::merge_batcher::VecMerger;
2119
use crate::trace::rc_blanket_impls::RcBuilder;
2220

2321
use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout};
@@ -28,8 +26,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
2826
/// A trace implementation using a spine of ordered lists.
2927
pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
3028
/// A batcher using ordered lists.
31-
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>;
32-
// pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, ContainerMerger<Vec<((K, V), T, R)>, std::collections::VecDeque<((K, V), T, R)>>>;
29+
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
3330
/// A builder using ordered lists.
3431
pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
3532

@@ -39,7 +36,7 @@ pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R
3936
/// A trace implementation backed by columnar storage.
4037
pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
4138
/// A batcher for columnar storage.
42-
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>;
39+
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
4340
/// A builder for columnar storage.
4441
pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
4542

@@ -64,8 +61,7 @@ pub type FlatValBuilderDefault<K, V, T, R> = FlatValBuilder<FlatLayout<<K as Reg
6461
/// A trace implementation using a spine of ordered lists.
6562
pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
6663
/// A batcher for ordered lists.
67-
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<((K, ()), T, R)>>;
68-
// pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, ContainerMerger<Vec<((K, ()), T, R)>, std::collections::VecDeque<((K, ()), T, R)>>>;
64+
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
6965
/// A builder for ordered lists.
7066
pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
7167

@@ -75,8 +71,7 @@ pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>
7571
/// A trace implementation backed by columnar storage.
7672
pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
7773
/// A batcher for columnar storage
78-
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColumnationMerger<((K,()),T,R)>>;
79-
// pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ContainerMerger<TimelyStack<((K,()),T,R)>,TimelyStackQueue<((K,()), T, R)>>>;
74+
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
8075
/// A builder for columnar storage
8176
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
8277

@@ -99,7 +94,7 @@ pub type FlatKeyBuilderDefault<K, T, R> = FlatKeyBuilder<FlatLayout<<K as Region
9994
/// A trace implementation backed by columnar storage.
10095
pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
10196
/// A batcher for columnar storage.
102-
pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationMerger<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>;
97+
pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColMerger<(<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R>>;
10398
/// A builder for columnar storage.
10499
pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;
105100

src/trace/implementations/rhh.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ use timely::container::columnation::TimelyStack;
1313

1414
use crate::Hashable;
1515
use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
16-
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger};
17-
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
16+
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
1817
use crate::trace::implementations::spine_fueled::Spine;
1918
use crate::trace::rc_blanket_impls::RcBuilder;
2019

@@ -25,7 +24,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder};
2524
/// A trace implementation using a spine of ordered lists.
2625
pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
2726
/// A batcher for ordered lists.
28-
pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>;
27+
pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
2928
/// A builder for ordered lists.
3029
pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
3130

@@ -35,7 +34,7 @@ pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<
3534
/// A trace implementation backed by columnar storage.
3635
pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K,V),T,R)>>>>;
3736
/// A batcher for columnar storage.
38-
pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>;
37+
pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
3938
/// A builder for columnar storage.
4039
pub type ColBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
4140

0 commit comments

Comments
 (0)