Skip to content

Commit 9759007

Browse files
committed
Switch to indexed serialization
Switch column from sequence to indexed serialization. The difference is that sequence stores data as a sequence of (length, data) and indexed puts all lengths first, followed by all data. We expect this to be more efficient if we do not read all data, or repeatedly borrow its serialized representation. For details, see frankmcsherry/columnar#30 Signed-off-by: Moritz Hoffmann <mh@materialize.com>
1 parent 2ad6312 commit 9759007

File tree

1 file changed

+21
-9
lines changed

1 file changed

+21
-9
lines changed

src/timely-util/src/containers.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ mod alloc {
6363
mod container {
6464
use columnar::Columnar;
6565
use columnar::Container as _;
66-
use columnar::bytes::{EncodeDecode, Sequence};
66+
use columnar::bytes::{EncodeDecode, Indexed};
6767
use columnar::common::IterOwn;
6868
use columnar::{Clear, FromBytes, Index, Len};
6969
use mz_ore::region::Region;
@@ -97,12 +97,12 @@ mod container {
9797
Column::Typed(t) => t.borrow(),
9898
Column::Bytes(b) => {
9999
<<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes(
100-
&mut Sequence::decode(bytemuck::cast_slice(b)),
100+
&mut Indexed::decode(bytemuck::cast_slice(b)),
101101
)
102102
}
103103
Column::Align(a) => {
104104
<<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes(
105-
&mut Sequence::decode(a),
105+
&mut Indexed::decode(a),
106106
)
107107
}
108108
}
@@ -214,7 +214,7 @@ mod container {
214214
#[inline]
215215
fn length_in_bytes(&self) -> usize {
216216
match self {
217-
Column::Typed(t) => Sequence::length_in_bytes(&t.borrow()),
217+
Column::Typed(t) => Indexed::length_in_bytes(&t.borrow()),
218218
Column::Bytes(b) => b.len(),
219219
Column::Align(a) => 8 * a.len(),
220220
}
@@ -223,7 +223,7 @@ mod container {
223223
#[inline]
224224
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
225225
match self {
226-
Column::Typed(t) => Sequence::write(writer, &t.borrow()).unwrap(),
226+
Column::Typed(t) => Indexed::write(writer, &t.borrow()).unwrap(),
227227
Column::Bytes(b) => writer.write_all(b).unwrap(),
228228
Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
229229
}
@@ -234,7 +234,7 @@ mod container {
234234
mod builder {
235235
use std::collections::VecDeque;
236236

237-
use columnar::bytes::{EncodeDecode, Sequence};
237+
use columnar::bytes::{EncodeDecode, Indexed};
238238
use columnar::{Clear, Columnar, Len, Push};
239239
use timely::container::PushInto;
240240
use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
@@ -263,7 +263,7 @@ mod builder {
263263
self.current.push(item);
264264
// If there is less than 10% slop with 2MB backing allocations, mint a container.
265265
use columnar::Container;
266-
let words = Sequence::length_in_words(&self.current.borrow());
266+
let words = Indexed::length_in_words(&self.current.borrow());
267267
let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
268268
if round - words < round / 10 {
269269
/// Move the contents from `current` to an aligned allocation, and push it to `pending`.
@@ -278,7 +278,7 @@ mod builder {
278278
{
279279
let mut alloc = super::alloc_aligned_zeroed(round);
280280
let writer = std::io::Cursor::new(bytemuck::cast_slice_mut(&mut alloc[..]));
281-
Sequence::write(writer, &current.borrow()).unwrap();
281+
Indexed::write(writer, &current.borrow()).unwrap();
282282
pending.push_back(Column::Align(alloc));
283283
current.clear();
284284
}
@@ -491,14 +491,16 @@ mod tests {
491491
use mz_ore::region::Region;
492492
use timely::Container;
493493
use timely::bytes::arc::BytesMut;
494+
use timely::container::PushInto;
494495
use timely::dataflow::channels::ContainerBytes;
495496

496497
use super::*;
497498

498499
/// Produce some bytes that are in columnar format.
499500
fn raw_columnar_bytes() -> Vec<u8> {
500501
let mut raw = Vec::new();
501-
raw.extend(12_u64.to_le_bytes()); // length
502+
raw.extend(16_u64.to_le_bytes()); // offsets
503+
raw.extend(28_u64.to_le_bytes()); // length
502504
raw.extend(1_i32.to_le_bytes());
503505
raw.extend(2_i32.to_le_bytes());
504506
raw.extend(3_i32.to_le_bytes());
@@ -532,6 +534,16 @@ mod tests {
532534

533535
#[mz_ore::test]
534536
fn test_column_from_bytes() {
537+
{
538+
let mut column: Column<i32> = Default::default();
539+
column.push_into(1);
540+
column.push_into(2);
541+
column.push_into(3);
542+
let mut data = Vec::new();
543+
column.into_bytes(&mut std::io::Cursor::new(&mut data));
544+
println!("data: {:?}", data);
545+
}
546+
535547
let raw = raw_columnar_bytes();
536548

537549
let buf = vec![0; raw.len() + 8];

0 commit comments

Comments
 (0)