Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions src/timely-util/src/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ mod alloc {
mod container {
use columnar::Columnar;
use columnar::Container as _;
use columnar::bytes::{EncodeDecode, Sequence};
use columnar::bytes::{EncodeDecode, Indexed};
use columnar::common::IterOwn;
use columnar::{Clear, FromBytes, Index, Len};
use mz_ore::region::Region;
Expand Down Expand Up @@ -97,12 +97,12 @@ mod container {
Column::Typed(t) => t.borrow(),
Column::Bytes(b) => {
<<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes(
&mut Sequence::decode(bytemuck::cast_slice(b)),
&mut Indexed::decode(bytemuck::cast_slice(b)),
)
}
Column::Align(a) => {
<<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes(
&mut Sequence::decode(a),
&mut Indexed::decode(a),
)
}
}
Expand Down Expand Up @@ -214,7 +214,7 @@ mod container {
#[inline]
fn length_in_bytes(&self) -> usize {
match self {
Column::Typed(t) => Sequence::length_in_bytes(&t.borrow()),
Column::Typed(t) => Indexed::length_in_bytes(&t.borrow()),
Column::Bytes(b) => b.len(),
Column::Align(a) => 8 * a.len(),
}
Expand All @@ -223,7 +223,7 @@ mod container {
#[inline]
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
match self {
Column::Typed(t) => Sequence::write(writer, &t.borrow()).unwrap(),
Column::Typed(t) => Indexed::write(writer, &t.borrow()).unwrap(),
Column::Bytes(b) => writer.write_all(b).unwrap(),
Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
}
Expand All @@ -234,7 +234,7 @@ mod container {
mod builder {
use std::collections::VecDeque;

use columnar::bytes::{EncodeDecode, Sequence};
use columnar::bytes::{EncodeDecode, Indexed};
use columnar::{Clear, Columnar, Len, Push};
use timely::container::PushInto;
use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
Expand Down Expand Up @@ -263,7 +263,7 @@ mod builder {
self.current.push(item);
// If there is less than 10% slop with 2MB backing allocations, mint a container.
use columnar::Container;
let words = Sequence::length_in_words(&self.current.borrow());
let words = Indexed::length_in_words(&self.current.borrow());
let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
if round - words < round / 10 {
/// Move the contents from `current` to an aligned allocation, and push it to `pending`.
Expand All @@ -278,7 +278,7 @@ mod builder {
{
let mut alloc = super::alloc_aligned_zeroed(round);
let writer = std::io::Cursor::new(bytemuck::cast_slice_mut(&mut alloc[..]));
Sequence::write(writer, &current.borrow()).unwrap();
Indexed::write(writer, &current.borrow()).unwrap();
pending.push_back(Column::Align(alloc));
current.clear();
}
Expand Down Expand Up @@ -491,14 +491,16 @@ mod tests {
use mz_ore::region::Region;
use timely::Container;
use timely::bytes::arc::BytesMut;
use timely::container::PushInto;
use timely::dataflow::channels::ContainerBytes;

use super::*;

/// Produce some bytes that are in columnar format.
fn raw_columnar_bytes() -> Vec<u8> {
let mut raw = Vec::new();
raw.extend(12_u64.to_le_bytes()); // length
raw.extend(16_u64.to_le_bytes()); // offsets
raw.extend(28_u64.to_le_bytes()); // length
raw.extend(1_i32.to_le_bytes());
raw.extend(2_i32.to_le_bytes());
raw.extend(3_i32.to_le_bytes());
Expand Down Expand Up @@ -530,6 +532,19 @@ mod tests {
assert_eq!(column_align2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
}

/// Assert the desired contents of raw_columnar_bytes so that diagnosing test failures is
/// easier.
#[mz_ore::test]
fn test_column_known_bytes() {
let mut column: Column<i32> = Default::default();
column.push_into(1);
column.push_into(2);
column.push_into(3);
let mut data = Vec::new();
column.into_bytes(&mut std::io::Cursor::new(&mut data));
assert_eq!(data, raw_columnar_bytes());
}

#[mz_ore::test]
fn test_column_from_bytes() {
let raw = raw_columnar_bytes();
Expand Down