Skip to content

Commit a6ad42e

Browse files
authored
Merge pull request #3 from omarabid/main
Kqueue implementation and Timer improvements
2 parents 0c47ca8 + 5d9fb73 commit a6ad42e

File tree

6 files changed

+198
-66
lines changed

6 files changed

+198
-66
lines changed

examples/internals-timer.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@ fn main() {
2929
.duration_since(std::time::UNIX_EPOCH)
3030
.unwrap()
3131
.as_secs();
32-
3332
println!("Current Time: {}", now);
3433

3534
// Listen to the Timer events
3635
std::thread::spawn(move || {
36+
#[allow(irrefutable_let_patterns)]
3737
while let result = rx.recv() {
3838
match result {
3939
Ok(time) => println!("Thread 1 Notification: {}", time),
40-
Err(err) => {
40+
Err(_err) => {
4141
println!("Error Thread 1");
4242
break;
4343
}
@@ -46,10 +46,11 @@ fn main() {
4646
});
4747

4848
std::thread::spawn(move || {
49+
#[allow(irrefutable_let_patterns)]
4950
while let result = rx2.recv() {
5051
match result {
5152
Ok(time) => println!("Thread 2 Notification: {}", time),
52-
Err(err) => {
53+
Err(_err) => {
5354
println!("Error Thread 2");
5455
break;
5556
}

src/pyroscope.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ impl PyroscopeAgent {
182182
drop(cvar);
183183
drop(running);
184184

185-
// TODO: move this channel to PyroscopeAgent
186185
let (tx, rx): (Sender<u64>, Receiver<u64>) = channel();
187186
self.timer.attach_listener(tx.clone())?;
188187
self.tx = Some(tx.clone());
@@ -219,7 +218,7 @@ impl PyroscopeAgent {
219218
// Wait for the Thread to finish
220219
let pair = Arc::clone(&self.running);
221220
let (lock, cvar) = &*pair;
222-
cvar.wait_while(lock.lock()?, |running| *running)?;
221+
let _guard = cvar.wait_while(lock.lock()?, |running| *running)?;
223222

224223
// Create a clone of Backend
225224
let backend = Arc::clone(&self.backend);

src/timer/epoll.rs

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
55
// except according to those terms.
66

7-
use crate::utils::{epoll_create1, epoll_ctl, epoll_wait, read, timerfd_create, timerfd_settime};
7+
use crate::utils::check_err;
88
use crate::Result;
99

10-
use std::sync::{mpsc::Sender, Arc, Mutex};
10+
use std::sync::{
11+
mpsc::{channel, Receiver, Sender},
12+
Arc, Mutex,
13+
};
1114
use std::{thread, thread::JoinHandle};
1215

1316
#[derive(Debug, Default)]
@@ -23,14 +26,21 @@ impl Timer {
2326
pub fn initialize(self) -> Result<Self> {
2427
let txs = Arc::clone(&self.txs);
2528

29+
// Add Default tx
30+
let (tx, _rx): (Sender<u64>, Receiver<u64>) = channel();
31+
txs.lock()?.push(tx);
32+
2633
let timer_fd = Timer::set_timerfd()?;
2734
let epoll_fd = Timer::create_epollfd(timer_fd)?;
2835

2936
let handle = Some(thread::spawn(move || {
3037
loop {
3138
// Exit thread if there are no listeners
3239
if txs.lock()?.len() == 0 {
33-
// TODO: should close file descriptors?
40+
// Close file descriptors
41+
unsafe { libc::close(timer_fd) };
42+
unsafe { libc::close(epoll_fd) };
43+
3444
return Ok(());
3545
}
3646

@@ -45,7 +55,10 @@ impl Timer {
4555
// Iterate through Senders
4656
txs.lock()?.iter().for_each(|tx| {
4757
// Send event to attached Sender
48-
tx.send(current).unwrap();
58+
match tx.send(current) {
59+
Ok(_) => {}
60+
Err(_) => {}
61+
}
4962
});
5063
}
5164
}));
@@ -158,3 +171,51 @@ impl Timer {
158171
Ok(())
159172
}
160173
}
174+
175+
/// Wrapper for libc functions.
176+
///
177+
/// Error wrapper for some libc functions used by the library. This only does
178+
/// Error (-1 return) wrapping. Alternatively, the nix crate could be used
179+
/// instead of expanding this wrappers (if more functions and types are used
180+
/// from libc)
181+
182+
/// libc::timerfd wrapper
183+
pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result<i32> {
184+
check_err(unsafe { libc::timerfd_create(clockid, clock_flags) }).map(|timer_fd| timer_fd as i32)
185+
}
186+
187+
/// libc::timerfd_settime wrapper
188+
pub fn timerfd_settime(
189+
timer_fd: i32, set_flags: libc::c_int, new_value: &mut libc::itimerspec,
190+
old_value: &mut libc::itimerspec,
191+
) -> Result<()> {
192+
check_err(unsafe { libc::timerfd_settime(timer_fd, set_flags, new_value, old_value) })?;
193+
Ok(())
194+
}
195+
196+
/// libc::epoll_create1 wrapper
197+
pub fn epoll_create1(epoll_flags: libc::c_int) -> Result<i32> {
198+
check_err(unsafe { libc::epoll_create1(epoll_flags) }).map(|epoll_fd| epoll_fd as i32)
199+
}
200+
201+
/// libc::epoll_ctl wrapper
202+
pub fn epoll_ctl(
203+
epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event,
204+
) -> Result<()> {
205+
check_err(unsafe { libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event) })?;
206+
Ok(())
207+
}
208+
209+
/// libc::epoll_wait wrapper
210+
pub fn epoll_wait(
211+
epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int,
212+
) -> Result<()> {
213+
check_err(unsafe { libc::epoll_wait(epoll_fd, events, maxevents, timeout) })?;
214+
Ok(())
215+
}
216+
217+
/// libc::read wrapper
218+
pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> {
219+
check_err(unsafe { libc::read(timer_fd, bufptr, count) })?;
220+
Ok(())
221+
}

src/timer/kqueue.rs

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,13 @@
44
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
55
// except according to those terms.
66

7+
use crate::utils::check_err;
78
use crate::Result;
89

9-
use std::sync::{mpsc::Sender, Arc, Mutex};
10+
use std::sync::{
11+
mpsc::{channel, Receiver, Sender},
12+
Arc, Mutex,
13+
};
1014
use std::{thread, thread::JoinHandle};
1115

1216
#[derive(Debug, Default)]
@@ -20,7 +24,50 @@ pub struct Timer {
2024

2125
impl Timer {
2226
pub fn initialize(self) -> Result<Self> {
23-
Ok(self)
27+
let txs = Arc::clone(&self.txs);
28+
29+
// Add Default tx
30+
let (tx, _rx): (Sender<u64>, Receiver<u64>) = channel();
31+
txs.lock()?.push(tx);
32+
33+
let kqueue = kqueue()?;
34+
35+
let handle = Some(thread::spawn(move || {
36+
// Wait for initial expiration
37+
let initial_event = Timer::register_initial_expiration(kqueue)?;
38+
Timer::wait_event(kqueue, [initial_event].as_mut_ptr())?;
39+
40+
// Register loop event
41+
let loop_event = Timer::register_loop_expiration(kqueue)?;
42+
43+
// Loop 10s
44+
loop {
45+
// Exit thread if there are no listeners
46+
if txs.lock()?.len() == 0 {
47+
// TODO: should close file descriptors?
48+
return Ok(());
49+
}
50+
51+
// Get current time
52+
let current = std::time::SystemTime::now()
53+
.duration_since(std::time::UNIX_EPOCH)?
54+
.as_secs();
55+
56+
// Iterate through Senders
57+
txs.lock()?.iter().for_each(|tx| {
58+
// Send event to attached Sender
59+
match tx.send(current) {
60+
Ok(_) => {}
61+
Err(_) => {}
62+
}
63+
});
64+
65+
// Wait 10s
66+
Timer::wait_event(kqueue, [loop_event].as_mut_ptr())?;
67+
}
68+
}));
69+
70+
Ok(Self { handle, ..self })
2471
}
2572

2673
/// Attach an mpsc::Sender to Timer
@@ -42,4 +89,74 @@ impl Timer {
4289

4390
Ok(())
4491
}
92+
93+
fn wait_event(kqueue: i32, events: *mut libc::kevent) -> Result<()> {
94+
kevent(kqueue, [].as_mut_ptr(), 0, events, 1, std::ptr::null())?;
95+
Ok(())
96+
}
97+
fn register_initial_expiration(kqueue: i32) -> Result<libc::kevent> {
98+
// Get the next event time
99+
let now = std::time::SystemTime::now()
100+
.duration_since(std::time::UNIX_EPOCH)?
101+
.as_secs();
102+
let rem = 10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap();
103+
let first_fire = now + rem;
104+
105+
let initial_event = libc::kevent {
106+
ident: 1,
107+
filter: libc::EVFILT_TIMER,
108+
flags: libc::EV_ADD | libc::EV_ENABLE | libc::EV_ONESHOT,
109+
fflags: libc::NOTE_ABSOLUTE | libc::NOTE_SECONDS,
110+
data: first_fire as isize,
111+
udata: 0 as *mut libc::c_void,
112+
};
113+
114+
// add first event
115+
kevent(
116+
kqueue,
117+
[initial_event].as_ptr() as *const libc::kevent,
118+
1,
119+
[].as_mut_ptr(),
120+
0,
121+
std::ptr::null(),
122+
)?;
123+
124+
Ok(initial_event)
125+
}
126+
fn register_loop_expiration(kqueue: i32) -> Result<libc::kevent> {
127+
let loop_event = libc::kevent {
128+
ident: 1,
129+
filter: libc::EVFILT_TIMER,
130+
flags: libc::EV_ADD | libc::EV_ENABLE,
131+
fflags: 0,
132+
data: 10000,
133+
udata: 0 as *mut libc::c_void,
134+
};
135+
136+
// add loop event
137+
let ke = kevent(
138+
kqueue,
139+
[loop_event].as_ptr() as *const libc::kevent,
140+
1,
141+
[].as_mut_ptr(),
142+
0,
143+
std::ptr::null(),
144+
)?;
145+
146+
Ok(loop_event)
147+
}
148+
}
149+
150+
/// libc::kqueue wrapper
151+
fn kqueue() -> Result<i32> {
152+
check_err(unsafe { libc::kqueue() }).map(|kq| kq as i32)
153+
}
154+
155+
/// libc::kevent wrapper
156+
fn kevent(
157+
kqueue: i32, change: *const libc::kevent, c_count: libc::c_int, events: *mut libc::kevent,
158+
e_count: libc::c_int, timeout: *const libc::timespec,
159+
) -> Result<()> {
160+
check_err(unsafe { libc::kevent(kqueue, change, c_count, events, e_count, timeout) })?;
161+
Ok(())
45162
}

src/timer/sleep.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ impl Timer {
3131
pub fn initialize(self) -> Result<Self> {
3232
let txs = Arc::clone(&self.txs);
3333

34-
// Add tx
35-
// txs.lock().unwrap().push(tx);
34+
// Add Default tx
35+
let (tx, _rx): (Sender<u64>, Receiver<u64>) = channel();
36+
txs.lock()?.push(tx);
3637

3738
// Spawn a Thread
3839
let handle = Some(thread::spawn(move || {
@@ -61,7 +62,10 @@ impl Timer {
6162
// Iterate through Senders
6263
txs.lock()?.iter().for_each(|tx| {
6364
// Send event to attached Sender
64-
tx.send(current).unwrap();
65+
match tx.send(current) {
66+
Ok(_) => {}
67+
Err(_) => {}
68+
}
6569
});
6670

6771
// Sleep for 10s

src/utils.rs

Lines changed: 2 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ mod tests {
3636
use crate::utils::merge_tags_with_app_name;
3737

3838
#[test]
39+
3940
fn merge_tags_with_app_name_with_tags() {
4041
let mut tags = HashMap::new();
4142
tags.insert("env".to_string(), "staging".to_string());
@@ -55,61 +56,10 @@ mod tests {
5556
)
5657
}
5758
}
58-
59-
/// Wrapper for libc functions.
60-
///
61-
/// Error wrapper for some libc functions used by the library. This only does
62-
/// Error (-1 return) wrapping. Alternatively, the nix crate could be used
63-
/// instead of expanding this wrappers (if more functions and types are used
64-
/// from libc)
65-
6659
/// Error Wrapper for libc return. Only check for errors.
67-
fn check_err<T: Ord + Default>(num: T) -> Result<T> {
60+
pub fn check_err<T: Ord + Default>(num: T) -> Result<T> {
6861
if num < T::default() {
6962
return Err(PyroscopeError::from(std::io::Error::last_os_error()));
7063
}
7164
Ok(num)
7265
}
73-
74-
/// libc::timerfd wrapper
75-
pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result<i32> {
76-
check_err(unsafe { libc::timerfd_create(clockid, clock_flags) }).map(|timer_fd| timer_fd as i32)
77-
}
78-
79-
/// libc::timerfd_settime wrapper
80-
pub fn timerfd_settime(
81-
timer_fd: i32, set_flags: libc::c_int, new_value: &mut libc::itimerspec,
82-
old_value: &mut libc::itimerspec,
83-
) -> Result<()> {
84-
check_err(unsafe { libc::timerfd_settime(timer_fd, set_flags, new_value, old_value) })?;
85-
Ok(())
86-
}
87-
88-
/// libc::epoll_create1 wrapper
89-
pub fn epoll_create1(epoll_flags: libc::c_int) -> Result<i32> {
90-
check_err(unsafe { libc::epoll_create1(epoll_flags) }).map(|epoll_fd| epoll_fd as i32)
91-
}
92-
93-
/// libc::epoll_ctl wrapper
94-
pub fn epoll_ctl(epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event) -> Result<()> {
95-
check_err(unsafe {
96-
libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event)
97-
})?;
98-
Ok(())
99-
}
100-
101-
/// libc::epoll_wait wrapper
102-
pub fn epoll_wait(epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int) -> Result<()> {
103-
check_err(unsafe {
104-
libc::epoll_wait(epoll_fd, events, maxevents, timeout)
105-
})?;
106-
Ok(())
107-
}
108-
109-
/// libc::read wrapper
110-
pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> {
111-
check_err(unsafe {
112-
libc::read(timer_fd, bufptr, count)
113-
})?;
114-
Ok(())
115-
}

0 commit comments

Comments
 (0)