Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion timely/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ default = ["getopts"]
getopts = ["getopts-dep", "timely_communication/getopts"]

[dependencies]
columnar = "0.2"
columnar = "0.3"
getopts-dep = { package = "getopts", version = "0.2.21", optional = true }
bincode = { version = "1.0" }
byteorder = "1.5"
Expand Down
42 changes: 15 additions & 27 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,17 @@ mod container {
}
}

use columnar::{Clear, Len, Index, AsBytes, FromBytes};
use columnar::bytes::serialization::decode;
use columnar::{Clear, Len, Index, FromBytes};
use columnar::bytes::{EncodeDecode, Indexed};
use columnar::common::IterOwn;

use timely::Container;
impl<C: Columnar> Container for Column<C> {
fn len(&self) -> usize {
match self {
Column::Typed(t) => t.len(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).len(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(a)).len(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(),
}
}
// This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into.
Expand All @@ -178,8 +178,8 @@ mod container {
fn iter<'a>(&'a self) -> Self::Iter<'a> {
match self {
Column::Typed(t) => t.borrow().into_iter(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_iter(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_iter(),
}
}

Expand All @@ -188,8 +188,8 @@ mod container {
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
match self {
Column::Typed(t) => t.borrow().into_iter(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_iter(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_iter(),
}
}
}
Expand All @@ -199,7 +199,7 @@ mod container {
fn at_capacity(&self) -> bool {
match self {
Self::Typed(t) => {
let length_in_bytes = t.borrow().length_in_words() * 8;
let length_in_bytes = 8 * Indexed::length_in_words(&t.borrow());
length_in_bytes >= (1 << 20)
},
Self::Bytes(_) => true,
Expand Down Expand Up @@ -246,28 +246,15 @@ mod container {
fn length_in_bytes(&self) -> usize {
match self {
// We'll need one u64 for the length, then the length rounded up to a multiple of 8.
Column::Typed(t) => 8 * t.borrow().length_in_words(),
Column::Typed(t) => 8 * Indexed::length_in_words(&t.borrow()),
Column::Bytes(b) => b.len(),
Column::Align(a) => 8 * a.len(),
}
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
match self {
Column::Typed(t) => {
use columnar::Container;
// Columnar data is serialized as a sequence of `u64` values, with each `[u8]` slice
// serialize as first its length in bytes, and then as many `u64` values as needed.
// Padding should be added, but only for alignment; no specific values are required.
for (align, bytes) in t.borrow().as_bytes() {
assert!(align <= 8);
let length: u64 = bytes.len().try_into().unwrap();
writer.write_all(bytemuck::cast_slice(std::slice::from_ref(&length))).unwrap();
writer.write_all(bytes).unwrap();
let padding: usize = ((8 - (length % 8)) % 8).try_into().unwrap();
writer.write_all(&[0; 8][..padding]).unwrap();
}
},
Column::Typed(t) => { Indexed::write(writer, &t.borrow()).unwrap() },
Column::Bytes(b) => writer.write_all(b).unwrap(),
Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
}
Expand All @@ -280,7 +267,8 @@ use builder::ColumnBuilder;
mod builder {

use std::collections::VecDeque;
use columnar::{Columnar, Clear, Len, AsBytes, Push};
use columnar::{Columnar, Clear, Len, Push};
use columnar::bytes::{EncodeDecode, Indexed};
use super::Column;

/// A container builder for `Column<C>`.
Expand All @@ -300,11 +288,11 @@ mod builder {
self.current.push(item);
// If there is less than 10% slop with 2MB backing allocations, mint a container.
use columnar::Container;
let words = self.current.borrow().length_in_words();
let words = Indexed::length_in_words(&self.current.borrow());
let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
if round - words < round / 10 {
let mut alloc = Vec::with_capacity(round);
columnar::bytes::serialization::encode(&mut alloc, self.current.borrow().as_bytes());
Indexed::encode(&mut alloc, &self.current.borrow());
self.pending.push_back(Column::Align(alloc.into_boxed_slice()));
self.current.clear();
}
Expand Down