Skip to content

Commit 37b737c

Browse files
committed
Make std::time::Instant optional
1 parent 7ffb5e2 commit 37b737c

File tree

7 files changed

+65
-51
lines changed

7 files changed

+65
-51
lines changed

timely/examples/logging-send.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ fn main() {
1616
let probe = ProbeHandle::new();
1717

1818
// Register timely worker logging.
19-
worker.log_register().insert::<TimelyEventBuilder,_>("timely", |time, data|
19+
worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
2020
if let Some(data) = data {
2121
data.iter().for_each(|x| println!("LOG1: {:?}", x))
2222
}
@@ -28,7 +28,7 @@ fn main() {
2828
// Register timely progress logging.
2929
// Less generally useful: intended for debugging advanced custom operators or timely
3030
// internals.
31-
worker.log_register().insert::<TimelyProgressEventBuilder<usize>,_>("timely/progress/usize", |time, data|
31+
worker.log_register().unwrap().insert::<TimelyProgressEventBuilder<usize>,_>("timely/progress/usize", |time, data|
3232
if let Some(data) = data {
3333
data.iter().for_each(|x| {
3434
println!("PROGRESS: {:?}", x);
@@ -50,7 +50,7 @@ fn main() {
5050
}
5151
);
5252

53-
worker.log_register().insert::<TrackerEventBuilder<usize>,_>("timely/reachability/usize", |time, data|
53+
worker.log_register().unwrap().insert::<TrackerEventBuilder<usize>,_>("timely/reachability/usize", |time, data|
5454
if let Some(data) = data {
5555
data.iter().for_each(|x| {
5656
println!("REACHABILITY: {:?}", x);
@@ -61,7 +61,7 @@ fn main() {
6161
}
6262
);
6363

64-
worker.log_register().insert::<TimelySummaryEventBuilder<usize>,_>("timely/summary/usize", |time, data|
64+
worker.log_register().unwrap().insert::<TimelySummaryEventBuilder<usize>,_>("timely/summary/usize", |time, data|
6565
if let Some(data) = data {
6666
data.iter().for_each(|(_, x)| {
6767
println!("SUMMARY: {:?}", x);
@@ -81,7 +81,7 @@ fn main() {
8181
});
8282

8383
// Register timely worker logging.
84-
worker.log_register().insert::<TimelyEventBuilder,_>("timely", |time, data|
84+
worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
8585
if let Some(data) = data {
8686
data.iter().for_each(|x| println!("LOG2: {:?}", x))
8787
}
@@ -100,7 +100,7 @@ fn main() {
100100

101101
// Register user-level logging.
102102
type MyBuilder = CapacityContainerBuilder<Vec<(Duration, ())>>;
103-
worker.log_register().insert::<MyBuilder,_>("input", |time, data|
103+
worker.log_register().unwrap().insert::<MyBuilder,_>("input", |time, data|
104104
if let Some(data) = data {
105105
for element in data.iter() {
106106
println!("Round tick at: {:?}", element.0);
@@ -111,7 +111,7 @@ fn main() {
111111
}
112112
);
113113

114-
let input_logger = worker.log_register().get::<MyBuilder>("input").expect("Input logger absent");
114+
let input_logger = worker.log_register().unwrap().get::<MyBuilder>("input").expect("Input logger absent");
115115

116116
let timer = std::time::Instant::now();
117117

timely/examples/threadless.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ fn main() {
66

77
// create a naked single-threaded worker.
88
let allocator = timely::communication::allocator::Thread::default();
9-
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator);
9+
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
1010

1111
// create input and probe handles.
1212
let mut input = InputHandle::new();

timely/src/dataflow/scopes/child.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ where
7373
fn peek_identifier(&self) -> usize {
7474
self.parent.peek_identifier()
7575
}
76-
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry> {
76+
fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry>> {
7777
self.parent.log_register()
7878
}
7979
}
@@ -135,8 +135,8 @@ where
135135
let path = self.addr_for_child(index);
136136

137137
let type_name = std::any::type_name::<T2>();
138-
let progress_logging = self.log_register().get(&format!("timely/progress/{type_name}"));
139-
let summary_logging = self.log_register().get(&format!("timely/summary/{type_name}"));
138+
let progress_logging = self.log_register().as_ref().and_then(|l| l.get(&format!("timely/progress/{type_name}")));
139+
let summary_logging = self.log_register().as_ref().and_then(|l| l.get(&format!("timely/summary/{type_name}")));
140140

141141
let subscope = RefCell::new(SubgraphBuilder::new_from(path, identifier, self.logging(), summary_logging, name));
142142
let result = {

timely/src/execute.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ where
154154
F: FnOnce(&mut Worker<crate::communication::allocator::thread::Thread>)->T+Send+Sync+'static
155155
{
156156
let alloc = crate::communication::allocator::thread::Thread::default();
157-
let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc);
157+
let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc, Some(std::time::Instant::now()));
158158
let result = func(&mut worker);
159159
while worker.has_dataflows() {
160160
worker.step_or_park(None);
@@ -320,7 +320,7 @@ where
320320
T: Send+'static,
321321
F: Fn(&mut Worker<<A as AllocateBuilder>::Allocator>)->T+Send+Sync+'static {
322322
initialize_from(builders, others, move |allocator| {
323-
let mut worker = Worker::new(worker_config.clone(), allocator);
323+
let mut worker = Worker::new(worker_config.clone(), allocator, Some(std::time::Instant::now()));
324324
let result = func(&mut worker);
325325
while worker.has_dataflows() {
326326
worker.step_or_park(None);

timely/src/progress/subgraph.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,12 @@ where
183183
let type_name = std::any::type_name::<TInner>();
184184
let reachability_logging =
185185
worker.log_register()
186-
.get::<reachability::logging::TrackerEventBuilder<TInner>>(&format!("timely/reachability/{type_name}"))
187-
.map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger));
188-
let progress_logging = worker.log_register().get::<TimelyProgressEventBuilder<TInner>>(&format!("timely/progress/{type_name}"));
186+
.as_ref()
187+
.and_then(|l|
188+
l.get::<reachability::logging::TrackerEventBuilder<TInner>>(&format!("timely/reachability/{type_name}"))
189+
.map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger))
190+
);
191+
let progress_logging = worker.log_register().as_ref().and_then(|l| l.get::<TimelyProgressEventBuilder<TInner>>(&format!("timely/progress/{type_name}")));
189192
let (tracker, scope_summary) = builder.build(reachability_logging);
190193

191194
let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, self.logging.clone(), progress_logging);

timely/src/scheduling/activate.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@ pub struct Activations {
4848
rx: Receiver<Vec<usize>>,
4949

5050
// Delayed activations.
51-
timer: Instant,
51+
timer: Option<Instant>,
5252
queue: BinaryHeap<Reverse<(Duration, Vec<usize>)>>,
5353
}
5454

5555
impl Activations {
5656

5757
/// Creates a new activation tracker.
58-
pub fn new(timer: Instant) -> Self {
58+
pub fn new(timer: Option<Instant>) -> Self {
5959
let (tx, rx) = crossbeam_channel::unbounded();
6060
Self {
6161
clean: 0,
@@ -77,13 +77,18 @@ impl Activations {
7777

7878
/// Schedules a future activation for the task addressed by `path`.
7979
pub fn activate_after(&mut self, path: &[usize], delay: Duration) {
80-
// TODO: We could have a minimum delay and immediately schedule anything less than that delay.
81-
if delay == Duration::new(0, 0) {
82-
self.activate(path);
83-
}
80+
if let Some(timer) = self.timer {
81+
// TODO: We could have a minimum delay and immediately schedule anything less than that delay.
82+
if delay == Duration::new(0, 0) {
83+
self.activate(path);
84+
}
85+
else {
86+
let moment = timer.elapsed() + delay;
87+
self.queue.push(Reverse((moment, path.to_vec())));
88+
}
89+
}
8490
else {
85-
let moment = self.timer.elapsed() + delay;
86-
self.queue.push(Reverse((moment, path.to_vec())));
91+
self.activate(path);
8792
}
8893
}
8994

@@ -96,11 +101,13 @@ impl Activations {
96101
}
97102

98103
// Drain timer-based activations.
99-
if !self.queue.is_empty() {
100-
let now = self.timer.elapsed();
101-
while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
102-
let Reverse((_time, path)) = self.queue.pop().unwrap();
103-
self.activate(&path[..]);
104+
if let Some(timer) = self.timer {
105+
if !self.queue.is_empty() {
106+
let now = timer.elapsed();
107+
while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
108+
let Reverse((_time, path)) = self.queue.pop().unwrap();
109+
self.activate(&path[..]);
110+
}
104111
}
105112
}
106113

@@ -173,12 +180,12 @@ impl Activations {
173180
/// indicates the amount of time before the thread should be unparked for the
174181
/// next scheduled activation.
175182
pub fn empty_for(&self) -> Option<Duration> {
176-
if !self.bounds.is_empty() {
183+
if !self.bounds.is_empty() || self.timer.is_none() {
177184
Some(Duration::new(0,0))
178185
}
179186
else {
180187
self.queue.peek().map(|Reverse((t,_a))| {
181-
let elapsed = self.timer.elapsed();
188+
let elapsed = self.timer.unwrap().elapsed();
182189
if t < &elapsed { Duration::new(0,0) }
183190
else { *t - elapsed }
184191
})

timely/src/worker.rs

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -201,23 +201,27 @@ pub trait AsWorker : Scheduler {
201201
/// The next worker-unique identifier to be allocated.
202202
fn peek_identifier(&self) -> usize;
203203
/// Provides access to named logging streams.
204-
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry>;
204+
fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry>>;
205205
/// Provides access to the timely logging stream.
206-
fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.log_register().get("timely").map(Into::into) }
206+
fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.log_register().and_then(|l| l.get("timely").map(Into::into)) }
207207
}
208208

209209
/// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`,
210210
/// and has a list of dataflows that it manages.
211211
pub struct Worker<A: Allocate> {
212212
config: Config,
213-
timer: Instant,
213+
/// An optional instant from which the start of the computation should be reckoned.
214+
///
215+
/// If this is set to none, system time-based functionality will be unavailable or work badly.
216+
/// For example, logging will be unavailable, and activation after a delay will be unavailable.
217+
timer: Option<Instant>,
214218
paths: Rc<RefCell<HashMap<usize, Rc<[usize]>>>>,
215219
allocator: Rc<RefCell<A>>,
216220
identifiers: Rc<RefCell<usize>>,
217221
// dataflows: Rc<RefCell<Vec<Wrapper>>>,
218222
dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
219223
dataflow_counter: Rc<RefCell<usize>>,
220-
logging: Rc<RefCell<crate::logging_core::Registry>>,
224+
logging: Option<Rc<RefCell<crate::logging_core::Registry>>>,
221225

222226
activations: Rc<RefCell<Activations>>,
223227
active_dataflows: Vec<usize>,
@@ -255,7 +259,7 @@ impl<A: Allocate> AsWorker for Worker<A> {
255259

256260
fn new_identifier(&mut self) -> usize { self.new_identifier() }
257261
fn peek_identifier(&self) -> usize { self.peek_identifier() }
258-
fn log_register(&self) -> RefMut<crate::logging_core::Registry> {
262+
fn log_register(&self) -> Option<RefMut<crate::logging_core::Registry>> {
259263
self.log_register()
260264
}
261265
}
@@ -268,8 +272,7 @@ impl<A: Allocate> Scheduler for Worker<A> {
268272

269273
impl<A: Allocate> Worker<A> {
270274
/// Allocates a new `Worker` bound to a channel allocator.
271-
pub fn new(config: Config, c: A) -> Worker<A> {
272-
let now = Instant::now();
275+
pub fn new(config: Config, c: A, now: Option<std::time::Instant>) -> Worker<A> {
273276
Worker {
274277
config,
275278
timer: now,
@@ -278,7 +281,7 @@ impl<A: Allocate> Worker<A> {
278281
identifiers: Default::default(),
279282
dataflows: Default::default(),
280283
dataflow_counter: Default::default(),
281-
logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now))),
284+
logging: now.map(|now| Rc::new(RefCell::new(crate::logging_core::Registry::new(now)))),
282285
activations: Rc::new(RefCell::new(Activations::new(now))),
283286
active_dataflows: Default::default(),
284287
temp_channel_ids: Default::default(),
@@ -414,7 +417,7 @@ impl<A: Allocate> Worker<A> {
414417
}
415418

416419
// Clean up, indicate if dataflows remain.
417-
self.logging.borrow_mut().flush();
420+
self.logging.as_ref().map(|l| l.borrow_mut().flush());
418421
self.allocator.borrow_mut().release();
419422
!self.dataflows.borrow().is_empty()
420423
}
@@ -485,7 +488,7 @@ impl<A: Allocate> Worker<A> {
485488
///
486489
/// let index = worker.index();
487490
/// let peers = worker.peers();
488-
/// let timer = worker.timer();
491+
/// let timer = worker.timer().unwrap();
489492
///
490493
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
491494
///
@@ -500,7 +503,7 @@ impl<A: Allocate> Worker<A> {
500503
///
501504
/// let index = worker.index();
502505
/// let peers = worker.peers();
503-
/// let timer = worker.timer();
506+
/// let timer = worker.timer().unwrap();
504507
///
505508
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
506509
///
@@ -516,13 +519,13 @@ impl<A: Allocate> Worker<A> {
516519
///
517520
/// let index = worker.index();
518521
/// let peers = worker.peers();
519-
/// let timer = worker.timer();
522+
/// let timer = worker.timer().unwrap();
520523
///
521524
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
522525
///
523526
/// });
524527
/// ```
525-
pub fn timer(&self) -> Instant { self.timer }
528+
pub fn timer(&self) -> Option<Instant> { self.timer }
526529

527530
/// Allocate a new worker-unique identifier.
528531
///
@@ -546,13 +549,14 @@ impl<A: Allocate> Worker<A> {
546549
/// timely::execute_from_args(::std::env::args(), |worker| {
547550
///
548551
/// worker.log_register()
552+
/// .unwrap()
549553
/// .insert::<timely::logging::TimelyEventBuilder,_>("timely", |time, data|
550554
/// println!("{:?}\t{:?}", time, data)
551555
/// );
552556
/// });
553557
/// ```
554-
pub fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry> {
555-
self.logging.borrow_mut()
558+
pub fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry>> {
559+
self.logging.as_ref().map(|l| l.borrow_mut())
556560
}
557561

558562
/// Construct a new dataflow.
@@ -575,7 +579,7 @@ impl<A: Allocate> Worker<A> {
575579
T: Refines<()>,
576580
F: FnOnce(&mut Child<Self, T>)->R,
577581
{
578-
let logging = self.logging.borrow_mut().get("timely").map(Into::into);
582+
let logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely").map(Into::into));
579583
self.dataflow_core("Dataflow", logging, Box::new(()), |_, child| func(child))
580584
}
581585

@@ -599,7 +603,7 @@ impl<A: Allocate> Worker<A> {
599603
T: Refines<()>,
600604
F: FnOnce(&mut Child<Self, T>)->R,
601605
{
602-
let logging = self.logging.borrow_mut().get("timely").map(Into::into);
606+
let logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely").map(Into::into));
603607
self.dataflow_core(name, logging, Box::new(()), |_, child| func(child))
604608
}
605609

@@ -639,8 +643,8 @@ impl<A: Allocate> Worker<A> {
639643
let identifier = self.new_identifier();
640644

641645
let type_name = std::any::type_name::<T>();
642-
let progress_logging = self.logging.borrow_mut().get(&format!("timely/progress/{type_name}"));
643-
let summary_logging = self.logging.borrow_mut().get(&format!("timely/summary/{type_name}"));
646+
let progress_logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get(&format!("timely/progress/{}", type_name)).map(Into::into));
647+
let summary_logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get(&format!("timely/summary/{}", type_name)).map(Into::into));
644648
let subscope = SubgraphBuilder::new_from(addr, identifier, logging.clone(), summary_logging, name);
645649
let subscope = RefCell::new(subscope);
646650

@@ -735,7 +739,7 @@ impl<A: Allocate> Clone for Worker<A> {
735739
identifiers: Rc::clone(&self.identifiers),
736740
dataflows: Rc::clone(&self.dataflows),
737741
dataflow_counter: Rc::clone(&self.dataflow_counter),
738-
logging: Rc::clone(&self.logging),
742+
logging: self.logging.clone(),
739743
activations: Rc::clone(&self.activations),
740744
active_dataflows: Vec::new(),
741745
temp_channel_ids: Rc::clone(&self.temp_channel_ids),

0 commit comments

Comments
 (0)