Skip to content

Commit 6826f06

Browse files
Update columnar to 0.3, and columnar example (#635)
1 parent 904d458 commit 6826f06

File tree

2 files changed

+16
-28
lines changed

2 files changed

+16
-28
lines changed

timely/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ default = ["getopts"]
1919
getopts = ["getopts-dep", "timely_communication/getopts"]
2020

2121
[dependencies]
22-
columnar = "0.2"
22+
columnar = "0.3"
2323
getopts-dep = { package = "getopts", version = "0.2.21", optional = true }
2424
bincode = { version = "1.0" }
2525
byteorder = "1.5"

timely/examples/columnar.rs

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -151,17 +151,17 @@ mod container {
151151
}
152152
}
153153

154-
use columnar::{Clear, Len, Index, AsBytes, FromBytes};
155-
use columnar::bytes::serialization::decode;
154+
use columnar::{Clear, Len, Index, FromBytes};
155+
use columnar::bytes::{EncodeDecode, Indexed};
156156
use columnar::common::IterOwn;
157157

158158
use timely::Container;
159159
impl<C: Columnar> Container for Column<C> {
160160
fn len(&self) -> usize {
161161
match self {
162162
Column::Typed(t) => t.len(),
163-
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).len(),
164-
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(a)).len(),
163+
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(),
164+
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(),
165165
}
166166
}
167167
// This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into.
@@ -178,8 +178,8 @@ mod container {
178178
fn iter<'a>(&'a self) -> Self::Iter<'a> {
179179
match self {
180180
Column::Typed(t) => t.borrow().into_iter(),
181-
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(),
182-
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(),
181+
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_iter(),
182+
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_iter(),
183183
}
184184
}
185185

@@ -188,8 +188,8 @@ mod container {
188188
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
189189
match self {
190190
Column::Typed(t) => t.borrow().into_iter(),
191-
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(),
192-
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(),
191+
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_iter(),
192+
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_iter(),
193193
}
194194
}
195195
}
@@ -199,7 +199,7 @@ mod container {
199199
fn at_capacity(&self) -> bool {
200200
match self {
201201
Self::Typed(t) => {
202-
let length_in_bytes = t.borrow().length_in_words() * 8;
202+
let length_in_bytes = 8 * Indexed::length_in_words(&t.borrow());
203203
length_in_bytes >= (1 << 20)
204204
},
205205
Self::Bytes(_) => true,
@@ -246,28 +246,15 @@ mod container {
246246
fn length_in_bytes(&self) -> usize {
247247
match self {
248248
// We'll need one u64 for the length, then the length rounded up to a multiple of 8.
249-
Column::Typed(t) => 8 * t.borrow().length_in_words(),
249+
Column::Typed(t) => 8 * Indexed::length_in_words(&t.borrow()),
250250
Column::Bytes(b) => b.len(),
251251
Column::Align(a) => 8 * a.len(),
252252
}
253253
}
254254

255255
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
256256
match self {
257-
Column::Typed(t) => {
258-
use columnar::Container;
259-
// Columnar data is serialized as a sequence of `u64` values, with each `[u8]` slice
260-
// serialize as first its length in bytes, and then as many `u64` values as needed.
261-
// Padding should be added, but only for alignment; no specific values are required.
262-
for (align, bytes) in t.borrow().as_bytes() {
263-
assert!(align <= 8);
264-
let length: u64 = bytes.len().try_into().unwrap();
265-
writer.write_all(bytemuck::cast_slice(std::slice::from_ref(&length))).unwrap();
266-
writer.write_all(bytes).unwrap();
267-
let padding: usize = ((8 - (length % 8)) % 8).try_into().unwrap();
268-
writer.write_all(&[0; 8][..padding]).unwrap();
269-
}
270-
},
257+
Column::Typed(t) => { Indexed::write(writer, &t.borrow()).unwrap() },
271258
Column::Bytes(b) => writer.write_all(b).unwrap(),
272259
Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
273260
}
@@ -280,7 +267,8 @@ use builder::ColumnBuilder;
280267
mod builder {
281268

282269
use std::collections::VecDeque;
283-
use columnar::{Columnar, Clear, Len, AsBytes, Push};
270+
use columnar::{Columnar, Clear, Len, Push};
271+
use columnar::bytes::{EncodeDecode, Indexed};
284272
use super::Column;
285273

286274
/// A container builder for `Column<C>`.
@@ -300,11 +288,11 @@ mod builder {
300288
self.current.push(item);
301289
// If there is less than 10% slop with 2MB backing allocations, mint a container.
302290
use columnar::Container;
303-
let words = self.current.borrow().length_in_words();
291+
let words = Indexed::length_in_words(&self.current.borrow());
304292
let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
305293
if round - words < round / 10 {
306294
let mut alloc = Vec::with_capacity(round);
307-
columnar::bytes::serialization::encode(&mut alloc, self.current.borrow().as_bytes());
295+
Indexed::encode(&mut alloc, &self.current.borrow());
308296
self.pending.push_back(Column::Align(alloc.into_boxed_slice()));
309297
self.current.clear();
310298
}

0 commit comments

Comments
 (0)