Skip to content

Commit 545cc31

Browse files
committed
Log action can distinguish data from flush
Previously, the log action would receive an empty container when we'd like to report progress but no new data. We'd only send the empty container when we didn't have any other data to send. This makes it hard for users to distinguish between flushing and just data updates. It might be important to distinguish the two because only on flush we might not be called again for a while, but otherwise it's very likely that the logger might receive more data, or sees a flush. This change alters the signature of the action to accept a `&mut Option<C>` (where `C` is `CB::Container`), and we pass `Some(container)` on data, and `None` on flush. Clients using the logging API need to change their implementation, as both vectors and `Option` offer a `iter` function, but with different results. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 3b32df5 commit 545cc31

File tree

2 files changed

+67
-38
lines changed

2 files changed

+67
-38
lines changed

logging/src/lib.rs

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use std::time::{Instant, Duration};
77
use std::fmt::{self, Debug};
88
use std::marker::PhantomData;
99

10-
use timely_container::{ContainerBuilder, PushInto};
10+
use timely_container::{Container, ContainerBuilder, PushInto};
1111

12+
/// A registry binding names to typed loggers.
1213
pub struct Registry {
1314
/// A map from names to typed loggers.
1415
map: HashMap<String, (Box<dyn Any>, Box<dyn Flush>)>,
@@ -28,7 +29,9 @@ impl Registry {
2829
/// seen (likely greater or equal to the timestamp of the last event). The end of a
2930
/// logging stream is indicated only by dropping the associated action, which can be
3031
/// accomplished with `remove` (or a call to insert, though this is not recommended).
31-
pub fn insert<CB: ContainerBuilder, F: FnMut(&Duration, &mut CB::Container)+'static>(
32+
///
33+
/// Passing a `&mut None` container to an action indicates a flush.
34+
pub fn insert<CB: ContainerBuilder, F: FnMut(&Duration, &mut Option<CB::Container>)+'static>(
3235
&mut self,
3336
name: &str,
3437
action: F) -> Option<Box<dyn Any>>
@@ -84,7 +87,7 @@ impl Flush for Registry {
8487

8588
/// A buffering logger.
8689
pub struct Logger<CB: ContainerBuilder> {
87-
inner: Rc<RefCell<LoggerInner<CB, dyn FnMut(&Duration, &mut CB::Container)>>>,
90+
inner: Rc<RefCell<LoggerInner<CB, dyn FnMut(&Duration, &mut Option<CB::Container>)>>>,
8891
}
8992

9093
impl<CB: ContainerBuilder> Clone for Logger<CB> {
@@ -103,25 +106,25 @@ impl<CB: ContainerBuilder + Debug> Debug for Logger<CB> {
103106
}
104107
}
105108

106-
struct LoggerInner<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut CB::Container)> {
109+
struct LoggerInner<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> {
107110
/// common instant used for all loggers.
108111
time: Instant,
109112
/// offset to allow re-calibration.
110113
offset: Duration,
111114
/// container builder to produce buffers of accumulated log events
112115
builder: CB,
113116
/// True if we logged an event since the last flush.
114-
/// Used to avoid sending empty buffers on drop.
117+
/// Used to avoid flushing buffers on drop.
115118
dirty: bool,
116-
/// action to take on full log buffers.
119+
/// action to take on full log buffers, or on flush.
117120
action: A,
118121
}
119122

120123
impl<CB: ContainerBuilder> Logger<CB> {
121124
/// Allocates a new shareable logger bound to a write destination.
122125
pub fn new<F>(time: Instant, offset: Duration, action: F) -> Self
123126
where
124-
F: FnMut(&Duration, &mut CB::Container)+'static
127+
F: FnMut(&Duration, &mut Option<CB::Container>)+'static
125128
{
126129
let inner = LoggerInner {
127130
time,
@@ -234,7 +237,7 @@ impl<CB: ContainerBuilder, T> std::ops::Deref for TypedLogger<CB, T> {
234237
}
235238
}
236239

237-
impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut CB::Container)> LoggerInner<CB, A> {
240+
impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> LoggerInner<CB, A> {
238241
fn log_many<I>(&mut self, events: I)
239242
where I: IntoIterator, CB: PushInto<(Duration, I::Item)>,
240243
{
@@ -243,31 +246,37 @@ impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut CB::Container)> Log
243246
self.dirty = true;
244247
self.builder.push_into((elapsed, event.into()));
245248
while let Some(container) = self.builder.extract() {
246-
(self.action)(&elapsed, container);
249+
let mut c = Some(std::mem::take(container));
250+
(self.action)(&elapsed, &mut c);
251+
if let Some(mut c) = c {
252+
c.clear();
253+
*container = c;
254+
}
247255
}
248256
}
249257
}
250258

251259
fn flush(&mut self) {
252260
let elapsed = self.time.elapsed() + self.offset;
253261

254-
let mut action_ran = false;
255-
while let Some(buffer) = self.builder.finish() {
256-
(self.action)(&elapsed, buffer);
257-
action_ran = true;
262+
while let Some(container) = self.builder.finish() {
263+
let mut c = Some(std::mem::take(container));
264+
(self.action)(&elapsed, &mut c);
265+
if let Some(mut c) = c {
266+
c.clear();
267+
*container = c;
268+
}
258269
}
259270

260-
if !action_ran {
261-
// Send an empty container to indicate progress.
262-
(self.action)(&elapsed, &mut CB::Container::default());
263-
}
271+
// Send no container to indicate flush.
272+
(self.action)(&elapsed, &mut None);
264273

265274
self.dirty = false;
266275
}
267276
}
268277

269278
/// Flush on the *last* drop of a logger.
270-
impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut CB::Container)> Drop for LoggerInner<CB, A> {
279+
impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Drop for LoggerInner<CB, A> {
271280
fn drop(&mut self) {
272281
// Avoid sending out empty buffers just because of drops.
273282
if self.dirty {
@@ -276,7 +285,7 @@ impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut CB::Container)> Dro
276285
}
277286
}
278287

279-
impl<CB, A: ?Sized + FnMut(&Duration, &mut CB::Container)> Debug for LoggerInner<CB, A>
288+
impl<CB, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Debug for LoggerInner<CB, A>
280289
where
281290
CB: ContainerBuilder + Debug,
282291
{

timely/examples/logging-send.rs

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,37 @@ fn main() {
1616

1717
// Register timely worker logging.
1818
worker.log_register().insert::<TimelyEventBuilder,_>("timely", |_time, data|
19-
data.iter().for_each(|x| println!("LOG1: {:?}", x))
19+
if let Some(data) = data {
20+
data.iter().for_each(|x| println!("LOG1: {:?}", x))
21+
}
22+
else {
23+
println!("LOG1: Flush");
24+
}
2025
);
2126

2227
// Register timely progress logging.
2328
// Less generally useful: intended for debugging advanced custom operators or timely
2429
// internals.
2530
worker.log_register().insert::<TimelyProgressEventBuilder,_>("timely/progress", |_time, data|
26-
data.iter().for_each(|x| {
27-
println!("PROGRESS: {:?}", x);
28-
let (_, ev) = x;
29-
print!("PROGRESS: TYPED MESSAGES: ");
30-
for (n, p, t, d) in ev.messages.iter() {
31-
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
32-
}
33-
println!();
34-
print!("PROGRESS: TYPED INTERNAL: ");
35-
for (n, p, t, d) in ev.internal.iter() {
36-
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
37-
}
38-
println!();
39-
})
31+
if let Some(data) = data {
32+
data.iter().for_each(|x| {
33+
println!("PROGRESS: {:?}", x);
34+
let (_, ev) = x;
35+
print!("PROGRESS: TYPED MESSAGES: ");
36+
for (n, p, t, d) in ev.messages.iter() {
37+
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
38+
}
39+
println!();
40+
print!("PROGRESS: TYPED INTERNAL: ");
41+
for (n, p, t, d) in ev.internal.iter() {
42+
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
43+
}
44+
println!();
45+
})
46+
}
47+
else {
48+
println!("PROGRESS: Flush");
49+
}
4050
);
4151

4252
// create a new input, exchange data, and inspect its output
@@ -49,7 +59,12 @@ fn main() {
4959

5060
// Register timely worker logging.
5161
worker.log_register().insert::<TimelyEventBuilder,_>("timely", |_time, data|
52-
data.iter().for_each(|x| println!("LOG2: {:?}", x))
62+
if let Some(data) = data {
63+
data.iter().for_each(|x| println!("LOG2: {:?}", x))
64+
}
65+
else {
66+
println!("LOG2: Flush");
67+
}
5368
);
5469

5570
// create a new input, exchange data, and inspect its output
@@ -62,9 +77,14 @@ fn main() {
6277

6378
// Register user-level logging.
6479
type MyBuilder = CapacityContainerBuilder<Vec<(Duration, ())>>;
65-
worker.log_register().insert::<MyBuilder,_>("input", |_time, data|
66-
for element in data.iter() {
67-
println!("Round tick at: {:?}", element.0);
80+
worker.log_register().insert::<MyBuilder,_>("input", |time, data|
81+
if let Some(data) = data {
82+
for element in data.iter() {
83+
println!("Round tick at: {:?}", element.0);
84+
}
85+
}
86+
else {
87+
println!("Round flush at: {time:?}");
6888
}
6989
);
7090

0 commit comments

Comments
 (0)