Skip to content

Commit e452849

Browse files
committed
Switch some dataflow fragments to columnar
Change the dataflow fragment for `specialized_arrange` and in the linear join preparation phase to use columnar data on edges. Depends on: * TimelyDataflow/differential-dataflow#562 * TimelyDataflow/timely-dataflow#630 Signed-off-by: Moritz Hoffmann <mh@materialize.com>
1 parent 2ed27fe commit e452849

File tree

42 files changed

+268
-165
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+268
-165
lines changed

Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/adapter-types/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ mz-ore = { path = "../ore" }
1515
mz-repr = { path = "../repr" }
1616
mz-storage-types = { path = "../storage-types" }
1717
serde = "1.0.152"
18-
timely = "0.17.0"
18+
timely = "0.17.1"
1919
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }
2020

2121
[package.metadata.cargo-udeps.ignore]

src/adapter/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ bytesize = "1.1.0"
1818
chrono = { version = "0.4.35", default-features = false, features = ["std"] }
1919
dec = "0.4.8"
2020
derivative = "2.2.0"
21-
differential-dataflow = "0.13.4"
21+
differential-dataflow = "0.13.5"
2222
enum-kinds = "0.5.1"
2323
fail = { version = "0.5.1", features = ["failpoints"] }
2424
futures = "0.3.25"
@@ -78,7 +78,7 @@ serde_plain = "1.0.1"
7878
sha2 = "0.10.6"
7979
smallvec = { version = "1.10.0", features = ["union"] }
8080
static_assertions = "1.1"
81-
timely = "0.17.0"
81+
timely = "0.17.1"
8282
tokio = { version = "1.38.0", features = ["rt", "time"] }
8383
tokio-postgres = { version = "0.7.8" }
8484
tracing = "0.1.37"

src/catalog/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ bytesize = "1.1.0"
1818
chrono = { version = "0.4.35", default-features = false, features = ["std"] }
1919
clap = { version = "4.5.23", features = ["derive"] }
2020
derivative = "2.2.0"
21-
differential-dataflow = "0.13.4"
21+
differential-dataflow = "0.13.5"
2222
futures = "0.3.25"
2323
ipnet = "2.5.0"
2424
itertools = "0.12.1"
@@ -60,7 +60,7 @@ serde_plain = "1.0.1"
6060
static_assertions = "1.1"
6161
sha2 = "0.10.6"
6262
thiserror = "1.0.37"
63-
timely = "0.17.0"
63+
timely = "0.17.1"
6464
tokio = { version = "1.38.0" }
6565
tracing = "0.1.37"
6666
uuid = "1.2.2"

src/cluster/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ workspace = true
1313
anyhow = "1.0.66"
1414
async-trait = "0.1.83"
1515
crossbeam-channel = "0.5.8"
16-
differential-dataflow = "0.13.4"
16+
differential-dataflow = "0.13.5"
1717
futures = "0.3.25"
1818
mz-cluster-client = { path = "../cluster-client" }
1919
mz-ore = { path = "../ore", features = ["async", "process", "tracing_"] }
2020
mz-persist-client = { path = "../persist-client" }
2121
mz-service = { path = "../service" }
2222
mz-txn-wal = { path = "../txn-wal" }
2323
regex = "1.7.0"
24-
timely = "0.17.0"
24+
timely = "0.17.1"
2525
tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] }
2626
tracing = "0.1.37"
2727
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

src/compute-client/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ async-trait = "0.1.83"
1515
bytesize = "1.1.0"
1616
crossbeam-channel = "0.5.8"
1717
derivative = "2.2.0"
18-
differential-dataflow = "0.13.4"
18+
differential-dataflow = "0.13.5"
1919
futures = "0.3.25"
2020
http = "1.1.0"
2121
mz-build-info = { path = "../build-info" }
@@ -43,7 +43,7 @@ prost = { version = "0.13.2", features = ["no-recursion-limit"] }
4343
serde = { version = "1.0.152", features = ["derive"] }
4444
serde_json = "1.0.125"
4545
thiserror = "1.0.37"
46-
timely = "0.17.0"
46+
timely = "0.17.1"
4747
tokio = "1.38.0"
4848
tokio-stream = "0.1.11"
4949
tonic = "0.12.1"

src/compute-types/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ workspace = true
1212
[dependencies]
1313
columnar = "0.2.2"
1414
columnation = "0.1.0"
15-
differential-dataflow = "0.13.4"
15+
differential-dataflow = "0.13.5"
1616
itertools = "0.12.1"
1717
mz-dyncfg = { path = "../dyncfg" }
1818
mz-expr = { path = "../expr" }
@@ -24,7 +24,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] }
2424
proptest-derive = { version = "0.5.1", features = ["boxed_union"] }
2525
prost = { version = "0.13.2", features = ["no-recursion-limit"] }
2626
serde = { version = "1.0.152", features = ["derive"] }
27-
timely = "0.17.0"
27+
timely = "0.17.1"
2828
tracing = "0.1.37"
2929
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }
3030

src/compute-types/src/dyncfgs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new(
5050
/// Enable lgalloc for columnation.
5151
pub const ENABLE_COLUMNATION_LGALLOC: Config<bool> = Config::new(
5252
"enable_columnation_lgalloc",
53-
false,
53+
true,
5454
"Enable allocating regions from lgalloc.",
5555
);
5656

src/compute/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ bytesize = "1.1.0"
1616
columnar = "0.2.2"
1717
crossbeam-channel = "0.5.8"
1818
dec = { version = "0.4.8", features = ["serde"] }
19-
differential-dataflow = "0.13.4"
20-
differential-dogs3 = "0.1.4"
19+
differential-dataflow = "0.13.5"
20+
differential-dogs3 = "0.1.5"
2121
futures = "0.3.25"
2222
itertools = "0.12.1"
2323
lgalloc = "0.4"
@@ -39,7 +39,7 @@ prometheus = { version = "0.13.3", default-features = false }
3939
scopeguard = "1.1.0"
4040
serde = { version = "1.0.152", features = ["derive"] }
4141
smallvec = { version = "1.10.0", features = ["serde", "union"] }
42-
timely = "0.17.0"
42+
timely = "0.17.1"
4343
tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] }
4444
tracing = "0.1.37"
4545
uuid = { version = "1.7.0", features = ["serde", "v4"] }

src/compute/src/render.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ use std::rc::{Rc, Weak};
110110
use std::sync::Arc;
111111
use std::task::Poll;
112112

113+
use columnar::Columnar;
113114
use differential_dataflow::dynamic::pointstamp::PointStamp;
114115
use differential_dataflow::lattice::Lattice;
115116
use differential_dataflow::operators::arrange::Arranged;
@@ -498,6 +499,7 @@ impl<'g, G, T> Context<Child<'g, G, T>>
498499
where
499500
G: Scope<Timestamp = mz_repr::Timestamp>,
500501
T: Refines<G::Timestamp> + RenderTimestamp,
502+
<T as Columnar>::Container: Clone + Send,
501503
{
502504
pub(crate) fn import_index(
503505
&mut self,
@@ -647,6 +649,7 @@ impl<'g, G, T> Context<Child<'g, G, T>>
647649
where
648650
G: Scope<Timestamp = mz_repr::Timestamp>,
649651
T: RenderTimestamp,
652+
<T as Columnar>::Container: Clone + Send,
650653
{
651654
pub(crate) fn export_index_iterative(
652655
&self,
@@ -889,6 +892,8 @@ impl<G> Context<G>
889892
where
890893
G: Scope,
891894
G::Timestamp: RenderTimestamp,
895+
<G::Timestamp as Columnar>::Container: Clone + Send,
896+
for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
892897
{
893898
/// Renders a non-recursive plan to a differential dataflow, producing the collection of
894899
/// results.
@@ -1293,7 +1298,11 @@ where
12931298

12941299
#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
12951300
/// A timestamp type that can be used for operations within MZ's dataflow layer.
1296-
pub trait RenderTimestamp: Timestamp + Lattice + Refines<mz_repr::Timestamp> + Columnation {
1301+
pub trait RenderTimestamp:
1302+
Timestamp + Lattice + Refines<mz_repr::Timestamp> + Columnation + Columnar
1303+
where
1304+
<Self as Columnar>::Container: Clone + Send,
1305+
{
12971306
/// The system timestamp component of the timestamp.
12981307
///
12991308
/// This is useful for manipulating the system time, as when delaying
@@ -1429,6 +1438,7 @@ impl<S> WithStartSignal for MzArrangementImport<S>
14291438
where
14301439
S: Scope,
14311440
S::Timestamp: RenderTimestamp,
1441+
<S::Timestamp as Columnar>::Container: Clone + Send,
14321442
{
14331443
fn with_start_signal(self, signal: StartSignal) -> Self {
14341444
match self {
@@ -1443,6 +1453,7 @@ impl<S, Tr> WithStartSignal for Arranged<S, Tr>
14431453
where
14441454
S: Scope,
14451455
S::Timestamp: RenderTimestamp,
1456+
<S::Timestamp as Columnar>::Container: Clone + Send,
14461457
Tr: TraceReader + Clone,
14471458
{
14481459
fn with_start_signal(self, signal: StartSignal) -> Self {

0 commit comments

Comments
 (0)