Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- windows
toolchain:
- stable
- 1.78
- 1.79
name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }}
runs-on: ${{ matrix.os }}-latest
steps:
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ resolver = "2"
edition = "2021"

[workspace.dependencies]
columnar = "0.6"
columnar = "0.7"

[workspace.lints.clippy]
type_complexity = "allow"
Expand Down
100 changes: 37 additions & 63 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Wordcount based on flatcontainer.
//! Wordcount based on the `columnar` crate.

use {
std::collections::HashMap,
Expand All @@ -19,7 +19,8 @@ struct WordCount {

fn main() {

type Container = Column<WordCount>;
type InnerContainer = <WordCount as columnar::Columnar>::Container;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have a typedef in columnar:

pub type ContainerOf<T> = <T as Columnar>::Container;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooo. Interesting!

type Container = Column<InnerContainer>;

use columnar::Len;

Expand Down Expand Up @@ -55,7 +56,7 @@ fn main() {
)
.container::<Container>()
.unary_frontier(
ExchangeCore::<ColumnBuilder<WordCount>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
"WordCount",
|_capability, _info| {
let mut queues = HashMap::new();
Expand Down Expand Up @@ -114,29 +115,27 @@ fn main() {
pub use container::Column;
mod container {

use columnar::Columnar;
use columnar::Container as FooBozzle;

use timely_bytes::arc::Bytes;

/// A container based on a columnar store, encoded in aligned bytes.
pub enum Column<C: Columnar> {
pub enum Column<C> {
/// The typed variant of the container.
Typed(C::Container),
Typed(C),
/// The binary variant of the container.
Bytes(Bytes),
Bytes(timely_bytes::arc::Bytes),
/// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
///
/// Reasons could include misalignment, cloning of data, or wanting
/// to release the `Bytes` as a scarce resource.
Align(Box<[u64]>),
}

impl<C: Columnar> Default for Column<C> {
impl<C: Default> Default for Column<C> {
fn default() -> Self { Self::Typed(Default::default()) }
}

impl<C: Columnar> Clone for Column<C> where C::Container: Clone {
// The clone implementation moves out of the `Bytes` variant into `Align`.
// This is optional and non-optimal, as the bytes clone is relatively free.
// But, we don't want to leak the uses of `Bytes`, is why we do this I think.
impl<C: Clone> Clone for Column<C> where C: Clone {
fn clone(&self) -> Self {
match self {
Column::Typed(t) => Column::Typed(t.clone()),
Expand All @@ -151,20 +150,24 @@ mod container {
}
}

use columnar::{Clear, Len, Index, FromBytes};
use columnar::{Len, Index, FromBytes};
use columnar::bytes::{EncodeDecode, Indexed};
use columnar::common::IterOwn;

use timely::Container;
impl<C: Columnar> Container for Column<C> {
fn len(&self) -> usize {
impl<C: columnar::Container> Column<C> {
/// Borrows the contents no matter their representation.
#[inline(always)] fn borrow(&self) -> C::Borrowed<'_> {
match self {
Column::Typed(t) => t.len(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(),
Column::Typed(t) => t.borrow(),
Column::Bytes(b) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))),
Column::Align(a) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)),
}
}
// This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into.
}

impl<C: columnar::Container> timely::Container for Column<C> {
fn len(&self) -> usize { self.borrow().len() }
// This sets `self` to be an empty `Typed` variant, appropriate for pushing into.
fn clear(&mut self) {
match self {
Column::Typed(t) => t.clear(),
Expand All @@ -174,28 +177,15 @@ mod container {
}

type ItemRef<'a> = C::Ref<'a>;
type Iter<'a> = IterOwn<<C::Container as columnar::Container<C>>::Borrowed<'a>>;
fn iter<'a>(&'a self) -> Self::Iter<'a> {
match self {
Column::Typed(t) => t.borrow().into_index_iter(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(),
}
}
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() }

type Item<'a> = C::Ref<'a>;
type DrainIter<'a> = IterOwn<<C::Container as columnar::Container<C>>::Borrowed<'a>>;
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
match self {
Column::Typed(t) => t.borrow().into_index_iter(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(),
}
}
type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { self.borrow().into_index_iter() }
}

use timely::container::SizableContainer;
impl<C: Columnar> SizableContainer for Column<C> {
impl<C: columnar::Container> timely::container::SizableContainer for Column<C> {
fn at_capacity(&self) -> bool {
match self {
Self::Typed(t) => {
Expand All @@ -209,11 +199,9 @@ mod container {
fn ensure_capacity(&mut self, _stash: &mut Option<Self>) { }
}

use timely::container::PushInto;
impl<C: Columnar, T> PushInto<T> for Column<C> where C::Container: columnar::Push<T> {
impl<C: columnar::Container, T> timely::container::PushInto<T> for Column<C> where C: columnar::Push<T> {
#[inline]
fn push_into(&mut self, item: T) {
use columnar::Push;
match self {
Column::Typed(t) => t.push(item),
Column::Align(_) | Column::Bytes(_) => {
Expand All @@ -225,8 +213,7 @@ mod container {
}
}

use timely::dataflow::channels::ContainerBytes;
impl<C: Columnar> ContainerBytes for Column<C> {
impl<C: columnar::Container> timely::dataflow::channels::ContainerBytes for Column<C> {
fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self {
// Our expectation / hope is that `bytes` is `u64` aligned and sized.
// If the alignment is borked, we can relocate. IF the size is borked,
Expand Down Expand Up @@ -267,27 +254,25 @@ use builder::ColumnBuilder;
mod builder {

use std::collections::VecDeque;
use columnar::{Columnar, Clear, Len, Push};
use columnar::bytes::{EncodeDecode, Indexed};
use super::Column;

/// A container builder for `Column<C>`.
pub struct ColumnBuilder<C: Columnar> {
#[derive(Default)]
pub struct ColumnBuilder<C> {
/// Container that we're writing to.
current: C::Container,
current: C,
/// Empty allocation.
empty: Option<Column<C>>,
/// Completed containers pending to be sent.
pending: VecDeque<Column<C>>,
}

use timely::container::PushInto;
impl<C: Columnar, T> PushInto<T> for ColumnBuilder<C> where C::Container: columnar::Push<T> {
impl<C: columnar::Container, T> timely::container::PushInto<T> for ColumnBuilder<C> where C: columnar::Push<T> {
#[inline]
fn push_into(&mut self, item: T) {
self.current.push(item);
// If there is less than 10% slop with 2MB backing allocations, mint a container.
use columnar::Container;
let words = Indexed::length_in_words(&self.current.borrow());
let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
if round - words < round / 10 {
Expand All @@ -299,19 +284,8 @@ mod builder {
}
}

impl<C: Columnar> Default for ColumnBuilder<C> {
#[inline]
fn default() -> Self {
ColumnBuilder {
current: Default::default(),
empty: None,
pending: Default::default(),
}
}
}

use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
impl<C: Columnar> ContainerBuilder for ColumnBuilder<C> where C::Container: Clone {
impl<C: columnar::Container> ContainerBuilder for ColumnBuilder<C> {
type Container = Column<C>;

#[inline]
Expand Down Expand Up @@ -343,5 +317,5 @@ mod builder {
}
}

impl<C: Columnar> LengthPreservingContainerBuilder for ColumnBuilder<C> where C::Container: Clone { }
impl<C: columnar::Container> LengthPreservingContainerBuilder for ColumnBuilder<C> { }
}
Loading