Skip to content

Commit aa929a2

Browse files
committed
Split the columnar modules into bespoke files
No changes in functionality expected. Signed-off-by: Moritz Hoffmann <mh@materialize.com>
1 parent 1247fee commit aa929a2

File tree

14 files changed

+558
-511
lines changed

14 files changed

+558
-511
lines changed

src/compute/src/logging.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, Timely
3636
use mz_expr::{MirScalarExpr, permutation_for_arrangement};
3737
use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp};
3838
use mz_timely_util::activator::RcActivator;
39-
use mz_timely_util::containers::ColumnBuilder;
39+
use mz_timely_util::columnar::builder::ColumnBuilder;
4040
use mz_timely_util::operator::consolidate_pact;
4141

4242
use crate::logging::compute::Logger as ComputeLogger;

src/compute/src/logging/compute.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use differential_dataflow::trace::{BatchReader, Cursor};
2222
use mz_compute_types::plan::LirId;
2323
use mz_ore::cast::CastFrom;
2424
use mz_repr::{Datum, Diff, GlobalId, Timestamp};
25-
use mz_timely_util::containers::{Column, ColumnBuilder, ProvidedBuilder};
25+
use mz_timely_util::columnar::Column;
26+
use mz_timely_util::columnar::builder::ColumnBuilder;
27+
use mz_timely_util::containers::ProvidedBuilder;
2628
use mz_timely_util::replay::MzReplay;
2729
use timely::dataflow::channels::pact::Pipeline;
2830
use timely::dataflow::operators::Operator;

src/compute/src/logging/differential.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ use differential_dataflow::logging::{
2020
};
2121
use mz_ore::cast::CastFrom;
2222
use mz_repr::{Datum, Diff, Timestamp};
23-
use mz_timely_util::containers::{
24-
Col2ValBatcher, ColumnBuilder, ProvidedBuilder, columnar_exchange,
25-
};
23+
use mz_timely_util::columnar::builder::ColumnBuilder;
24+
use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
25+
use mz_timely_util::containers::ProvidedBuilder;
2626
use mz_timely_util::replay::MzReplay;
2727
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
2828
use timely::dataflow::channels::pushers::buffer::Session;

src/compute/src/logging/initialize.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ use mz_compute_client::logging::{LogVariant, LoggingConfig};
1717
use mz_repr::{Diff, Timestamp};
1818
use mz_storage_operators::persist_source::Subtime;
1919
use mz_storage_types::errors::DataflowError;
20-
use mz_timely_util::containers::{Column, ColumnBuilder};
20+
use mz_timely_util::columnar::Column;
21+
use mz_timely_util::columnar::builder::ColumnBuilder;
2122
use mz_timely_util::operator::CollectionExt;
2223
use timely::communication::Allocate;
2324
use timely::container::{ContainerBuilder, PushInto};

src/compute/src/logging/reachability.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ use std::time::Duration;
1717
use mz_compute_client::logging::LoggingConfig;
1818
use mz_ore::cast::CastFrom;
1919
use mz_repr::{Datum, Diff, Row, Timestamp};
20-
use mz_timely_util::containers::{Col2ValBatcher, Column, ColumnBuilder, columnar_exchange};
20+
use mz_timely_util::columnar::builder::ColumnBuilder;
21+
use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
2122
use mz_timely_util::replay::MzReplay;
2223
use timely::Container;
2324
use timely::dataflow::Scope;

src/compute/src/logging/timely.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ use differential_dataflow::containers::{Columnation, CopyRegion};
1919
use mz_compute_client::logging::LoggingConfig;
2020
use mz_ore::cast::CastFrom;
2121
use mz_repr::{Datum, Diff, Timestamp};
22-
use mz_timely_util::containers::{
23-
Col2ValBatcher, ColumnBuilder, ProvidedBuilder, columnar_exchange,
24-
};
22+
use mz_timely_util::columnar::builder::ColumnBuilder;
23+
use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
24+
use mz_timely_util::containers::ProvidedBuilder;
2525
use mz_timely_util::replay::MzReplay;
2626
use timely::Container;
2727
use timely::dataflow::Scope;

src/compute/src/render/context.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow
3131
use mz_storage_types::controller::CollectionMetadata;
3232
use mz_storage_types::errors::DataflowError;
3333
use mz_timely_util::builder_async::{ButtonHandle, PressOnDropButton};
34-
use mz_timely_util::containers::{Col2ValBatcher, ColumnBuilder, columnar_exchange};
34+
use mz_timely_util::columnar::builder::ColumnBuilder;
35+
use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
3536
use mz_timely_util::operator::{CollectionExt, StreamExt};
3637
use timely::Container;
3738
use timely::container::CapacityContainerBuilder;

src/compute/src/render/join/linear_join.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ use mz_dyncfg::ConfigSet;
2727
use mz_repr::fixed_length::ToDatumIter;
2828
use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
2929
use mz_storage_types::errors::DataflowError;
30-
use mz_timely_util::containers::{Col2ValBatcher, ColumnBuilder, columnar_exchange};
30+
use mz_timely_util::columnar::builder::ColumnBuilder;
31+
use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
3132
use mz_timely_util::operator::{CollectionExt, StreamExt};
3233
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
3334
use timely::dataflow::operators::OkErr;

src/timely-util/src/columnar.rs

Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
// Copyright Materialize, Inc. and contributors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License in the LICENSE file at the
6+
// root of this repository, or online at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
//! Container for columnar data.
17+
18+
#![deny(missing_docs)]
19+
20+
pub mod batcher;
21+
pub mod builder;
22+
23+
use std::hash::Hash;
24+
25+
use columnar::Container as _;
26+
use columnar::bytes::{EncodeDecode, Indexed};
27+
use columnar::common::IterOwn;
28+
use columnar::{Clear, FromBytes, Index, Len};
29+
use columnar::{Columnar, Ref};
30+
use differential_dataflow::Hashable;
31+
use differential_dataflow::containers::TimelyStack;
32+
use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher};
33+
use mz_ore::region::Region;
34+
use timely::Container;
35+
use timely::bytes::arc::Bytes;
36+
use timely::container::PushInto;
37+
use timely::dataflow::channels::ContainerBytes;
38+
39+
/// A batcher for columnar storage.
40+
pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<
41+
Column<((K, V), T, R)>,
42+
batcher::Chunker<TimelyStack<((K, V), T, R)>>,
43+
ColMerger<(K, V), T, R>,
44+
>;
45+
/// A batcher for columnar storage with unit values.
46+
pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
47+
48+
/// A container based on a columnar store, encoded in aligned bytes.
49+
///
50+
/// The type can represent typed data, bytes from Timely, or an aligned allocation. The name
51+
/// is singular to express that the preferred format is [`Column::Align`]. The [`Column::Typed`]
52+
/// variant is used to construct the container, and it owns potentially multiple columns of data.
53+
pub enum Column<C: Columnar> {
54+
/// The typed variant of the container.
55+
Typed(C::Container),
56+
/// The binary variant of the container.
57+
Bytes(Bytes),
58+
/// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
59+
///
60+
/// Reasons could include misalignment, cloning of data, or wanting
61+
/// to release the `Bytes` as a scarce resource.
62+
Align(Region<u64>),
63+
}
64+
65+
impl<C: Columnar> Column<C> {
66+
/// Borrows the container as a reference.
67+
#[inline]
68+
fn borrow(&self) -> <C::Container as columnar::Container>::Borrowed<'_> {
69+
match self {
70+
Column::Typed(t) => t.borrow(),
71+
Column::Bytes(b) => <<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes(
72+
&mut Indexed::decode(bytemuck::cast_slice(b)),
73+
),
74+
Column::Align(a) => <<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes(
75+
&mut Indexed::decode(a),
76+
),
77+
}
78+
}
79+
}
80+
81+
impl<C: Columnar> Default for Column<C> {
82+
fn default() -> Self {
83+
Self::Typed(Default::default())
84+
}
85+
}
86+
87+
impl<C: Columnar> Clone for Column<C>
88+
where
89+
C::Container: Clone,
90+
{
91+
fn clone(&self) -> Self {
92+
match self {
93+
// Typed stays typed, although we would have the option to move to aligned data.
94+
// If we did it might be confusing why we couldn't push into a cloned column.
95+
Column::Typed(t) => Column::Typed(t.clone()),
96+
Column::Bytes(b) => {
97+
assert_eq!(b.len() % 8, 0);
98+
let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(b.len() / 8);
99+
let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
100+
alloc_bytes[..b.len()].copy_from_slice(b);
101+
Self::Align(alloc)
102+
}
103+
Column::Align(a) => {
104+
let mut alloc = crate::containers::alloc_aligned_zeroed(a.len());
105+
alloc[..a.len()].copy_from_slice(a);
106+
Column::Align(alloc)
107+
}
108+
}
109+
}
110+
}
111+
112+
impl<C: Columnar> Container for Column<C> {
113+
type ItemRef<'a> = columnar::Ref<'a, C>;
114+
type Item<'a> = columnar::Ref<'a, C>;
115+
116+
#[inline]
117+
fn len(&self) -> usize {
118+
self.borrow().len()
119+
}
120+
121+
// This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into.
122+
#[inline]
123+
fn clear(&mut self) {
124+
match self {
125+
Column::Typed(t) => t.clear(),
126+
Column::Bytes(_) | Column::Align(_) => *self = Column::Typed(Default::default()),
127+
}
128+
}
129+
130+
type Iter<'a> = IterOwn<<C::Container as columnar::Container>::Borrowed<'a>>;
131+
132+
#[inline]
133+
fn iter(&self) -> Self::Iter<'_> {
134+
self.borrow().into_index_iter()
135+
}
136+
137+
type DrainIter<'a> = IterOwn<<C::Container as columnar::Container>::Borrowed<'a>>;
138+
139+
#[inline]
140+
fn drain(&mut self) -> Self::DrainIter<'_> {
141+
self.borrow().into_index_iter()
142+
}
143+
}
144+
145+
impl<C: Columnar, T> PushInto<T> for Column<C>
146+
where
147+
C::Container: columnar::Push<T>,
148+
{
149+
#[inline]
150+
fn push_into(&mut self, item: T) {
151+
use columnar::Push;
152+
match self {
153+
Column::Typed(t) => t.push(item),
154+
Column::Align(_) | Column::Bytes(_) => {
155+
// We really oughtn't be calling this in this case.
156+
// We could convert to owned, but need more constraints on `C`.
157+
unimplemented!("Pushing into Column::Bytes without first clearing");
158+
}
159+
}
160+
}
161+
}
162+
163+
impl<C: Columnar> ContainerBytes for Column<C> {
164+
#[inline]
165+
fn from_bytes(bytes: Bytes) -> Self {
166+
// Our expectation / hope is that `bytes` is `u64` aligned and sized.
167+
// If the alignment is borked, we can relocate. If the size is borked,
168+
// not sure what we do in that case. An incorrect size indicates a problem
169+
// of `into_bytes`, or a failure of the communication layer, both of which
170+
// are unrecoverable.
171+
assert_eq!(bytes.len() % 8, 0);
172+
if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
173+
Self::Bytes(bytes)
174+
} else {
175+
// We failed to cast the slice, so we'll reallocate.
176+
let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(bytes.len() / 8);
177+
let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
178+
alloc_bytes[..bytes.len()].copy_from_slice(&bytes);
179+
Self::Align(alloc)
180+
}
181+
}
182+
183+
#[inline]
184+
fn length_in_bytes(&self) -> usize {
185+
match self {
186+
Column::Typed(t) => Indexed::length_in_bytes(&t.borrow()),
187+
Column::Bytes(b) => b.len(),
188+
Column::Align(a) => 8 * a.len(),
189+
}
190+
}
191+
192+
#[inline]
193+
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
194+
match self {
195+
Column::Typed(t) => Indexed::write(writer, &t.borrow()).unwrap(),
196+
Column::Bytes(b) => writer.write_all(b).unwrap(),
197+
Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
198+
}
199+
}
200+
}
201+
202+
/// An exchange function for columnar tuples of the form `((K, V), T, D)`. Rust has a hard
203+
/// time to figure out the lifetimes of the elements when specified as a closure, so we rather
204+
/// specify it as a function.
205+
#[inline(always)]
206+
pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64
207+
where
208+
K: Columnar,
209+
for<'a> Ref<'a, K>: Hash,
210+
V: Columnar,
211+
D: Columnar,
212+
T: Columnar,
213+
{
214+
k.hashed()
215+
}
216+
217+
#[cfg(test)]
218+
mod tests {
219+
use mz_ore::region::Region;
220+
use timely::Container;
221+
use timely::bytes::arc::BytesMut;
222+
use timely::container::PushInto;
223+
use timely::dataflow::channels::ContainerBytes;
224+
225+
use super::*;
226+
227+
/// Produce some bytes that are in columnar format.
228+
fn raw_columnar_bytes() -> Vec<u8> {
229+
let mut raw = Vec::new();
230+
raw.extend(16_u64.to_le_bytes()); // offsets
231+
raw.extend(28_u64.to_le_bytes()); // length
232+
raw.extend(1_i32.to_le_bytes());
233+
raw.extend(2_i32.to_le_bytes());
234+
raw.extend(3_i32.to_le_bytes());
235+
raw.extend([0, 0, 0, 0]); // padding
236+
raw
237+
}
238+
239+
#[mz_ore::test]
240+
fn test_column_clone() {
241+
let columns = Columnar::as_columns([1, 2, 3].iter());
242+
let column_typed: Column<i32> = Column::Typed(columns);
243+
let column_typed2 = column_typed.clone();
244+
245+
assert_eq!(column_typed2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
246+
247+
let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
248+
let column_bytes: Column<i32> = Column::Bytes(bytes);
249+
let column_bytes2 = column_bytes.clone();
250+
251+
assert_eq!(column_bytes2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
252+
253+
let raw = raw_columnar_bytes();
254+
let mut region: Region<u64> = crate::containers::alloc_aligned_zeroed(raw.len() / 8);
255+
let region_bytes = bytemuck::cast_slice_mut(&mut region);
256+
region_bytes[..raw.len()].copy_from_slice(&raw);
257+
let column_align: Column<i32> = Column::Align(region);
258+
let column_align2 = column_align.clone();
259+
260+
assert_eq!(column_align2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
261+
}
262+
263+
/// Assert the desired contents of raw_columnar_bytes so that diagnosing test failures is
264+
/// easier.
265+
#[mz_ore::test]
266+
fn test_column_known_bytes() {
267+
let mut column: Column<i32> = Default::default();
268+
column.push_into(1);
269+
column.push_into(2);
270+
column.push_into(3);
271+
let mut data = Vec::new();
272+
column.into_bytes(&mut std::io::Cursor::new(&mut data));
273+
assert_eq!(data, raw_columnar_bytes());
274+
}
275+
276+
#[mz_ore::test]
277+
fn test_column_from_bytes() {
278+
{
279+
let mut column: Column<i32> = Default::default();
280+
column.push_into(1);
281+
column.push_into(2);
282+
column.push_into(3);
283+
let mut data = Vec::new();
284+
column.into_bytes(&mut std::io::Cursor::new(&mut data));
285+
println!("data: {:?}", data);
286+
}
287+
288+
let raw = raw_columnar_bytes();
289+
290+
let buf = vec![0; raw.len() + 8];
291+
let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
292+
let mut bytes_mut = BytesMut::from(buf);
293+
let _ = bytes_mut.extract_to(align);
294+
bytes_mut[..raw.len()].copy_from_slice(&raw);
295+
let aligned_bytes = bytes_mut.extract_to(raw.len());
296+
297+
let column: Column<i32> = Column::from_bytes(aligned_bytes);
298+
assert!(matches!(column, Column::Bytes(_)));
299+
assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
300+
301+
let buf = vec![0; raw.len() + 8];
302+
let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
303+
let mut bytes_mut = BytesMut::from(buf);
304+
let _ = bytes_mut.extract_to(align + 1);
305+
bytes_mut[..raw.len()].copy_from_slice(&raw);
306+
let unaligned_bytes = bytes_mut.extract_to(raw.len());
307+
308+
let column: Column<i32> = Column::from_bytes(unaligned_bytes);
309+
assert!(matches!(column, Column::Align(_)));
310+
assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
311+
}
312+
}

0 commit comments

Comments
 (0)