Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ resolver = "2"
[workspace.dependencies]
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.15.3" }
timely = { version = "0.21", default-features = false }
columnar = { version = "0.8", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[profile.release]
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ graph_map = "0.1"
bytemuck = "1.18.0"

[dependencies]
columnar = "0.6"
columnar = { workspace = true }
columnation = "0.1.0"
fnv="1.0.2"
paste = "1.0"
Expand Down
53 changes: 30 additions & 23 deletions differential-dataflow/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,17 @@ mod container {
use columnar::bytes::{EncodeDecode, Indexed};
use columnar::common::IterOwn;

type BorrowedOf<'a, C> = <<C as Columnar>::Container as columnar::Container>::Borrowed<'a>;

impl<C: Columnar> Column<C> {
pub fn borrow(&self) -> <C::Container as columnar::Container<C>>::Borrowed<'_> {
pub fn borrow(&self) -> BorrowedOf<C> {
match self {
Column::Typed(t) => t.borrow(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)),
Column::Bytes(b) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))),
Column::Align(a) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(a)),
}
}
pub fn get(&self, index: usize) -> C::Ref<'_> {
pub fn get(&self, index: usize) -> columnar::Ref<C> {
self.borrow().get(index)
}
}
Expand All @@ -174,8 +176,8 @@ mod container {
fn len(&self) -> usize {
match self {
Column::Typed(t) => t.len(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(),
Column::Bytes(b) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(),
Column::Align(a) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(),
}
}
// This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into.
Expand All @@ -187,23 +189,23 @@ mod container {
}
}

type ItemRef<'a> = C::Ref<'a>;
type Iter<'a> = IterOwn<<C::Container as columnar::Container<C>>::Borrowed<'a>>;
type ItemRef<'a> = columnar::Ref<'a, C>;
type Iter<'a> = IterOwn<BorrowedOf<'a, C>>;
fn iter<'a>(&'a self) -> Self::Iter<'a> {
match self {
Column::Typed(t) => t.borrow().into_index_iter(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(),
Column::Bytes(b) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(),
Column::Align(a) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(),
}
}

type Item<'a> = C::Ref<'a>;
type DrainIter<'a> = IterOwn<<C::Container as columnar::Container<C>>::Borrowed<'a>>;
type Item<'a> = columnar::Ref<'a, C>;
type DrainIter<'a> = IterOwn<BorrowedOf<'a, C>>;
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
match self {
Column::Typed(t) => t.borrow().into_index_iter(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(),
Column::Bytes(b) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(),
Column::Align(a) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(),
}
}
}
Expand Down Expand Up @@ -406,10 +408,13 @@ pub mod batcher {

impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker<C2>
where
D: for<'b> Columnar<Ref<'b>: Ord>,
T: for<'b> Columnar<Ref<'b>: Ord>,
R: for<'b> Columnar<Ref<'b>: Ord> + for<'b> Semigroup<R::Ref<'b>>,
C2: Container + for<'b, 'c> PushInto<(D::Ref<'b>, T::Ref<'b>, &'c R)>,
D: for<'b> Columnar,
for<'b> columnar::Ref<'b, D>: Ord,
T: for<'b> Columnar,
for<'b> columnar::Ref<'b, T>: Ord,
R: for<'b> Columnar + for<'b> Semigroup<columnar::Ref<'b, R>>,
for<'b> columnar::Ref<'b, R>: Ord,
C2: Container + for<'b, 'c> PushInto<(columnar::Ref<'b, D>, columnar::Ref<'b, T>, &'c R)>,
{
fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {

Expand Down Expand Up @@ -482,11 +487,13 @@ pub mod batcher {

impl<D, T, R> ContainerQueue<Column<(D, T, R)>> for ColumnQueue<(D, T, R)>
where
D: for<'a> Columnar<Ref<'a>: Ord>,
T: for<'a> Columnar<Ref<'a>: Ord>,
D: for<'a> Columnar,
for<'b> columnar::Ref<'b, D>: Ord,
T: for<'a> Columnar,
for<'b> columnar::Ref<'b, T>: Ord,
R: Columnar,
{
fn next_or_alloc(&mut self) -> Result<<(D, T, R) as Columnar>::Ref<'_>, Column<(D, T, R)>> {
fn next_or_alloc(&mut self) -> Result<columnar::Ref<(D, T, R)>, Column<(D, T, R)>> {
if self.is_empty() {
Err(std::mem::take(&mut self.list))
}
Expand All @@ -510,12 +517,12 @@ pub mod batcher {
}

impl<T: Columnar> ColumnQueue<T> {
fn pop(&mut self) -> T::Ref<'_> {
fn pop(&mut self) -> columnar::Ref<T> {
self.head += 1;
self.list.get(self.head - 1)
}

fn peek(&self) -> T::Ref<'_> {
fn peek(&self) -> columnar::Ref<T> {
self.list.get(self.head)
}
}
Expand Down