diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a392796ed..f5fba7cc3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,7 +15,7 @@ jobs: - windows toolchain: - stable - - 1.78 + - 1.79 name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }} runs-on: ${{ matrix.os }}-latest steps: diff --git a/Cargo.toml b/Cargo.toml index 712b98afa..68b1bf3f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ resolver = "2" edition = "2021" [workspace.dependencies] -columnar = "0.6" +columnar = "0.7" [workspace.lints.clippy] type_complexity = "allow" diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index 7271ccc76..ab6475c82 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -1,4 +1,4 @@ -//! Wordcount based on flatcontainer. +//! Wordcount based on the `columnar` crate. use { std::collections::HashMap, @@ -19,7 +19,8 @@ struct WordCount { fn main() { - type Container = Column; + type InnerContainer = ::Container; + type Container = Column; use columnar::Len; @@ -55,7 +56,7 @@ fn main() { ) .container::() .unary_frontier( - ExchangeCore::,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64), + ExchangeCore::,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64), "WordCount", |_capability, _info| { let mut queues = HashMap::new(); @@ -114,17 +115,12 @@ fn main() { pub use container::Column; mod container { - use columnar::Columnar; - use columnar::Container as FooBozzle; - - use timely_bytes::arc::Bytes; - /// A container based on a columnar store, encoded in aligned bytes. - pub enum Column { + pub enum Column { /// The typed variant of the container. - Typed(C::Container), + Typed(C), /// The binary variant of the container. - Bytes(Bytes), + Bytes(timely_bytes::arc::Bytes), /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason. /// /// Reasons could include misalignment, cloning of data, or wanting @@ -132,11 +128,14 @@ mod container { Align(Box<[u64]>), } - impl Default for Column { + impl Default for Column { fn default() -> Self { Self::Typed(Default::default()) } } - impl Clone for Column where C::Container: Clone { + // The clone implementation moves out of the `Bytes` variant into `Align`. + // This is optional and non-optimal, as the bytes clone is relatively free. + // But, we don't want to leak the uses of `Bytes`, is why we do this I think. + impl Clone for Column where C: Clone { fn clone(&self) -> Self { match self { Column::Typed(t) => Column::Typed(t.clone()), @@ -151,20 +150,24 @@ mod container { } } - use columnar::{Clear, Len, Index, FromBytes}; + use columnar::{Len, Index, FromBytes}; use columnar::bytes::{EncodeDecode, Indexed}; use columnar::common::IterOwn; - use timely::Container; - impl Container for Column { - fn len(&self) -> usize { + impl Column { + /// Borrows the contents no matter their representation. + #[inline(always)] fn borrow(&self) -> C::Borrowed<'_> { match self { - Column::Typed(t) => t.len(), - Column::Bytes(b) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(), - Column::Align(a) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(), + Column::Typed(t) => t.borrow(), + Column::Bytes(b) => as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))), + Column::Align(a) => as FromBytes>::from_bytes(&mut Indexed::decode(a)), } } - // This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into. + } + + impl timely::Container for Column { + fn len(&self) -> usize { self.borrow().len() } + // This sets `self` to be an empty `Typed` variant, appropriate for pushing into. fn clear(&mut self) { match self { Column::Typed(t) => t.clear(), @@ -174,28 +177,15 @@ mod container { } type ItemRef<'a> = C::Ref<'a>; - type Iter<'a> = IterOwn<>::Borrowed<'a>>; - fn iter<'a>(&'a self) -> Self::Iter<'a> { - match self { - Column::Typed(t) => t.borrow().into_index_iter(), - Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(), - Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(), - } - } + type Iter<'a> = IterOwn>; + fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() } type Item<'a> = C::Ref<'a>; - type DrainIter<'a> = IterOwn<>::Borrowed<'a>>; - fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { - match self { - Column::Typed(t) => t.borrow().into_index_iter(), - Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(), - Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(), - } - } + type DrainIter<'a> = IterOwn>; + fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { self.borrow().into_index_iter() } } - use timely::container::SizableContainer; - impl SizableContainer for Column { + impl timely::container::SizableContainer for Column { fn at_capacity(&self) -> bool { match self { Self::Typed(t) => { @@ -209,11 +199,9 @@ mod container { fn ensure_capacity(&mut self, _stash: &mut Option) { } } - use timely::container::PushInto; - impl PushInto for Column where C::Container: columnar::Push { + impl timely::container::PushInto for Column where C: columnar::Push { #[inline] fn push_into(&mut self, item: T) { - use columnar::Push; match self { Column::Typed(t) => t.push(item), Column::Align(_) | Column::Bytes(_) => { @@ -225,8 +213,7 @@ mod container { } } - use timely::dataflow::channels::ContainerBytes; - impl ContainerBytes for Column { + impl timely::dataflow::channels::ContainerBytes for Column { fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { // Our expectation / hope is that `bytes` is `u64` aligned and sized. // If the alignment is borked, we can relocate. IF the size is borked, @@ -267,27 +254,25 @@ use builder::ColumnBuilder; mod builder { use std::collections::VecDeque; - use columnar::{Columnar, Clear, Len, Push}; use columnar::bytes::{EncodeDecode, Indexed}; use super::Column; /// A container builder for `Column`. - pub struct ColumnBuilder { + #[derive(Default)] + pub struct ColumnBuilder { /// Container that we're writing to. - current: C::Container, + current: C, /// Empty allocation. empty: Option>, /// Completed containers pending to be sent. pending: VecDeque>, } - use timely::container::PushInto; - impl PushInto for ColumnBuilder where C::Container: columnar::Push { + impl timely::container::PushInto for ColumnBuilder where C: columnar::Push { #[inline] fn push_into(&mut self, item: T) { self.current.push(item); // If there is less than 10% slop with 2MB backing allocations, mint a container. - use columnar::Container; let words = Indexed::length_in_words(&self.current.borrow()); let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1); if round - words < round / 10 { @@ -299,19 +284,8 @@ mod builder { } } - impl Default for ColumnBuilder { - #[inline] - fn default() -> Self { - ColumnBuilder { - current: Default::default(), - empty: None, - pending: Default::default(), - } - } - } - use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; - impl ContainerBuilder for ColumnBuilder where C::Container: Clone { + impl ContainerBuilder for ColumnBuilder { type Container = Column; #[inline] @@ -343,5 +317,5 @@ mod builder { } } - impl LengthPreservingContainerBuilder for ColumnBuilder where C::Container: Clone { } + impl LengthPreservingContainerBuilder for ColumnBuilder { } }