Skip to content

Commit 0c47ca8

Browse files
authored
Merge pull request #2 from pyroscope-io/epoll
Epoll
2 parents 09e3de9 + 3e22736 commit 0c47ca8

File tree

6 files changed

+191
-9
lines changed

6 files changed

+191
-9
lines changed

examples/internals-timer.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use pyroscope::timer::Timer;
1313

1414
fn main() {
1515
// Initialize the Timer
16-
let mut timer = Timer::default().initialize();
16+
let mut timer = Timer::default().initialize().unwrap();
1717

1818
// Create a streaming channel
1919
let (tx, rx): (Sender<u64>, Receiver<u64>) = channel();
@@ -24,11 +24,19 @@ fn main() {
2424
timer.attach_listener(tx).unwrap();
2525
timer.attach_listener(tx2).unwrap();
2626

27+
// Show current time
28+
let now = std::time::SystemTime::now()
29+
.duration_since(std::time::UNIX_EPOCH)
30+
.unwrap()
31+
.as_secs();
32+
33+
println!("Current Time: {}", now);
34+
2735
// Listen to the Timer events
2836
std::thread::spawn(move || {
2937
while let result = rx.recv() {
3038
match result {
31-
Ok(time) => println!("Thread 2 Notification: {}", time),
39+
Ok(time) => println!("Thread 1 Notification: {}", time),
3240
Err(err) => {
3341
println!("Error Thread 1");
3442
break;

src/pyroscope.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl PyroscopeAgentBuilder {
123123
backend.lock()?.initialize(self.config.sample_rate)?;
124124

125125
// Start Timer
126-
let timer = Timer::default().initialize();
126+
let timer = Timer::default().initialize()?;
127127

128128
// Return PyroscopeAgent
129129
Ok(PyroscopeAgent {

src/timer/epoll.rs

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
55
// except according to those terms.
66

7+
use crate::utils::{epoll_create1, epoll_ctl, epoll_wait, read, timerfd_create, timerfd_settime};
78
use crate::Result;
89

910
use std::sync::{mpsc::Sender, Arc, Mutex};
@@ -19,8 +20,122 @@ pub struct Timer {
1920
}
2021

2122
impl Timer {
22-
pub fn initialize(self) -> Self {
23-
self
23+
pub fn initialize(self) -> Result<Self> {
24+
let txs = Arc::clone(&self.txs);
25+
26+
let timer_fd = Timer::set_timerfd()?;
27+
let epoll_fd = Timer::create_epollfd(timer_fd)?;
28+
29+
let handle = Some(thread::spawn(move || {
30+
loop {
31+
// Exit thread if there are no listeners
32+
if txs.lock()?.len() == 0 {
33+
// TODO: should close file descriptors?
34+
return Ok(());
35+
}
36+
37+
// Fire @ 10th sec
38+
Timer::epoll_wait(timer_fd, epoll_fd)?;
39+
40+
// Get current time
41+
let current = std::time::SystemTime::now()
42+
.duration_since(std::time::UNIX_EPOCH)?
43+
.as_secs();
44+
45+
// Iterate through Senders
46+
txs.lock()?.iter().for_each(|tx| {
47+
// Send event to attached Sender
48+
tx.send(current).unwrap();
49+
});
50+
}
51+
}));
52+
53+
Ok(Self { handle, ..self })
54+
}
55+
56+
/// create and set a timer file descriptor
57+
fn set_timerfd() -> Result<libc::c_int> {
58+
// Set the timer to use the system time.
59+
let clockid: libc::clockid_t = libc::CLOCK_REALTIME;
60+
// Non-blocking file descriptor
61+
let clock_flags: libc::c_int = libc::TFD_NONBLOCK;
62+
63+
// Create timer fd
64+
let tfd = timerfd_create(clockid, clock_flags)?;
65+
66+
// Get the next event time
67+
let now = std::time::SystemTime::now()
68+
.duration_since(std::time::UNIX_EPOCH)?
69+
.as_secs();
70+
let rem = 10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap();
71+
let first_fire = now + rem;
72+
73+
// new_value sets the Timer
74+
let mut new_value = libc::itimerspec {
75+
it_interval: libc::timespec {
76+
tv_sec: 10,
77+
tv_nsec: 0,
78+
},
79+
it_value: libc::timespec {
80+
tv_sec: first_fire as i64,
81+
tv_nsec: 0,
82+
},
83+
};
84+
85+
// Empty itimerspec object
86+
let mut old_value = libc::itimerspec {
87+
it_interval: libc::timespec {
88+
tv_sec: 0,
89+
tv_nsec: 0,
90+
},
91+
it_value: libc::timespec {
92+
tv_sec: 0,
93+
tv_nsec: 0,
94+
},
95+
};
96+
97+
let set_flags = libc::TFD_TIMER_ABSTIME;
98+
99+
// Set the timer
100+
timerfd_settime(tfd, set_flags, &mut new_value, &mut old_value)?;
101+
102+
// Return file descriptor
103+
Ok(tfd)
104+
}
105+
106+
/// Create a new epoll file descriptor and add the timer to its interests
107+
fn create_epollfd(timer_fd: libc::c_int) -> Result<libc::c_int> {
108+
// create a new epoll fd
109+
let epoll_fd = epoll_create1(0)?;
110+
111+
// event to pull
112+
let mut event = libc::epoll_event {
113+
events: libc::EPOLLIN as u32,
114+
u64: 1,
115+
};
116+
117+
let epoll_flags = libc::EPOLL_CTL_ADD;
118+
119+
// add event to the epoll
120+
epoll_ctl(epoll_fd, epoll_flags, timer_fd, &mut event)?;
121+
122+
// return epoll fd
123+
Ok(epoll_fd)
124+
}
125+
126+
fn epoll_wait(timer_fd: libc::c_int, epoll_fd: libc::c_int) -> Result<()> {
127+
// vector to store events
128+
let mut events = Vec::with_capacity(1);
129+
130+
// wait for the timer to fire an event. This is function will block.
131+
epoll_wait(epoll_fd, events.as_mut_ptr(), 1, -1)?;
132+
133+
// read the value from the timerfd. This is required to re-arm the timer.
134+
let mut buffer: u64 = 0;
135+
let bufptr: *mut _ = &mut buffer;
136+
read(timer_fd, bufptr as *mut libc::c_void, 8)?;
137+
138+
Ok(())
24139
}
25140

26141
/// Attach an mpsc::Sender to Timer

src/timer/kqueue.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ pub struct Timer {
1919
}
2020

2121
impl Timer {
22-
pub fn initialize(self) -> Self {
23-
self
22+
pub fn initialize(self) -> Result<Self> {
23+
Ok(self)
2424
}
2525

2626
/// Attach an mpsc::Sender to Timer

src/timer/sleep.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub struct Timer {
2828

2929
impl Timer {
3030
/// Initialize Timer and run a thread to send events to attached listeners
31-
pub fn initialize(self) -> Self {
31+
pub fn initialize(self) -> Result<Self> {
3232
let txs = Arc::clone(&self.txs);
3333

3434
// Add tx
@@ -69,7 +69,7 @@ impl Timer {
6969
}
7070
}));
7171

72-
Self { handle, ..self }
72+
Ok(Self { handle, ..self })
7373
}
7474

7575
/// Attach an mpsc::Sender to Timer

src/utils.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
// except according to those terms.
66

77
use crate::error::Result;
8+
use crate::PyroscopeError;
89

910
use std::collections::HashMap;
1011

@@ -54,3 +55,61 @@ mod tests {
5455
)
5556
}
5657
}
58+
59+
/// Wrapper for libc functions.
60+
///
61+
/// Error wrapper for some libc functions used by the library. This only does
62+
/// Error (-1 return) wrapping. Alternatively, the nix crate could be used
63+
/// instead of expanding this wrappers (if more functions and types are used
64+
/// from libc)
65+
66+
/// Error Wrapper for libc return. Only check for errors.
67+
fn check_err<T: Ord + Default>(num: T) -> Result<T> {
68+
if num < T::default() {
69+
return Err(PyroscopeError::from(std::io::Error::last_os_error()));
70+
}
71+
Ok(num)
72+
}
73+
74+
/// libc::timerfd wrapper
75+
pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result<i32> {
76+
check_err(unsafe { libc::timerfd_create(clockid, clock_flags) }).map(|timer_fd| timer_fd as i32)
77+
}
78+
79+
/// libc::timerfd_settime wrapper
80+
pub fn timerfd_settime(
81+
timer_fd: i32, set_flags: libc::c_int, new_value: &mut libc::itimerspec,
82+
old_value: &mut libc::itimerspec,
83+
) -> Result<()> {
84+
check_err(unsafe { libc::timerfd_settime(timer_fd, set_flags, new_value, old_value) })?;
85+
Ok(())
86+
}
87+
88+
/// libc::epoll_create1 wrapper
89+
pub fn epoll_create1(epoll_flags: libc::c_int) -> Result<i32> {
90+
check_err(unsafe { libc::epoll_create1(epoll_flags) }).map(|epoll_fd| epoll_fd as i32)
91+
}
92+
93+
/// libc::epoll_ctl wrapper
94+
pub fn epoll_ctl(epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event) -> Result<()> {
95+
check_err(unsafe {
96+
libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event)
97+
})?;
98+
Ok(())
99+
}
100+
101+
/// libc::epoll_wait wrapper
102+
pub fn epoll_wait(epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int) -> Result<()> {
103+
check_err(unsafe {
104+
libc::epoll_wait(epoll_fd, events, maxevents, timeout)
105+
})?;
106+
Ok(())
107+
}
108+
109+
/// libc::read wrapper
110+
pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> {
111+
check_err(unsafe {
112+
libc::read(timer_fd, bufptr, count)
113+
})?;
114+
Ok(())
115+
}

0 commit comments

Comments
 (0)