@@ -10,10 +10,19 @@ use {
1010 timely:: dataflow:: ProbeHandle ,
1111} ;
1212
13+ // Creates `WordCountContainer` and `WordCountReference` structs,
14+ // as well as various implementations relating them to `WordCount`.
15+ #[ derive( Columnar ) ]
16+ struct WordCount {
17+ text : String ,
18+ diff : i64 ,
19+ }
20+
1321fn main ( ) {
1422
15- use timely_container:: columnar:: Strings ;
16- type Container = Columnar < ( Strings , Vec < i64 > ) > ;
23+ type Container = Columnar < <WordCount as columnar:: Columnar >:: Container > ;
24+
25+ use columnar:: Len ;
1726
1827 // initializes and runs a timely dataflow.
1928 timely:: execute_from_args ( std:: env:: args ( ) , |worker| {
@@ -31,18 +40,18 @@ fn main() {
3140 move |input, output| {
3241 while let Some ( ( time, data) ) = input. next ( ) {
3342 let mut session = output. session ( & time) ;
34- for ( text , diff ) in data. iter ( ) . flat_map ( |( text , diff ) | {
35- text. split_whitespace ( ) . map ( move |s| ( s , diff) )
43+ for wordcount in data. iter ( ) . flat_map ( |wordcount | {
44+ wordcount . text . split_whitespace ( ) . map ( move |text| WordCountReference { text , diff : wordcount . diff } )
3645 } ) {
37- session. give ( ( text , diff ) ) ;
46+ session. give ( wordcount ) ;
3847 }
3948 }
4049 }
4150 } ,
4251 )
4352 . container :: < Container > ( )
4453 . unary_frontier (
45- ExchangeCore :: new ( |( s , _ ) : & ( & str , _ ) | s . len ( ) as u64 ) ,
54+ ExchangeCore :: new ( |x : & WordCountReference < & str , & i64 > | x . text . len ( ) as u64 ) ,
4655 "WordCount" ,
4756 |_capability, _info| {
4857 let mut queues = HashMap :: new ( ) ;
@@ -60,17 +69,17 @@ fn main() {
6069 if !input. frontier ( ) . less_equal ( key. time ( ) ) {
6170 let mut session = output. session ( key) ;
6271 for batch in val. drain ( ..) {
63- for ( word , diff ) in batch. iter ( ) {
72+ for wordcount in batch. iter ( ) {
6473 let total =
65- if let Some ( count) = counts. get_mut ( word ) {
66- * count += diff;
74+ if let Some ( count) = counts. get_mut ( wordcount . text ) {
75+ * count += wordcount . diff ;
6776 * count
6877 }
6978 else {
70- counts. insert ( word . to_string ( ) , * diff) ;
71- * diff
79+ counts. insert ( wordcount . text . to_string ( ) , * wordcount . diff ) ;
80+ * wordcount . diff
7281 } ;
73- session. give ( ( word , total) ) ;
82+ session. give ( WordCountReference { text : wordcount . text , diff : total } ) ;
7483 }
7584 }
7685 }
@@ -87,7 +96,7 @@ fn main() {
8796
8897 // introduce data and watch!
8998 for round in 0 ..10 {
90- input. send ( ( "flat container" , 1 ) ) ;
99+ input. send ( WordCountReference { text : "flat container" , diff : 1 } ) ;
91100 input. advance_to ( round + 1 ) ;
92101 while probe. less_than ( input. time ( ) ) {
93102 worker. step ( ) ;
0 commit comments