Skip to content

Commit cc7db71

Browse files
committed
Demonstrate columnar stuff
1 parent 2a5cb8b commit cc7db71

File tree

4 files changed

+189
-0
lines changed

4 files changed

+189
-0
lines changed

container/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ edition.workspace = true
99
columnation = { git = "https://github.com/frankmcsherry/columnation" }
1010
flatcontainer = "0.5"
1111
serde = { version = "1.0", features = ["derive"] }
12+
# columnar = { path = "../../columnar" }
13+
columnar = { git = "https://github.com/frankmcsherry/columnar" }

container/src/columnar.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
//! Present a columnar container as a timely container.
2+
3+
use serde::{Serialize, Deserialize};
4+
5+
pub use columnar::*;
6+
use columnar::common::IterOwn;
7+
8+
use crate::{Container, SizableContainer, PushInto};
9+
10+
/// A container based on a `columnar` store.
11+
#[derive(Clone, Default, Serialize, Deserialize)]
12+
pub struct Columnar<C> {
13+
store: C,
14+
}
15+
16+
impl<C: Len + Clear + Clone + Default + 'static> Container for Columnar<C>
17+
where
18+
for<'a> &'a C: columnar::Index,
19+
{
20+
fn len(&self) -> usize { self.store.len() }
21+
fn clear(&mut self) { self.store.clear() }
22+
23+
type ItemRef<'a> = <&'a C as Index>::Ref where Self: 'a;
24+
type Iter<'a> = IterOwn<&'a C>;
25+
fn iter<'a>(&'a self) -> Self::Iter<'a> { (&self.store).into_iter() }
26+
27+
type Item<'a> = <&'a C as Index>::Ref where Self: 'a;
28+
type DrainIter<'a> = IterOwn<&'a C>;
29+
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { (&self.store).into_iter() }
30+
}
31+
32+
impl<C: Len + Clear + Clone + Default + 'static> SizableContainer for Columnar<C>
33+
where
34+
for<'a> &'a C: columnar::Index,
35+
{
36+
fn capacity(&self) -> usize { 1024 }
37+
fn preferred_capacity() -> usize { 1024 }
38+
fn reserve(&mut self, _additional: usize) { }
39+
}
40+
41+
impl<C: columnar::Push<T>, T> PushInto<T> for Columnar<C> {
42+
#[inline]
43+
fn push_into(&mut self, item: T) {
44+
self.store.push(item);
45+
}
46+
}
47+
48+
49+
use columnar::bytes::{AsBytes, FromBytes, serialization::decode};
50+
51+
/// A container based on a columnar store, encoded in aligned bytes.
52+
#[derive(Clone, Default)]
53+
pub struct ColumnarBytes<B, C> {
54+
bytes: B,
55+
phantom: std::marker::PhantomData<C>,
56+
}
57+
58+
impl<B: std::ops::Deref<Target = [u64]> + Clone + Default + 'static, C: AsBytes + Clone + Default + 'static> Container for ColumnarBytes<B, C>
59+
where
60+
for<'a> C::Borrowed<'a> : Len + Clear + Index,
61+
{
62+
fn len(&self) -> usize {
63+
<C::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(&self.bytes)).len()
64+
}
65+
// Perhpas this should be an enum that allows the bytes to be un-set, but .. not sure what this should do.
66+
fn clear(&mut self) { unimplemented!() }
67+
68+
type ItemRef<'a> = <C::Borrowed<'a> as Index>::Ref where Self: 'a;
69+
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
70+
fn iter<'a>(&'a self) -> Self::Iter<'a> {
71+
<C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(&self.bytes)).into_iter()
72+
}
73+
74+
type Item<'a> = <C::Borrowed<'a> as Index>::Ref where Self: 'a;
75+
type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
76+
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
77+
<C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(&self.bytes)).into_iter()
78+
}
79+
}
80+
81+
impl<B: std::ops::Deref<Target = [u64]> + Clone + Default + 'static, C: AsBytes + Clone + Default + 'static> SizableContainer for ColumnarBytes<B, C>
82+
where
83+
for<'a> C::Borrowed<'a> : Len + Clear + Index,
84+
{
85+
fn capacity(&self) -> usize { 1024 }
86+
fn preferred_capacity() -> usize { 1024 }
87+
fn reserve(&mut self, _additional: usize) { }
88+
}

container/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::collections::VecDeque;
66

77
pub mod columnation;
88
pub mod flatcontainer;
9+
pub mod columnar;
910

1011
/// A container transferring data through dataflow edges
1112
///

timely/examples/columnar.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
//! Wordcount based on flatcontainer.
2+
3+
use {
4+
std::collections::HashMap,
5+
timely::{Container, container::CapacityContainerBuilder},
6+
timely::container::columnar::Columnar,
7+
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
8+
timely::dataflow::InputHandleCore,
9+
timely::dataflow::operators::{Inspect, Operator, Probe},
10+
timely::dataflow::ProbeHandle,
11+
};
12+
13+
fn main() {
14+
15+
use timely_container::columnar::Strings;
16+
type Container = Columnar<(Strings, Vec<i64>)>;
17+
18+
// initializes and runs a timely dataflow.
19+
timely::execute_from_args(std::env::args(), |worker| {
20+
let mut input = <InputHandleCore<_, CapacityContainerBuilder<Container>>>::new();
21+
let mut probe = ProbeHandle::new();
22+
23+
// create a new input, exchange data, and inspect its output
24+
worker.dataflow::<usize, _, _>(|scope| {
25+
input
26+
.to_stream(scope)
27+
.unary(
28+
Pipeline,
29+
"Split",
30+
|_cap, _info| {
31+
move |input, output| {
32+
while let Some((time, data)) = input.next() {
33+
let mut session = output.session(&time);
34+
for (text, diff) in data.iter().flat_map(|(text, diff)| {
35+
text.split_whitespace().map(move |s| (s, diff))
36+
}) {
37+
session.give((text, diff));
38+
}
39+
}
40+
}
41+
},
42+
)
43+
.container::<Container>()
44+
.unary_frontier(
45+
ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64),
46+
"WordCount",
47+
|_capability, _info| {
48+
let mut queues = HashMap::new();
49+
let mut counts = HashMap::new();
50+
51+
move |input, output| {
52+
while let Some((time, data)) = input.next() {
53+
queues
54+
.entry(time.retain())
55+
.or_insert(Vec::new())
56+
.push(data.take());
57+
}
58+
59+
for (key, val) in queues.iter_mut() {
60+
if !input.frontier().less_equal(key.time()) {
61+
let mut session = output.session(key);
62+
for batch in val.drain(..) {
63+
for (word, diff) in batch.iter() {
64+
let total =
65+
if let Some(count) = counts.get_mut(word) {
66+
*count += diff;
67+
*count
68+
}
69+
else {
70+
counts.insert(word.to_string(), *diff);
71+
*diff
72+
};
73+
session.give((word, total));
74+
}
75+
}
76+
}
77+
}
78+
79+
queues.retain(|_key, val| !val.is_empty());
80+
}
81+
},
82+
)
83+
.container::<Container>()
84+
.inspect(|x| println!("seen: {:?}", x))
85+
.probe_with(&mut probe);
86+
});
87+
88+
// introduce data and watch!
89+
for round in 0..10 {
90+
input.send(("flat container", 1));
91+
input.advance_to(round + 1);
92+
while probe.less_than(input.time()) {
93+
worker.step();
94+
}
95+
}
96+
})
97+
.unwrap();
98+
}

0 commit comments

Comments
 (0)