Skip to content

Commit 8aa6a2f

Browse files
committed
imp(timer): feature parity for kqueue
1 parent f6d51d1 commit 8aa6a2f

File tree

1 file changed

+39
-36
lines changed

1 file changed

+39
-36
lines changed

src/timer/kqueue.rs

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
55
// except according to those terms.
66

7+
use super::TimerSignal;
78
use crate::utils::check_err;
89
use crate::utils::get_time_range;
910
use crate::Result;
@@ -28,65 +29,67 @@ use std::{
2829
#[derive(Debug, Default)]
2930
pub struct Timer {
3031
/// A vector to store listeners (mpsc::Sender)
31-
txs: Arc<Mutex<Vec<Sender<u64>>>>,
32+
txs: Arc<Mutex<Vec<Sender<TimerSignal>>>>,
3233

3334
/// Thread handle
3435
pub handle: Option<JoinHandle<Result<()>>>,
3536
}
3637

3738
impl Timer {
3839
/// Initialize Timer and run a thread to send events to attached listeners
39-
pub fn initialize(self, accumulation_cycle: Duration) -> Result<Self> {
40-
let txs = Arc::clone(&self.txs);
40+
pub fn initialize(cycle: Duration) -> Result<Self> {
41+
let txs = Arc::new(Mutex::new(Vec::new()));
4142

4243
// Add Default tx
43-
let (tx, _rx): (Sender<u64>, Receiver<u64>) = channel();
44+
let (tx, _rx): (Sender<TimerSignal>, Receiver<TimerSignal>) = channel();
4445
txs.lock()?.push(tx);
4546

4647
let kqueue = kqueue()?;
4748

48-
let handle = Some(thread::spawn(move || {
49-
// Wait for initial expiration
50-
let initial_event =
51-
Timer::register_initial_expiration(kqueue, Duration::from_millis(0))?;
52-
Timer::wait_event(kqueue, [initial_event].as_mut_ptr())?;
53-
54-
// Register loop event
55-
let loop_event = Timer::register_loop_expiration(kqueue, accumulation_cycle)?;
56-
57-
// Loop 10s
58-
loop {
59-
// Exit thread if there are no listeners
60-
if txs.lock()?.len() == 0 {
61-
// TODO: should close file descriptors?
62-
return Ok(());
63-
}
49+
let handle = Some({
50+
let txs = txs.clone();
51+
thread::spawn(move || {
52+
// Wait for initial expiration
53+
let initial_event = Timer::register_initial_expiration(kqueue)?;
54+
Timer::wait_event(kqueue, [initial_event].as_mut_ptr())?;
55+
56+
// Register loop event
57+
let loop_event = Timer::register_loop_expiration(kqueue, cycle)?;
58+
59+
// Loop 10s
60+
loop {
61+
// Exit thread if there are no listeners
62+
if txs.lock()?.len() == 0 {
63+
// TODO: should close file descriptors?
64+
return Ok(());
65+
}
6466

65-
// Get current time
66-
let from = get_time_range(0)?.from;
67+
// Get current time
68+
let from = TimerSignal::NextSnapshot(get_time_range(0)?.from);
6769

68-
// Iterate through Senders
69-
txs.lock()?.iter().for_each(|tx| {
70-
// Send event to attached Sender
71-
match tx.send(from) {
72-
Ok(_) => {}
73-
Err(_) => {}
74-
}
75-
});
70+
// Iterate through Senders
71+
txs.lock()?.iter().for_each(|tx| {
72+
// Send event to attached Sender
73+
match tx.send(from) {
74+
Ok(_) => {}
75+
Err(_) => {}
76+
}
77+
});
7678

77-
// Wait 10s
78-
Timer::wait_event(kqueue, [loop_event].as_mut_ptr())?;
79-
}
80-
}));
79+
// Wait 10s
80+
Timer::wait_event(kqueue, [loop_event].as_mut_ptr())?;
81+
}
82+
})
83+
});
8184

82-
Ok(Self { handle, ..self })
85+
Ok(Self { handle, txs })
8386
}
8487

8588
/// Attach an mpsc::Sender to Timer
8689
///
8790
/// Timer will dispatch an event with the timestamp of the current instant,
8891
/// every 10th second to all attached senders
89-
pub fn attach_listener(&mut self, tx: Sender<u64>) -> Result<()> {
92+
pub fn attach_listener(&mut self, tx: Sender<TimerSignal>) -> Result<()> {
9093
// Push Sender to a Vector of Sender(s)
9194
let txs = Arc::clone(&self.txs);
9295
txs.lock()?.push(tx);

0 commit comments

Comments
 (0)