|
| 1 | +// Copyright Materialize, Inc. and contributors. All rights reserved. |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License in the LICENSE file at the |
| 6 | +// root of this repository, or online at |
| 7 | +// |
| 8 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +// |
| 10 | +// Unless required by applicable law or agreed to in writing, software |
| 11 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +// See the License for the specific language governing permissions and |
| 14 | +// limitations under the License. |
| 15 | + |
| 16 | +//! Container for columnar data. |
| 17 | +
|
| 18 | +#![deny(missing_docs)] |
| 19 | + |
| 20 | +pub mod batcher; |
| 21 | +pub mod builder; |
| 22 | + |
| 23 | +use std::hash::Hash; |
| 24 | + |
| 25 | +use columnar::Container as _; |
| 26 | +use columnar::bytes::{EncodeDecode, Indexed}; |
| 27 | +use columnar::common::IterOwn; |
| 28 | +use columnar::{Clear, FromBytes, Index, Len}; |
| 29 | +use columnar::{Columnar, Ref}; |
| 30 | +use differential_dataflow::Hashable; |
| 31 | +use differential_dataflow::containers::TimelyStack; |
| 32 | +use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher}; |
| 33 | +use mz_ore::region::Region; |
| 34 | +use timely::Container; |
| 35 | +use timely::bytes::arc::Bytes; |
| 36 | +use timely::container::PushInto; |
| 37 | +use timely::dataflow::channels::ContainerBytes; |
| 38 | + |
| 39 | +/// A batcher for columnar storage. |
| 40 | +pub type Col2ValBatcher<K, V, T, R> = MergeBatcher< |
| 41 | + Column<((K, V), T, R)>, |
| 42 | + batcher::Chunker<TimelyStack<((K, V), T, R)>>, |
| 43 | + ColMerger<(K, V), T, R>, |
| 44 | +>; |
| 45 | +/// A batcher for columnar storage with unit values. |
| 46 | +pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>; |
| 47 | + |
| 48 | +/// A container based on a columnar store, encoded in aligned bytes. |
| 49 | +/// |
| 50 | +/// The type can represent typed data, bytes from Timely, or an aligned allocation. The name |
| 51 | +/// is singular to express that the preferred format is [`Column::Align`]. The [`Column::Typed`] |
| 52 | +/// variant is used to construct the container, and it owns potentially multiple columns of data. |
| 53 | +pub enum Column<C: Columnar> { |
| 54 | + /// The typed variant of the container. |
| 55 | + Typed(C::Container), |
| 56 | + /// The binary variant of the container. |
| 57 | + Bytes(Bytes), |
| 58 | + /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason. |
| 59 | + /// |
| 60 | + /// Reasons could include misalignment, cloning of data, or wanting |
| 61 | + /// to release the `Bytes` as a scarce resource. |
| 62 | + Align(Region<u64>), |
| 63 | +} |
| 64 | + |
| 65 | +impl<C: Columnar> Column<C> { |
| 66 | + /// Borrows the container as a reference. |
| 67 | + #[inline] |
| 68 | + fn borrow(&self) -> <C::Container as columnar::Container>::Borrowed<'_> { |
| 69 | + match self { |
| 70 | + Column::Typed(t) => t.borrow(), |
| 71 | + Column::Bytes(b) => <<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes( |
| 72 | + &mut Indexed::decode(bytemuck::cast_slice(b)), |
| 73 | + ), |
| 74 | + Column::Align(a) => <<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes( |
| 75 | + &mut Indexed::decode(a), |
| 76 | + ), |
| 77 | + } |
| 78 | + } |
| 79 | +} |
| 80 | + |
| 81 | +impl<C: Columnar> Default for Column<C> { |
| 82 | + fn default() -> Self { |
| 83 | + Self::Typed(Default::default()) |
| 84 | + } |
| 85 | +} |
| 86 | + |
| 87 | +impl<C: Columnar> Clone for Column<C> |
| 88 | +where |
| 89 | + C::Container: Clone, |
| 90 | +{ |
| 91 | + fn clone(&self) -> Self { |
| 92 | + match self { |
| 93 | + // Typed stays typed, although we would have the option to move to aligned data. |
| 94 | + // If we did it might be confusing why we couldn't push into a cloned column. |
| 95 | + Column::Typed(t) => Column::Typed(t.clone()), |
| 96 | + Column::Bytes(b) => { |
| 97 | + assert_eq!(b.len() % 8, 0); |
| 98 | + let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(b.len() / 8); |
| 99 | + let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc); |
| 100 | + alloc_bytes[..b.len()].copy_from_slice(b); |
| 101 | + Self::Align(alloc) |
| 102 | + } |
| 103 | + Column::Align(a) => { |
| 104 | + let mut alloc = crate::containers::alloc_aligned_zeroed(a.len()); |
| 105 | + alloc[..a.len()].copy_from_slice(a); |
| 106 | + Column::Align(alloc) |
| 107 | + } |
| 108 | + } |
| 109 | + } |
| 110 | +} |
| 111 | + |
| 112 | +impl<C: Columnar> Container for Column<C> { |
| 113 | + type ItemRef<'a> = columnar::Ref<'a, C>; |
| 114 | + type Item<'a> = columnar::Ref<'a, C>; |
| 115 | + |
| 116 | + #[inline] |
| 117 | + fn len(&self) -> usize { |
| 118 | + self.borrow().len() |
| 119 | + } |
| 120 | + |
| 121 | + // This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into. |
| 122 | + #[inline] |
| 123 | + fn clear(&mut self) { |
| 124 | + match self { |
| 125 | + Column::Typed(t) => t.clear(), |
| 126 | + Column::Bytes(_) | Column::Align(_) => *self = Column::Typed(Default::default()), |
| 127 | + } |
| 128 | + } |
| 129 | + |
| 130 | + type Iter<'a> = IterOwn<<C::Container as columnar::Container>::Borrowed<'a>>; |
| 131 | + |
| 132 | + #[inline] |
| 133 | + fn iter(&self) -> Self::Iter<'_> { |
| 134 | + self.borrow().into_index_iter() |
| 135 | + } |
| 136 | + |
| 137 | + type DrainIter<'a> = IterOwn<<C::Container as columnar::Container>::Borrowed<'a>>; |
| 138 | + |
| 139 | + #[inline] |
| 140 | + fn drain(&mut self) -> Self::DrainIter<'_> { |
| 141 | + self.borrow().into_index_iter() |
| 142 | + } |
| 143 | +} |
| 144 | + |
| 145 | +impl<C: Columnar, T> PushInto<T> for Column<C> |
| 146 | +where |
| 147 | + C::Container: columnar::Push<T>, |
| 148 | +{ |
| 149 | + #[inline] |
| 150 | + fn push_into(&mut self, item: T) { |
| 151 | + use columnar::Push; |
| 152 | + match self { |
| 153 | + Column::Typed(t) => t.push(item), |
| 154 | + Column::Align(_) | Column::Bytes(_) => { |
| 155 | + // We really oughtn't be calling this in this case. |
| 156 | + // We could convert to owned, but need more constraints on `C`. |
| 157 | + unimplemented!("Pushing into Column::Bytes without first clearing"); |
| 158 | + } |
| 159 | + } |
| 160 | + } |
| 161 | +} |
| 162 | + |
| 163 | +impl<C: Columnar> ContainerBytes for Column<C> { |
| 164 | + #[inline] |
| 165 | + fn from_bytes(bytes: Bytes) -> Self { |
| 166 | + // Our expectation / hope is that `bytes` is `u64` aligned and sized. |
| 167 | + // If the alignment is borked, we can relocate. If the size is borked, |
| 168 | + // not sure what we do in that case. An incorrect size indicates a problem |
| 169 | + // of `into_bytes`, or a failure of the communication layer, both of which |
| 170 | + // are unrecoverable. |
| 171 | + assert_eq!(bytes.len() % 8, 0); |
| 172 | + if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) { |
| 173 | + Self::Bytes(bytes) |
| 174 | + } else { |
| 175 | + // We failed to cast the slice, so we'll reallocate. |
| 176 | + let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(bytes.len() / 8); |
| 177 | + let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc); |
| 178 | + alloc_bytes[..bytes.len()].copy_from_slice(&bytes); |
| 179 | + Self::Align(alloc) |
| 180 | + } |
| 181 | + } |
| 182 | + |
| 183 | + #[inline] |
| 184 | + fn length_in_bytes(&self) -> usize { |
| 185 | + match self { |
| 186 | + Column::Typed(t) => Indexed::length_in_bytes(&t.borrow()), |
| 187 | + Column::Bytes(b) => b.len(), |
| 188 | + Column::Align(a) => 8 * a.len(), |
| 189 | + } |
| 190 | + } |
| 191 | + |
| 192 | + #[inline] |
| 193 | + fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) { |
| 194 | + match self { |
| 195 | + Column::Typed(t) => Indexed::write(writer, &t.borrow()).unwrap(), |
| 196 | + Column::Bytes(b) => writer.write_all(b).unwrap(), |
| 197 | + Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(), |
| 198 | + } |
| 199 | + } |
| 200 | +} |
| 201 | + |
| 202 | +/// An exchange function for columnar tuples of the form `((K, V), T, D)`. Rust has a hard |
| 203 | +/// time to figure out the lifetimes of the elements when specified as a closure, so we rather |
| 204 | +/// specify it as a function. |
| 205 | +#[inline(always)] |
| 206 | +pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64 |
| 207 | +where |
| 208 | + K: Columnar, |
| 209 | + for<'a> Ref<'a, K>: Hash, |
| 210 | + V: Columnar, |
| 211 | + D: Columnar, |
| 212 | + T: Columnar, |
| 213 | +{ |
| 214 | + k.hashed() |
| 215 | +} |
| 216 | + |
| 217 | +#[cfg(test)] |
| 218 | +mod tests { |
| 219 | + use mz_ore::region::Region; |
| 220 | + use timely::Container; |
| 221 | + use timely::bytes::arc::BytesMut; |
| 222 | + use timely::container::PushInto; |
| 223 | + use timely::dataflow::channels::ContainerBytes; |
| 224 | + |
| 225 | + use super::*; |
| 226 | + |
| 227 | + /// Produce some bytes that are in columnar format. |
| 228 | + fn raw_columnar_bytes() -> Vec<u8> { |
| 229 | + let mut raw = Vec::new(); |
| 230 | + raw.extend(16_u64.to_le_bytes()); // offsets |
| 231 | + raw.extend(28_u64.to_le_bytes()); // length |
| 232 | + raw.extend(1_i32.to_le_bytes()); |
| 233 | + raw.extend(2_i32.to_le_bytes()); |
| 234 | + raw.extend(3_i32.to_le_bytes()); |
| 235 | + raw.extend([0, 0, 0, 0]); // padding |
| 236 | + raw |
| 237 | + } |
| 238 | + |
| 239 | + #[mz_ore::test] |
| 240 | + fn test_column_clone() { |
| 241 | + let columns = Columnar::as_columns([1, 2, 3].iter()); |
| 242 | + let column_typed: Column<i32> = Column::Typed(columns); |
| 243 | + let column_typed2 = column_typed.clone(); |
| 244 | + |
| 245 | + assert_eq!(column_typed2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]); |
| 246 | + |
| 247 | + let bytes = BytesMut::from(raw_columnar_bytes()).freeze(); |
| 248 | + let column_bytes: Column<i32> = Column::Bytes(bytes); |
| 249 | + let column_bytes2 = column_bytes.clone(); |
| 250 | + |
| 251 | + assert_eq!(column_bytes2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]); |
| 252 | + |
| 253 | + let raw = raw_columnar_bytes(); |
| 254 | + let mut region: Region<u64> = crate::containers::alloc_aligned_zeroed(raw.len() / 8); |
| 255 | + let region_bytes = bytemuck::cast_slice_mut(&mut region); |
| 256 | + region_bytes[..raw.len()].copy_from_slice(&raw); |
| 257 | + let column_align: Column<i32> = Column::Align(region); |
| 258 | + let column_align2 = column_align.clone(); |
| 259 | + |
| 260 | + assert_eq!(column_align2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]); |
| 261 | + } |
| 262 | + |
| 263 | + #[mz_ore::test] |
| 264 | + fn test_column_from_bytes() { |
| 265 | + { |
| 266 | + let mut column: Column<i32> = Default::default(); |
| 267 | + column.push_into(1); |
| 268 | + column.push_into(2); |
| 269 | + column.push_into(3); |
| 270 | + let mut data = Vec::new(); |
| 271 | + column.into_bytes(&mut std::io::Cursor::new(&mut data)); |
| 272 | + println!("data: {:?}", data); |
| 273 | + } |
| 274 | + |
| 275 | + let raw = raw_columnar_bytes(); |
| 276 | + |
| 277 | + let buf = vec![0; raw.len() + 8]; |
| 278 | + let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>()); |
| 279 | + let mut bytes_mut = BytesMut::from(buf); |
| 280 | + let _ = bytes_mut.extract_to(align); |
| 281 | + bytes_mut[..raw.len()].copy_from_slice(&raw); |
| 282 | + let aligned_bytes = bytes_mut.extract_to(raw.len()); |
| 283 | + |
| 284 | + let column: Column<i32> = Column::from_bytes(aligned_bytes); |
| 285 | + assert!(matches!(column, Column::Bytes(_))); |
| 286 | + assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]); |
| 287 | + |
| 288 | + let buf = vec![0; raw.len() + 8]; |
| 289 | + let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>()); |
| 290 | + let mut bytes_mut = BytesMut::from(buf); |
| 291 | + let _ = bytes_mut.extract_to(align + 1); |
| 292 | + bytes_mut[..raw.len()].copy_from_slice(&raw); |
| 293 | + let unaligned_bytes = bytes_mut.extract_to(raw.len()); |
| 294 | + |
| 295 | + let column: Column<i32> = Column::from_bytes(unaligned_bytes); |
| 296 | + assert!(matches!(column, Column::Align(_))); |
| 297 | + assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]); |
| 298 | + } |
| 299 | +} |
0 commit comments