Skip to content

Commit 1257900

Browse files
committed
Make std::time::Instant optional
1 parent ab24dab commit 1257900

File tree

7 files changed

+57
-43
lines changed

7 files changed

+57
-43
lines changed

timely/examples/logging-send.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ fn main() {
1515
let mut probe = ProbeHandle::new();
1616

1717
// Register timely worker logging.
18-
worker.log_register().insert::<TimelyEvent,_>("timely", |_time, data|
18+
worker.log_register().unwrap().insert::<TimelyEvent,_>("timely", |_time, data|
1919
data.iter().for_each(|x| println!("LOG1: {:?}", x))
2020
);
2121

2222
// Register timely progress logging.
2323
// Less generally useful: intended for debugging advanced custom operators or timely
2424
// internals.
25-
worker.log_register().insert::<TimelyProgressEvent,_>("timely/progress", |_time, data|
25+
worker.log_register().unwrap().insert::<TimelyProgressEvent,_>("timely/progress", |_time, data|
2626
data.iter().for_each(|x| {
2727
println!("PROGRESS: {:?}", x);
2828
let (_, _, ev) = x;
@@ -48,7 +48,7 @@ fn main() {
4848
});
4949

5050
// Register timely worker logging.
51-
worker.log_register().insert::<TimelyEvent,_>("timely", |_time, data|
51+
worker.log_register().unwrap().insert::<TimelyEvent,_>("timely", |_time, data|
5252
data.iter().for_each(|x| println!("LOG2: {:?}", x))
5353
);
5454

@@ -61,13 +61,13 @@ fn main() {
6161
});
6262

6363
// Register user-level logging.
64-
worker.log_register().insert::<(),_>("input", |_time, data|
64+
worker.log_register().unwrap().insert::<(),_>("input", |_time, data|
6565
for element in data.iter() {
6666
println!("Round tick at: {:?}", element.0);
6767
}
6868
);
6969

70-
let input_logger = worker.log_register().get::<()>("input").expect("Input logger absent");
70+
let input_logger = worker.log_register().unwrap().get::<()>("input").expect("Input logger absent");
7171

7272
let timer = std::time::Instant::now();
7373

timely/examples/threadless.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ fn main() {
66

77
// create a naked single-threaded worker.
88
let allocator = timely::communication::allocator::Thread::default();
9-
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator);
9+
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
1010

1111
// create input and probe handles.
1212
let mut input = InputHandle::new();

timely/src/dataflow/scopes/child.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ where
7070
fn peek_identifier(&self) -> usize {
7171
self.parent.peek_identifier()
7272
}
73-
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
73+
fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>> {
7474
self.parent.log_register()
7575
}
7676
}

timely/src/execute.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ where
154154
F: FnOnce(&mut Worker<crate::communication::allocator::thread::Thread>)->T+Send+Sync+'static
155155
{
156156
let alloc = crate::communication::allocator::thread::Thread::default();
157-
let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc);
157+
let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc, Some(std::time::Instant::now()));
158158
let result = func(&mut worker);
159159
while worker.has_dataflows() {
160160
worker.step_or_park(None);
@@ -320,7 +320,7 @@ where
320320
T: Send+'static,
321321
F: Fn(&mut Worker<<A as AllocateBuilder>::Allocator>)->T+Send+Sync+'static {
322322
initialize_from(builders, others, move |allocator| {
323-
let mut worker = Worker::new(worker_config.clone(), allocator);
323+
let mut worker = Worker::new(worker_config.clone(), allocator, Some(std::time::Instant::now()));
324324
let result = func(&mut worker);
325325
while worker.has_dataflows() {
326326
worker.step_or_park(None);

timely/src/progress/subgraph.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,11 @@ where
179179
let path = self.path.clone();
180180
let reachability_logging =
181181
worker.log_register()
182-
.get::<reachability::logging::TrackerEvent>("timely/reachability")
183-
.map(|logger| reachability::logging::TrackerLogger::new(path, logger));
182+
.as_ref()
183+
.and_then(|l|
184+
l.get::<reachability::logging::TrackerEvent>("timely/reachability")
185+
.map(|logger| reachability::logging::TrackerLogger::new(path, logger))
186+
);
184187
let (tracker, scope_summary) = builder.build(reachability_logging);
185188

186189
let progcaster = Progcaster::new(worker, self.path.clone(), self.logging.clone(), self.progress_logging.clone());

timely/src/scheduling/activate.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@ pub struct Activations {
4848
rx: Receiver<Vec<usize>>,
4949

5050
// Delayed activations.
51-
timer: Instant,
51+
timer: Option<Instant>,
5252
queue: BinaryHeap<Reverse<(Duration, Vec<usize>)>>,
5353
}
5454

5555
impl Activations {
5656

5757
/// Creates a new activation tracker.
58-
pub fn new(timer: Instant) -> Self {
58+
pub fn new(timer: Option<Instant>) -> Self {
5959
let (tx, rx) = crossbeam_channel::unbounded();
6060
Self {
6161
clean: 0,
@@ -77,13 +77,18 @@ impl Activations {
7777

7878
/// Schedules a future activation for the task addressed by `path`.
7979
pub fn activate_after(&mut self, path: &[usize], delay: Duration) {
80-
// TODO: We could have a minimum delay and immediately schedule anything less than that delay.
81-
if delay == Duration::new(0, 0) {
82-
self.activate(path);
83-
}
80+
if let Some(timer) = self.timer {
81+
// TODO: We could have a minimum delay and immediately schedule anything less than that delay.
82+
if delay == Duration::new(0, 0) {
83+
self.activate(path);
84+
}
85+
else {
86+
let moment = timer.elapsed() + delay;
87+
self.queue.push(Reverse((moment, path.to_vec())));
88+
}
89+
}
8490
else {
85-
let moment = self.timer.elapsed() + delay;
86-
self.queue.push(Reverse((moment, path.to_vec())));
91+
self.activate(path);
8792
}
8893
}
8994

@@ -96,10 +101,12 @@ impl Activations {
96101
}
97102

98103
// Drain timer-based activations.
99-
let now = self.timer.elapsed();
100-
while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
101-
let Reverse((_time, path)) = self.queue.pop().unwrap();
102-
self.activate(&path[..]);
104+
if let Some(timer) = self.timer {
105+
let now = timer.elapsed();
106+
while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
107+
let Reverse((_time, path)) = self.queue.pop().unwrap();
108+
self.activate(&path[..]);
109+
}
103110
}
104111

105112
self.bounds.drain(.. self.clean);
@@ -171,12 +178,12 @@ impl Activations {
171178
/// indicates the amount of time before the thread should be unparked for the
172179
/// next scheduled activation.
173180
pub fn empty_for(&self) -> Option<Duration> {
174-
if !self.bounds.is_empty() {
181+
if !self.bounds.is_empty() || self.timer.is_none() {
175182
Some(Duration::new(0,0))
176183
}
177184
else {
178185
self.queue.peek().map(|Reverse((t,_a))| {
179-
let elapsed = self.timer.elapsed();
186+
let elapsed = self.timer.unwrap().elapsed();
180187
if t < &elapsed { Duration::new(0,0) }
181188
else { *t - elapsed }
182189
})

timely/src/worker.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -198,23 +198,27 @@ pub trait AsWorker : Scheduler {
198198
/// The next worker-unique identifier to be allocated.
199199
fn peek_identifier(&self) -> usize;
200200
/// Provides access to named logging streams.
201-
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>;
201+
fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>>;
202202
/// Provides access to the timely logging stream.
203-
fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.log_register().get("timely") }
203+
fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.log_register().and_then(|l| l.get("timely")) }
204204
}
205205

206206
/// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`,
207207
/// and has a list of dataflows that it manages.
208208
pub struct Worker<A: Allocate> {
209209
config: Config,
210-
timer: Instant,
210+
/// An optional instant from which the start of the computation should be reckoned.
211+
///
212+
/// If this is set to none, system time-based functionality will be unavailable or work badly.
213+
/// For example, logging will be unavailable, and activation after a delay will be unavailable.
214+
timer: Option<Instant>,
211215
paths: Rc<RefCell<HashMap<usize, Rc<[usize]>>>>,
212216
allocator: Rc<RefCell<A>>,
213217
identifiers: Rc<RefCell<usize>>,
214218
// dataflows: Rc<RefCell<Vec<Wrapper>>>,
215219
dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
216220
dataflow_counter: Rc<RefCell<usize>>,
217-
logging: Rc<RefCell<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>>,
221+
logging: Option<Rc<RefCell<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>>>,
218222

219223
activations: Rc<RefCell<Activations>>,
220224
active_dataflows: Vec<usize>,
@@ -245,7 +249,7 @@ impl<A: Allocate> AsWorker for Worker<A> {
245249

246250
fn new_identifier(&mut self) -> usize { self.new_identifier() }
247251
fn peek_identifier(&self) -> usize { self.peek_identifier() }
248-
fn log_register(&self) -> RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
252+
fn log_register(&self) -> Option<RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>> {
249253
self.log_register()
250254
}
251255
}
@@ -258,8 +262,7 @@ impl<A: Allocate> Scheduler for Worker<A> {
258262

259263
impl<A: Allocate> Worker<A> {
260264
/// Allocates a new `Worker` bound to a channel allocator.
261-
pub fn new(config: Config, c: A) -> Worker<A> {
262-
let now = Instant::now();
265+
pub fn new(config: Config, c: A, now: Option<std::time::Instant>) -> Worker<A> {
263266
let index = c.index();
264267
Worker {
265268
config,
@@ -269,7 +272,7 @@ impl<A: Allocate> Worker<A> {
269272
identifiers: Default::default(),
270273
dataflows: Default::default(),
271274
dataflow_counter: Default::default(),
272-
logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index))),
275+
logging: now.map(|now| Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index)))),
273276
activations: Rc::new(RefCell::new(Activations::new(now))),
274277
active_dataflows: Default::default(),
275278
temp_channel_ids: Default::default(),
@@ -405,7 +408,7 @@ impl<A: Allocate> Worker<A> {
405408
}
406409

407410
// Clean up, indicate if dataflows remain.
408-
self.logging.borrow_mut().flush();
411+
self.logging.as_ref().map(|l| l.borrow_mut().flush());
409412
self.allocator.borrow_mut().release();
410413
!self.dataflows.borrow().is_empty()
411414
}
@@ -476,7 +479,7 @@ impl<A: Allocate> Worker<A> {
476479
///
477480
/// let index = worker.index();
478481
/// let peers = worker.peers();
479-
/// let timer = worker.timer();
482+
/// let timer = worker.timer().unwrap();
480483
///
481484
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
482485
///
@@ -491,7 +494,7 @@ impl<A: Allocate> Worker<A> {
491494
///
492495
/// let index = worker.index();
493496
/// let peers = worker.peers();
494-
/// let timer = worker.timer();
497+
/// let timer = worker.timer().unwrap();
495498
///
496499
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
497500
///
@@ -507,13 +510,13 @@ impl<A: Allocate> Worker<A> {
507510
///
508511
/// let index = worker.index();
509512
/// let peers = worker.peers();
510-
/// let timer = worker.timer();
513+
/// let timer = worker.timer().unwrap();
511514
///
512515
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
513516
///
514517
/// });
515518
/// ```
516-
pub fn timer(&self) -> Instant { self.timer }
519+
pub fn timer(&self) -> Option<Instant> { self.timer }
517520

518521
/// Allocate a new worker-unique identifier.
519522
///
@@ -537,13 +540,14 @@ impl<A: Allocate> Worker<A> {
537540
/// timely::execute_from_args(::std::env::args(), |worker| {
538541
///
539542
/// worker.log_register()
543+
/// .unwrap()
540544
/// .insert::<timely::logging::TimelyEvent,_>("timely", |time, data|
541545
/// println!("{:?}\t{:?}", time, data)
542546
/// );
543547
/// });
544548
/// ```
545-
pub fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
546-
self.logging.borrow_mut()
549+
pub fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>> {
550+
self.logging.as_ref().map(|l| l.borrow_mut())
547551
}
548552

549553
/// Construct a new dataflow.
@@ -566,7 +570,7 @@ impl<A: Allocate> Worker<A> {
566570
T: Refines<()>,
567571
F: FnOnce(&mut Child<Self, T>)->R,
568572
{
569-
let logging = self.logging.borrow_mut().get("timely");
573+
let logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely"));
570574
self.dataflow_core("Dataflow", logging, Box::new(()), |_, child| func(child))
571575
}
572576

@@ -590,7 +594,7 @@ impl<A: Allocate> Worker<A> {
590594
T: Refines<()>,
591595
F: FnOnce(&mut Child<Self, T>)->R,
592596
{
593-
let logging = self.logging.borrow_mut().get("timely");
597+
let logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely"));
594598
self.dataflow_core(name, logging, Box::new(()), |_, child| func(child))
595599
}
596600

@@ -629,7 +633,7 @@ impl<A: Allocate> Worker<A> {
629633
let addr = vec![dataflow_index].into();
630634
let identifier = self.new_identifier();
631635

632-
let progress_logging = self.logging.borrow_mut().get("timely/progress");
636+
let progress_logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely/progress"));
633637
let subscope = SubgraphBuilder::new_from(addr, logging.clone(), progress_logging.clone(), name);
634638
let subscope = RefCell::new(subscope);
635639

0 commit comments

Comments
 (0)