diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 32ddf38b2..7c9dd0e21 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -19,7 +19,7 @@ default = ["getopts"] getopts = ["getopts-dep", "timely_communication/getopts"] [dependencies] -columnar = "0.2" +columnar = "0.3" getopts-dep = { package = "getopts", version = "0.2.21", optional = true } bincode = { version = "1.0" } byteorder = "1.5" diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index 1d0cfd2c8..12d362f3c 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -151,8 +151,8 @@ mod container { } } - use columnar::{Clear, Len, Index, AsBytes, FromBytes}; - use columnar::bytes::serialization::decode; + use columnar::{Clear, Len, Index, FromBytes}; + use columnar::bytes::{EncodeDecode, Indexed}; use columnar::common::IterOwn; use timely::Container; @@ -160,8 +160,8 @@ mod container { fn len(&self) -> usize { match self { Column::Typed(t) => t.len(), - Column::Bytes(b) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).len(), - Column::Align(a) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(a)).len(), + Column::Bytes(b) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(), + Column::Align(a) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(), } } // This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into. @@ -178,8 +178,8 @@ mod container { fn iter<'a>(&'a self) -> Self::Iter<'a> { match self { Column::Typed(t) => t.borrow().into_iter(), - Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(), - Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(), + Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_iter(), + Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_iter(), } } @@ -188,8 +188,8 @@ mod container { fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { match self { Column::Typed(t) => t.borrow().into_iter(), - Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(), - Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(), + Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_iter(), + Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_iter(), } } } @@ -199,7 +199,7 @@ mod container { fn at_capacity(&self) -> bool { match self { Self::Typed(t) => { - let length_in_bytes = t.borrow().length_in_words() * 8; + let length_in_bytes = 8 * Indexed::length_in_words(&t.borrow()); length_in_bytes >= (1 << 20) }, Self::Bytes(_) => true, @@ -246,7 +246,7 @@ mod container { fn length_in_bytes(&self) -> usize { match self { // We'll need one u64 for the length, then the length rounded up to a multiple of 8. - Column::Typed(t) => 8 * t.borrow().length_in_words(), + Column::Typed(t) => 8 * Indexed::length_in_words(&t.borrow()), Column::Bytes(b) => b.len(), Column::Align(a) => 8 * a.len(), } @@ -254,20 +254,7 @@ mod container { fn into_bytes(&self, writer: &mut W) { match self { - Column::Typed(t) => { - use columnar::Container; - // Columnar data is serialized as a sequence of `u64` values, with each `[u8]` slice - // serialize as first its length in bytes, and then as many `u64` values as needed. - // Padding should be added, but only for alignment; no specific values are required. - for (align, bytes) in t.borrow().as_bytes() { - assert!(align <= 8); - let length: u64 = bytes.len().try_into().unwrap(); - writer.write_all(bytemuck::cast_slice(std::slice::from_ref(&length))).unwrap(); - writer.write_all(bytes).unwrap(); - let padding: usize = ((8 - (length % 8)) % 8).try_into().unwrap(); - writer.write_all(&[0; 8][..padding]).unwrap(); - } - }, + Column::Typed(t) => { Indexed::write(writer, &t.borrow()).unwrap() }, Column::Bytes(b) => writer.write_all(b).unwrap(), Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(), } @@ -280,7 +267,8 @@ use builder::ColumnBuilder; mod builder { use std::collections::VecDeque; - use columnar::{Columnar, Clear, Len, AsBytes, Push}; + use columnar::{Columnar, Clear, Len, Push}; + use columnar::bytes::{EncodeDecode, Indexed}; use super::Column; /// A container builder for `Column`. @@ -300,11 +288,11 @@ mod builder { self.current.push(item); // If there is less than 10% slop with 2MB backing allocations, mint a container. use columnar::Container; - let words = self.current.borrow().length_in_words(); + let words = Indexed::length_in_words(&self.current.borrow()); let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1); if round - words < round / 10 { let mut alloc = Vec::with_capacity(round); - columnar::bytes::serialization::encode(&mut alloc, self.current.borrow().as_bytes()); + Indexed::encode(&mut alloc, &self.current.borrow()); self.pending.push_back(Column::Align(alloc.into_boxed_slice())); self.current.clear(); }