Skip to content

Commit 7c792f3

Browse files
committed
Switch some dataflow fragments to columnar
Change the dataflow fragment for `specialized_arrange` and in the linear join preparation phase to use columnar data on edges. Depends on: * TimelyDataflow/differential-dataflow#562 * TimelyDataflow/timely-dataflow#630 Signed-off-by: Moritz Hoffmann <mh@materialize.com>
1 parent 9b87cb6 commit 7c792f3

File tree

17 files changed

+218
-125
lines changed

17 files changed

+218
-125
lines changed

Cargo.lock

Lines changed: 0 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,8 @@ incremental = true
285285
# merged), after which point it becomes impossible to build that historical
286286
# version of Materialize.
287287
[patch.crates-io]
288+
differential-dataflow = { path = "../differential-dataflow" }
289+
timely = { path = "../timely-dataflow/timely" }
288290
# Waiting on https://github.com/sfackler/rust-postgres/pull/752.
289291
postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }
290292
tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }

src/compute-types/src/dyncfgs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new(
4141
/// Enable lgalloc for columnation.
4242
pub const ENABLE_COLUMNATION_LGALLOC: Config<bool> = Config::new(
4343
"enable_columnation_lgalloc",
44-
false,
44+
true,
4545
"Enable allocating regions from lgalloc.",
4646
);
4747

src/compute/src/render.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ use std::rc::{Rc, Weak};
110110
use std::sync::Arc;
111111
use std::task::Poll;
112112

113+
use columnar::Columnar;
113114
use differential_dataflow::dynamic::pointstamp::PointStamp;
114115
use differential_dataflow::lattice::Lattice;
115116
use differential_dataflow::operators::arrange::Arranged;
@@ -498,6 +499,7 @@ impl<'g, G, T> Context<Child<'g, G, T>>
498499
where
499500
G: Scope<Timestamp = mz_repr::Timestamp>,
500501
T: Refines<G::Timestamp> + RenderTimestamp,
502+
<T as Columnar>::Container: Clone + Send,
501503
{
502504
pub(crate) fn import_index(
503505
&mut self,
@@ -647,6 +649,7 @@ impl<'g, G, T> Context<Child<'g, G, T>>
647649
where
648650
G: Scope<Timestamp = mz_repr::Timestamp>,
649651
T: RenderTimestamp,
652+
<T as Columnar>::Container: Clone + Send,
650653
{
651654
pub(crate) fn export_index_iterative(
652655
&self,
@@ -889,6 +892,8 @@ impl<G> Context<G>
889892
where
890893
G: Scope,
891894
G::Timestamp: RenderTimestamp,
895+
<G::Timestamp as Columnar>::Container: Clone + Send,
896+
for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
892897
{
893898
/// Renders a non-recursive plan to a differential dataflow, producing the collection of
894899
/// results.
@@ -1293,7 +1298,11 @@ where
12931298

12941299
#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
12951300
/// A timestamp type that can be used for operations within MZ's dataflow layer.
1296-
pub trait RenderTimestamp: Timestamp + Lattice + Refines<mz_repr::Timestamp> + Columnation {
1301+
pub trait RenderTimestamp:
1302+
Timestamp + Lattice + Refines<mz_repr::Timestamp> + Columnation + Columnar
1303+
where
1304+
<Self as Columnar>::Container: Clone + Send,
1305+
{
12971306
/// The system timestamp component of the timestamp.
12981307
///
12991308
/// This is useful for manipulating the system time, as when delaying
@@ -1429,6 +1438,7 @@ impl<S> WithStartSignal for MzArrangementImport<S>
14291438
where
14301439
S: Scope,
14311440
S::Timestamp: RenderTimestamp,
1441+
<S::Timestamp as Columnar>::Container: Clone + Send,
14321442
{
14331443
fn with_start_signal(self, signal: StartSignal) -> Self {
14341444
match self {
@@ -1443,6 +1453,7 @@ impl<S, Tr> WithStartSignal for Arranged<S, Tr>
14431453
where
14441454
S: Scope,
14451455
S::Timestamp: RenderTimestamp,
1456+
<S::Timestamp as Columnar>::Container: Clone + Send,
14461457
Tr: TraceReader + Clone,
14471458
{
14481459
fn with_start_signal(self, signal: StartSignal) -> Self {

src/compute/src/render/context.rs

Lines changed: 54 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -14,37 +14,40 @@ use std::collections::BTreeMap;
1414
use std::rc::Weak;
1515
use std::sync::mpsc;
1616

17+
use columnar::Columnar;
1718
use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
1819
use differential_dataflow::lattice::Lattice;
1920
use differential_dataflow::operators::arrange::Arranged;
2021
use differential_dataflow::trace::cursor::IntoOwned;
2122
use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
22-
use differential_dataflow::{Collection, Data};
23+
use differential_dataflow::{AsCollection, Collection, Data};
2324
use mz_compute_types::dataflows::DataflowDescription;
2425
use mz_compute_types::plan::{AvailableCollections, LirId};
2526
use mz_expr::{Id, MapFilterProject, MirScalarExpr};
26-
use mz_repr::fixed_length::{FromDatumIter, ToDatumIter};
27+
use mz_repr::fixed_length::ToDatumIter;
2728
use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
2829
use mz_storage_types::controller::CollectionMetadata;
2930
use mz_storage_types::errors::DataflowError;
31+
use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder};
3032
use mz_timely_util::operator::{CollectionExt, StreamExt};
3133
use timely::container::columnation::Columnation;
3234
use timely::container::CapacityContainerBuilder;
33-
use timely::dataflow::channels::pact::Pipeline;
35+
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
3436
use timely::dataflow::operators::generic::OutputHandleCore;
3537
use timely::dataflow::operators::Capability;
3638
use timely::dataflow::scopes::Child;
3739
use timely::dataflow::{Scope, ScopeParent};
3840
use timely::progress::timestamp::Refines;
3941
use timely::progress::{Antichain, Timestamp};
42+
use timely::Container;
4043
use tracing::error;
4144

4245
use crate::arrangement::manager::SpecializedTraceHandle;
4346
use crate::compute_state::{ComputeState, HydrationEvent};
44-
use crate::extensions::arrange::{KeyCollection, MzArrange};
47+
use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
4548
use crate::render::errors::ErrorLogger;
4649
use crate::render::{LinearJoinSpec, RenderTimestamp};
47-
use crate::row_spine::{RowRowBatcher, RowRowBuilder};
50+
use crate::row_spine::RowRowBuilder;
4851
use crate::typedefs::{
4952
ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
5053
};
@@ -919,10 +922,11 @@ where
919922

920923
impl<S, T> CollectionBundle<S, T>
921924
where
922-
T: timely::progress::Timestamp + Lattice + Columnation,
925+
T: Timestamp + Lattice + Columnation,
923926
S: Scope,
924-
S::Timestamp:
925-
Refines<T> + Lattice + timely::progress::Timestamp + crate::render::RenderTimestamp,
927+
S::Timestamp: Refines<T> + RenderTimestamp,
928+
<S::Timestamp as Columnar>::Container: Clone + Send,
929+
for<'a> <S::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
926930
{
927931
/// Presents `self` as a stream of updates, having been subjected to `mfp`.
928932
///
@@ -1046,7 +1050,8 @@ where
10461050
.collection
10471051
.clone()
10481052
.expect("Collection constructed above");
1049-
let (oks, errs_keyed) = Self::specialized_arrange(&name, oks, &key, &thinning);
1053+
let (oks, errs_keyed) =
1054+
Self::specialized_arrange(&name, oks, key.clone(), thinning.clone());
10501055
let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into();
10511056
let errs = errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
10521057
&format!("{}-errors", name),
@@ -1064,44 +1069,48 @@ where
10641069
fn specialized_arrange(
10651070
name: &String,
10661071
oks: Collection<S, Row, i64>,
1067-
key: &Vec<MirScalarExpr>,
1068-
thinning: &Vec<usize>,
1072+
key: Vec<MirScalarExpr>,
1073+
thinning: Vec<usize>,
10691074
) -> (MzArrangement<S>, Collection<S, DataflowError, i64>) {
1070-
// Catch-all: Just use RowRow.
1071-
let (oks, errs) = oks
1072-
.map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1073-
"FormArrangementKey",
1074-
specialized_arrangement_key(key.clone(), thinning.clone()),
1075+
let (oks, errs) =
1076+
oks.inner
1077+
.unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, i64)>, _, _, _>(
1078+
Pipeline,
1079+
"FormArrangementKey",
1080+
move |_, _| {
1081+
Box::new(move |input, ok, err| {
1082+
let mut key_buf = Row::default();
1083+
let mut val_buf = Row::default();
1084+
let mut datums = DatumVec::new();
1085+
let temp_storage = RowArena::new();
1086+
while let Some((time, data)) = input.next() {
1087+
let mut ok_session = ok.session_with_builder(&time);
1088+
let mut err_session = err.session(&time);
1089+
for (row, time, diff) in data.iter() {
1090+
temp_storage.clear();
1091+
let datums = datums.borrow_with(&row);
1092+
let val_datum_iter = thinning.iter().map(|c| datums[*c]);
1093+
match key_buf.packer().try_extend(
1094+
key.iter().map(|k| k.eval(&datums, &temp_storage)),
1095+
) {
1096+
Ok(()) => {
1097+
val_buf.packer().extend(val_datum_iter);
1098+
ok_session.give(((&*key_buf, &*val_buf), time, diff));
1099+
}
1100+
Err(e) => {
1101+
err_session.give((e.into(), time.clone(), *diff));
1102+
}
1103+
}
1104+
}
1105+
}
1106+
})
1107+
},
1108+
);
1109+
let oks = oks
1110+
.mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1111+
ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),name
10751112
);
1076-
let oks =
1077-
oks.mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name);
1078-
(MzArrangement::RowRow(oks), errs)
1079-
}
1080-
}
1081-
1082-
/// Obtains a function that maps input rows to (key, value) pairs according to
1083-
/// the given key and thinning expressions. This function allows for specialization
1084-
/// of key and value types and is intended to use to form arrangement keys.
1085-
fn specialized_arrangement_key<K, V>(
1086-
key: Vec<MirScalarExpr>,
1087-
thinning: Vec<usize>,
1088-
) -> impl FnMut(Row) -> Result<(K, V), DataflowError>
1089-
where
1090-
K: Columnation + Data + FromDatumIter,
1091-
V: Columnation + Data + FromDatumIter,
1092-
{
1093-
let mut key_buf = K::default();
1094-
let mut val_buf = V::default();
1095-
let mut datums = DatumVec::new();
1096-
move |row| {
1097-
// TODO: Consider reusing the `row` allocation; probably in *next* invocation.
1098-
let datums = datums.borrow_with(&row);
1099-
let temp_storage = RowArena::new();
1100-
let val_datum_iter = thinning.iter().map(|c| datums[*c]);
1101-
Ok::<(K, V), DataflowError>((
1102-
key_buf.try_from_datum_iter(key.iter().map(|k| k.eval(&datums, &temp_storage)))?,
1103-
val_buf.from_datum_iter(val_datum_iter),
1104-
))
1113+
(MzArrangement::RowRow(oks), errs.as_collection())
11051114
}
11061115
}
11071116

src/compute/src/render/flat_map.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// the Business Source License, use of this software will be governed
88
// by the Apache License, Version 2.0.
99

10+
use columnar::Columnar;
1011
use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
1112
use mz_expr::MfpPlan;
1213
use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc};
@@ -26,6 +27,7 @@ impl<G> Context<G>
2627
where
2728
G: Scope,
2829
G::Timestamp: crate::render::RenderTimestamp,
30+
<G::Timestamp as Columnar>::Container: Clone + Send,
2931
{
3032
/// Applies a `TableFunc` to every row, followed by an `mfp`.
3133
pub fn render_flat_map(
@@ -130,6 +132,7 @@ fn drain_through_mfp<T>(
130132
>,
131133
) where
132134
T: crate::render::RenderTimestamp,
135+
<T as Columnar>::Container: Clone + Send,
133136
{
134137
let temp_storage = RowArena::new();
135138
let binding = SharedRow::get();

src/compute/src/render/join/delta_join.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
use std::collections::{BTreeMap, BTreeSet};
1717

18+
use columnar::Columnar;
1819
use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
1920
use differential_dataflow::lattice::Lattice;
2021
use differential_dataflow::operators::arrange::Arranged;
@@ -45,7 +46,9 @@ use crate::typedefs::{RowRowAgent, RowRowEnter};
4546
impl<G> Context<G>
4647
where
4748
G: Scope,
48-
G::Timestamp: crate::render::RenderTimestamp,
49+
G::Timestamp: RenderTimestamp,
50+
<G::Timestamp as Columnar>::Container: Clone + Send,
51+
for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
4952
{
5053
/// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows.
5154
///
@@ -326,7 +329,9 @@ fn dispatch_build_halfjoin_local<G, CF>(
326329
)
327330
where
328331
G: Scope,
329-
G::Timestamp: crate::render::RenderTimestamp,
332+
G::Timestamp: RenderTimestamp,
333+
<G::Timestamp as Columnar>::Container: Clone + Send,
334+
for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
330335
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
331336
{
332337
match trace {
@@ -358,7 +363,9 @@ fn dispatch_build_halfjoin_trace<G, T, CF>(
358363
where
359364
G: Scope,
360365
T: Timestamp + Lattice + Columnation,
361-
G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines<T> + Columnation,
366+
G::Timestamp: RenderTimestamp + Refines<T>,
367+
<G::Timestamp as Columnar>::Container: Clone + Send,
368+
for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
362369
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
363370
{
364371
match trace {
@@ -398,7 +405,9 @@ fn build_halfjoin<G, Tr, K, CF>(
398405
)
399406
where
400407
G: Scope,
401-
G::Timestamp: crate::render::RenderTimestamp,
408+
G::Timestamp: RenderTimestamp,
409+
<G::Timestamp as Columnar>::Container: Clone + Send,
410+
for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
402411
Tr: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
403412
K: ExchangeData + Hashable + Default + FromDatumIter + ToDatumIter,
404413
for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>,
@@ -531,7 +540,9 @@ fn dispatch_build_update_stream_local<G>(
531540
) -> (Collection<G, Row, Diff>, Collection<G, DataflowError, Diff>)
532541
where
533542
G: Scope,
534-
G::Timestamp: crate::render::RenderTimestamp,
543+
G::Timestamp: RenderTimestamp,
544+
<G::Timestamp as Columnar>::Container: Clone + Send,
545+
for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
535546
{
536547
match trace {
537548
MzArrangement::RowRow(inner) => build_update_stream::<_, RowRowAgent<_, _>>(
@@ -553,7 +564,9 @@ fn dispatch_build_update_stream_trace<G, T>(
553564
where
554565
G: Scope,
555566
T: Timestamp + Lattice + Columnation,
556-
G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines<T> + Columnation,
567+
G::Timestamp: Lattice + RenderTimestamp + Refines<T>,
568+
<G::Timestamp as Columnar>::Container: Clone + Send,
569+
for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
557570
{
558571
match trace {
559572
MzArrangementImport::RowRow(inner) => build_update_stream::<_, RowRowEnter<_, _, _>>(
@@ -578,7 +591,9 @@ fn build_update_stream<G, Tr>(
578591
) -> (Collection<G, Row, Diff>, Collection<G, DataflowError, Diff>)
579592
where
580593
G: Scope,
581-
G::Timestamp: crate::render::RenderTimestamp,
594+
G::Timestamp: RenderTimestamp,
595+
<G::Timestamp as Columnar>::Container: Clone + Send,
596+
for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
582597
for<'a, 'b> &'a G::Timestamp: PartialEq<Tr::TimeGat<'b>>,
583598
Tr: for<'a> TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
584599
for<'a> Tr::Key<'a>: ToDatumIter,

0 commit comments

Comments
 (0)