@@ -5,8 +5,8 @@ use std::io::{LineWriter, Write};
55use std:: time:: { Duration , Instant } ;
66
77use timely:: dataflow:: channels:: pact:: ParallelizationContract ;
8- use timely:: dataflow:: operators:: generic:: builder_rc :: OperatorBuilder ;
9- use timely:: dataflow:: operators:: generic:: FrontieredInputHandle ;
8+ use timely:: dataflow:: operators:: generic:: Operator ;
9+ use timely:: dataflow:: operators:: generic:: OutputHandle ;
1010use timely:: dataflow:: { Scope , Stream } ;
1111use timely:: order:: TotalOrder ;
1212use timely:: progress:: Timestamp ;
4242#[ derive( Hash , PartialEq , Eq , PartialOrd , Ord , Clone , Debug , Serialize , Deserialize ) ]
4343pub enum Sink {
4444 /// /dev/null, used for benchmarking
45- TheVoid ( String ) ,
45+ TheVoid ( Option < String > ) ,
4646 /// CSV files
4747 #[ cfg( feature = "csv-source" ) ]
4848 CsvFile ( CsvFile ) ,
@@ -77,27 +77,37 @@ impl Sinkable<Duration> for Sink {
7777 P : ParallelizationContract < S :: Timestamp , ResultDiff < Duration > > ,
7878 {
7979 match * self {
80- Sink :: TheVoid ( ref name) => {
81- let file = File :: create ( name) . unwrap ( ) ;
82- let mut writer = LineWriter :: new ( file) ;
83-
84- let mut builder = OperatorBuilder :: new ( name. to_owned ( ) , stream. scope ( ) ) ;
85- let mut input = builder. new_input ( stream, pact) ;
86- let ( _output, sunk) = builder. new_output ( ) ;
87-
88- builder. build ( |_capabilities| {
89- let mut t0 = Instant :: now ( ) ;
90- let mut last = Duration :: from_millis ( 0 ) ;
91-
92- move |frontiers| {
93- let input_handle = FrontieredInputHandle :: new ( & mut input, & frontiers[ 0 ] ) ;
94-
95- if input_handle. frontier . is_empty ( ) {
80+ Sink :: TheVoid ( ref filename) => {
81+ let mut writer = match * filename {
82+ None => None ,
83+ Some ( ref filename) => {
84+ let file = File :: create ( filename. to_owned ( ) ) . unwrap ( ) ;
85+ Some ( LineWriter :: new ( file) )
86+ }
87+ } ;
88+
89+ let mut t0 = Instant :: now ( ) ;
90+ let mut last = Duration :: from_millis ( 0 ) ;
91+ let mut buffer = Vec :: new ( ) ;
92+
93+ let sunk = stream. unary_frontier ( pact, "TheVoid" , move |_cap, _info| {
94+ move |input, _output : & mut OutputHandle < _ , ResultDiff < Duration > , _ > | {
95+ let mut received_input = false ;
96+ input. for_each ( |time, data| {
97+ data. swap ( & mut buffer) ;
98+ received_input = !buffer. is_empty ( ) ;
99+ buffer. clear ( ) ;
100+ } ) ;
101+
102+ if input. frontier . is_empty ( ) {
96103 println ! ( "[{:?}] inputs to void sink ceased" , t0. elapsed( ) ) ;
97- } else if !input_handle. frontier . frontier ( ) . less_equal ( & last) {
98- write ! ( writer, "{},{:?}\n " , t0. elapsed( ) . as_millis( ) , last) . unwrap ( ) ;
104+ } else if received_input && !input. frontier . frontier ( ) . less_equal ( & last) {
105+ if let Some ( ref mut writer) = & mut writer {
106+ write ! ( writer, "{},{:?}\n " , t0. elapsed( ) . as_millis( ) , last)
107+ . unwrap ( ) ;
108+ }
99109
100- last = input_handle . frontier . frontier ( ) [ 0 ] . clone ( ) ;
110+ last = input . frontier . frontier ( ) [ 0 ] . clone ( ) ;
101111 t0 = Instant :: now ( ) ;
102112 }
103113 }
0 commit comments