Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 28 additions & 30 deletions logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use std::time::{Instant, Duration};
use std::fmt::{self, Debug};
use std::marker::PhantomData;

use timely_container::{ContainerBuilder, PushInto};
use timely_container::{Container, ContainerBuilder, PushInto};

/// A registry binding names to typed loggers.
pub struct Registry {
/// A map from names to typed loggers.
map: HashMap<String, (Box<dyn Any>, Box<dyn Flush>)>,
Expand All @@ -28,7 +29,9 @@ impl Registry {
/// seen (likely greater or equal to the timestamp of the last event). The end of a
/// logging stream is indicated only by dropping the associated action, which can be
/// accomplished with `remove` (or a call to insert, though this is not recommended).
pub fn insert<CB: ContainerBuilder, F: FnMut(&Duration, &mut CB::Container)+'static>(
///
/// Passing a `&mut None` container to an action indicates a flush.
pub fn insert<CB: ContainerBuilder, F: FnMut(&Duration, &mut Option<CB::Container>)+'static>(
&mut self,
name: &str,
action: F) -> Option<Box<dyn Any>>
Expand Down Expand Up @@ -84,7 +87,7 @@ impl Flush for Registry {

/// A buffering logger.
pub struct Logger<CB: ContainerBuilder> {
inner: Rc<RefCell<LoggerInner<CB, dyn FnMut(&Duration, &mut CB::Container)>>>,
inner: Rc<RefCell<LoggerInner<CB, dyn FnMut(&Duration, &mut Option<CB::Container>)>>>,
}

impl<CB: ContainerBuilder> Clone for Logger<CB> {
Expand All @@ -103,32 +106,28 @@ impl<CB: ContainerBuilder + Debug> Debug for Logger<CB> {
}
}

struct LoggerInner<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut CB::Container)> {
struct LoggerInner<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> {
/// common instant used for all loggers.
time: Instant,
/// offset to allow re-calibration.
offset: Duration,
/// container builder to produce buffers of accumulated log events
builder: CB,
/// True if we logged an event since the last flush.
/// Used to avoid sending empty buffers on drop.
dirty: bool,
/// action to take on full log buffers.
/// action to take on full log buffers, or on flush.
action: A,
}

impl<CB: ContainerBuilder> Logger<CB> {
/// Allocates a new shareable logger bound to a write destination.
pub fn new<F>(time: Instant, offset: Duration, action: F) -> Self
where
F: FnMut(&Duration, &mut CB::Container)+'static
F: FnMut(&Duration, &mut Option<CB::Container>)+'static
{
let inner = LoggerInner {
time,
offset,
action,
builder: CB::default(),
dirty: false,
};
let inner = Rc::new(RefCell::new(inner));
Logger { inner }
Expand Down Expand Up @@ -234,57 +233,56 @@ impl<CB: ContainerBuilder, T> std::ops::Deref for TypedLogger<CB, T> {
}
}

impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut CB::Container)> LoggerInner<CB, A> {
impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> LoggerInner<CB, A> {
fn log_many<I>(&mut self, events: I)
where I: IntoIterator, CB: PushInto<(Duration, I::Item)>,
{
let elapsed = self.time.elapsed() + self.offset;
for event in events {
self.dirty = true;
self.builder.push_into((elapsed, event.into()));
while let Some(container) = self.builder.extract() {
(self.action)(&elapsed, container);
let mut c = Some(std::mem::take(container));
(self.action)(&elapsed, &mut c);
if let Some(mut c) = c {
c.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinion, but is this clearing new behavior? Does it e.g. prevent passing back owned data, into which the logger can write? Again, really no strong opinion, but just checking whether the force-clear is new, and whether it is intentional.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was purely defensive. Thinking about it, it should be the container builder's responsibility to enforce what needs to be true about a container after extracting or finishing it, so it doesn't make sense to have the clear call here.

*container = c;
}
}
}
}

fn flush(&mut self) {
let elapsed = self.time.elapsed() + self.offset;

let mut action_ran = false;
while let Some(buffer) = self.builder.finish() {
(self.action)(&elapsed, buffer);
action_ran = true;
}

if !action_ran {
// Send an empty container to indicate progress.
(self.action)(&elapsed, &mut CB::Container::default());
while let Some(container) = self.builder.finish() {
let mut c = Some(std::mem::take(container));
(self.action)(&elapsed, &mut c);
if let Some(mut c) = c {
c.clear();
*container = c;
}
}

self.dirty = false;
// Send no container to indicate flush.
(self.action)(&elapsed, &mut None);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nit, but one of the reasons to take a &mut Option<_> rather than a Option<&mut _> is to allow the None call to pass back resources, and at least with Push the intent is that you keep calling this as long as you get a non-None back. We don't have to do that here, and probably massively over-thinking this, but wanted to call out the gap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, but I'm wondering if this is actually true! Or put differently, is it something we'd like to be true, or is it something that we had at some point and forgot about?

The reason I'm asking is because most (all?) places where we call Push::done, we don't have a loop to drain resources. If I recall correctly, the only place where we loop is in Differential merge batchers to drain the stash of allocations once we're done merging chains.

We could change Push's done function to look like this instead:

    fn done(&mut self) { 
        let mut container = None;
        loop {
            self.push(&mut container);
            if container.is_none() { break; }
        }
    }

}
}

/// Flush on the *last* drop of a logger.
impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut CB::Container)> Drop for LoggerInner<CB, A> {
impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Drop for LoggerInner<CB, A> {
fn drop(&mut self) {
// Avoid sending out empty buffers just because of drops.
if self.dirty {
self.flush();
}
self.flush();
}
}

impl<CB, A: ?Sized + FnMut(&Duration, &mut CB::Container)> Debug for LoggerInner<CB, A>
impl<CB, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Debug for LoggerInner<CB, A>
where
CB: ContainerBuilder + Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LoggerInner")
.field("time", &self.time)
.field("offset", &self.offset)
.field("dirty", &self.dirty)
.field("action", &"FnMut")
.field("builder", &self.builder)
.finish()
Expand Down
64 changes: 42 additions & 22 deletions timely/examples/logging-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,38 @@ fn main() {
let mut probe = ProbeHandle::new();

// Register timely worker logging.
worker.log_register().insert::<TimelyEventBuilder,_>("timely", |_time, data|
data.iter().for_each(|x| println!("LOG1: {:?}", x))
worker.log_register().insert::<TimelyEventBuilder,_>("timely", |time, data|
if let Some(data) = data {
data.iter().for_each(|x| println!("LOG1: {:?}", x))
}
else {
println!("LOG1: Flush {time:?}");
}
);

// Register timely progress logging.
// Less generally useful: intended for debugging advanced custom operators or timely
// internals.
worker.log_register().insert::<TimelyProgressEventBuilder,_>("timely/progress", |_time, data|
data.iter().for_each(|x| {
println!("PROGRESS: {:?}", x);
let (_, ev) = x;
print!("PROGRESS: TYPED MESSAGES: ");
for (n, p, t, d) in ev.messages.iter() {
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
}
println!();
print!("PROGRESS: TYPED INTERNAL: ");
for (n, p, t, d) in ev.internal.iter() {
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
}
println!();
})
worker.log_register().insert::<TimelyProgressEventBuilder,_>("timely/progress", |time, data|
if let Some(data) = data {
data.iter().for_each(|x| {
println!("PROGRESS: {:?}", x);
let (_, ev) = x;
print!("PROGRESS: TYPED MESSAGES: ");
for (n, p, t, d) in ev.messages.iter() {
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
}
println!();
print!("PROGRESS: TYPED INTERNAL: ");
for (n, p, t, d) in ev.internal.iter() {
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
}
println!();
})
}
else {
println!("PROGRESS: Flush {time:?}");
}
);

// create a new input, exchange data, and inspect its output
Expand All @@ -48,8 +58,13 @@ fn main() {
});

// Register timely worker logging.
worker.log_register().insert::<TimelyEventBuilder,_>("timely", |_time, data|
data.iter().for_each(|x| println!("LOG2: {:?}", x))
worker.log_register().insert::<TimelyEventBuilder,_>("timely", |time, data|
if let Some(data) = data {
data.iter().for_each(|x| println!("LOG2: {:?}", x))
}
else {
println!("LOG2: Flush {time:?}");
}
);

// create a new input, exchange data, and inspect its output
Expand All @@ -62,9 +77,14 @@ fn main() {

// Register user-level logging.
type MyBuilder = CapacityContainerBuilder<Vec<(Duration, ())>>;
worker.log_register().insert::<MyBuilder,_>("input", |_time, data|
for element in data.iter() {
println!("Round tick at: {:?}", element.0);
worker.log_register().insert::<MyBuilder,_>("input", |time, data|
if let Some(data) = data {
for element in data.iter() {
println!("Round tick at: {:?}", element.0);
}
}
else {
println!("Round flush at: {time:?}");
}
);

Expand Down
15 changes: 8 additions & 7 deletions timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@ pub type TimelyProgressLogger = crate::logging_core::Logger<TimelyProgressEventB
use std::time::Duration;
use columnar::Columnar;
use serde::{Deserialize, Serialize};

use crate::Container;
use crate::container::CapacityContainerBuilder;
use crate::dataflow::operators::capture::{Event, EventPusher};

/// Logs events as a timely stream, with progress statements.
pub struct BatchLogger<T, P> where P: EventPusher<Duration, Vec<(Duration, T)>> {
// None when the logging stream is closed
pub struct BatchLogger<P, C> where P: EventPusher<Duration, C> {
time: Duration,
event_pusher: P,
_phantom: ::std::marker::PhantomData<T>,
_phantom: ::std::marker::PhantomData<C>,
}

impl<T, P> BatchLogger<T, P> where P: EventPusher<Duration, Vec<(Duration, T)>> {
impl<P, C> BatchLogger<P, C> where P: EventPusher<Duration, C>, C: Container {
/// Creates a new batch logger.
pub fn new(event_pusher: P) -> Self {
BatchLogger {
Expand All @@ -35,8 +36,8 @@ impl<T, P> BatchLogger<T, P> where P: EventPusher<Duration, Vec<(Duration, T)>>
}
}
/// Publishes a batch of logged events and advances the capability.
pub fn publish_batch(&mut self, &time: &Duration, data: &mut Vec<(Duration, T)>) {
if !data.is_empty() {
pub fn publish_batch(&mut self, &time: &Duration, data: &mut Option<C>) {
if let Some(data) = data {
self.event_pusher.push(Event::Messages(self.time, std::mem::take(data)));
}
if self.time < time {
Expand All @@ -47,7 +48,7 @@ impl<T, P> BatchLogger<T, P> where P: EventPusher<Duration, Vec<(Duration, T)>>
self.time = time;
}
}
impl<T, P> Drop for BatchLogger<T, P> where P: EventPusher<Duration, Vec<(Duration, T)>> {
impl<P, C> Drop for BatchLogger<P, C> where P: EventPusher<Duration, C> {
fn drop(&mut self) {
self.event_pusher.push(Event::Progress(vec![(self.time, -1)]));
}
Expand Down
Loading