diff --git a/src/compute/src/logging.rs b/src/compute/src/logging.rs index f8bf69f50fd2f..7a5abd96e5dfc 100644 --- a/src/compute/src/logging.rs +++ b/src/compute/src/logging.rs @@ -36,7 +36,7 @@ use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, Timely use mz_expr::{MirScalarExpr, permutation_for_arrangement}; use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp}; use mz_timely_util::activator::RcActivator; -use mz_timely_util::containers::ColumnBuilder; +use mz_timely_util::columnar::builder::ColumnBuilder; use mz_timely_util::operator::consolidate_pact; use crate::logging::compute::Logger as ComputeLogger; diff --git a/src/compute/src/logging/compute.rs b/src/compute/src/logging/compute.rs index 7dc93f6de8377..9abcae1ddf79b 100644 --- a/src/compute/src/logging/compute.rs +++ b/src/compute/src/logging/compute.rs @@ -22,7 +22,9 @@ use differential_dataflow::trace::{BatchReader, Cursor}; use mz_compute_types::plan::LirId; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, GlobalId, Timestamp}; -use mz_timely_util::containers::{Column, ColumnBuilder, ProvidedBuilder}; +use mz_timely_util::columnar::Column; +use mz_timely_util::columnar::builder::ColumnBuilder; +use mz_timely_util::containers::ProvidedBuilder; use mz_timely_util::replay::MzReplay; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Operator; diff --git a/src/compute/src/logging/differential.rs b/src/compute/src/logging/differential.rs index 6801b993bc71e..1b32d59f86790 100644 --- a/src/compute/src/logging/differential.rs +++ b/src/compute/src/logging/differential.rs @@ -20,9 +20,9 @@ use differential_dataflow::logging::{ }; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, Timestamp}; -use mz_timely_util::containers::{ - Col2ValBatcher, ColumnBuilder, ProvidedBuilder, columnar_exchange, -}; +use mz_timely_util::columnar::builder::ColumnBuilder; +use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange}; +use mz_timely_util::containers::ProvidedBuilder; use mz_timely_util::replay::MzReplay; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::channels::pushers::buffer::Session; diff --git a/src/compute/src/logging/initialize.rs b/src/compute/src/logging/initialize.rs index d401dcda5bd6d..3299d29a44a2e 100644 --- a/src/compute/src/logging/initialize.rs +++ b/src/compute/src/logging/initialize.rs @@ -17,7 +17,8 @@ use mz_compute_client::logging::{LogVariant, LoggingConfig}; use mz_repr::{Diff, Timestamp}; use mz_storage_operators::persist_source::Subtime; use mz_storage_types::errors::DataflowError; -use mz_timely_util::containers::{Column, ColumnBuilder}; +use mz_timely_util::columnar::Column; +use mz_timely_util::columnar::builder::ColumnBuilder; use mz_timely_util::operator::CollectionExt; use timely::communication::Allocate; use timely::container::{ContainerBuilder, PushInto}; diff --git a/src/compute/src/logging/reachability.rs b/src/compute/src/logging/reachability.rs index efc355520a744..ee5d3402a5c4f 100644 --- a/src/compute/src/logging/reachability.rs +++ b/src/compute/src/logging/reachability.rs @@ -17,7 +17,8 @@ use std::time::Duration; use mz_compute_client::logging::LoggingConfig; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, Row, Timestamp}; -use mz_timely_util::containers::{Col2ValBatcher, Column, ColumnBuilder, columnar_exchange}; +use mz_timely_util::columnar::builder::ColumnBuilder; +use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange}; use mz_timely_util::replay::MzReplay; use timely::Container; use timely::dataflow::Scope; diff --git a/src/compute/src/logging/timely.rs b/src/compute/src/logging/timely.rs index c7af08ea9034c..093e9e585bafe 100644 --- a/src/compute/src/logging/timely.rs +++ b/src/compute/src/logging/timely.rs @@ -19,9 +19,9 @@ use differential_dataflow::containers::{Columnation, CopyRegion}; use mz_compute_client::logging::LoggingConfig; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, Timestamp}; -use mz_timely_util::containers::{ - Col2ValBatcher, ColumnBuilder, ProvidedBuilder, columnar_exchange, -}; +use mz_timely_util::columnar::builder::ColumnBuilder; +use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange}; +use mz_timely_util::containers::ProvidedBuilder; use mz_timely_util::replay::MzReplay; use timely::Container; use timely::dataflow::Scope; diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 9b0d292d64dbc..811aa5af9c43e 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -31,7 +31,8 @@ use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::errors::DataflowError; use mz_timely_util::builder_async::{ButtonHandle, PressOnDropButton}; -use mz_timely_util::containers::{Col2ValBatcher, ColumnBuilder, columnar_exchange}; +use mz_timely_util::columnar::builder::ColumnBuilder; +use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange}; use mz_timely_util::operator::{CollectionExt, StreamExt}; use timely::Container; use timely::container::CapacityContainerBuilder; diff --git a/src/compute/src/render/join/linear_join.rs b/src/compute/src/render/join/linear_join.rs index 748cd32d8e4c4..0402039fa7d93 100644 --- a/src/compute/src/render/join/linear_join.rs +++ b/src/compute/src/render/join/linear_join.rs @@ -27,7 +27,8 @@ use mz_dyncfg::ConfigSet; use mz_repr::fixed_length::ToDatumIter; use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow}; use mz_storage_types::errors::DataflowError; -use mz_timely_util::containers::{Col2ValBatcher, ColumnBuilder, columnar_exchange}; +use mz_timely_util::columnar::builder::ColumnBuilder; +use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange}; use mz_timely_util::operator::{CollectionExt, StreamExt}; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::operators::OkErr; diff --git a/src/timely-util/src/columnar.rs b/src/timely-util/src/columnar.rs new file mode 100644 index 0000000000000..ee05e3ed5e379 --- /dev/null +++ b/src/timely-util/src/columnar.rs @@ -0,0 +1,302 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file at the +// root of this repository, or online at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Container for columnar data. + +#![deny(missing_docs)] + +pub mod batcher; +pub mod builder; + +use std::hash::Hash; + +use columnar::Container as _; +use columnar::bytes::{EncodeDecode, Indexed}; +use columnar::common::IterOwn; +use columnar::{Clear, FromBytes, Index, Len}; +use columnar::{Columnar, Ref}; +use differential_dataflow::Hashable; +use differential_dataflow::containers::TimelyStack; +use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher}; +use mz_ore::region::Region; +use timely::Container; +use timely::bytes::arc::Bytes; +use timely::container::PushInto; +use timely::dataflow::channels::ContainerBytes; + +/// A batcher for columnar storage. +pub type Col2ValBatcher = MergeBatcher< + Column<((K, V), T, R)>, + batcher::Chunker>, + ColMerger<(K, V), T, R>, +>; +/// A batcher for columnar storage with unit values. +pub type Col2KeyBatcher = Col2ValBatcher; + +/// A container based on a columnar store, encoded in aligned bytes. +/// +/// The type can represent typed data, bytes from Timely, or an aligned allocation. The name +/// is singular to express that the preferred format is [`Column::Align`]. The [`Column::Typed`] +/// variant is used to construct the container, and it owns potentially multiple columns of data. +pub enum Column { + /// The typed variant of the container. + Typed(C::Container), + /// The binary variant of the container. + Bytes(Bytes), + /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason. + /// + /// Reasons could include misalignment, cloning of data, or wanting + /// to release the `Bytes` as a scarce resource. + Align(Region), +} + +impl Column { + /// Borrows the container as a reference. + #[inline] + fn borrow(&self) -> ::Borrowed<'_> { + match self { + Column::Typed(t) => t.borrow(), + Column::Bytes(b) => <::Borrowed<'_>>::from_bytes( + &mut Indexed::decode(bytemuck::cast_slice(b)), + ), + Column::Align(a) => <::Borrowed<'_>>::from_bytes( + &mut Indexed::decode(a), + ), + } + } +} + +impl Default for Column { + fn default() -> Self { + Self::Typed(Default::default()) + } +} + +impl Clone for Column +where + C::Container: Clone, +{ + fn clone(&self) -> Self { + match self { + // Typed stays typed, although we would have the option to move to aligned data. + // If we did it might be confusing why we couldn't push into a cloned column. + Column::Typed(t) => Column::Typed(t.clone()), + Column::Bytes(b) => { + assert_eq!(b.len() % 8, 0); + let mut alloc: Region = crate::containers::alloc_aligned_zeroed(b.len() / 8); + let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc); + alloc_bytes[..b.len()].copy_from_slice(b); + Self::Align(alloc) + } + Column::Align(a) => { + let mut alloc = crate::containers::alloc_aligned_zeroed(a.len()); + alloc[..a.len()].copy_from_slice(a); + Column::Align(alloc) + } + } + } +} + +impl Container for Column { + type ItemRef<'a> = columnar::Ref<'a, C>; + type Item<'a> = columnar::Ref<'a, C>; + + #[inline] + fn len(&self) -> usize { + self.borrow().len() + } + + // This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into. + #[inline] + fn clear(&mut self) { + match self { + Column::Typed(t) => t.clear(), + Column::Bytes(_) | Column::Align(_) => *self = Column::Typed(Default::default()), + } + } + + type Iter<'a> = IterOwn<::Borrowed<'a>>; + + #[inline] + fn iter(&self) -> Self::Iter<'_> { + self.borrow().into_index_iter() + } + + type DrainIter<'a> = IterOwn<::Borrowed<'a>>; + + #[inline] + fn drain(&mut self) -> Self::DrainIter<'_> { + self.borrow().into_index_iter() + } +} + +impl PushInto for Column +where + C::Container: columnar::Push, +{ + #[inline] + fn push_into(&mut self, item: T) { + use columnar::Push; + match self { + Column::Typed(t) => t.push(item), + Column::Align(_) | Column::Bytes(_) => { + // We really oughtn't be calling this in this case. + // We could convert to owned, but need more constraints on `C`. + unimplemented!("Pushing into Column::Bytes without first clearing"); + } + } + } +} + +impl ContainerBytes for Column { + #[inline] + fn from_bytes(bytes: Bytes) -> Self { + // Our expectation / hope is that `bytes` is `u64` aligned and sized. + // If the alignment is borked, we can relocate. If the size is borked, + // not sure what we do in that case. An incorrect size indicates a problem + // of `into_bytes`, or a failure of the communication layer, both of which + // are unrecoverable. + assert_eq!(bytes.len() % 8, 0); + if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) { + Self::Bytes(bytes) + } else { + // We failed to cast the slice, so we'll reallocate. + let mut alloc: Region = crate::containers::alloc_aligned_zeroed(bytes.len() / 8); + let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc); + alloc_bytes[..bytes.len()].copy_from_slice(&bytes); + Self::Align(alloc) + } + } + + #[inline] + fn length_in_bytes(&self) -> usize { + match self { + Column::Typed(t) => Indexed::length_in_bytes(&t.borrow()), + Column::Bytes(b) => b.len(), + Column::Align(a) => 8 * a.len(), + } + } + + #[inline] + fn into_bytes(&self, writer: &mut W) { + match self { + Column::Typed(t) => Indexed::write(writer, &t.borrow()).unwrap(), + Column::Bytes(b) => writer.write_all(b).unwrap(), + Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(), + } + } +} + +/// An exchange function for columnar tuples of the form `((K, V), T, D)`. Rust has a hard +/// time to figure out the lifetimes of the elements when specified as a closure, so we rather +/// specify it as a function. +#[inline(always)] +pub fn columnar_exchange(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64 +where + K: Columnar, + for<'a> Ref<'a, K>: Hash, + V: Columnar, + D: Columnar, + T: Columnar, +{ + k.hashed() +} + +#[cfg(test)] +mod tests { + use mz_ore::region::Region; + use timely::Container; + use timely::bytes::arc::BytesMut; + use timely::container::PushInto; + use timely::dataflow::channels::ContainerBytes; + + use super::*; + + /// Produce some bytes that are in columnar format. + fn raw_columnar_bytes() -> Vec { + let mut raw = Vec::new(); + raw.extend(16_u64.to_le_bytes()); // offsets + raw.extend(28_u64.to_le_bytes()); // length + raw.extend(1_i32.to_le_bytes()); + raw.extend(2_i32.to_le_bytes()); + raw.extend(3_i32.to_le_bytes()); + raw.extend([0, 0, 0, 0]); // padding + raw + } + + #[mz_ore::test] + fn test_column_clone() { + let columns = Columnar::as_columns([1, 2, 3].iter()); + let column_typed: Column = Column::Typed(columns); + let column_typed2 = column_typed.clone(); + + assert_eq!(column_typed2.iter().collect::>(), vec![&1, &2, &3]); + + let bytes = BytesMut::from(raw_columnar_bytes()).freeze(); + let column_bytes: Column = Column::Bytes(bytes); + let column_bytes2 = column_bytes.clone(); + + assert_eq!(column_bytes2.iter().collect::>(), vec![&1, &2, &3]); + + let raw = raw_columnar_bytes(); + let mut region: Region = crate::containers::alloc_aligned_zeroed(raw.len() / 8); + let region_bytes = bytemuck::cast_slice_mut(&mut region); + region_bytes[..raw.len()].copy_from_slice(&raw); + let column_align: Column = Column::Align(region); + let column_align2 = column_align.clone(); + + assert_eq!(column_align2.iter().collect::>(), vec![&1, &2, &3]); + } + + /// Assert the desired contents of raw_columnar_bytes so that diagnosing test failures is + /// easier. + #[mz_ore::test] + fn test_column_known_bytes() { + let mut column: Column = Default::default(); + column.push_into(1); + column.push_into(2); + column.push_into(3); + let mut data = Vec::new(); + column.into_bytes(&mut std::io::Cursor::new(&mut data)); + assert_eq!(data, raw_columnar_bytes()); + } + + #[mz_ore::test] + fn test_column_from_bytes() { + let raw = raw_columnar_bytes(); + + let buf = vec![0; raw.len() + 8]; + let align = buf.as_ptr().align_offset(std::mem::size_of::()); + let mut bytes_mut = BytesMut::from(buf); + let _ = bytes_mut.extract_to(align); + bytes_mut[..raw.len()].copy_from_slice(&raw); + let aligned_bytes = bytes_mut.extract_to(raw.len()); + + let column: Column = Column::from_bytes(aligned_bytes); + assert!(matches!(column, Column::Bytes(_))); + assert_eq!(column.iter().collect::>(), vec![&1, &2, &3]); + + let buf = vec![0; raw.len() + 8]; + let align = buf.as_ptr().align_offset(std::mem::size_of::()); + let mut bytes_mut = BytesMut::from(buf); + let _ = bytes_mut.extract_to(align + 1); + bytes_mut[..raw.len()].copy_from_slice(&raw); + let unaligned_bytes = bytes_mut.extract_to(raw.len()); + + let column: Column = Column::from_bytes(unaligned_bytes); + assert!(matches!(column, Column::Align(_))); + assert_eq!(column.iter().collect::>(), vec![&1, &2, &3]); + } +} diff --git a/src/timely-util/src/columnar/batcher.rs b/src/timely-util/src/columnar/batcher.rs new file mode 100644 index 0000000000000..3f59264230a8b --- /dev/null +++ b/src/timely-util/src/columnar/batcher.rs @@ -0,0 +1,113 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file at the +// root of this repository, or online at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Types for consolidating, merging, and extracting columnar update collections. + +use std::collections::VecDeque; + +use columnar::Columnar; +use differential_dataflow::difference::Semigroup; +use timely::Container; +use timely::container::{ContainerBuilder, PushInto}; + +use crate::columnar::Column; + +/// A chunker to transform input data into sorted columns. +#[derive(Default)] +pub struct Chunker { + /// Buffer into which we'll consolidate. + /// + /// Also the buffer where we'll stage responses to `extract` and `finish`. + /// When these calls return, the buffer is available for reuse. + target: C, + /// Consolidated buffers ready to go. + ready: VecDeque, +} + +impl ContainerBuilder for Chunker { + type Container = C; + + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(ready) = self.ready.pop_front() { + self.target = ready; + Some(&mut self.target) + } else { + None + } + } + + fn finish(&mut self) -> Option<&mut Self::Container> { + self.extract() + } +} + +impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker +where + D: Columnar, + for<'b> columnar::Ref<'b, D>: Ord + Copy, + T: Columnar, + for<'b> columnar::Ref<'b, T>: Ord + Copy, + R: Columnar + Semigroup + for<'b> Semigroup>, + for<'b> columnar::Ref<'b, R>: Ord, + C2: Container + for<'b> PushInto<&'b (D, T, R)>, +{ + fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) { + // Sort input data + // TODO: consider `Vec` that we retain, containing indexes. + let mut permutation = Vec::with_capacity(container.len()); + permutation.extend(container.drain()); + permutation.sort(); + + self.target.clear(); + // Iterate over the data, accumulating diffs for like keys. + let mut iter = permutation.drain(..); + if let Some((data, time, diff)) = iter.next() { + let mut owned_data = D::into_owned(data); + let mut owned_time = T::into_owned(time); + + let mut prev_data = data; + let mut prev_time = time; + let mut prev_diff = ::into_owned(diff); + + for (data, time, diff) in iter { + if (&prev_data, &prev_time) == (&data, &time) { + prev_diff.plus_equals(&diff); + } else { + if !prev_diff.is_zero() { + D::copy_from(&mut owned_data, prev_data); + T::copy_from(&mut owned_time, prev_time); + let tuple = (owned_data, owned_time, prev_diff); + self.target.push_into(&tuple); + (owned_data, owned_time, prev_diff) = tuple; + } + prev_data = data; + prev_time = time; + R::copy_from(&mut prev_diff, diff); + } + } + + if !prev_diff.is_zero() { + D::copy_from(&mut owned_data, prev_data); + T::copy_from(&mut owned_time, prev_time); + let tuple = (owned_data, owned_time, prev_diff); + self.target.push_into(&tuple); + } + } + + if !self.target.is_empty() { + self.ready.push_back(std::mem::take(&mut self.target)); + } + } +} diff --git a/src/timely-util/src/columnar/builder.rs b/src/timely-util/src/columnar/builder.rs new file mode 100644 index 0000000000000..a0558cfda27c6 --- /dev/null +++ b/src/timely-util/src/columnar/builder.rs @@ -0,0 +1,112 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file at the +// root of this repository, or online at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! A container builder for columns. + +use std::collections::VecDeque; + +use columnar::bytes::{EncodeDecode, Indexed}; +use columnar::{Clear, Columnar, Len, Push}; +use timely::container::PushInto; +use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; + +use crate::columnar::Column; + +/// A container builder for `Column`. +pub struct ColumnBuilder { + /// Container that we're writing to. + current: C::Container, + /// Finished container that we presented to callers of extract/finish. + /// + /// We don't recycle the column because for extract, it's not typed, and after calls + /// to finish it'll be `None`. + finished: Option>, + /// Completed containers pending to be sent. + pending: VecDeque>, +} + +impl PushInto for ColumnBuilder +where + C::Container: Push, +{ + #[inline] + fn push_into(&mut self, item: T) { + self.current.push(item); + // If there is less than 10% slop with 2MB backing allocations, mint a container. + use columnar::Container; + let words = Indexed::length_in_words(&self.current.borrow()); + let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1); + if round - words < round / 10 { + /// Move the contents from `current` to an aligned allocation, and push it to `pending`. + /// The contents must fit in `round` words (u64). + #[cold] + fn outlined_align( + current: &mut C::Container, + round: usize, + pending: &mut VecDeque>, + ) where + C: Columnar, + { + let mut alloc = crate::containers::alloc_aligned_zeroed(round); + let writer = std::io::Cursor::new(bytemuck::cast_slice_mut(&mut alloc[..])); + Indexed::write(writer, ¤t.borrow()).unwrap(); + pending.push_back(Column::Align(alloc)); + current.clear(); + } + + outlined_align(&mut self.current, round, &mut self.pending); + } + } +} + +impl Default for ColumnBuilder { + #[inline] + fn default() -> Self { + ColumnBuilder { + current: Default::default(), + finished: None, + pending: Default::default(), + } + } +} + +impl ContainerBuilder for ColumnBuilder +where + C::Container: Clone, +{ + type Container = Column; + + #[inline] + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(container) = self.pending.pop_front() { + self.finished = Some(container); + self.finished.as_mut() + } else { + None + } + } + + #[inline] + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.current.is_empty() { + self.pending + .push_back(Column::Typed(std::mem::take(&mut self.current))); + } + self.finished = self.pending.pop_front(); + self.finished.as_mut() + } +} + +impl LengthPreservingContainerBuilder for ColumnBuilder where C::Container: Clone {} diff --git a/src/timely-util/src/containers.rs b/src/timely-util/src/containers.rs index 7bcf8b04a96fd..06ba731fc117d 100644 --- a/src/timely-util/src/containers.rs +++ b/src/timely-util/src/containers.rs @@ -15,19 +15,10 @@ //! Reusable containers. -use std::hash::Hash; - -use columnar::{Columnar, Ref}; -use differential_dataflow::Hashable; -use differential_dataflow::containers::TimelyStack; -use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher}; - pub mod stack; pub(crate) use alloc::alloc_aligned_zeroed; pub use alloc::{enable_columnar_lgalloc, set_enable_columnar_lgalloc}; -pub use builder::ColumnBuilder; -pub use container::Column; pub use provided_builder::ProvidedBuilder; mod alloc { @@ -60,397 +51,6 @@ mod alloc { } } -mod container { - use columnar::Columnar; - use columnar::Container as _; - use columnar::bytes::{EncodeDecode, Indexed}; - use columnar::common::IterOwn; - use columnar::{Clear, FromBytes, Index, Len}; - use mz_ore::region::Region; - use timely::Container; - use timely::bytes::arc::Bytes; - use timely::container::PushInto; - use timely::dataflow::channels::ContainerBytes; - - /// A container based on a columnar store, encoded in aligned bytes. - /// - /// The type can represent typed data, bytes from Timely, or an aligned allocation. The name - /// is singular to express that the preferred format is [`Column::Align`]. The [`Column::Typed`] - /// variant is used to construct the container, and it owns potentially multiple columns of data. - pub enum Column { - /// The typed variant of the container. - Typed(C::Container), - /// The binary variant of the container. - Bytes(Bytes), - /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason. - /// - /// Reasons could include misalignment, cloning of data, or wanting - /// to release the `Bytes` as a scarce resource. - Align(Region), - } - - impl Column { - /// Borrows the container as a reference. - #[inline] - fn borrow(&self) -> ::Borrowed<'_> { - match self { - Column::Typed(t) => t.borrow(), - Column::Bytes(b) => { - <::Borrowed<'_>>::from_bytes( - &mut Indexed::decode(bytemuck::cast_slice(b)), - ) - } - Column::Align(a) => { - <::Borrowed<'_>>::from_bytes( - &mut Indexed::decode(a), - ) - } - } - } - } - - impl Default for Column { - fn default() -> Self { - Self::Typed(Default::default()) - } - } - - impl Clone for Column - where - C::Container: Clone, - { - fn clone(&self) -> Self { - match self { - // Typed stays typed, although we would have the option to move to aligned data. - // If we did it might be confusing why we couldn't push into a cloned column. - Column::Typed(t) => Column::Typed(t.clone()), - Column::Bytes(b) => { - assert_eq!(b.len() % 8, 0); - let mut alloc: Region = super::alloc_aligned_zeroed(b.len() / 8); - let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc); - alloc_bytes[..b.len()].copy_from_slice(b); - Self::Align(alloc) - } - Column::Align(a) => { - let mut alloc = super::alloc_aligned_zeroed(a.len()); - alloc[..a.len()].copy_from_slice(a); - Column::Align(alloc) - } - } - } - } - - impl Container for Column { - type ItemRef<'a> = columnar::Ref<'a, C>; - type Item<'a> = columnar::Ref<'a, C>; - - #[inline] - fn len(&self) -> usize { - self.borrow().len() - } - - // This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into. - #[inline] - fn clear(&mut self) { - match self { - Column::Typed(t) => t.clear(), - Column::Bytes(_) | Column::Align(_) => *self = Column::Typed(Default::default()), - } - } - - type Iter<'a> = IterOwn<::Borrowed<'a>>; - - #[inline] - fn iter(&self) -> Self::Iter<'_> { - self.borrow().into_index_iter() - } - - type DrainIter<'a> = IterOwn<::Borrowed<'a>>; - - #[inline] - fn drain(&mut self) -> Self::DrainIter<'_> { - self.borrow().into_index_iter() - } - } - - impl PushInto for Column - where - C::Container: columnar::Push, - { - #[inline] - fn push_into(&mut self, item: T) { - use columnar::Push; - match self { - Column::Typed(t) => t.push(item), - Column::Align(_) | Column::Bytes(_) => { - // We really oughtn't be calling this in this case. - // We could convert to owned, but need more constraints on `C`. - unimplemented!("Pushing into Column::Bytes without first clearing"); - } - } - } - } - - impl ContainerBytes for Column { - #[inline] - fn from_bytes(bytes: Bytes) -> Self { - // Our expectation / hope is that `bytes` is `u64` aligned and sized. - // If the alignment is borked, we can relocate. If the size is borked, - // not sure what we do in that case. An incorrect size indicates a problem - // of `into_bytes`, or a failure of the communication layer, both of which - // are unrecoverable. - assert_eq!(bytes.len() % 8, 0); - if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) { - Self::Bytes(bytes) - } else { - // We failed to cast the slice, so we'll reallocate. - let mut alloc: Region = super::alloc_aligned_zeroed(bytes.len() / 8); - let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc); - alloc_bytes[..bytes.len()].copy_from_slice(&bytes); - Self::Align(alloc) - } - } - - #[inline] - fn length_in_bytes(&self) -> usize { - match self { - Column::Typed(t) => Indexed::length_in_bytes(&t.borrow()), - Column::Bytes(b) => b.len(), - Column::Align(a) => 8 * a.len(), - } - } - - #[inline] - fn into_bytes(&self, writer: &mut W) { - match self { - Column::Typed(t) => Indexed::write(writer, &t.borrow()).unwrap(), - Column::Bytes(b) => writer.write_all(b).unwrap(), - Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(), - } - } - } -} - -mod builder { - use std::collections::VecDeque; - - use columnar::bytes::{EncodeDecode, Indexed}; - use columnar::{Clear, Columnar, Len, Push}; - use timely::container::PushInto; - use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; - - use crate::containers::Column; - - /// A container builder for `Column`. - pub struct ColumnBuilder { - /// Container that we're writing to. - current: C::Container, - /// Finished container that we presented to callers of extract/finish. - /// - /// We don't recycle the column because for extract, it's not typed, and after calls - /// to finish it'll be `None`. - finished: Option>, - /// Completed containers pending to be sent. - pending: VecDeque>, - } - - impl PushInto for ColumnBuilder - where - C::Container: Push, - { - #[inline] - fn push_into(&mut self, item: T) { - self.current.push(item); - // If there is less than 10% slop with 2MB backing allocations, mint a container. - use columnar::Container; - let words = Indexed::length_in_words(&self.current.borrow()); - let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1); - if round - words < round / 10 { - /// Move the contents from `current` to an aligned allocation, and push it to `pending`. - /// The contents must fit in `round` words (u64). - #[cold] - fn outlined_align( - current: &mut C::Container, - round: usize, - pending: &mut VecDeque>, - ) where - C: Columnar, - { - let mut alloc = super::alloc_aligned_zeroed(round); - let writer = std::io::Cursor::new(bytemuck::cast_slice_mut(&mut alloc[..])); - Indexed::write(writer, ¤t.borrow()).unwrap(); - pending.push_back(Column::Align(alloc)); - current.clear(); - } - - outlined_align(&mut self.current, round, &mut self.pending); - } - } - } - - impl Default for ColumnBuilder { - #[inline] - fn default() -> Self { - ColumnBuilder { - current: Default::default(), - finished: None, - pending: Default::default(), - } - } - } - - impl ContainerBuilder for ColumnBuilder - where - C::Container: Clone, - { - type Container = Column; - - #[inline] - fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(container) = self.pending.pop_front() { - self.finished = Some(container); - self.finished.as_mut() - } else { - None - } - } - - #[inline] - fn finish(&mut self) -> Option<&mut Self::Container> { - if !self.current.is_empty() { - self.pending - .push_back(Column::Typed(std::mem::take(&mut self.current))); - } - self.finished = self.pending.pop_front(); - self.finished.as_mut() - } - } - - impl LengthPreservingContainerBuilder for ColumnBuilder where C::Container: Clone {} -} - -/// A batcher for columnar storage. -pub type Col2ValBatcher = MergeBatcher< - Column<((K, V), T, R)>, - batcher::Chunker>, - ColMerger<(K, V), T, R>, ->; -pub type Col2KeyBatcher = Col2ValBatcher; - -/// An exchange function for columnar tuples of the form `((K, V), T, D)`. Rust has a hard -/// time to figure out the lifetimes of the elements when specified as a closure, so we rather -/// specify it as a function. -#[inline(always)] -pub fn columnar_exchange(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64 -where - K: Columnar, - for<'a> Ref<'a, K>: Hash, - V: Columnar, - D: Columnar, - T: Columnar, -{ - k.hashed() -} - -/// Types for consolidating, merging, and extracting columnar update collections. -pub mod batcher { - use std::collections::VecDeque; - - use columnar::Columnar; - use differential_dataflow::difference::Semigroup; - use timely::Container; - use timely::container::{ContainerBuilder, PushInto}; - - use crate::containers::Column; - - #[derive(Default)] - pub struct Chunker { - /// Buffer into which we'll consolidate. - /// - /// Also the buffer where we'll stage responses to `extract` and `finish`. - /// When these calls return, the buffer is available for reuse. - target: C, - /// Consolidated buffers ready to go. - ready: VecDeque, - } - - impl ContainerBuilder for Chunker { - type Container = C; - - fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(ready) = self.ready.pop_front() { - self.target = ready; - Some(&mut self.target) - } else { - None - } - } - - fn finish(&mut self) -> Option<&mut Self::Container> { - self.extract() - } - } - - impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker - where - D: Columnar, - for<'b> columnar::Ref<'b, D>: Ord + Copy, - T: Columnar, - for<'b> columnar::Ref<'b, T>: Ord + Copy, - R: Columnar + Semigroup + for<'b> Semigroup>, - for<'b> columnar::Ref<'b, R>: Ord, - C2: Container + for<'b> PushInto<&'b (D, T, R)>, - { - fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) { - // Sort input data - // TODO: consider `Vec` that we retain, containing indexes. - let mut permutation = Vec::with_capacity(container.len()); - permutation.extend(container.drain()); - permutation.sort(); - - self.target.clear(); - // Iterate over the data, accumulating diffs for like keys. - let mut iter = permutation.drain(..); - if let Some((data, time, diff)) = iter.next() { - let mut owned_data = D::into_owned(data); - let mut owned_time = T::into_owned(time); - - let mut prev_data = data; - let mut prev_time = time; - let mut prev_diff = ::into_owned(diff); - - for (data, time, diff) in iter { - if (&prev_data, &prev_time) == (&data, &time) { - prev_diff.plus_equals(&diff); - } else { - if !prev_diff.is_zero() { - D::copy_from(&mut owned_data, prev_data); - T::copy_from(&mut owned_time, prev_time); - let tuple = (owned_data, owned_time, prev_diff); - self.target.push_into(&tuple); - (owned_data, owned_time, prev_diff) = tuple; - } - prev_data = data; - prev_time = time; - R::copy_from(&mut prev_diff, diff); - } - } - - if !prev_diff.is_zero() { - D::copy_from(&mut owned_data, prev_data); - T::copy_from(&mut owned_time, prev_time); - let tuple = (owned_data, owned_time, prev_diff); - self.target.push_into(&tuple); - } - } - - if !self.target.is_empty() { - self.ready.push_back(std::mem::take(&mut self.target)); - } - } - } -} - mod provided_builder { use timely::Container; use timely::container::ContainerBuilder; @@ -485,90 +85,3 @@ mod provided_builder { } } } - -#[cfg(test)] -mod tests { - use mz_ore::region::Region; - use timely::Container; - use timely::bytes::arc::BytesMut; - use timely::container::PushInto; - use timely::dataflow::channels::ContainerBytes; - - use super::*; - - /// Produce some bytes that are in columnar format. - fn raw_columnar_bytes() -> Vec { - let mut raw = Vec::new(); - raw.extend(16_u64.to_le_bytes()); // offsets - raw.extend(28_u64.to_le_bytes()); // length - raw.extend(1_i32.to_le_bytes()); - raw.extend(2_i32.to_le_bytes()); - raw.extend(3_i32.to_le_bytes()); - raw.extend([0, 0, 0, 0]); // padding - raw - } - - #[mz_ore::test] - fn test_column_clone() { - let columns = Columnar::as_columns([1, 2, 3].iter()); - let column_typed: Column = Column::Typed(columns); - let column_typed2 = column_typed.clone(); - - assert_eq!(column_typed2.iter().collect::>(), vec![&1, &2, &3]); - - let bytes = BytesMut::from(raw_columnar_bytes()).freeze(); - let column_bytes: Column = Column::Bytes(bytes); - let column_bytes2 = column_bytes.clone(); - - assert_eq!(column_bytes2.iter().collect::>(), vec![&1, &2, &3]); - - let raw = raw_columnar_bytes(); - let mut region: Region = alloc_aligned_zeroed(raw.len() / 8); - let region_bytes = bytemuck::cast_slice_mut(&mut region); - region_bytes[..raw.len()].copy_from_slice(&raw); - let column_align: Column = Column::Align(region); - let column_align2 = column_align.clone(); - - assert_eq!(column_align2.iter().collect::>(), vec![&1, &2, &3]); - } - - /// Assert the desired contents of raw_columnar_bytes so that diagnosing test failures is - /// easier. - #[mz_ore::test] - fn test_column_known_bytes() { - let mut column: Column = Default::default(); - column.push_into(1); - column.push_into(2); - column.push_into(3); - let mut data = Vec::new(); - column.into_bytes(&mut std::io::Cursor::new(&mut data)); - assert_eq!(data, raw_columnar_bytes()); - } - - #[mz_ore::test] - fn test_column_from_bytes() { - let raw = raw_columnar_bytes(); - - let buf = vec![0; raw.len() + 8]; - let align = buf.as_ptr().align_offset(std::mem::size_of::()); - let mut bytes_mut = BytesMut::from(buf); - let _ = bytes_mut.extract_to(align); - bytes_mut[..raw.len()].copy_from_slice(&raw); - let aligned_bytes = bytes_mut.extract_to(raw.len()); - - let column: Column = Column::from_bytes(aligned_bytes); - assert!(matches!(column, Column::Bytes(_))); - assert_eq!(column.iter().collect::>(), vec![&1, &2, &3]); - - let buf = vec![0; raw.len() + 8]; - let align = buf.as_ptr().align_offset(std::mem::size_of::()); - let mut bytes_mut = BytesMut::from(buf); - let _ = bytes_mut.extract_to(align + 1); - bytes_mut[..raw.len()].copy_from_slice(&raw); - let unaligned_bytes = bytes_mut.extract_to(raw.len()); - - let column: Column = Column::from_bytes(unaligned_bytes); - assert!(matches!(column, Column::Align(_))); - assert_eq!(column.iter().collect::>(), vec![&1, &2, &3]); - } -} diff --git a/src/timely-util/src/lib.rs b/src/timely-util/src/lib.rs index a141c750bd4d8..900075f57e565 100644 --- a/src/timely-util/src/lib.rs +++ b/src/timely-util/src/lib.rs @@ -19,6 +19,7 @@ pub mod activator; pub mod antichain; pub mod builder_async; pub mod capture; +pub mod columnar; pub mod containers; pub mod operator; pub mod order; diff --git a/test/sqllogictest/introspection/relations.slt b/test/sqllogictest/introspection/relations.slt index 0d50e081bbccd..bc150e7e80f5d 100644 --- a/test/sqllogictest/introspection/relations.slt +++ b/test/sqllogictest/introspection/relations.slt @@ -58,7 +58,7 @@ Feedback persist_source_backpressure(backpressure(u1)) alloc::vec::Vec)> FlatMap Exchange alloc::vec::Vec<(u64,␠mz_txn_wal::txn_read::DataRemapEntry)> FlatMap txns_progress_frontiers(u1) alloc::vec::Vec> -FormArrangementKey ArrangeBy[[Column(0,␠"a"),␠Column(1,␠"b")]] mz_timely_util::containers::container::Column<((mz_repr::row::Row,␠mz_repr::row::Row),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> +FormArrangementKey ArrangeBy[[Column(0,␠"a"),␠Column(1,␠"b")]] mz_timely_util::columnar::Column<((mz_repr::row::Row,␠mz_repr::row::Row),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> FormArrangementKey Concatenate alloc::vec::Vec<(mz_storage_types::errors::DataflowError,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> InputRegion:␠materialize.public.test_primary_idx BuildRegion:␠materialize.public.test_primary_idx alloc::vec::Vec<(mz_storage_types::errors::DataflowError,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> InputRegion:␠materialize.public.test_primary_idx Temporal␠delay alloc::vec::Vec<(mz_repr::row::Row,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> @@ -91,7 +91,7 @@ GROUP BY type; ---- 1 alloc::vec::Vec<((mz_storage_types::errors::DataflowError,␠()),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> 1 alloc::vec::Vec> -1 mz_timely_util::containers::container::Column<((mz_repr::row::Row,␠mz_repr::row::Row),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> +1 mz_timely_util::columnar::Column<((mz_repr::row::Row,␠mz_repr::row::Row),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> 10 alloc::vec::Vec<(mz_repr::row::Row,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> 2 alloc::vec::Vec<(u64,␠mz_txn_wal::txn_read::DataRemapEntry)> 2 alloc::vec::Vec<(usize,␠mz_persist_client::fetch::ExchangeableBatchPart)>