Skip to content

Commit 791b6e5

Browse files
committed
feat: rewrite the compression logic to use ruzstd instead of zeekstd
1 parent 03506e3 commit 791b6e5

File tree

3 files changed

+38
-20
lines changed

3 files changed

+38
-20
lines changed

runtime_tracing/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ serde = { version = "1.0", features = ["derive"] }
2020
serde_json = "1.0"
2121
serde_repr = "0.1"
2222
capnp = "0.21.1"
23-
zeekstd = "0.5.0"
2423
cbor4ii = { version = "1.0.0", features = ["serde1", "use_std"] }
2524
fscommon = "0.1.1"
25+
ruzstd = "0.8.1"
2626

2727
[build-dependencies]
2828
capnpc = "0.21.0"

runtime_tracing/src/cbor_zstd_reader.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::io::{self, BufRead, BufReader, Read, Seek, Write};
22

33
use fscommon::StreamSlice;
4-
use zeekstd::Decoder;
4+
5+
use ruzstd::decoding::StreamingDecoder;
56

67
use crate::{TraceLowLevelEvent, cbor_zstd_writer::HEADERV1};
78

@@ -24,7 +25,8 @@ pub fn read_trace(input: &mut (impl Read + Write + Seek)) -> Result<Vec<TraceLow
2425
input.seek(io::SeekFrom::Start(0))?;
2526
let input2 = StreamSlice::new(&mut *input, 8, end_pos)?;
2627

27-
let decoder = Decoder::new(input2)?;
28+
// let decoder = Decoder::new(input2)?;
29+
let decoder = StreamingDecoder::new(input2)?;
2830
let mut buf_reader = BufReader::new(decoder);
2931

3032
let mut result: Vec<TraceLowLevelEvent> = vec![];

runtime_tracing/src/cbor_zstd_writer.rs

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
use std::{fs::File, io::Write, path::PathBuf};
2-
use zeekstd::Encoder;
1+
use std::{
2+
fs::File,
3+
io::{Cursor, Write},
4+
path::PathBuf,
5+
};
6+
7+
use ruzstd::encoding::{CompressionLevel, compress};
38

49
use crate::{
510
TraceLowLevelEvent,
@@ -15,26 +20,30 @@ pub const HEADERV1: &[u8] = &[
1520
0x00, 0x00,
1621
]; // Reserved, must be zero in this version.
1722

18-
pub struct CborZstdTraceWriter<'a> {
23+
pub struct CborZstdTraceWriter {
1924
base: AbstractTraceWriterData,
2025

2126
trace_events_path: Option<PathBuf>,
22-
trace_events_file_zstd_encoder: Option<Encoder<'a, File>>,
27+
// Open file we’ll write the header immediately and the compressed frame at finish().
28+
trace_events_file: Option<File>,
29+
// Accumulate uncompressed CBOR here; compress once at finish().
30+
uncompressed_buf: Vec<u8>,
2331
}
2432

25-
impl CborZstdTraceWriter<'_> {
33+
impl CborZstdTraceWriter {
2634
/// Create a new tracer instance for the given program and arguments.
2735
pub fn new(program: &str, args: &[String]) -> Self {
2836
CborZstdTraceWriter {
2937
base: AbstractTraceWriterData::new(program, args),
3038

3139
trace_events_path: None,
32-
trace_events_file_zstd_encoder: None,
40+
trace_events_file: None,
41+
uncompressed_buf: Vec::new(),
3342
}
3443
}
3544
}
3645

37-
impl AbstractTraceWriter for CborZstdTraceWriter<'_> {
46+
impl AbstractTraceWriter for CborZstdTraceWriter {
3847
fn get_data(&self) -> &AbstractTraceWriterData {
3948
&self.base
4049
}
@@ -44,35 +53,42 @@ impl AbstractTraceWriter for CborZstdTraceWriter<'_> {
4453
}
4554

4655
fn add_event(&mut self, event: TraceLowLevelEvent) {
56+
// Serialize to CBOR and append to the uncompressed buffer.
4757
let buf: Vec<u8> = Vec::new();
48-
let q = cbor4ii::serde::to_vec(buf, &event).unwrap();
49-
if let Some(enc) = &mut self.trace_events_file_zstd_encoder {
50-
enc.write_all(&q).unwrap();
51-
}
58+
let q = cbor4ii::serde::to_vec(buf, &event).expect("CBOR encode failed");
59+
self.uncompressed_buf.extend_from_slice(&q);
5260
}
5361

5462
fn append_events(&mut self, events: &mut Vec<TraceLowLevelEvent>) {
55-
for e in events {
56-
AbstractTraceWriter::add_event(self, e.clone());
63+
// Move events out to avoid clones; qualify the trait call explicitly.
64+
for e in events.drain(..) {
65+
<Self as AbstractTraceWriter>::add_event(self, e);
5766
}
5867
}
5968
}
6069

61-
impl TraceWriter for CborZstdTraceWriter<'_> {
70+
impl TraceWriter for CborZstdTraceWriter {
6271
fn begin_writing_trace_events(&mut self, path: &std::path::Path) -> Result<(), Box<dyn std::error::Error>> {
6372
let pb = path.to_path_buf();
6473
self.trace_events_path = Some(pb.clone());
74+
6575
let mut file_output = std::fs::File::create(pb)?;
76+
// Write header immediately; compressed frame follows at finish().
6677
file_output.write_all(HEADERV1)?;
67-
self.trace_events_file_zstd_encoder = Some(Encoder::new(file_output)?);
78+
self.trace_events_file = Some(file_output);
6879

6980
Ok(())
7081
}
7182

7283
fn finish_writing_trace_events(&mut self) -> Result<(), Box<dyn std::error::Error>> {
73-
if let Some(enc) = self.trace_events_file_zstd_encoder.take() {
74-
enc.finish()?;
84+
if let Some(mut file) = self.trace_events_file.take() {
85+
// Compress buffered CBOR into a single Zstd frame written to the file.
86+
let mut cursor = Cursor::new(&self.uncompressed_buf);
87+
compress(&mut cursor, &mut file, CompressionLevel::Fastest);
88+
file.flush()?;
7589

90+
// Clear buffer now that it’s written.
91+
self.uncompressed_buf.clear();
7692
Ok(())
7793
} else {
7894
panic!("finish_writing_trace_events() called without previous call to begin_writing_trace_events()");

0 commit comments

Comments
 (0)