Skip to content

Commit 26e909b

Browse files
committed
feat: rewrite the compression logic to use ruzstd instead of zeekstd
1 parent 9fc804f commit 26e909b

File tree

3 files changed

+31
-20
lines changed

3 files changed

+31
-20
lines changed

runtime_tracing/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ serde_json = "1.0"
2121
serde_repr = "0.1"
2222
schemars = "0.8.2"
2323
capnp = "0.21.1"
24-
zeekstd = "0.5.0"
2524
cbor4ii = { version = "1.0.0", features = ["serde1", "use_std"] }
2625
fscommon = "0.1.1"
26+
ruzstd = "0.8.1"
2727

2828
[build-dependencies]
2929
capnpc = "0.21.0"

runtime_tracing/src/cbor_zstd_reader.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::io::{self, BufRead, BufReader, Read, Seek, Write};
22

33
use fscommon::StreamSlice;
4-
use zeekstd::Decoder;
4+
5+
use ruzstd::decoding::StreamingDecoder;
56

67
use crate::{TraceLowLevelEvent, cbor_zstd_writer::HEADERV1};
78

@@ -24,7 +25,7 @@ pub fn read_trace(input: &mut (impl Read + Write + Seek)) -> Result<Vec<TraceLow
2425
input.seek(io::SeekFrom::Start(0))?;
2526
let input2 = StreamSlice::new(&mut *input, 8, end_pos)?;
2627

27-
let decoder = Decoder::new(input2)?;
28+
let decoder = StreamingDecoder::new(input2)?;
2829
let mut buf_reader = BufReader::new(decoder);
2930

3031
let mut result: Vec<TraceLowLevelEvent> = vec![];

runtime_tracing/src/cbor_zstd_writer.rs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
use std::{fs::File, io::Write, path::PathBuf};
2-
use zeekstd::Encoder;
1+
use std::{
2+
fs::File,
3+
io::{Cursor, Write},
4+
path::PathBuf,
5+
};
6+
7+
use ruzstd::encoding::{CompressionLevel, compress};
38

49
use crate::{
510
TraceLowLevelEvent,
@@ -15,26 +20,28 @@ pub const HEADERV1: &[u8] = &[
1520
0x00, 0x00,
1621
]; // Reserved, must be zero in this version.
1722

18-
pub struct CborZstdTraceWriter<'a> {
23+
pub struct CborZstdTraceWriter {
1924
base: AbstractTraceWriterData,
2025

2126
trace_events_path: Option<PathBuf>,
22-
trace_events_file_zstd_encoder: Option<Encoder<'a, File>>,
27+
trace_events_file: Option<File>,
28+
uncompressed_buf: Vec<u8>,
2329
}
2430

25-
impl CborZstdTraceWriter<'_> {
31+
impl CborZstdTraceWriter {
2632
/// Create a new tracer instance for the given program and arguments.
2733
pub fn new(program: &str, args: &[String]) -> Self {
2834
CborZstdTraceWriter {
2935
base: AbstractTraceWriterData::new(program, args),
3036

3137
trace_events_path: None,
32-
trace_events_file_zstd_encoder: None,
38+
trace_events_file: None,
39+
uncompressed_buf: Vec::new(),
3340
}
3441
}
3542
}
3643

37-
impl AbstractTraceWriter for CborZstdTraceWriter<'_> {
44+
impl AbstractTraceWriter for CborZstdTraceWriter {
3845
fn get_data(&self) -> &AbstractTraceWriterData {
3946
&self.base
4047
}
@@ -45,34 +52,37 @@ impl AbstractTraceWriter for CborZstdTraceWriter<'_> {
4552

4653
fn add_event(&mut self, event: TraceLowLevelEvent) {
4754
let buf: Vec<u8> = Vec::new();
48-
let q = cbor4ii::serde::to_vec(buf, &event).unwrap();
49-
if let Some(enc) = &mut self.trace_events_file_zstd_encoder {
50-
enc.write_all(&q).unwrap();
51-
}
55+
let q = cbor4ii::serde::to_vec(buf, &event).expect("CBOR encode failed");
56+
self.uncompressed_buf.extend_from_slice(&q);
5257
}
5358

5459
fn append_events(&mut self, events: &mut Vec<TraceLowLevelEvent>) {
55-
for e in events {
56-
AbstractTraceWriter::add_event(self, e.clone());
60+
for e in events.drain(..) {
61+
<Self as AbstractTraceWriter>::add_event(self, e);
5762
}
5863
}
5964
}
6065

61-
impl TraceWriter for CborZstdTraceWriter<'_> {
66+
impl TraceWriter for CborZstdTraceWriter {
6267
fn begin_writing_trace_events(&mut self, path: &std::path::Path) -> Result<(), Box<dyn std::error::Error>> {
6368
let pb = path.to_path_buf();
6469
self.trace_events_path = Some(pb.clone());
70+
6571
let mut file_output = std::fs::File::create(pb)?;
6672
file_output.write_all(HEADERV1)?;
67-
self.trace_events_file_zstd_encoder = Some(Encoder::new(file_output)?);
73+
self.trace_events_file = Some(file_output);
6874

6975
Ok(())
7076
}
7177

7278
fn finish_writing_trace_events(&mut self) -> Result<(), Box<dyn std::error::Error>> {
73-
if let Some(enc) = self.trace_events_file_zstd_encoder.take() {
74-
enc.finish()?;
79+
if let Some(mut file) = self.trace_events_file.take() {
80+
let mut cursor = Cursor::new(&self.uncompressed_buf);
81+
compress(&mut cursor, &mut file, CompressionLevel::Fastest);
82+
83+
file.flush()?;
7584

85+
self.uncompressed_buf.clear();
7686
Ok(())
7787
} else {
7888
panic!("finish_writing_trace_events() called without previous call to begin_writing_trace_events()");

0 commit comments

Comments
 (0)