Skip to content

Commit 3d34ce4

Browse files
committed
Merge from main
2 parents a40f72f + 50d7aa7 commit 3d34ce4

File tree

12 files changed

+225
-195
lines changed

12 files changed

+225
-195
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ libc = "^0.2.66"
6464
[dev-dependencies]
6565
tokio = { version = "1.13", features = ["full"] }
6666
pretty_env_logger = "0.4.0"
67+
assert_matches = "1"
6768

6869
[profile.dev]
6970
opt-level=0

examples/internal/timer.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
extern crate pyroscope;
22

3-
use std::sync::mpsc::channel;
4-
use std::sync::mpsc::{Receiver, Sender};
3+
use std::sync::mpsc;
54

65
use pyroscope::timer::Timer;
76

87
fn main() {
98
// Initialize the Timer
10-
let mut timer = Timer::default().initialize().unwrap();
9+
let mut timer = Timer::initialize(std::time::Duration::from_secs(10)).unwrap();
1110

1211
// Create a streaming channel
13-
let (tx, rx): (Sender<u64>, Receiver<u64>) = channel();
12+
let (tx, rx) = mpsc::channel();
1413

15-
let (tx2, rx2): (Sender<u64>, Receiver<u64>) = channel();
14+
let (tx2, rx2) = mpsc::channel();
1615

1716
// Attach tx to Timer
1817
timer.attach_listener(tx).unwrap();

src/error.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
/// Result Alias with PyroscopeError
23
pub type Result<T> = std::result::Result<T, PyroscopeError>;
34

@@ -36,7 +37,9 @@ impl PyroscopeError {
3637

3738
/// Create a new instance of PyroscopeError with source
3839
pub fn new_with_source<E>(msg: &str, source: E) -> Self
39-
where E: std::error::Error + Send + Sync + 'static {
40+
where
41+
E: std::error::Error + Send + Sync + 'static,
42+
{
4043
PyroscopeError::Compat {
4144
msg: msg.to_string(),
4245
source: Box::new(source),

src/pyroscope.rs

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
use std::{
22
collections::HashMap,
33
sync::{
4-
mpsc::{channel, Receiver, Sender},
4+
mpsc::{self, Sender},
55
Arc, Condvar, Mutex,
66
},
77
thread::JoinHandle,
8+
time::Duration,
89
};
910

1011
use crate::{
1112
error::Result,
1213
session::{Session, SessionManager, SessionSignal},
13-
timer::Timer,
14+
timer::{Timer, TimerSignal},
1415
};
1516

1617
use pyroscope_backends::pprof::Pprof;
@@ -147,6 +148,8 @@ impl PyroscopeAgentBuilder {
147148
}
148149
}
149150

151+
/// # use pyroscope::PyroscopeError;
152+
/// # fn main() -> Result<(), PyroscopeError> {
150153
/// Set tags. Default is empty.
151154
/// # Example
152155
/// ```ignore
@@ -176,7 +179,7 @@ impl PyroscopeAgentBuilder {
176179
log::trace!(target: LOG_TAG, "Backend initialized");
177180

178181
// Start Timer
179-
let timer = Timer::default().initialize()?;
182+
let timer = Timer::initialize(std::time::Duration::from_secs(10))?;
180183
log::trace!(target: LOG_TAG, "Timer initialized");
181184

182185
// Start the SessionManager
@@ -201,7 +204,7 @@ impl PyroscopeAgentBuilder {
201204
pub struct PyroscopeAgent {
202205
timer: Timer,
203206
session_manager: SessionManager,
204-
tx: Option<Sender<u64>>,
207+
tx: Option<Sender<TimerSignal>>,
205208
handle: Option<JoinHandle<Result<()>>>,
206209
running: Arc<(Mutex<bool>, Condvar)>,
207210

@@ -293,7 +296,7 @@ impl PyroscopeAgent {
293296
*running = true;
294297
drop(running);
295298

296-
let (tx, rx): (Sender<u64>, Receiver<u64>) = channel();
299+
let (tx, rx) = mpsc::channel();
297300
self.timer.attach_listener(tx.clone())?;
298301
self.tx = Some(tx);
299302

@@ -304,28 +307,31 @@ impl PyroscopeAgent {
304307
self.handle = Some(std::thread::spawn(move || {
305308
log::trace!(target: LOG_TAG, "Main Thread started");
306309

307-
while let Ok(until) = rx.recv() {
308-
log::trace!(target: LOG_TAG, "Sending session {}", until);
309-
310-
// Generate report from backend
311-
let report = backend.lock()?.report()?;
312-
313-
// Send new Session to SessionManager
314-
stx.send(SessionSignal::Session(Session::new(
315-
until,
316-
config.clone(),
317-
report,
318-
)?))?;
319-
320-
if until == 0 {
321-
log::trace!(target: LOG_TAG, "Session Killed");
322-
323-
let (lock, cvar) = &*pair;
324-
let mut running = lock.lock()?;
325-
*running = false;
326-
cvar.notify_one();
327-
328-
return Ok(());
310+
while let Ok(signal) = rx.recv() {
311+
match signal {
312+
TimerSignal::NextSnapshot(until) => {
313+
log::trace!(target: LOG_TAG, "Sending session {}", until);
314+
315+
// Generate report from backend
316+
let report = backend.lock()?.report()?;
317+
318+
// Send new Session to SessionManager
319+
stx.send(SessionSignal::Session(Session::new(
320+
until,
321+
config.clone(),
322+
report,
323+
)?))?
324+
}
325+
TimerSignal::Terminate => {
326+
log::trace!(target: LOG_TAG, "Session Killed");
327+
328+
let (lock, cvar) = &*pair;
329+
let mut running = lock.lock()?;
330+
*running = false;
331+
cvar.notify_one();
332+
333+
return Ok(());
334+
}
329335
}
330336
}
331337
Ok(())
@@ -346,7 +352,9 @@ impl PyroscopeAgent {
346352
log::debug!(target: LOG_TAG, "Stopping");
347353
// get tx and send termination signal
348354
if let Some(sender) = self.tx.take() {
349-
sender.send(0)?;
355+
// best effort
356+
let _ = sender.send(TimerSignal::NextSnapshot(0));
357+
sender.send(TimerSignal::Terminate)?;
350358
} else {
351359
log::error!("PyroscopeAgent - Missing sender")
352360
}

src/session.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{
22
sync::mpsc::{sync_channel, Receiver, SyncSender},
3-
thread,
4-
thread::JoinHandle,
3+
thread::{self, JoinHandle},
4+
time::Duration,
55
};
66

77
use crate::pyroscope::PyroscopeConfig;
@@ -84,7 +84,9 @@ impl SessionManager {
8484
pub struct Session {
8585
pub config: PyroscopeConfig,
8686
pub report: Vec<u8>,
87+
// unix time
8788
pub from: u64,
89+
// unix time
8890
pub until: u64,
8991
}
9092

@@ -159,7 +161,7 @@ impl Session {
159161
("spyName", self.config.spy_name.as_str()),
160162
])
161163
.body(self.report)
162-
.timeout(std::time::Duration::from_secs(10))
164+
.timeout(Duration::from_secs(10))
163165
.send()?;
164166

165167
Ok(())

src/timer/epoll.rs

Lines changed: 44 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
1-
// Copyright 2021 Developers of Pyroscope.
2-
3-
// Licensed under the Apache License, Version 2.0 <LICENSE or
4-
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
5-
// except according to those terms.
6-
1+
use super::TimerSignal;
72
use crate::utils::check_err;
83
use crate::utils::get_time_range;
4+
use crate::PyroscopeError;
95
use crate::Result;
106

117
use std::sync::{
12-
mpsc::{channel, Receiver, Sender},
8+
mpsc::{channel, Sender},
139
Arc, Mutex,
1410
};
15-
use std::{thread, thread::JoinHandle};
11+
use std::{
12+
thread::{self, JoinHandle},
13+
time::Duration,
14+
};
1615

1716
/// A thread that sends a notification every 10th second
1817
///
@@ -22,57 +21,59 @@ use std::{thread, thread::JoinHandle};
2221
/// The Timer thread will run continously until all Senders are dropped.
2322
/// The Timer thread will be joined when all Senders are dropped.
2423
25-
#[derive(Debug, Default)]
24+
#[derive(Debug)]
2625
pub struct Timer {
2726
/// A vector to store listeners (mpsc::Sender)
28-
txs: Arc<Mutex<Vec<Sender<u64>>>>,
27+
txs: Arc<Mutex<Vec<Sender<TimerSignal>>>>,
2928

3029
/// Thread handle
3130
pub handle: Option<JoinHandle<Result<()>>>,
3231
}
3332

3433
impl Timer {
3534
/// Initialize Timer and run a thread to send events to attached listeners
36-
pub fn initialize(self) -> Result<Self> {
37-
let txs = Arc::clone(&self.txs);
35+
pub fn initialize(cycle: Duration) -> Result<Self> {
36+
let txs = Arc::new(Mutex::new(Vec::new()));
3837

39-
// Add Default tx
40-
let (tx, _rx): (Sender<u64>, Receiver<u64>) = channel();
38+
// Add a dummy tx so the below thread does not terminate early
39+
let (tx, _rx) = channel();
4140
txs.lock()?.push(tx);
4241

43-
let timer_fd = Timer::set_timerfd()?;
42+
let timer_fd = Timer::set_timerfd(cycle)?;
4443
let epoll_fd = Timer::create_epollfd(timer_fd)?;
4544

46-
let handle = Some(thread::spawn(move || {
47-
loop {
48-
// Exit thread if there are no listeners
49-
if txs.lock()?.len() == 0 {
50-
// Close file descriptors
51-
unsafe { libc::close(timer_fd) };
52-
unsafe { libc::close(epoll_fd) };
53-
54-
return Ok(());
45+
let handle = Some({
46+
let txs = txs.clone();
47+
thread::spawn(move || {
48+
loop {
49+
// Exit thread if there are no listeners
50+
if txs.lock()?.is_empty() {
51+
// Close file descriptors
52+
unsafe { libc::close(timer_fd) };
53+
unsafe { libc::close(epoll_fd) };
54+
return Ok::<_, PyroscopeError>(());
55+
}
56+
57+
// Fire @ 10th sec
58+
Timer::epoll_wait(timer_fd, epoll_fd)?;
59+
60+
// Get the current time range
61+
let from = TimerSignal::NextSnapshot(get_time_range(0)?.from);
62+
63+
// Iterate through Senders
64+
txs.lock()?.iter().for_each(|tx| {
65+
// Send event to attached Sender
66+
if tx.send(from).is_ok() {}
67+
});
5568
}
69+
})
70+
});
5671

57-
// Fire @ 10th sec
58-
Timer::epoll_wait(timer_fd, epoll_fd)?;
59-
60-
// Get the current time range
61-
let from = get_time_range(0)?.from;
62-
63-
// Iterate through Senders
64-
txs.lock()?.iter().for_each(|tx| {
65-
// Send event to attached Sender
66-
if tx.send(from).is_ok() {}
67-
});
68-
}
69-
}));
70-
71-
Ok(Self { handle, ..self })
72+
Ok(Self { handle, txs })
7273
}
7374

7475
/// create and set a timer file descriptor
75-
fn set_timerfd() -> Result<libc::c_int> {
76+
fn set_timerfd(cycle: Duration) -> Result<libc::c_int> {
7677
// Set the timer to use the system time.
7778
let clockid: libc::clockid_t = libc::CLOCK_REALTIME;
7879
// Non-blocking file descriptor
@@ -87,8 +88,8 @@ impl Timer {
8788
// new_value sets the Timer
8889
let mut new_value = libc::itimerspec {
8990
it_interval: libc::timespec {
90-
tv_sec: 10,
91-
tv_nsec: 0,
91+
tv_sec: cycle.as_secs() as i64,
92+
tv_nsec: cycle.subsec_nanos() as i64,
9293
},
9394
it_value: libc::timespec {
9495
tv_sec: first_fire as i64,
@@ -161,7 +162,7 @@ impl Timer {
161162
///
162163
/// Timer will dispatch an event with the timestamp of the current instant,
163164
/// every 10th second to all attached senders
164-
pub fn attach_listener(&mut self, tx: Sender<u64>) -> Result<()> {
165+
pub fn attach_listener(&mut self, tx: Sender<TimerSignal>) -> Result<()> {
165166
// Push Sender to a Vector of Sender(s)
166167
let txs = Arc::clone(&self.txs);
167168
txs.lock()?.push(tx);

0 commit comments

Comments
 (0)