Skip to content

Commit 54f4150

Browse files
Merge pull request #693 from frankmcsherry/remove_guarded_logging
Remove guarded logging
2 parents 7341835 + c4852d6 commit 54f4150

File tree

5 files changed

+9
-56
lines changed

5 files changed

+9
-56
lines changed

timely/src/dataflow/operators/generic/builder_rc.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_ha
2121
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
2222
use crate::dataflow::operators::generic::builder_raw::OperatorShape;
2323
use crate::progress::operate::PortConnectivity;
24-
use crate::logging::TimelyLogger as Logger;
2524

2625
use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
2726

@@ -35,22 +34,19 @@ pub struct OperatorBuilder<G: Scope> {
3534
/// For each input, a shared list of summaries to each output.
3635
summaries: Vec<Rc<RefCell<PortConnectivity<<G::Timestamp as Timestamp>::Summary>>>>,
3736
produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
38-
logging: Option<Logger>,
3937
}
4038

4139
impl<G: Scope> OperatorBuilder<G> {
4240

4341
/// Allocates a new generic operator builder from its containing scope.
4442
pub fn new(name: String, scope: G) -> Self {
45-
let logging = scope.logging();
4643
OperatorBuilder {
4744
builder: OperatorBuilderRaw::new(name, scope),
4845
frontier: Vec::new(),
4946
consumed: Vec::new(),
5047
internal: Rc::new(RefCell::new(Vec::new())),
5148
summaries: Vec::new(),
5249
produced: Vec::new(),
53-
logging,
5450
}
5551
}
5652

@@ -90,7 +86,7 @@ impl<G: Scope> OperatorBuilder<G> {
9086
let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
9187
self.summaries.push(Rc::clone(&shared_summary));
9288

93-
new_input_handle(input, Rc::clone(&self.internal), shared_summary, self.logging.clone())
89+
new_input_handle(input, Rc::clone(&self.internal), shared_summary)
9490
}
9591

9692
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.

timely/src/dataflow/operators/generic/handles.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use crate::dataflow::channels::Message;
1717
use crate::communication::{Push, Pull};
1818
use crate::{Container, Data};
1919
use crate::container::{ContainerBuilder, CapacityContainerBuilder};
20-
use crate::logging::TimelyLogger as Logger;
2120

2221
use crate::dataflow::operators::InputCapability;
2322
use crate::dataflow::operators::capability::CapabilityTrait;
@@ -31,7 +30,6 @@ pub struct InputHandleCore<T: Timestamp, C: Container, P: Pull<Message<T, C>>> {
3130
/// Each timestamp received through this input may only produce output timestamps
3231
/// greater or equal to the input timestamp subjected to at least one of these summaries.
3332
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
34-
logging: Option<Logger>,
3533
}
3634

3735
/// Handle to an operator's input stream, specialized to vectors.
@@ -82,13 +80,9 @@ impl<T: Timestamp, C: Container, P: Pull<Message<T, C>>> InputHandleCore<T, C, P
8280
/// ```
8381
#[inline]
8482
pub fn for_each<F: FnMut(InputCapability<T>, &mut C)>(&mut self, mut logic: F) {
85-
let mut logging = self.logging.take();
8683
while let Some((cap, data)) = self.next() {
87-
logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true }));
8884
logic(cap, data);
89-
logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: false }));
9085
}
91-
self.logging = logging;
9286
}
9387

9488
}
@@ -150,13 +144,11 @@ pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
150144
pull_counter: PullCounter<T, C, P>,
151145
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
152146
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
153-
logging: Option<Logger>
154147
) -> InputHandleCore<T, C, P> {
155148
InputHandleCore {
156149
pull_counter,
157150
internal,
158151
summaries,
159-
logging,
160152
}
161153
}
162154

timely/src/dataflow/operators/generic/notificator.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::progress::frontier::{AntichainRef, MutableAntichain};
22
use crate::progress::Timestamp;
33
use crate::dataflow::operators::Capability;
4-
use crate::logging::TimelyLogger as Logger;
54

65
/// Tracks requests for notification and delivers available notifications.
76
///
@@ -18,7 +17,6 @@ use crate::logging::TimelyLogger as Logger;
1817
pub struct Notificator<'a, T: Timestamp> {
1918
frontiers: &'a [&'a MutableAntichain<T>],
2019
inner: &'a mut FrontierNotificator<T>,
21-
logging: &'a Option<Logger>,
2220
}
2321

2422
impl<'a, T: Timestamp> Notificator<'a, T> {
@@ -28,14 +26,13 @@ impl<'a, T: Timestamp> Notificator<'a, T> {
2826
pub fn new(
2927
frontiers: &'a [&'a MutableAntichain<T>],
3028
inner: &'a mut FrontierNotificator<T>,
31-
logging: &'a Option<Logger>) -> Self {
29+
) -> Self {
3230

3331
inner.make_available(frontiers);
3432

3533
Notificator {
3634
frontiers,
3735
inner,
38-
logging,
3936
}
4037
}
4138

@@ -82,9 +79,7 @@ impl<'a, T: Timestamp> Notificator<'a, T> {
8279
#[inline]
8380
pub fn for_each<F: FnMut(Capability<T>, u64, &mut Notificator<T>)>(&mut self, mut logic: F) {
8481
while let Some((cap, count)) = self.next() {
85-
self.logging.as_ref().map(|l| l.log(crate::logging::GuardedProgressEvent { is_start: true }));
8682
logic(cap, count, self);
87-
self.logging.as_ref().map(|l| l.log(crate::logging::GuardedProgressEvent { is_start: false }));
8883
}
8984
}
9085
}
@@ -117,8 +112,6 @@ fn notificator_delivers_notifications_in_topo_order() {
117112

118113
let root_capability = Capability::new(Product::new(0,0), Rc::new(RefCell::new(ChangeBatch::new())));
119114

120-
let logging = None;//::logging::new_inactive_logger();
121-
122115
// notificator.update_frontier_from_cm(&mut vec![ChangeBatch::new_from(ts_from_tuple((0, 0)), 1)]);
123116
let times = [
124117
Product::new(3, 5),
@@ -136,14 +129,14 @@ fn notificator_delivers_notifications_in_topo_order() {
136129
let mut frontier_notificator = FrontierNotificator::from(times.iter().map(|t| root_capability.delayed(t)));
137130

138131
// the frontier is initially (0,0), and so we should deliver no notifications.
139-
assert!(frontier_notificator.monotonic(&[&frontier], &logging).next().is_none());
132+
assert!(frontier_notificator.monotonic(&[&frontier]).next().is_none());
140133

141134
// advance the frontier to [(5,7), (6,0)], opening up some notifications.
142135
frontier.update_iter(vec![(Product::new(0,0),-1), (Product::new(5,7), 1), (Product::new(6,1), 1)]);
143136

144137
{
145138
let frontiers = [&frontier];
146-
let mut notificator = frontier_notificator.monotonic(&frontiers, &logging);
139+
let mut notificator = frontier_notificator.monotonic(&frontiers);
147140

148141
// we should deliver the following available notifications, in this order.
149142
assert_eq!(notificator.next().unwrap().0.time(), &Product::new(1,1));
@@ -159,7 +152,7 @@ fn notificator_delivers_notifications_in_topo_order() {
159152

160153
{
161154
let frontiers = [&frontier];
162-
let mut notificator = frontier_notificator.monotonic(&frontiers, &logging);
155+
let mut notificator = frontier_notificator.monotonic(&frontiers);
163156

164157
// the first available notification should be (5,8). Note: before (6,0) in the total order, but not
165158
// in the partial order. We don't make the promise that we respect the total order.
@@ -380,8 +373,8 @@ impl<T: Timestamp> FrontierNotificator<T> {
380373
/// This implementation can be emulated with judicious use of `make_available` and `notify_at_frontiered`,
381374
/// in the event that `Notificator` provides too restrictive an interface.
382375
#[inline]
383-
pub fn monotonic<'a>(&'a mut self, frontiers: &'a [&'a MutableAntichain<T>], logging: &'a Option<Logger>) -> Notificator<'a, T> {
384-
Notificator::new(frontiers, self, logging)
376+
pub fn monotonic<'a>(&'a mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Notificator<'a, T> {
377+
Notificator::new(frontiers, self)
385378
}
386379

387380
/// Iterates over pending capabilities and their count. The count represents how often a

timely/src/dataflow/operators/generic/operator.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -350,10 +350,9 @@ impl<G: Scope, C1: Container + Data> Operator<G, C1> for StreamCore<G, C1> {
350350
notificator.notify_at(capability.delayed(&time));
351351
}
352352

353-
let logging = self.scope().logging();
354353
move |input, output| {
355354
let frontier = &[input.frontier()];
356-
let notificator = &mut Notificator::new(frontier, &mut notificator, &logging);
355+
let notificator = &mut Notificator::new(frontier, &mut notificator);
357356
logic(input.handle, output, notificator);
358357
}
359358
})
@@ -436,10 +435,9 @@ impl<G: Scope, C1: Container + Data> Operator<G, C1> for StreamCore<G, C1> {
436435
notificator.notify_at(capability.delayed(&time));
437436
}
438437

439-
let logging = self.scope().logging();
440438
move |input1, input2, output| {
441439
let frontiers = &[input1.frontier(), input2.frontier()];
442-
let notificator = &mut Notificator::new(frontiers, &mut notificator, &logging);
440+
let notificator = &mut Notificator::new(frontiers, &mut notificator);
443441
logic(input1.handle, input2.handle, output, notificator);
444442
}
445443
})

timely/src/logging.rs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -181,20 +181,6 @@ pub struct ApplicationEvent {
181181
pub is_start: bool,
182182
}
183183

184-
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
185-
/// Application-defined code start or stop
186-
pub struct GuardedMessageEvent {
187-
/// `true` when activity begins, `false` when it stops
188-
pub is_start: bool,
189-
}
190-
191-
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
192-
/// Application-defined code start or stop
193-
pub struct GuardedProgressEvent {
194-
/// `true` when activity begins, `false` when it stops
195-
pub is_start: bool,
196-
}
197-
198184
#[derive(Serialize, Deserialize, Columnar, Debug, PartialEq, Eq, Hash, Clone, Copy)]
199185
/// Identifier of the worker that generated a log line
200186
pub struct TimelySetup {
@@ -260,10 +246,6 @@ pub enum TimelyEvent {
260246
Shutdown(ShutdownEvent),
261247
/// No clue.
262248
Application(ApplicationEvent),
263-
/// Per-message computation.
264-
GuardedMessage(GuardedMessageEvent),
265-
/// Per-notification computation.
266-
GuardedProgress(GuardedProgressEvent),
267249
/// Communication channel event.
268250
CommChannels(CommChannelsEvent),
269251
/// Input event.
@@ -302,14 +284,6 @@ impl From<ApplicationEvent> for TimelyEvent {
302284
fn from(v: ApplicationEvent) -> TimelyEvent { TimelyEvent::Application(v) }
303285
}
304286

305-
impl From<GuardedMessageEvent> for TimelyEvent {
306-
fn from(v: GuardedMessageEvent) -> TimelyEvent { TimelyEvent::GuardedMessage(v) }
307-
}
308-
309-
impl From<GuardedProgressEvent> for TimelyEvent {
310-
fn from(v: GuardedProgressEvent) -> TimelyEvent { TimelyEvent::GuardedProgress(v) }
311-
}
312-
313287
impl From<CommChannelsEvent> for TimelyEvent {
314288
fn from(v: CommChannelsEvent) -> TimelyEvent { TimelyEvent::CommChannels(v) }
315289
}

0 commit comments

Comments
 (0)