Skip to content

Commit e596636

Browse files
committed
imp(timer): kqueue timer implementation
1 parent 26a7766 commit e596636

File tree

2 files changed

+115
-1
lines changed

2 files changed

+115
-1
lines changed

src/timer/kqueue.rs

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
55
// except according to those terms.
66

7+
use crate::utils::check_err;
78
use crate::Result;
89

910
use std::sync::{mpsc::Sender, Arc, Mutex};
@@ -20,7 +21,43 @@ pub struct Timer {
2021

2122
impl Timer {
2223
pub fn initialize(self) -> Result<Self> {
23-
Ok(self)
24+
let txs = Arc::clone(&self.txs);
25+
26+
let kqueue = kqueue()?;
27+
28+
let handle = Some(thread::spawn(move || {
29+
// Wait for initial expiration
30+
let initial_event = Timer::register_initial_expiration(kqueue)?;
31+
Timer::wait_event(kqueue, [initial_event].as_mut_ptr())?;
32+
33+
// Register loop event
34+
let loop_event = Timer::register_loop_expiration(kqueue)?;
35+
36+
// Loop 10s
37+
loop {
38+
// Exit thread if there are no listeners
39+
if txs.lock()?.len() == 0 {
40+
// TODO: should close file descriptors?
41+
return Ok(());
42+
}
43+
44+
// Get current time
45+
let current = std::time::SystemTime::now()
46+
.duration_since(std::time::UNIX_EPOCH)?
47+
.as_secs();
48+
49+
// Iterate through Senders
50+
txs.lock()?.iter().for_each(|tx| {
51+
// Send event to attached Sender
52+
tx.send(current).unwrap();
53+
});
54+
55+
// Wait 10s
56+
Timer::wait_event(kqueue, [loop_event].as_mut_ptr())?;
57+
}
58+
}));
59+
60+
Ok(Self { handle, ..self })
2461
}
2562

2663
/// Attach an mpsc::Sender to Timer
@@ -42,4 +79,72 @@ impl Timer {
4279

4380
Ok(())
4481
}
82+
83+
fn wait_event(kqueue: i32, events: *mut libc::kevent) -> Result<()> {
84+
kevent(kqueue, [].as_mut_ptr(), 0, events, 1, std::ptr::null())?;
85+
Ok(())
86+
}
87+
fn register_initial_expiration(kqueue: i32) -> Result<libc::kevent> {
88+
// Get the next event time
89+
let now = std::time::SystemTime::now()
90+
.duration_since(std::time::UNIX_EPOCH)?
91+
.as_secs();
92+
let rem = 10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap();
93+
let first_fire = now + rem;
94+
95+
let initial_event = libc::kevent {
96+
ident: 1,
97+
filter: libc::EVFILT_TIMER,
98+
flags: libc::EV_ADD | libc::EV_ENABLE | libc::EV_ONESHOT,
99+
fflags: libc::NOTE_ABSOLUTE | libc::NOTE_SECONDS,
100+
data: first_fire as isize,
101+
udata: 0 as *mut libc::c_void,
102+
};
103+
104+
// add first event
105+
kevent(
106+
kqueue,
107+
[initial_event].as_ptr() as *const libc::kevent,
108+
1,
109+
[].as_mut_ptr(),
110+
0,
111+
std::ptr::null(),
112+
)?;
113+
114+
Ok(initial_event)
115+
}
116+
fn register_loop_expiration(kqueue: i32) -> Result<libc::kevent> {
117+
let loop_event = libc::kevent {
118+
ident: 1,
119+
filter: libc::EVFILT_TIMER,
120+
flags: libc::EV_ADD | libc::EV_ENABLE,
121+
fflags: 0,
122+
data: 10000,
123+
udata: 0 as *mut libc::c_void,
124+
};
125+
126+
// add loop event
127+
let ke = kevent(
128+
kqueue,
129+
[loop_event].as_ptr() as *const libc::kevent,
130+
1,
131+
[].as_mut_ptr(),
132+
0,
133+
std::ptr::null(),
134+
)?;
135+
136+
Ok(loop_event)
137+
}
138+
}
139+
140+
fn kqueue() -> Result<i32> {
141+
check_err(unsafe { libc::kqueue() }).map(|kq| kq as i32)
142+
}
143+
144+
fn kevent(
145+
kqueue: i32, change: *const libc::kevent, c_count: libc::c_int, events: *mut libc::kevent,
146+
e_count: libc::c_int, timeout: *const libc::timespec,
147+
) -> Result<()> {
148+
check_err(unsafe { libc::kevent(kqueue, change, c_count, events, e_count, timeout) })?;
149+
Ok(())
45150
}

src/utils.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
// except according to those terms.
66

77
use crate::error::Result;
8+
use crate::PyroscopeError;
89

910
use std::collections::HashMap;
1011

@@ -35,6 +36,7 @@ mod tests {
3536
use crate::utils::merge_tags_with_app_name;
3637

3738
#[test]
39+
3840
fn merge_tags_with_app_name_with_tags() {
3941
let mut tags = HashMap::new();
4042
tags.insert("env".to_string(), "staging".to_string());
@@ -54,3 +56,10 @@ mod tests {
5456
)
5557
}
5658
}
59+
/// Error Wrapper for libc return. Only check for errors.
60+
pub fn check_err<T: Ord + Default>(num: T) -> Result<T> {
61+
if num < T::default() {
62+
return Err(PyroscopeError::from(std::io::Error::last_os_error()));
63+
}
64+
Ok(num)
65+
}

0 commit comments

Comments
 (0)