Skip to content

Commit 0f315ce

Browse files
committed
imp(rbspy): change type implementation to pyroscope Backend types
1 parent 33c6a5e commit 0f315ce

File tree

2 files changed

+50
-141
lines changed

2 files changed

+50
-141
lines changed

examples/internal/rbspy-connect.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,7 @@ fn main() -> Result<()> {
3636
agent.start()?;
3737

3838
// Profile for around 1 minute
39-
std::thread::sleep(std::time::Duration::from_secs(30));
40-
41-
// Stop Agent
42-
agent.stop()?;
43-
44-
std::thread::sleep(std::time::Duration::from_secs(30));
45-
46-
agent.start()?;
47-
48-
std::thread::sleep(std::time::Duration::from_secs(30));
39+
std::thread::sleep(std::time::Duration::from_secs(120));
4940

5041
agent.stop()?;
5142

pyroscope_backends/pyroscope_rbspy/src/lib.rs

Lines changed: 49 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
use pyroscope::backend::{Backend, State};
1+
use pyroscope::backend::{Backend, Report, StackFrame, StackTrace, State};
22
use pyroscope::error::{PyroscopeError, Result};
33

4-
use rbspy::{sampler::Sampler, ui::output::Outputter, StackFrame, StackTrace};
4+
use rbspy::sampler::Sampler;
55

6-
use std::collections::HashMap;
7-
use std::io::Write;
86
use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender};
7+
use std::sync::{Arc, Mutex};
98

109
/// Rbspy Configuration
1110
#[derive(Debug)]
@@ -79,9 +78,11 @@ pub struct Rbspy {
7978
/// Rbspy Sampler
8079
sampler: Option<Sampler>,
8180
/// StackTrace Receiver
82-
stack_receiver: Option<Receiver<StackTrace>>,
81+
stack_receiver: Option<Receiver<rbspy::StackTrace>>,
8382
/// Error Receiver
8483
error_receiver: Option<Receiver<std::result::Result<(), anyhow::Error>>>,
84+
/// Profiling buffer
85+
buffer: Arc<Mutex<Report>>,
8586
}
8687

8788
impl std::fmt::Debug for Rbspy {
@@ -98,6 +99,7 @@ impl Rbspy {
9899
error_receiver: None,
99100
state: State::Uninitialized,
100101
config,
102+
buffer: Arc::new(Mutex::new(Report::default())),
101103
}
102104
}
103105
}
@@ -159,8 +161,10 @@ impl Backend for Rbspy {
159161
let queue_size: usize = self.config.sample_rate as usize * 10 * 100;
160162

161163
// Channel for StackTraces generated by the RubySpy Sampler
162-
let (stack_sender, stack_receiver): (SyncSender<StackTrace>, Receiver<StackTrace>) =
163-
sync_channel(queue_size);
164+
let (stack_sender, stack_receiver): (
165+
SyncSender<rbspy::StackTrace>,
166+
Receiver<rbspy::StackTrace>,
167+
) = sync_channel(queue_size);
164168

165169
// Set Error and Stack Receivers
166170
self.stack_receiver = Some(stack_receiver);
@@ -207,6 +211,9 @@ impl Backend for Rbspy {
207211
return Err(PyroscopeError::new("Rbspy: Backend is not Running"));
208212
}
209213

214+
// Get an Arc reference to the Report Buffer
215+
let buffer = self.buffer.clone();
216+
210217
// Send Errors to Log
211218
let errors = self
212219
.error_receiver
@@ -229,150 +236,61 @@ impl Backend for Rbspy {
229236
.ok_or_else(|| PyroscopeError::new("Rbspy: StackTrace receiver is not set"))?
230237
.try_iter();
231238

232-
// Create a new OutputFormat (collapsed). This is an object provided by rbspy.
233-
// The argument should be ignored.
234-
let mut outputter = RbspyFormatter::default();
235-
236239
// Iterate over the StackTrace
237240
for trace in stack_trace {
238-
// Write the StackTrace to the OutputFormat
239-
outputter.record(&trace).map_err(|e| {
240-
PyroscopeError::new(&format!("Rbspy: Error in OutputFormat: {}", e))
241-
})?;
241+
// convert StackTrace
242+
let own_trace: StackTrace = Into::<StackTraceWrapper>::into(trace).into();
243+
buffer.lock()?.record(own_trace)?;
242244
}
243245

244-
// buffer to store the output
245-
let mut buffer: Vec<u8> = Vec::new();
246-
247-
// Create a new writer
248-
let mut writer = RbspyWriter::new(&mut buffer);
246+
let v8: Vec<u8> = buffer.lock()?.to_string().into_bytes();
249247

250-
// Push the outputter into our writer
251-
outputter
252-
.complete(&mut writer)
253-
.map_err(|e| PyroscopeError::new(&format!("Rbspy: Error in OutputFormat: {}", e)))?;
254-
255-
// Flush the Writer
256-
writer.flush()?;
248+
buffer.lock()?.clear();
257249

258250
// Return the writer's buffer
259-
Ok(buffer)
251+
Ok(v8)
260252
}
261253
}
262254

263-
/// Rbspy Formatter
264-
#[derive(Default)]
265-
pub struct RbspyFormatter(HashMap<String, usize>);
255+
struct StackFrameWrapper(StackFrame);
266256

267-
impl Outputter for RbspyFormatter {
268-
fn record(&mut self, stack: &StackTrace) -> std::result::Result<(), anyhow::Error> {
269-
let frame = stack
270-
.iter()
271-
.rev()
272-
.map(|frame| format!("{}", StackFrameFormatter(frame)))
273-
.collect::<Vec<String>>()
274-
.join(";");
275-
276-
*self.0.entry(frame).or_insert(0) += 1;
277-
278-
Ok(())
279-
}
280-
fn complete(&mut self, writer: &mut dyn Write) -> std::result::Result<(), anyhow::Error> {
281-
if self.0.is_empty() {
282-
log::info!("Rbspy: No StackTraces reported");
283-
284-
return Ok(());
285-
} else {
286-
self.0
287-
.iter()
288-
.map(|(frame, count)| format!("{} {}", frame, count))
289-
.collect::<Vec<String>>()
290-
.iter()
291-
.try_for_each(|line| writeln!(writer, "{}", line))?;
292-
}
293-
Ok(())
257+
impl From<StackFrameWrapper> for StackFrame {
258+
fn from(frame: StackFrameWrapper) -> Self {
259+
frame.0
294260
}
295261
}
296262

297-
/// Custom Formatter for Rbspy StackFrames
298-
pub struct StackFrameFormatter<'a>(&'a StackFrame);
299-
300-
impl<'a> std::fmt::Display for StackFrameFormatter<'a> {
301-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302-
write!(
303-
f,
304-
"{}:{} - {}",
305-
self.0.relative_path, self.0.lineno, self.0.name
306-
)
263+
impl From<rbspy::StackFrame> for StackFrameWrapper {
264+
fn from(frame: rbspy::StackFrame) -> Self {
265+
StackFrameWrapper(StackFrame {
266+
module: None,
267+
name: Some(frame.name),
268+
filename: Some(frame.relative_path.clone()),
269+
relative_path: Some(frame.relative_path),
270+
absolute_path: frame.absolute_path,
271+
line: Some(frame.lineno),
272+
})
307273
}
308274
}
309275

310-
#[cfg(test)]
311-
mod test_stack_frame_formatter {
312-
use super::{StackFrame, StackFrameFormatter};
313-
314-
#[test]
315-
fn test_stack_frame_formatter() {
316-
let frame = StackFrame {
317-
absolute_path: Some("".to_string()),
318-
relative_path: "test.rb".to_string(),
319-
lineno: 1,
320-
name: "test".to_string(),
321-
};
322-
let formatter = StackFrameFormatter(&frame);
323-
assert_eq!(formatter.to_string(), "test.rb:1 - test");
324-
}
325-
}
326-
327-
/// Rubyspy Writer
328-
/// This object is used to write the output of the rbspy sampler to a data buffer
329-
struct RbspyWriter<'a> {
330-
data: Vec<u8>,
331-
buffer: &'a mut Vec<u8>,
332-
}
333-
334-
impl<'a> RbspyWriter<'a> {
335-
/// Create a new RbspyWriter
336-
pub fn new(buffer: &'a mut Vec<u8>) -> Self {
337-
RbspyWriter {
338-
data: Vec::new(),
339-
buffer,
340-
}
341-
}
342-
}
343-
344-
/// Implement Writer for Rbspy
345-
impl<'a> std::io::Write for RbspyWriter<'a> {
346-
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
347-
// push the data to the buffer
348-
self.data.extend_from_slice(buf);
276+
struct StackTraceWrapper(StackTrace);
349277

350-
// return the number of bytes written
351-
Ok(buf.len())
352-
}
353-
354-
fn flush(&mut self) -> std::io::Result<()> {
355-
// flush the buffer
356-
self.buffer.extend_from_slice(&self.data);
357-
358-
Ok(())
278+
impl From<StackTraceWrapper> for StackTrace {
279+
fn from(trace: StackTraceWrapper) -> Self {
280+
trace.0
359281
}
360282
}
361283

362-
#[cfg(test)]
363-
mod test_rbspy_writer {
364-
use super::RbspyWriter;
365-
use std::io::Write;
366-
367-
#[test]
368-
fn test_rbspy_writer() {
369-
let mut buffer: Vec<u8> = Vec::new();
370-
let mut writer = RbspyWriter::new(&mut buffer);
371-
372-
writer.write(b"hello").unwrap();
373-
writer.write(b"world").unwrap();
374-
writer.flush().unwrap();
375-
376-
assert_eq!(buffer, b"helloworld".to_vec());
284+
impl From<rbspy::StackTrace> for StackTraceWrapper {
285+
fn from(trace: rbspy::StackTrace) -> Self {
286+
StackTraceWrapper(StackTrace {
287+
pid: trace.pid.map(|pid| pid as u32),
288+
thread_id: trace.thread_id.map(|id| id as u64),
289+
thread_name: None,
290+
frames: trace
291+
.iter()
292+
.map(|frame| Into::<StackFrameWrapper>::into(frame.clone()).into())
293+
.collect(),
294+
})
377295
}
378296
}

0 commit comments

Comments
 (0)