Skip to content

Commit b4dc3aa

Browse files
committed
Extend TheVoid to write completed times
1 parent 7afe6a4 commit b4dc3aa

File tree

1 file changed

+19
-7
lines changed

1 file changed

+19
-7
lines changed

src/sinks/mod.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
//! Types and operators to feed outputs into external systems.
22
3-
use std::time::Duration;
3+
use std::fs::File;
4+
use std::io::{LineWriter, Write};
5+
use std::time::{Duration, Instant};
46

57
use timely::dataflow::channels::pact::ParallelizationContract;
68
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
@@ -40,7 +42,7 @@ where
4042
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
4143
pub enum Sink {
4244
/// /dev/null, used for benchmarking
43-
TheVoid,
45+
TheVoid(String),
4446
/// CSV files
4547
#[cfg(feature = "csv-source")]
4648
CsvFile(CsvFile),
@@ -75,18 +77,28 @@ impl Sinkable<Duration> for Sink {
7577
P: ParallelizationContract<S::Timestamp, ResultDiff<Duration>>,
7678
{
7779
match *self {
78-
Sink::TheVoid => {
79-
let mut builder = OperatorBuilder::new("TheVoid".to_owned(), stream.scope());
80+
Sink::TheVoid(ref name) => {
81+
let file = File::create(name).unwrap();
82+
let mut writer = LineWriter::new(file);
83+
84+
let mut builder = OperatorBuilder::new(name.to_owned(), stream.scope());
8085
let mut input = builder.new_input(stream, pact);
8186
let (_output, sunk) = builder.new_output();
8287

8388
builder.build(|_capabilities| {
89+
let mut t0 = Instant::now();
90+
let mut last = Duration::from_millis(0);
91+
8492
move |frontiers| {
85-
let mut input_handle =
86-
FrontieredInputHandle::new(&mut input, &frontiers[0]);
93+
let input_handle = FrontieredInputHandle::new(&mut input, &frontiers[0]);
8794

8895
if input_handle.frontier.is_empty() {
89-
println!("Inputs to void sink have ceased.");
96+
println!("[{:?}] inputs to void sink ceased", t0.elapsed());
97+
} else if !input_handle.frontier.frontier().less_equal(&last) {
98+
write!(writer, "{},{:?}\n", t0.elapsed().as_millis(), last).unwrap();
99+
100+
last = input_handle.frontier.frontier()[0].clone();
101+
t0 = Instant::now();
90102
}
91103
}
92104
});

0 commit comments

Comments
 (0)