Skip to content

Commit 9ec7202

Browse files
Merge pull request #680 from frankmcsherry/columnar_07
Update for Columnar 0.7
2 parents 3dc85f1 + 812e458 commit 9ec7202

File tree

3 files changed

+39
-65
lines changed

3 files changed

+39
-65
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
- windows
1616
toolchain:
1717
- stable
18-
- 1.78
18+
- 1.79
1919
name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }}
2020
runs-on: ${{ matrix.os }}-latest
2121
steps:

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ resolver = "2"
1414
edition = "2021"
1515

1616
[workspace.dependencies]
17-
columnar = "0.6"
17+
columnar = "0.7"
1818

1919
[workspace.lints.clippy]
2020
type_complexity = "allow"

timely/examples/columnar.rs

Lines changed: 37 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! Wordcount based on flatcontainer.
1+
//! Wordcount based on the `columnar` crate.
22
33
use {
44
std::collections::HashMap,
@@ -19,7 +19,8 @@ struct WordCount {
1919

2020
fn main() {
2121

22-
type Container = Column<WordCount>;
22+
type InnerContainer = <WordCount as columnar::Columnar>::Container;
23+
type Container = Column<InnerContainer>;
2324

2425
use columnar::Len;
2526

@@ -55,7 +56,7 @@ fn main() {
5556
)
5657
.container::<Container>()
5758
.unary_frontier(
58-
ExchangeCore::<ColumnBuilder<WordCount>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
59+
ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
5960
"WordCount",
6061
|_capability, _info| {
6162
let mut queues = HashMap::new();
@@ -114,29 +115,27 @@ fn main() {
114115
pub use container::Column;
115116
mod container {
116117

117-
use columnar::Columnar;
118-
use columnar::Container as FooBozzle;
119-
120-
use timely_bytes::arc::Bytes;
121-
122118
/// A container based on a columnar store, encoded in aligned bytes.
123-
pub enum Column<C: Columnar> {
119+
pub enum Column<C> {
124120
/// The typed variant of the container.
125-
Typed(C::Container),
121+
Typed(C),
126122
/// The binary variant of the container.
127-
Bytes(Bytes),
123+
Bytes(timely_bytes::arc::Bytes),
128124
/// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
129125
///
130126
/// Reasons could include misalignment, cloning of data, or wanting
131127
/// to release the `Bytes` as a scarce resource.
132128
Align(Box<[u64]>),
133129
}
134130

135-
impl<C: Columnar> Default for Column<C> {
131+
impl<C: Default> Default for Column<C> {
136132
fn default() -> Self { Self::Typed(Default::default()) }
137133
}
138134

139-
impl<C: Columnar> Clone for Column<C> where C::Container: Clone {
135+
// The clone implementation moves out of the `Bytes` variant into `Align`.
136+
// This is optional and non-optimal, as the bytes clone is relatively free.
137+
// But, we don't want to leak the uses of `Bytes`, is why we do this I think.
138+
impl<C: Clone> Clone for Column<C> where C: Clone {
140139
fn clone(&self) -> Self {
141140
match self {
142141
Column::Typed(t) => Column::Typed(t.clone()),
@@ -151,20 +150,24 @@ mod container {
151150
}
152151
}
153152

154-
use columnar::{Clear, Len, Index, FromBytes};
153+
use columnar::{Len, Index, FromBytes};
155154
use columnar::bytes::{EncodeDecode, Indexed};
156155
use columnar::common::IterOwn;
157156

158-
use timely::Container;
159-
impl<C: Columnar> Container for Column<C> {
160-
fn len(&self) -> usize {
157+
impl<C: columnar::Container> Column<C> {
158+
/// Borrows the contents no matter their representation.
159+
#[inline(always)] fn borrow(&self) -> C::Borrowed<'_> {
161160
match self {
162-
Column::Typed(t) => t.len(),
163-
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(),
164-
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(),
161+
Column::Typed(t) => t.borrow(),
162+
Column::Bytes(b) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))),
163+
Column::Align(a) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)),
165164
}
166165
}
167-
// This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into.
166+
}
167+
168+
impl<C: columnar::Container> timely::Container for Column<C> {
169+
fn len(&self) -> usize { self.borrow().len() }
170+
// This sets `self` to be an empty `Typed` variant, appropriate for pushing into.
168171
fn clear(&mut self) {
169172
match self {
170173
Column::Typed(t) => t.clear(),
@@ -174,28 +177,15 @@ mod container {
174177
}
175178

176179
type ItemRef<'a> = C::Ref<'a>;
177-
type Iter<'a> = IterOwn<<C::Container as columnar::Container<C>>::Borrowed<'a>>;
178-
fn iter<'a>(&'a self) -> Self::Iter<'a> {
179-
match self {
180-
Column::Typed(t) => t.borrow().into_index_iter(),
181-
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(),
182-
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(),
183-
}
184-
}
180+
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
181+
fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() }
185182

186183
type Item<'a> = C::Ref<'a>;
187-
type DrainIter<'a> = IterOwn<<C::Container as columnar::Container<C>>::Borrowed<'a>>;
188-
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
189-
match self {
190-
Column::Typed(t) => t.borrow().into_index_iter(),
191-
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(),
192-
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(),
193-
}
194-
}
184+
type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
185+
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { self.borrow().into_index_iter() }
195186
}
196187

197-
use timely::container::SizableContainer;
198-
impl<C: Columnar> SizableContainer for Column<C> {
188+
impl<C: columnar::Container> timely::container::SizableContainer for Column<C> {
199189
fn at_capacity(&self) -> bool {
200190
match self {
201191
Self::Typed(t) => {
@@ -209,11 +199,9 @@ mod container {
209199
fn ensure_capacity(&mut self, _stash: &mut Option<Self>) { }
210200
}
211201

212-
use timely::container::PushInto;
213-
impl<C: Columnar, T> PushInto<T> for Column<C> where C::Container: columnar::Push<T> {
202+
impl<C: columnar::Container, T> timely::container::PushInto<T> for Column<C> where C: columnar::Push<T> {
214203
#[inline]
215204
fn push_into(&mut self, item: T) {
216-
use columnar::Push;
217205
match self {
218206
Column::Typed(t) => t.push(item),
219207
Column::Align(_) | Column::Bytes(_) => {
@@ -225,8 +213,7 @@ mod container {
225213
}
226214
}
227215

228-
use timely::dataflow::channels::ContainerBytes;
229-
impl<C: Columnar> ContainerBytes for Column<C> {
216+
impl<C: columnar::Container> timely::dataflow::channels::ContainerBytes for Column<C> {
230217
fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self {
231218
// Our expectation / hope is that `bytes` is `u64` aligned and sized.
232219
// If the alignment is borked, we can relocate. IF the size is borked,
@@ -267,27 +254,25 @@ use builder::ColumnBuilder;
267254
mod builder {
268255

269256
use std::collections::VecDeque;
270-
use columnar::{Columnar, Clear, Len, Push};
271257
use columnar::bytes::{EncodeDecode, Indexed};
272258
use super::Column;
273259

274260
/// A container builder for `Column<C>`.
275-
pub struct ColumnBuilder<C: Columnar> {
261+
#[derive(Default)]
262+
pub struct ColumnBuilder<C> {
276263
/// Container that we're writing to.
277-
current: C::Container,
264+
current: C,
278265
/// Empty allocation.
279266
empty: Option<Column<C>>,
280267
/// Completed containers pending to be sent.
281268
pending: VecDeque<Column<C>>,
282269
}
283270

284-
use timely::container::PushInto;
285-
impl<C: Columnar, T> PushInto<T> for ColumnBuilder<C> where C::Container: columnar::Push<T> {
271+
impl<C: columnar::Container, T> timely::container::PushInto<T> for ColumnBuilder<C> where C: columnar::Push<T> {
286272
#[inline]
287273
fn push_into(&mut self, item: T) {
288274
self.current.push(item);
289275
// If there is less than 10% slop with 2MB backing allocations, mint a container.
290-
use columnar::Container;
291276
let words = Indexed::length_in_words(&self.current.borrow());
292277
let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
293278
if round - words < round / 10 {
@@ -299,19 +284,8 @@ mod builder {
299284
}
300285
}
301286

302-
impl<C: Columnar> Default for ColumnBuilder<C> {
303-
#[inline]
304-
fn default() -> Self {
305-
ColumnBuilder {
306-
current: Default::default(),
307-
empty: None,
308-
pending: Default::default(),
309-
}
310-
}
311-
}
312-
313287
use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
314-
impl<C: Columnar> ContainerBuilder for ColumnBuilder<C> where C::Container: Clone {
288+
impl<C: columnar::Container> ContainerBuilder for ColumnBuilder<C> {
315289
type Container = Column<C>;
316290

317291
#[inline]
@@ -343,5 +317,5 @@ mod builder {
343317
}
344318
}
345319

346-
impl<C: Columnar> LengthPreservingContainerBuilder for ColumnBuilder<C> where C::Container: Clone { }
320+
impl<C: columnar::Container> LengthPreservingContainerBuilder for ColumnBuilder<C> { }
347321
}

0 commit comments

Comments
 (0)