Skip to content

Commit 0d7fab0

Browse files
committed
allow configurable accumulation_cycle
1 parent c0a64b7 commit 0d7fab0

File tree

8 files changed

+174
-93
lines changed

8 files changed

+174
-93
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ libc = "^0.2.66"
2323
[dev-dependencies]
2424
tokio = { version = "1.13", features = ["full"] }
2525
pretty_env_logger = "0.4.0"
26+
assert_matches = "1"
2627

2728
[profile.dev]
2829
opt-level=0

examples/internals-timer.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
extern crate pyroscope;
22

3-
use std::sync::mpsc::channel;
4-
use std::sync::mpsc::{Receiver, Sender};
3+
use std::sync::mpsc;
54

65
use pyroscope::timer::Timer;
76

87
fn main() {
98
// Initialize the Timer
10-
let mut timer = Timer::default().initialize().unwrap();
9+
let mut timer = Timer::initialize(std::time::Duration::from_secs(10)).unwrap();
1110

1211
// Create a streaming channel
13-
let (tx, rx): (Sender<u64>, Receiver<u64>) = channel();
12+
let (tx, rx) = mpsc::channel();
1413

15-
let (tx2, rx2): (Sender<u64>, Receiver<u64>) = channel();
14+
let (tx2, rx2) = mpsc::channel();
1615

1716
// Attach tx to Timer
1817
timer.attach_listener(tx).unwrap();

src/pyroscope.rs

Lines changed: 103 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{
22
collections::HashMap,
33
sync::{
4-
mpsc::{channel, Receiver, Sender},
4+
mpsc::{self, Sender},
55
Arc, Condvar, Mutex,
66
},
77
thread::JoinHandle,
@@ -16,6 +16,28 @@ use crate::{
1616

1717
const LOG_TAG: &str = "Pyroscope::Agent";
1818

19+
20+
/// A signal sent from the agent to the timer.
21+
///
22+
/// Either schedules another wake-up, or asks
23+
/// the timer thread to terminate.
24+
#[derive(Debug, Clone, Copy)]
25+
pub enum AgentSignal {
26+
// Thread termination was requested.
27+
Terminate,
28+
// When to take the next snapshot using the `Backend`.
29+
NextSnapshot(u64),
30+
}
31+
32+
impl std::fmt::Display for AgentSignal {
33+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
34+
match self {
35+
Self::Terminate => write!(f, "Terminate"),
36+
Self::NextSnapshot(when) => write!(f, "NextSnapshot({})", when),
37+
}
38+
}
39+
}
40+
1941
/// Pyroscope Agent Configuration. This is the configuration that is passed to the agent.
2042
/// # Example
2143
/// ```
@@ -32,6 +54,8 @@ pub struct PyroscopeConfig {
3254
pub tags: HashMap<String, String>,
3355
/// Sample rate used in Hz
3456
pub sample_rate: i32,
57+
/// How long to accumulate data for before sending it upstream
58+
pub accumulation_cycle: std::time::Duration,
3559
// TODO
3660
// log_level
3761
// auth_token
@@ -52,6 +76,7 @@ impl PyroscopeConfig {
5276
application_name: application_name.as_ref().to_owned(),
5377
tags: HashMap::new(),
5478
sample_rate: 100i32,
79+
accumulation_cycle: std::time::Duration::from_secs(10),
5580
}
5681
}
5782

@@ -69,6 +94,26 @@ impl PyroscopeConfig {
6994
}
7095
}
7196

97+
/// Override the accumulation cycle.
98+
///
99+
/// # Example
100+
///
101+
/// ```
102+
/// # fn main() -> Result<(), PyroscopeError> {
103+
/// use std::time::Duration;
104+
/// use pyroscope::pyroscope::PyroscopeConfig;
105+
/// let config = PyroscopeConfig::new("http://localhost:8080", "my-app")
106+
/// .accumulation_cycle(Duration::from_millis(4587))?;
107+
/// # Ok(())
108+
/// # }
109+
/// ```
110+
pub fn accumulation_cycle(self, accumulation_cycle: std::time::Duration) -> Self {
111+
Self {
112+
accumulation_cycle,
113+
..self
114+
}
115+
}
116+
72117
/// Set the tags
73118
/// # Example
74119
/// ```ignore
@@ -134,7 +179,9 @@ impl PyroscopeAgentBuilder {
134179
/// ?;
135180
/// ```
136181
pub fn backend<T>(self, backend: T) -> Self
137-
where T: 'static + Backend {
182+
where
183+
T: 'static + Backend,
184+
{
138185
Self {
139186
backend: Arc::new(Mutex::new(backend)),
140187
..self
@@ -145,7 +192,7 @@ impl PyroscopeAgentBuilder {
145192
/// # Example
146193
/// ```ignore
147194
/// let builder = PyroscopeAgentBuilder::new("http://localhost:8080", "my-app")
148-
/// .sample_rate(99)
195+
/// .sample_rate(113)
149196
/// .build()
150197
/// ?;
151198
/// ```
@@ -156,6 +203,28 @@ impl PyroscopeAgentBuilder {
156203
}
157204
}
158205

206+
/// Set the accumulation cycle. Default value is 10 seconds.
207+
///
208+
/// # Example
209+
///
210+
/// ```
211+
/// # fn main() -> Result<(), PyroscopeError> {
212+
/// use std::time::Duration;
213+
///
214+
/// let builder = PyroscopeAgentBuilder::new("http://localhost:8080", "my-app")
215+
/// .accumulation_cycle(Duration::from_secs(3))
216+
/// .build()
217+
/// ?;
218+
/// # Ok(())
219+
/// # }
220+
/// ```
221+
pub fn accumulation_cycle(self, acc_cycle: impl Into<std::time::Duration>) -> Self {
222+
Self {
223+
config: self.config.accumulation_cycle(acc_cycle.into()),
224+
..self
225+
}
226+
}
227+
159228
/// Set tags. Default is empty.
160229
/// # Example
161230
/// ```ignore
@@ -179,7 +248,7 @@ impl PyroscopeAgentBuilder {
179248
log::trace!(target: LOG_TAG, "Backend initialized");
180249

181250
// Start Timer
182-
let timer = Timer::default().initialize()?;
251+
let timer = Timer::initialize(self.config.accumulation_cycle.clone())?;
183252
log::trace!(target: LOG_TAG, "Timer initialized");
184253

185254
// Start the SessionManager
@@ -204,7 +273,7 @@ impl PyroscopeAgentBuilder {
204273
pub struct PyroscopeAgent {
205274
timer: Timer,
206275
session_manager: SessionManager,
207-
tx: Option<Sender<u64>>,
276+
tx: Option<Sender<AgentSignal>>,
208277
handle: Option<JoinHandle<Result<()>>>,
209278
running: Arc<(Mutex<bool>, Condvar)>,
210279

@@ -287,7 +356,7 @@ impl PyroscopeAgent {
287356
*running = true;
288357
drop(running);
289358

290-
let (tx, rx): (Sender<u64>, Receiver<u64>) = channel();
359+
let (tx, rx) = mpsc::channel();
291360
self.timer.attach_listener(tx.clone())?;
292361
self.tx = Some(tx);
293362

@@ -298,28 +367,31 @@ impl PyroscopeAgent {
298367
self.handle = Some(std::thread::spawn(move || {
299368
log::trace!(target: LOG_TAG, "Main Thread started");
300369

301-
while let Ok(until) = rx.recv() {
302-
log::trace!(target: LOG_TAG, "Sending session {}", until);
303-
304-
// Generate report from backend
305-
let report = backend.lock()?.report()?;
306-
307-
// Send new Session to SessionManager
308-
stx.send(SessionSignal::Session(Session::new(
309-
until,
310-
config.clone(),
311-
report,
312-
)?))?;
313-
314-
if until == 0 {
315-
log::trace!(target: LOG_TAG, "Session Killed");
316-
317-
let (lock, cvar) = &*pair;
318-
let mut running = lock.lock()?;
319-
*running = false;
320-
cvar.notify_one();
321-
322-
return Ok(());
370+
while let Ok(signal) = rx.recv() {
371+
match signal {
372+
AgentSignal::NextSnapshot(until) => {
373+
log::trace!(target: LOG_TAG, "Sending session {}", until);
374+
375+
// Generate report from backend
376+
let report = backend.lock()?.report()?;
377+
378+
// Send new Session to SessionManager
379+
stx.send(SessionSignal::Session(Session::new(
380+
until,
381+
config.clone(),
382+
report,
383+
)?))?
384+
}
385+
AgentSignal::Terminate => {
386+
log::trace!(target: LOG_TAG, "Session Killed");
387+
388+
let (lock, cvar) = &*pair;
389+
let mut running = lock.lock()?;
390+
*running = false;
391+
cvar.notify_one();
392+
393+
return Ok(());
394+
}
323395
}
324396
}
325397
Ok(())
@@ -345,7 +417,9 @@ impl PyroscopeAgent {
345417
log::debug!(target: LOG_TAG, "Stopping");
346418
// get tx and send termination signal
347419
if let Some(sender) = self.tx.take() {
348-
sender.send(0)?;
420+
// best effort
421+
let _ = sender.send(AgentSignal::NextSnapshot(0));
422+
sender.send(AgentSignal::Terminate)?;
349423
} else {
350424
log::error!("PyroscopeAgent - Missing sender")
351425
}

src/session.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ impl SessionManager {
8484
pub struct Session {
8585
pub config: PyroscopeConfig,
8686
pub report: Vec<u8>,
87+
// unix time
8788
pub from: u64,
89+
// unix time
8890
pub until: u64,
8991
}
9092

src/timer/epoll.rs

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
55
// except according to those terms.
66

7+
use crate::pyroscope::AgentSignal;
78
use crate::utils::check_err;
89
use crate::utils::get_time_range;
10+
use crate::PyroscopeError;
911
use crate::Result;
1012

1113
use std::sync::{
12-
mpsc::{channel, Receiver, Sender},
14+
mpsc::{channel, Sender},
1315
Arc, Mutex,
1416
};
1517
use std::{thread, thread::JoinHandle};
@@ -22,57 +24,59 @@ use std::{thread, thread::JoinHandle};
2224
/// The Timer thread will run continously until all Senders are dropped.
2325
/// The Timer thread will be joined when all Senders are dropped.
2426
25-
#[derive(Debug, Default)]
27+
#[derive(Debug)]
2628
pub struct Timer {
2729
/// A vector to store listeners (mpsc::Sender)
28-
txs: Arc<Mutex<Vec<Sender<u64>>>>,
30+
txs: Arc<Mutex<Vec<Sender<AgentSignal>>>>,
2931

3032
/// Thread handle
3133
pub handle: Option<JoinHandle<Result<()>>>,
3234
}
3335

3436
impl Timer {
3537
/// Initialize Timer and run a thread to send events to attached listeners
36-
pub fn initialize(self) -> Result<Self> {
37-
let txs = Arc::clone(&self.txs);
38+
pub fn initialize(cycle: std::time::Duration) -> Result<Self> {
39+
let txs = Arc::new(Mutex::new(Vec::new()));
3840

39-
// Add Default tx
40-
let (tx, _rx): (Sender<u64>, Receiver<u64>) = channel();
41+
// Add a dummy tx so the below thread does not terminate early
42+
let (tx, _rx) = channel();
4143
txs.lock()?.push(tx);
4244

43-
let timer_fd = Timer::set_timerfd()?;
45+
let timer_fd = Timer::set_timerfd(cycle)?;
4446
let epoll_fd = Timer::create_epollfd(timer_fd)?;
4547

46-
let handle = Some(thread::spawn(move || {
47-
loop {
48-
// Exit thread if there are no listeners
49-
if txs.lock()?.len() == 0 {
50-
// Close file descriptors
51-
unsafe { libc::close(timer_fd) };
52-
unsafe { libc::close(epoll_fd) };
53-
54-
return Ok(());
48+
let handle = Some({
49+
let txs = txs.clone();
50+
thread::spawn(move || {
51+
loop {
52+
// Exit thread if there are no listeners
53+
if txs.lock()?.is_empty() {
54+
// Close file descriptors
55+
unsafe { libc::close(timer_fd) };
56+
unsafe { libc::close(epoll_fd) };
57+
return Ok::<_, PyroscopeError>(());
58+
}
59+
60+
// Fire @ 10th sec
61+
Timer::epoll_wait(timer_fd, epoll_fd)?;
62+
63+
// Get the current time range
64+
let from = AgentSignal::NextSnapshot(get_time_range(0)?.from);
65+
66+
// Iterate through Senders
67+
txs.lock()?.iter().for_each(|tx| {
68+
// Send event to attached Sender
69+
if tx.send(from).is_ok() {}
70+
});
5571
}
72+
})
73+
});
5674

57-
// Fire @ 10th sec
58-
Timer::epoll_wait(timer_fd, epoll_fd)?;
59-
60-
// Get the current time range
61-
let from = get_time_range(0)?.from;
62-
63-
// Iterate through Senders
64-
txs.lock()?.iter().for_each(|tx| {
65-
// Send event to attached Sender
66-
if tx.send(from).is_ok() {}
67-
});
68-
}
69-
}));
70-
71-
Ok(Self { handle, ..self })
75+
Ok(Self { handle, txs })
7276
}
7377

7478
/// create and set a timer file descriptor
75-
fn set_timerfd() -> Result<libc::c_int> {
79+
fn set_timerfd(cycle: std::time::Duration) -> Result<libc::c_int> {
7680
// Set the timer to use the system time.
7781
let clockid: libc::clockid_t = libc::CLOCK_REALTIME;
7882
// Non-blocking file descriptor
@@ -87,8 +91,8 @@ impl Timer {
8791
// new_value sets the Timer
8892
let mut new_value = libc::itimerspec {
8993
it_interval: libc::timespec {
90-
tv_sec: 10,
91-
tv_nsec: 0,
94+
tv_sec: cycle.as_secs() as i64,
95+
tv_nsec: cycle.subsec_nanos() as i64,
9296
},
9397
it_value: libc::timespec {
9498
tv_sec: first_fire as i64,
@@ -161,7 +165,7 @@ impl Timer {
161165
///
162166
/// Timer will dispatch an event with the timestamp of the current instant,
163167
/// every 10th second to all attached senders
164-
pub fn attach_listener(&mut self, tx: Sender<u64>) -> Result<()> {
168+
pub fn attach_listener(&mut self, tx: Sender<AgentSignal>) -> Result<()> {
165169
// Push Sender to a Vector of Sender(s)
166170
let txs = Arc::clone(&self.txs);
167171
txs.lock()?.push(tx);

0 commit comments

Comments
 (0)