Skip to content

Commit 1111115

Browse files
committed
imp(session): kill SessionManager on Agent drop
1 parent 6dfe698 commit 1111115

File tree

2 files changed

+32
-8
lines changed

2 files changed

+32
-8
lines changed

src/pyroscope.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::backends::Backend;
1616
use crate::error::Result;
1717
use crate::session::Session;
1818
use crate::session::SessionManager;
19+
use crate::session::SessionSignal;
1920
use crate::timer::Timer;
2021

2122
/// Represent PyroscopeAgent Configuration
@@ -165,15 +166,18 @@ impl Drop for PyroscopeAgent {
165166
self.timer.drop_listeners().unwrap(); // Drop listeners
166167
self.timer.handle.take().unwrap().join().unwrap().unwrap(); // Wait for the Timer thread to finish
167168

168-
// Wait for main thread to finish
169-
self.handle.take().unwrap().join().unwrap().unwrap();
169+
// Stop the SessionManager
170+
self.session_manager.push(SessionSignal::Kill).unwrap();
170171
self.session_manager
171172
.handle
172173
.take()
173174
.unwrap()
174175
.join()
175176
.unwrap()
176177
.unwrap();
178+
179+
// Wait for main thread to finish
180+
self.handle.take().unwrap().join().unwrap().unwrap();
177181
}
178182
}
179183

@@ -212,7 +216,11 @@ impl PyroscopeAgent {
212216
while let Ok(time) = rx.recv() {
213217
let report = backend.lock()?.report()?;
214218
// Send new Session to SessionManager
215-
stx.send(Session::new(time, config.clone(), report)?)?;
219+
stx.send(SessionSignal::Session(Session::new(
220+
time,
221+
config.clone(),
222+
report,
223+
)?))?;
216224

217225
if time == 0 {
218226
let (lock, cvar) = &*pair;

src/session.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,39 @@ use crate::pyroscope::PyroscopeConfig;
1414
use crate::utils::merge_tags_with_app_name;
1515
use crate::Result;
1616

17+
/// Session Signal
18+
#[derive(Debug)]
19+
pub enum SessionSignal {
20+
Session(Session),
21+
Kill,
22+
}
23+
1724
/// SessionManager
1825
#[derive(Debug)]
1926
pub struct SessionManager {
2027
pub handle: Option<JoinHandle<Result<()>>>,
21-
pub tx: SyncSender<Session>,
28+
pub tx: SyncSender<SessionSignal>,
2229
}
2330

2431
impl SessionManager {
2532
/// Create a new SessionManager
2633
pub fn new() -> Result<Self> {
2734
// Create a channel for sending and receiving sessions
28-
let (tx, rx): (SyncSender<Session>, Receiver<Session>) = sync_channel(10);
35+
let (tx, rx): (SyncSender<SessionSignal>, Receiver<SessionSignal>) = sync_channel(10);
2936

3037
// Create a thread for the SessionManager
3138
let handle = Some(thread::spawn(move || {
32-
while let Ok(session) = rx.recv() {
33-
session.send()?;
39+
while let Ok(signal) = rx.recv() {
40+
match signal {
41+
SessionSignal::Session(session) => {
42+
// Send the session
43+
session.send()?;
44+
}
45+
SessionSignal::Kill => {
46+
// Kill the session manager
47+
return Ok(());
48+
}
49+
}
3450
}
3551
Ok(())
3652
}));
@@ -39,7 +55,7 @@ impl SessionManager {
3955
}
4056

4157
/// Push a new session into the SessionManager
42-
pub fn push(&self, session: Session) -> Result<()> {
58+
pub fn push(&self, session: SessionSignal) -> Result<()> {
4359
self.tx.send(session)?;
4460
Ok(())
4561
}

0 commit comments

Comments
 (0)