Skip to content

Commit 4cedcb1

Browse files
committed
Remove the identifier from logging
This changes the data received from logging from (Duration, Id, Data) to (Duration, Data), saving the space for Id. In most cases, the Id can be derived from the context where the log data is processed. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent b990fab commit 4cedcb1

File tree

9 files changed

+45
-54
lines changed

9 files changed

+45
-54
lines changed

communication/src/allocator/zero_copy/initialize.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub fn initialize_networking(
3939
my_index: usize,
4040
threads: usize,
4141
noisy: bool,
42-
log_sender: Box<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>)
42+
log_sender: Box<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent>>+Send+Sync>)
4343
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
4444
{
4545
let sockets = create_sockets(addresses, my_index, noisy)?;
@@ -57,7 +57,7 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static>(
5757
mut sockets: Vec<Option<S>>,
5858
my_index: usize,
5959
threads: usize,
60-
log_sender: Box<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>)
60+
log_sender: Box<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent>>+Send+Sync>)
6161
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
6262
{
6363
// Sockets are expected to be blocking,

communication/src/allocator/zero_copy/tcp.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use super::stream::Stream;
1111

1212
use logging_core::Logger;
1313

14-
use crate::logging::{CommunicationEvent, CommunicationSetup, MessageEvent, StateEvent};
14+
use crate::logging::{CommunicationEvent, MessageEvent, StateEvent};
1515

1616
fn tcp_panic(context: &'static str, cause: io::Error) -> ! {
1717
// NOTE: some downstream crates sniff out "timely communication error:" from
@@ -35,7 +35,7 @@ pub fn recv_loop<S>(
3535
worker_offset: usize,
3636
process: usize,
3737
remote: usize,
38-
mut logger: Option<Logger<CommunicationEvent, CommunicationSetup>>)
38+
mut logger: Option<Logger<CommunicationEvent>>)
3939
where
4040
S: Stream,
4141
{
@@ -134,7 +134,7 @@ pub fn send_loop<S: Stream>(
134134
sources: Vec<Sender<MergeQueue>>,
135135
process: usize,
136136
remote: usize,
137-
mut logger: Option<Logger<CommunicationEvent, CommunicationSetup>>)
137+
mut logger: Option<Logger<CommunicationEvent>>)
138138
{
139139

140140
// Log the send thread's start.

communication/src/initialize.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub enum Config {
3838
/// Verbosely report connection process
3939
report: bool,
4040
/// Closure to create a new logger for a communication thread
41-
log_fn: Box<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEvent, CommunicationSetup>> + Send + Sync>,
41+
log_fn: Box<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEvent>> + Send + Sync>,
4242
}
4343
}
4444

logging/src/lib.rs

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,14 @@ use std::collections::HashMap;
66
use std::time::{Instant, Duration};
77
use std::fmt::{self, Debug};
88

9-
pub struct Registry<Id> {
10-
/// A worker-specific identifier.
11-
id: Id,
9+
pub struct Registry {
1210
/// A map from names to typed loggers.
1311
map: HashMap<String, (Box<dyn Any>, Box<dyn Flush>)>,
1412
/// An instant common to all logging statements.
1513
time: Instant,
1614
}
1715

18-
impl<Id: Clone+'static> Registry<Id> {
16+
impl Registry {
1917
/// Binds a log name to an action on log event batches.
2018
///
2119
/// This method also returns any pre-installed action, rather than overwriting it
@@ -27,20 +25,20 @@ impl<Id: Clone+'static> Registry<Id> {
2725
/// seen (likely greater or equal to the timestamp of the last event). The end of a
2826
/// logging stream is indicated only by dropping the associated action, which can be
2927
/// accomplished with `remove` (or a call to insert, though this is not recommended).
30-
pub fn insert<T: 'static, F: FnMut(&Duration, &mut Vec<(Duration, Id, T)>)+'static>(
28+
pub fn insert<T: 'static, F: FnMut(&Duration, &mut Vec<(Duration, T)>)+'static>(
3129
&mut self,
3230
name: &str,
3331
action: F) -> Option<Box<dyn Any>>
3432
{
35-
let logger = Logger::<T, Id>::new(self.time, Duration::default(), self.id.clone(), action);
33+
let logger = Logger::<T>::new(self.time, Duration::default(), action);
3634
self.insert_logger(name, logger)
3735
}
3836

3937
/// Binds a log name to a logger.
4038
pub fn insert_logger<T: 'static>(
4139
&mut self,
4240
name: &str,
43-
logger: Logger<T, Id>) -> Option<Box<dyn Any>>
41+
logger: Logger<T>) -> Option<Box<dyn Any>>
4442
{
4543
self.map.insert(name.to_owned(), (Box::new(logger.clone()), Box::new(logger))).map(|x| x.0)
4644
}
@@ -56,17 +54,16 @@ impl<Id: Clone+'static> Registry<Id> {
5654
}
5755

5856
/// Retrieves a shared logger, if one has been inserted.
59-
pub fn get<T: 'static>(&self, name: &str) -> Option<Logger<T, Id>> {
57+
pub fn get<T: 'static>(&self, name: &str) -> Option<Logger<T>> {
6058
self.map
6159
.get(name)
62-
.and_then(|entry| entry.0.downcast_ref::<Logger<T, Id>>())
60+
.and_then(|entry| entry.0.downcast_ref::<Logger<T>>())
6361
.map(|x| (*x).clone())
6462
}
6563

6664
/// Creates a new logger registry.
67-
pub fn new(time: Instant, id: Id) -> Self {
65+
pub fn new(time: Instant) -> Self {
6866
Registry {
69-
id,
7067
time,
7168
map: HashMap::new(),
7269
}
@@ -78,7 +75,7 @@ impl<Id: Clone+'static> Registry<Id> {
7875
}
7976
}
8077

81-
impl<Id> Flush for Registry<Id> {
78+
impl Flush for Registry {
8279
fn flush(&mut self) {
8380
for value in self.map.values_mut() {
8481
value.1.flush();
@@ -88,42 +85,40 @@ impl<Id> Flush for Registry<Id> {
8885

8986
/// A buffering logger.
9087
#[derive(Debug)]
91-
pub struct Logger<T, E> {
92-
inner: Rc<RefCell<LoggerInner<T, E, dyn FnMut(&Duration, &mut Vec<(Duration, E, T)>)>>>,
88+
pub struct Logger<T> {
89+
inner: Rc<RefCell<LoggerInner<T, dyn FnMut(&Duration, &mut Vec<(Duration, T)>)>>>,
9390
}
9491

95-
impl<T, E: Clone> Clone for Logger<T, E> {
92+
impl<T> Clone for Logger<T> {
9693
fn clone(&self) -> Self {
9794
Self {
9895
inner: self.inner.clone()
9996
}
10097
}
10198
}
10299

103-
struct LoggerInner<T, E, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> {
104-
id: E,
100+
struct LoggerInner<T, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, T)>)> {
105101
/// common instant used for all loggers.
106102
time: Instant,
107103
/// offset to allow re-calibration.
108104
offset: Duration,
109105
/// shared buffer of accumulated log events
110-
buffer: Vec<(Duration, E, T)>,
106+
buffer: Vec<(Duration, T)>,
111107
/// action to take on full log buffers.
112108
action: A,
113109
}
114110

115-
impl<T, E: Clone> Logger<T, E> {
111+
impl<T> Logger<T> {
116112
/// Allocates a new shareable logger bound to a write destination.
117-
pub fn new<F>(time: Instant, offset: Duration, id: E, action: F) -> Self
113+
pub fn new<F>(time: Instant, offset: Duration, action: F) -> Self
118114
where
119-
F: FnMut(&Duration, &mut Vec<(Duration, E, T)>)+'static
115+
F: FnMut(&Duration, &mut Vec<(Duration, T)>)+'static
120116
{
121117
let inner = LoggerInner {
122-
id,
123118
time,
124119
offset,
125120
action,
126-
buffer: Vec::with_capacity(LoggerInner::<T, E, F>::buffer_capacity()),
121+
buffer: Vec::with_capacity(LoggerInner::<T, F>::buffer_capacity()),
127122
};
128123
let inner = Rc::new(RefCell::new(inner));
129124
Logger { inner }
@@ -167,7 +162,7 @@ impl<T, E: Clone> Logger<T, E> {
167162
}
168163
}
169164

170-
impl<T, E: Clone, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> LoggerInner<T, E, A> {
165+
impl<T, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, T)>)> LoggerInner<T, A> {
171166

172167
/// The upper limit for buffers to allocate, size in bytes. [Self::buffer_capacity] converts
173168
/// this to size in elements.
@@ -177,7 +172,7 @@ impl<T, E: Clone, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> Logg
177172
/// and 1, inclusively.
178173
// TODO: This fn is not const because it cannot depend on non-Sized generic parameters
179174
fn buffer_capacity() -> usize {
180-
let size = ::std::mem::size_of::<(Duration, E, T)>();
175+
let size = ::std::mem::size_of::<(Duration, T)>();
181176
if size == 0 {
182177
Self::BUFFER_SIZE_BYTES
183178
} else if size <= Self::BUFFER_SIZE_BYTES {
@@ -192,7 +187,7 @@ impl<T, E: Clone, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> Logg
192187
{
193188
let elapsed = self.time.elapsed() + self.offset;
194189
for event in events {
195-
self.buffer.push((elapsed, self.id.clone(), event.into()));
190+
self.buffer.push((elapsed, event.into()));
196191
if self.buffer.len() == self.buffer.capacity() {
197192
// Would call `self.flush()`, but for `RefCell` panic.
198193
(self.action)(&elapsed, &mut self.buffer);
@@ -209,7 +204,7 @@ impl<T, E: Clone, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> Logg
209204
}
210205

211206
/// Flush on the *last* drop of a logger.
212-
impl<T, E, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> Drop for LoggerInner<T, E, A> {
207+
impl<T, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, T)>)> Drop for LoggerInner<T, A> {
213208
fn drop(&mut self) {
214209
// Avoid sending out empty buffers just because of drops.
215210
if !self.buffer.is_empty() {
@@ -218,14 +213,12 @@ impl<T, E, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> Drop for Lo
218213
}
219214
}
220215

221-
impl<T, E, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> Debug for LoggerInner<T, E, A>
216+
impl<T, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, T)>)> Debug for LoggerInner<T, A>
222217
where
223-
E: Debug,
224218
T: Debug,
225219
{
226220
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227221
f.debug_struct("LoggerInner")
228-
.field("id", &self.id)
229222
.field("time", &self.time)
230223
.field("offset", &self.offset)
231224
.field("action", &"FnMut")
@@ -240,13 +233,13 @@ trait Flush {
240233
fn flush(&mut self);
241234
}
242235

243-
impl<T, E> Flush for Logger<T, E> {
236+
impl<T> Flush for Logger<T> {
244237
fn flush(&mut self) {
245238
self.inner.borrow_mut().flush()
246239
}
247240
}
248241

249-
impl<T, E, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> Flush for LoggerInner<T, E, A> {
242+
impl<T, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, T)>)> Flush for LoggerInner<T, A> {
250243
fn flush(&mut self) {
251244
let elapsed = self.time.elapsed() + self.offset;
252245
if !self.buffer.is_empty() {

timely/src/dataflow/scopes/child.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ where
6767
fn new_identifier(&mut self) -> usize {
6868
self.parent.new_identifier()
6969
}
70-
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
70+
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry> {
7171
self.parent.log_register()
7272
}
7373
}

timely/src/execute.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,6 @@ where
245245
result = Some(crate::logging_core::Logger::new(
246246
::std::time::Instant::now(),
247247
::std::time::Duration::default(),
248-
events_setup,
249248
move |time, data| logger.publish_batch(time, data)
250249
));
251250
}

timely/src/logging.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
/// Type alias for logging timely events.
44
pub type WorkerIdentifier = usize;
55
/// Logger type for worker-local logging.
6-
pub type Logger<Event> = crate::logging_core::Logger<Event, WorkerIdentifier>;
6+
pub type Logger<Event> = crate::logging_core::Logger<Event>;
77
/// Logger for timely dataflow system events.
88
pub type TimelyLogger = Logger<TimelyEvent>;
99
/// Logger for timely dataflow progress events (the "timely/progress" log stream).
@@ -13,14 +13,14 @@ use std::time::Duration;
1313
use crate::dataflow::operators::capture::{Event, EventPusher};
1414

1515
/// Logs events as a timely stream, with progress statements.
16-
pub struct BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
16+
pub struct BatchLogger<T, P> where P: EventPusher<Duration, (Duration, T)> {
1717
// None when the logging stream is closed
1818
time: Duration,
1919
event_pusher: P,
20-
_phantom: ::std::marker::PhantomData<(E, T)>,
20+
_phantom: ::std::marker::PhantomData<T>,
2121
}
2222

23-
impl<T, E, P> BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
23+
impl<T, P> BatchLogger<T, P> where P: EventPusher<Duration, (Duration, T)> {
2424
/// Creates a new batch logger.
2525
pub fn new(event_pusher: P) -> Self {
2626
BatchLogger {
@@ -30,7 +30,7 @@ impl<T, E, P> BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E,
3030
}
3131
}
3232
/// Publishes a batch of logged events and advances the capability.
33-
pub fn publish_batch(&mut self, &time: &Duration, data: &mut Vec<(Duration, E, T)>) {
33+
pub fn publish_batch(&mut self, &time: &Duration, data: &mut Vec<(Duration, T)>) {
3434
if !data.is_empty() {
3535
self.event_pusher.push(Event::Messages(self.time, data.drain(..).collect()));
3636
}
@@ -42,7 +42,7 @@ impl<T, E, P> BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E,
4242
self.time = time;
4343
}
4444
}
45-
impl<T, E, P> Drop for BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
45+
impl<T, P> Drop for BatchLogger<T, P> where P: EventPusher<Duration, (Duration, T)> {
4646
fn drop(&mut self) {
4747
self.event_pusher.push(Event::Progress(vec![(self.time, -1)]));
4848
}

timely/src/synchronization/sequence.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ impl<T: ExchangeData> Sequencer<T> {
157157
}
158158

159159
let mut activator_borrow = activator_source.borrow_mut();
160-
let mut activator = activator_borrow.as_mut().unwrap();
160+
let activator = activator_borrow.as_mut().unwrap();
161161

162162
if let Some(t) = activator.catchup_until {
163163
if capability.time().less_than(&t) {
@@ -190,7 +190,7 @@ impl<T: ExchangeData> Sequencer<T> {
190190

191191
if let Some(last) = recvd.last() {
192192
let mut activator_borrow = activator_sink.borrow_mut();
193-
let mut activator = activator_borrow.as_mut().unwrap();
193+
let activator = activator_borrow.as_mut().unwrap();
194194

195195
activator.catchup_until = Some((last.0).0);
196196
activator.activate();

timely/src/worker.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ pub trait AsWorker : Scheduler {
201201
/// Allocates a new worker-unique identifier.
202202
fn new_identifier(&mut self) -> usize;
203203
/// Provides access to named logging streams.
204-
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>;
204+
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry>;
205205
/// Provides access to the timely logging stream.
206206
fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.log_register().get("timely") }
207207
}
@@ -217,7 +217,7 @@ pub struct Worker<A: Allocate> {
217217
// dataflows: Rc<RefCell<Vec<Wrapper>>>,
218218
dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
219219
dataflow_counter: Rc<RefCell<usize>>,
220-
logging: Rc<RefCell<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>>,
220+
logging: Rc<RefCell<crate::logging_core::Registry>>,
221221

222222
activations: Rc<RefCell<Activations>>,
223223
active_dataflows: Vec<usize>,
@@ -247,7 +247,7 @@ impl<A: Allocate> AsWorker for Worker<A> {
247247
}
248248

249249
fn new_identifier(&mut self) -> usize { self.new_identifier() }
250-
fn log_register(&self) -> RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
250+
fn log_register(&self) -> RefMut<crate::logging_core::Registry> {
251251
self.log_register()
252252
}
253253
}
@@ -262,7 +262,6 @@ impl<A: Allocate> Worker<A> {
262262
/// Allocates a new `Worker` bound to a channel allocator.
263263
pub fn new(config: Config, c: A) -> Worker<A> {
264264
let now = Instant::now();
265-
let index = c.index();
266265
Worker {
267266
config,
268267
timer: now,
@@ -271,7 +270,7 @@ impl<A: Allocate> Worker<A> {
271270
identifiers: Default::default(),
272271
dataflows: Default::default(),
273272
dataflow_counter: Default::default(),
274-
logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index))),
273+
logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now))),
275274
activations: Rc::new(RefCell::new(Activations::new(now))),
276275
active_dataflows: Default::default(),
277276
temp_channel_ids: Default::default(),
@@ -538,7 +537,7 @@ impl<A: Allocate> Worker<A> {
538537
/// );
539538
/// });
540539
/// ```
541-
pub fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
540+
pub fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry> {
542541
self.logging.borrow_mut()
543542
}
544543

0 commit comments

Comments
 (0)