Skip to content

Commit f14716c

Browse files
authored
Remove worker identifier from logging (#533)
* Remove the identifier from logging This changes the data received from logging from (Duration, Id, Data) to (Duration, Data), saving the space for Id. In most cases, the Id can be derived from the context where the log data is processed. Signed-off-by: Moritz Hoffmann <[email protected]> * Log a communication setup message Signed-off-by: Moritz Hoffmann <[email protected]> * rebase Signed-off-by: Moritz Hoffmann <[email protected]> --------- Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 4337b10 commit f14716c

File tree

9 files changed

+76
-82
lines changed

9 files changed

+76
-82
lines changed

communication/src/allocator/zero_copy/initialize.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub fn initialize_networking(
3939
my_index: usize,
4040
threads: usize,
4141
noisy: bool,
42-
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>,
42+
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent>>+Send+Sync>,
4343
)
4444
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
4545
{
@@ -58,7 +58,7 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static>(
5858
mut sockets: Vec<Option<S>>,
5959
my_index: usize,
6060
threads: usize,
61-
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>,
61+
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent>>+Send+Sync>,
6262
)
6363
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
6464
{

communication/src/allocator/zero_copy/tcp.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use super::stream::Stream;
1111

1212
use timely_logging::Logger;
1313

14-
use crate::logging::{CommunicationEvent, CommunicationSetup, MessageEvent, StateEvent};
14+
use crate::logging::{CommunicationEvent, MessageEvent, StateEvent};
1515

1616
fn tcp_panic(context: &'static str, cause: io::Error) -> ! {
1717
// NOTE: some downstream crates sniff out "timely communication error:" from
@@ -35,7 +35,7 @@ pub fn recv_loop<S>(
3535
worker_offset: usize,
3636
process: usize,
3737
remote: usize,
38-
mut logger: Option<Logger<CommunicationEvent, CommunicationSetup>>)
38+
mut logger: Option<Logger<CommunicationEvent>>)
3939
where
4040
S: Stream,
4141
{
@@ -134,7 +134,7 @@ pub fn send_loop<S: Stream>(
134134
sources: Vec<Sender<MergeQueue>>,
135135
process: usize,
136136
remote: usize,
137-
mut logger: Option<Logger<CommunicationEvent, CommunicationSetup>>)
137+
mut logger: Option<Logger<CommunicationEvent>>)
138138
{
139139

140140
// Log the send thread's start.

communication/src/initialize.rs

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub enum Config {
3939
/// Verbosely report connection process
4040
report: bool,
4141
/// Closure to create a new logger for a communication thread
42-
log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEvent, CommunicationSetup>> + Send + Sync>,
42+
log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEvent>> + Send + Sync>,
4343
}
4444
}
4545

@@ -183,62 +183,62 @@ impl Config {
183183
/// # Examples
184184
/// ```
185185
/// use timely_communication::{Allocate, Bytesable};
186-
///
186+
///
187187
/// /// A wrapper that indicates the serialization/deserialization strategy.
188188
/// pub struct Message {
189189
/// /// Text contents.
190190
/// pub payload: String,
191191
/// }
192-
///
192+
///
193193
/// impl Bytesable for Message {
194194
/// fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
195195
/// Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
196196
/// }
197-
///
197+
///
198198
/// fn length_in_bytes(&self) -> usize {
199199
/// self.payload.len()
200200
/// }
201-
///
201+
///
202202
/// fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
203203
/// writer.write_all(self.payload.as_bytes()).unwrap();
204204
/// }
205205
/// }
206-
///
206+
///
207207
/// fn main() {
208-
///
208+
///
209209
/// // extract the configuration from user-supplied arguments, initialize the computation.
210210
/// let config = timely_communication::Config::from_args(std::env::args()).unwrap();
211211
/// let guards = timely_communication::initialize(config, |mut allocator| {
212-
///
212+
///
213213
/// println!("worker {} of {} started", allocator.index(), allocator.peers());
214-
///
214+
///
215215
/// // allocates a pair of senders list and one receiver.
216216
/// let (mut senders, mut receiver) = allocator.allocate(0);
217-
///
217+
///
218218
/// // send typed data along each channel
219219
/// for i in 0 .. allocator.peers() {
220220
/// senders[i].send(Message { payload: format!("hello, {}", i)});
221221
/// senders[i].done();
222222
/// }
223-
///
223+
///
224224
/// // no support for termination notification,
225225
/// // we have to count down ourselves.
226226
/// let mut received = 0;
227227
/// while received < allocator.peers() {
228-
///
228+
///
229229
/// allocator.receive();
230-
///
230+
///
231231
/// if let Some(message) = receiver.recv() {
232232
/// println!("worker {}: received: <{}>", allocator.index(), message.payload);
233233
/// received += 1;
234234
/// }
235-
///
235+
///
236236
/// allocator.release();
237237
/// }
238-
///
238+
///
239239
/// allocator.index()
240240
/// });
241-
///
241+
///
242242
/// // computation runs until guards are joined or dropped.
243243
/// if let Ok(guards) = guards {
244244
/// for guard in guards.join() {
@@ -279,62 +279,62 @@ pub fn initialize<T:Send+'static, F: Fn(Generic)->T+Send+Sync+'static>(
279279
/// # Examples
280280
/// ```
281281
/// use timely_communication::{Allocate, Bytesable};
282-
///
282+
///
283283
/// /// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
284284
/// pub struct Message {
285285
/// /// Text contents.
286286
/// pub payload: String,
287287
/// }
288-
///
288+
///
289289
/// impl Bytesable for Message {
290290
/// fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
291291
/// Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
292292
/// }
293-
///
293+
///
294294
/// fn length_in_bytes(&self) -> usize {
295295
/// self.payload.len()
296296
/// }
297-
///
297+
///
298298
/// fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
299299
/// writer.write_all(self.payload.as_bytes()).unwrap();
300300
/// }
301301
/// }
302-
///
302+
///
303303
/// fn main() {
304-
///
304+
///
305305
/// // extract the configuration from user-supplied arguments, initialize the computation.
306306
/// let config = timely_communication::Config::from_args(std::env::args()).unwrap();
307307
/// let guards = timely_communication::initialize(config, |mut allocator| {
308-
///
308+
///
309309
/// println!("worker {} of {} started", allocator.index(), allocator.peers());
310-
///
310+
///
311311
/// // allocates a pair of senders list and one receiver.
312312
/// let (mut senders, mut receiver) = allocator.allocate(0);
313-
///
313+
///
314314
/// // send typed data along each channel
315315
/// for i in 0 .. allocator.peers() {
316316
/// senders[i].send(Message { payload: format!("hello, {}", i)});
317317
/// senders[i].done();
318318
/// }
319-
///
319+
///
320320
/// // no support for termination notification,
321321
/// // we have to count down ourselves.
322322
/// let mut received = 0;
323323
/// while received < allocator.peers() {
324-
///
324+
///
325325
/// allocator.receive();
326-
///
326+
///
327327
/// if let Some(message) = receiver.recv() {
328328
/// println!("worker {}: received: <{}>", allocator.index(), message.payload);
329329
/// received += 1;
330330
/// }
331-
///
331+
///
332332
/// allocator.release();
333333
/// }
334-
///
334+
///
335335
/// allocator.index()
336336
/// });
337-
///
337+
///
338338
/// // computation runs until guards are joined or dropped.
339339
/// if let Ok(guards) = guards {
340340
/// for guard in guards.join() {

communication/src/logging.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ pub enum CommunicationEvent {
2121
Message(MessageEvent),
2222
/// A state transition.
2323
State(StateEvent),
24+
/// Setup event
25+
Setup(CommunicationSetup)
2426
}
2527

2628
/// An observed message.

0 commit comments

Comments
 (0)