Skip to content

Commit 235d317

Browse files
committed
Document and organize MergeBatcher
1 parent 9d2676b commit 235d317

File tree

2 files changed

+50
-31
lines changed

2 files changed

+50
-31
lines changed

src/trace/implementations/merge_batcher.rs

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,14 @@
1-
//! A general purpose `Batcher` implementation based on radix sort.
1+
//! A `Batcher` implementation based on merge sort.
2+
//!
3+
//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger".
4+
//! The chunker receives input batches and consolidates them, producing sorted output
5+
//! "chunks" that are fully consolidated (no adjacent updates can be accumulated).
6+
//! The merger implements the [`Merger`] trait, and provides hooks for manipulating
7+
//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also
8+
//! splitting them apart based on time.
9+
//!
10+
//! Implementations of `MergeBatcher` can be instantiated through the choice of both
11+
//! the chunker and the merger, provided their respective output and input types align.
212
313
use std::collections::VecDeque;
414
use std::marker::PhantomData;
@@ -15,30 +25,30 @@ use crate::logging::{BatcherEvent, DifferentialEvent};
1525
use crate::trace::{Batcher, Builder, Description};
1626
use crate::Data;
1727

18-
/// Creates batches from unordered tuples.
19-
pub struct MergeBatcher<Input, C, M>
20-
where
21-
C: ContainerBuilder<Container=M::Chunk>,
22-
M: Merger,
23-
{
24-
/// each power-of-two length list of allocations.
25-
/// Do not push/pop directly but use the corresponding functions
26-
/// ([`Self::chain_push`]/[`Self::chain_pop`]).
28+
/// Creates batches from containers of unordered tuples.
29+
///
30+
/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs,
31+
/// and must produce outputs of type `M::Chunk`.
32+
pub struct MergeBatcher<Input, C, M: Merger> {
33+
/// Transforms input streams to chunks of sorted, consolidated data.
34+
chunker: C,
35+
/// A sequence of power-of-two length lists of sorted, consolidated containers.
36+
///
37+
/// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
2738
chains: Vec<Vec<M::Chunk>>,
28-
/// Stash of empty chunks
39+
/// Stash of empty chunks, recycled through the merging process.
2940
stash: Vec<M::Chunk>,
30-
/// Chunker to transform input streams to chunks of data.
31-
chunker: C,
32-
/// Thing to accept data, merge chains, and talk to the builder.
41+
/// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time.
3342
merger: M,
34-
/// Logger for size accounting.
35-
logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>,
36-
/// Timely operator ID.
37-
operator_id: usize,
3843
/// Current lower frontier, we sealed up to here.
3944
lower: Antichain<M::Time>,
4045
/// The lower-bound frontier of the data, after the last call to seal.
4146
frontier: Antichain<M::Time>,
47+
/// Logger for size accounting.
48+
logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>,
49+
/// Timely operator ID.
50+
operator_id: usize,
51+
/// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
4252
_marker: PhantomData<Input>,
4353
}
4454

@@ -124,7 +134,6 @@ where
124134

125135
impl<Input, C, M> MergeBatcher<Input, C, M>
126136
where
127-
C: ContainerBuilder<Container=M::Chunk> + Default,
128137
M: Merger,
129138
{
130139
/// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
@@ -192,7 +201,6 @@ where
192201

193202
impl<Input, C, M> Drop for MergeBatcher<Input, C, M>
194203
where
195-
C: ContainerBuilder<Container=M::Chunk> + Default,
196204
M: Merger,
197205
{
198206
fn drop(&mut self) {
@@ -226,7 +234,17 @@ pub trait Merger: Default {
226234

227235
pub mod container {
228236

229-
//! A general purpose `Batcher` implementation for arbitrary containers.
237+
//! A general purpose `Merger` implementation for arbitrary containers.
238+
//!
239+
//! The implementation requires implementations of two traits, `ContainerQueue` and `MergerChunk`.
240+
//! The `ContainerQueue` trait is meant to wrap a container and provide iterable access to it, as
241+
//! well as the ability to return the container when iteration is complete.
242+
//! The `MergerChukn` trait is meant to be implemented by containers, and it explains how container
243+
//! items should be interpreted with respect to times, and with respect to differences.
244+
//! These two traits exist instead of a stack of constraints on the structure of the associated items
245+
//! of the containers, allowing them to perform their functions without destructuring their guts.
246+
//!
247+
//! Standard implementations exist in the `vec`, `columnation`, and `flat_container` modules.
230248
231249
use std::cmp::Ordering;
232250
use std::marker::PhantomData;
@@ -250,20 +268,26 @@ pub mod container {
250268

251269
/// Behavior to dissect items of chunks in the merge batcher
252270
pub trait MergerChunk : SizableContainer {
253-
/// The owned time type.
271+
/// An owned time type.
272+
///
273+
/// This type is provided so that users can maintain antichains of something, in order to track
274+
/// the forward movement of time and extract intervals from chains of updates.
254275
type TimeOwned;
255276
/// The owned diff type.
277+
///
278+
/// This type is provided so that users can provide an owned instance to the `push_and_add` method,
279+
/// to act as a scratch space when the type is substantial and could otherwise require allocations.
256280
type DiffOwned: Default;
257281

258-
/// Compares a borrowed time to an antichain of owned times.
282+
/// Relates a borrowed time to antichains of owned times.
259283
///
260-
/// If `upper` is less or equal to `time`, the method returns true and ensures that `frontier` reflects `time`.
284+
/// If `upper` is less or equal to `time`, the method returns `true` and ensures that `frontier` reflects `time`.
261285
fn time_kept(time1: &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool;
262286

263287
/// Push an entry that adds together two diffs.
264288
///
265289
/// This is only called when two items are deemed mergeable by the container queue.
266-
/// If the two diffs would cancel, do not push anything.
290+
/// If the two diffs added together is zero do not push anything.
267291
fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned);
268292

269293
/// Account the allocations behind the chunk.
@@ -277,6 +301,7 @@ pub mod container {
277301
/// A merger for arbitrary containers.
278302
///
279303
/// `MC` is a [`Container`] that implements [`MergerChunk`].
304+
/// `CQ` is a [`ContainerQueue`] supporting `MC`.
280305
pub struct ContainerMerger<MC, CQ> {
281306
_marker: PhantomData<(MC, CQ)>,
282307
}
@@ -288,7 +313,6 @@ pub mod container {
288313
}
289314

290315
impl<MC: MergerChunk, CQ> ContainerMerger<MC, CQ> {
291-
292316
/// Helper to get pre-sized vector from the stash.
293317
#[inline]
294318
fn empty(&self, stash: &mut Vec<MC>) -> MC {
@@ -298,14 +322,12 @@ pub mod container {
298322
container
299323
})
300324
}
301-
302325
/// Helper to return a chunk to the stash.
303326
#[inline]
304327
fn recycle(&self, chunk: MC, stash: &mut Vec<MC>) {
305328
// TODO: Should we only retain correctly sized containers?
306329
stash.push(chunk);
307330
}
308-
309331
}
310332

311333
impl<MC, CQ> Merger for ContainerMerger<MC, CQ>

src/trace/implementations/ord_neu.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,11 @@ use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegi
1515
use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker, VecChunker};
1616
use crate::trace::implementations::spine_fueled::Spine;
1717
use crate::trace::implementations::merge_batcher::{MergeBatcher};
18-
use crate::trace::implementations::merge_batcher::container::ContainerMerger;
1918
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
2019
use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger;
2120
use crate::trace::implementations::merge_batcher::VecMerger;
2221
use crate::trace::rc_blanket_impls::RcBuilder;
2322

24-
use crate::trace::implementations::merge_batcher::container::columnation::TimelyStackQueue;
25-
2623
use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout};
2724

2825
pub use self::val_batch::{OrdValBatch, OrdValBuilder};

0 commit comments

Comments
 (0)