Skip to content

Commit 694c5df

Browse files
committed
feat: Modularize code for different build; Fallback to zeekstd for x86
1 parent 968c722 commit 694c5df

File tree

6 files changed

+167
-23
lines changed

6 files changed

+167
-23
lines changed

runtime_tracing/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ serde_repr = "0.1"
2222
capnp = "0.21.1"
2323
cbor4ii = { version = "1.0.0", features = ["serde1", "use_std"] }
2424
fscommon = "0.1.1"
25+
26+
[target.'cfg(not(target_arch = "x86_64"))'.dependencies]
27+
zeekstd = "0.6.0"
28+
29+
[target.'cfg(target_arch = "wasm32")'.dependencies]
2530
ruzstd = "0.8.1"
2631

2732
[build-dependencies]

runtime_tracing/src/cbor_zstd_reader.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::io::{self, BufRead, BufReader, Read, Seek, Write};
22

33
use fscommon::StreamSlice;
44

5-
use ruzstd::decoding::StreamingDecoder;
5+
use zeekstd::Decoder;
66

77
use crate::{TraceLowLevelEvent, cbor_zstd_writer::HEADERV1};
88

@@ -11,6 +11,7 @@ fn is_at_eof<R: BufRead>(reader: &mut R) -> io::Result<bool> {
1111
Ok(buffer.is_empty())
1212
}
1313

14+
#[cfg(target_arch = "x86_64")]
1415
pub fn read_trace(input: &mut (impl Read + Write + Seek)) -> Result<Vec<TraceLowLevelEvent>, Box<dyn std::error::Error>> {
1516
let end_pos = input.seek(io::SeekFrom::End(0))?;
1617
input.seek(io::SeekFrom::Start(0))?;
@@ -25,7 +26,7 @@ pub fn read_trace(input: &mut (impl Read + Write + Seek)) -> Result<Vec<TraceLow
2526
input.seek(io::SeekFrom::Start(0))?;
2627
let input2 = StreamSlice::new(&mut *input, 8, end_pos)?;
2728

28-
let decoder = StreamingDecoder::new(input2)?;
29+
let decoder = Decoder::new(input2)?;
2930
let mut buf_reader = BufReader::new(decoder);
3031

3132
let mut result: Vec<TraceLowLevelEvent> = vec![];
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use std::io::{self, BufRead, BufReader, Read, Seek, Write};
2+
3+
use fscommon::StreamSlice;
4+
5+
use ruzstd::decoding::StreamingDecoder;
6+
7+
use crate::{TraceLowLevelEvent, cbor_zstd_writer::HEADERV1};
8+
9+
fn is_at_eof<R: BufRead>(reader: &mut R) -> io::Result<bool> {
10+
let buffer = reader.fill_buf()?;
11+
Ok(buffer.is_empty())
12+
}
13+
14+
pub fn read_trace(input: &mut (impl Read + Write + Seek)) -> Result<Vec<TraceLowLevelEvent>, Box<dyn std::error::Error>> {
15+
let end_pos = input.seek(io::SeekFrom::End(0))?;
16+
input.seek(io::SeekFrom::Start(0))?;
17+
18+
let mut header_buf = [0; 8];
19+
let mut buf_reader = BufReader::new(&mut *input);
20+
buf_reader.read_exact(&mut header_buf)?;
21+
if header_buf != HEADERV1 {
22+
panic!("Invalid file header (wrong file format or incompatible version)");
23+
}
24+
25+
input.seek(io::SeekFrom::Start(0))?;
26+
let input2 = StreamSlice::new(&mut *input, 8, end_pos)?;
27+
28+
let decoder = StreamingDecoder::new(input2)?;
29+
let mut buf_reader = BufReader::new(decoder);
30+
31+
let mut result: Vec<TraceLowLevelEvent> = vec![];
32+
33+
while !is_at_eof(&mut buf_reader)? {
34+
let obj = cbor4ii::serde::from_reader::<TraceLowLevelEvent, _>(&mut buf_reader)?;
35+
result.push(obj);
36+
}
37+
38+
Ok(result)
39+
}

runtime_tracing/src/cbor_zstd_writer.rs

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use std::{
44
path::PathBuf,
55
};
66

7-
use ruzstd::encoding::{CompressionLevel, compress};
7+
#[cfg(target_arch = "x86_64")]
8+
use zeekstd::Encoder;
89

910
use crate::{
1011
TraceLowLevelEvent,
@@ -20,28 +21,26 @@ pub const HEADERV1: &[u8] = &[
2021
0x00, 0x00,
2122
]; // Reserved, must be zero in this version.
2223

23-
pub struct CborZstdTraceWriter {
24+
pub struct CborZstdTraceWriter<'a> {
2425
base: AbstractTraceWriterData,
2526

2627
trace_events_path: Option<PathBuf>,
27-
trace_events_file: Option<File>,
28-
uncompressed_buf: Vec<u8>,
28+
trace_events_file_zstd_encoder: Option<Encoder<'a, File>>,
2929
}
3030

31-
impl CborZstdTraceWriter {
31+
impl CborZstdTraceWriter<'_> {
3232
/// Create a new tracer instance for the given program and arguments.
3333
pub fn new(program: &str, args: &[String]) -> Self {
3434
CborZstdTraceWriter {
3535
base: AbstractTraceWriterData::new(program, args),
3636

3737
trace_events_path: None,
38-
trace_events_file: None,
39-
uncompressed_buf: Vec::new(),
38+
trace_events_file_zstd_encoder: None,
4039
}
4140
}
4241
}
4342

44-
impl AbstractTraceWriter for CborZstdTraceWriter {
43+
impl AbstractTraceWriter for CborZstdTraceWriter<'_> {
4544
fn get_data(&self) -> &AbstractTraceWriterData {
4645
&self.base
4746
}
@@ -52,37 +51,35 @@ impl AbstractTraceWriter for CborZstdTraceWriter {
5251

5352
fn add_event(&mut self, event: TraceLowLevelEvent) {
5453
let buf: Vec<u8> = Vec::new();
55-
let q = cbor4ii::serde::to_vec(buf, &event).expect("CBOR encode failed");
56-
self.uncompressed_buf.extend_from_slice(&q);
54+
let q = cbor4ii::serde::to_vec(buf, &event).unwrap();
55+
if let Some(enc) = &mut self.trace_events_file_zstd_encoder {
56+
enc.write_all(&q).unwrap();
57+
}
5758
}
5859

5960
fn append_events(&mut self, events: &mut Vec<TraceLowLevelEvent>) {
60-
for e in events.drain(..) {
61-
<Self as AbstractTraceWriter>::add_event(self, e);
61+
for e in events {
62+
AbstractTraceWriter::add_event(self, e.clone());
6263
}
6364
}
6465
}
6566

66-
impl TraceWriter for CborZstdTraceWriter {
67+
#[cfg(target_arch = "x86_64")]
68+
impl TraceWriter for CborZstdTraceWriter<'_> {
6769
fn begin_writing_trace_events(&mut self, path: &std::path::Path) -> Result<(), Box<dyn std::error::Error>> {
6870
let pb = path.to_path_buf();
6971
self.trace_events_path = Some(pb.clone());
70-
7172
let mut file_output = std::fs::File::create(pb)?;
7273
file_output.write_all(HEADERV1)?;
73-
self.trace_events_file = Some(file_output);
74+
self.trace_events_file_zstd_encoder = Some(Encoder::new(file_output)?);
7475

7576
Ok(())
7677
}
7778

7879
fn finish_writing_trace_events(&mut self) -> Result<(), Box<dyn std::error::Error>> {
79-
if let Some(mut file) = self.trace_events_file.take() {
80-
let mut cursor = Cursor::new(&self.uncompressed_buf);
81-
compress(&mut cursor, &mut file, CompressionLevel::Fastest);
82-
83-
file.flush()?;
80+
if let Some(enc) = self.trace_events_file_zstd_encoder.take() {
81+
enc.finish()?;
8482

85-
self.uncompressed_buf.clear();
8683
Ok(())
8784
} else {
8885
panic!("finish_writing_trace_events() called without previous call to begin_writing_trace_events()");
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
use std::{
2+
fs::File,
3+
io::{Cursor, Write},
4+
path::PathBuf,
5+
};
6+
7+
use ruzstd::encoding::{CompressionLevel, compress};
8+
9+
use crate::{
10+
TraceLowLevelEvent,
11+
abstract_trace_writer::{AbstractTraceWriter, AbstractTraceWriterData},
12+
trace_writer::TraceWriter,
13+
};
14+
15+
/// The next 3 bytes are reserved/version info.
16+
/// The header is 8 bytes in size, ensuring 64-bit alignment for the rest of the file.
17+
pub const HEADERV1: &[u8] = &[
18+
0xC0, 0xDE, 0x72, 0xAC, 0xE2, // The first 5 bytes identify the file as a CodeTracer file (hex l33tsp33k - C0DE72ACE2 for "CodeTracer").
19+
0x01, // Indicates version 1 of the file format
20+
0x00, 0x00,
21+
]; // Reserved, must be zero in this version.
22+
23+
pub struct CborZstdTraceWriter {
24+
base: AbstractTraceWriterData,
25+
26+
trace_events_path: Option<PathBuf>,
27+
trace_events_file: Option<File>,
28+
uncompressed_buf: Vec<u8>,
29+
}
30+
31+
impl CborZstdTraceWriter {
32+
/// Create a new tracer instance for the given program and arguments.
33+
pub fn new(program: &str, args: &[String]) -> Self {
34+
CborZstdTraceWriter {
35+
base: AbstractTraceWriterData::new(program, args),
36+
37+
trace_events_path: None,
38+
trace_events_file: None,
39+
uncompressed_buf: vec![],
40+
}
41+
}
42+
}
43+
44+
impl AbstractTraceWriter for CborZstdTraceWriter {
45+
fn get_data(&self) -> &AbstractTraceWriterData {
46+
&self.base
47+
}
48+
49+
fn get_mut_data(&mut self) -> &mut AbstractTraceWriterData {
50+
&mut self.base
51+
}
52+
53+
fn add_event(&mut self, event: TraceLowLevelEvent) {
54+
let buf: Vec<u8> = Vec::new();
55+
let q = cbor4ii::serde::to_vec(buf, &event).expect("CBOR encode failed");
56+
self.uncompressed_buf.extend_from_slice(&q);
57+
}
58+
59+
fn append_events(&mut self, events: &mut Vec<TraceLowLevelEvent>) {
60+
for e in events.drain(..) {
61+
<Self as AbstractTraceWriter>::add_event(self, e);
62+
}
63+
}
64+
}
65+
66+
impl TraceWriter for CborZstdTraceWriter {
67+
fn begin_writing_trace_events(&mut self, path: &std::path::Path) -> Result<(), Box<dyn std::error::Error>> {
68+
let pb = path.to_path_buf();
69+
self.trace_events_path = Some(pb.clone());
70+
71+
let mut file_output = std::fs::File::create(pb)?;
72+
file_output.write_all(HEADERV1)?;
73+
self.trace_events_file = Some(file_output);
74+
75+
Ok(())
76+
}
77+
78+
fn finish_writing_trace_events(&mut self) -> Result<(), Box<dyn std::error::Error>> {
79+
if let Some(mut file) = self.trace_events_file.take() {
80+
let mut cursor = Cursor::new(&self.uncompressed_buf);
81+
compress(&mut cursor, &mut file, CompressionLevel::Fastest);
82+
83+
file.flush()?;
84+
85+
self.uncompressed_buf.clear();
86+
Ok(())
87+
} else {
88+
panic!("finish_writing_trace_events() called without previous call to begin_writing_trace_events()");
89+
}
90+
}
91+
}

runtime_tracing/src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,19 @@
1111
mod abstract_trace_writer;
1212
mod base64;
1313
mod capnptrace;
14+
15+
#[cfg(target_arch = "wasm32")]
16+
#[path = "./cbor_zstd_reader_wasm.rs"]
1417
mod cbor_zstd_reader;
18+
#[cfg(target_arch = "wasm32")]
19+
#[path = "./cbor_zstd_writer_wasm.rs"]
1520
mod cbor_zstd_writer;
21+
22+
#[cfg(not(target_arch = "wasm32"))]
23+
mod cbor_zstd_reader;
24+
#[cfg(not(target_arch = "wasm32"))]
25+
mod cbor_zstd_writer;
26+
1627
mod non_streaming_trace_writer;
1728
mod trace_readers;
1829
mod trace_writer;

0 commit comments

Comments
 (0)