Skip to content

Commit 958b210

Browse files
committed
perf(profiling): use zstd for compression
1 parent e741848 commit 958b210

File tree

9 files changed

+158
-19
lines changed

9 files changed

+158
-19
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datadog-profiling/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ hyper = { workspace = true}
3232
http-body-util = "0.1"
3333
hyper-multipart-rfc7578 = "0.9.0"
3434
indexmap = "2.2"
35-
lz4_flex = { version = "0.9", default-features = false, features = ["std", "safe-encode", "frame"] }
3635
mime = "0.3.16"
3736
prost = "0.13.5"
3837
rustc-hash = { version = "1.1", default-features = false }
@@ -41,7 +40,9 @@ serde_json = {version = "1.0"}
4140
target-triple = "0.1.4"
4241
tokio = {version = "1.23", features = ["rt", "macros"]}
4342
tokio-util = "0.7.1"
43+
zstd = { version = "0.13", default-features = false }
4444

4545
[dev-dependencies]
4646
bolero = "0.13"
4747
criterion = "0.5.1"
48+
lz4_flex = { version = "0.9", default-features = false, features = ["std", "frame"] }

datadog-profiling/src/exporter/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use anyhow::Context;
45
use bytes::Bytes;
56
pub use chrono::{DateTime, Utc};
67
pub use ddcommon::tag::Tag;
78
pub use hyper::Uri;
89
use hyper_multipart_rfc7578::client::multipart;
9-
use lz4_flex::frame::FrameEncoder;
1010
use serde_json::json;
1111
use std::borrow::Cow;
1212
use std::fmt::Debug;
@@ -29,6 +29,7 @@ pub use connector::uds::{socket_path_from_uri, socket_path_to_uri};
2929
pub use connector::named_pipe::{named_pipe_path_from_uri, named_pipe_path_to_uri};
3030

3131
use crate::internal::EncodedProfile;
32+
use crate::profiles::Compressor;
3233

3334
const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0);
3435

@@ -279,8 +280,10 @@ impl ProfileExporter {
279280
// not reserving too much for things that compress really well, and
280281
// power-of-two capacities are almost always the best performing.
281282
let capacity = (file.bytes.len() / 10).next_power_of_two();
282-
let buffer = Vec::with_capacity(capacity);
283-
let mut encoder = FrameEncoder::new(buffer);
283+
let max_capacity = 50 * 1024 * 1024;
284+
let compression_level = 1;
285+
let mut encoder = Compressor::try_new(capacity, max_capacity, compression_level)
286+
.context("failed to create compressor")?;
284287
encoder.write_all(file.bytes)?;
285288
let encoded = encoder.finish()?;
286289
/* The Datadog RFC examples strip off the file extension, but the exact behavior

datadog-profiling/src/internal/observation/timestamped_observations.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,11 @@ use super::super::StackTraceId;
1111
use crate::collections::identifiable::Id;
1212
use crate::internal::Timestamp;
1313
use byteorder::{NativeEndian, ReadBytesExt};
14-
use lz4_flex::frame::FrameDecoder;
15-
use lz4_flex::frame::FrameEncoder;
16-
use std::io::Cursor;
1714
use std::io::Write;
15+
use std::io::{BufReader, Cursor};
1816

19-
#[derive(Debug)]
2017
pub struct TimestampedObservations {
21-
compressed_timestamped_data: FrameEncoder<Vec<u8>>,
18+
compressed_timestamped_data: zstd::Encoder<'static, Vec<u8>>,
2219
sample_types_len: usize,
2320
}
2421

@@ -32,16 +29,19 @@ impl TimestampedObservations {
3229

3330
pub fn new(sample_types_len: usize) -> Self {
3431
Self {
35-
compressed_timestamped_data: FrameEncoder::new(Vec::with_capacity(
36-
Self::DEFAULT_BUFFER_SIZE,
37-
)),
32+
compressed_timestamped_data: zstd::Encoder::new(
33+
Vec::with_capacity(Self::DEFAULT_BUFFER_SIZE),
34+
1,
35+
)
36+
.expect("failed to create zstd encoder"),
3837
sample_types_len,
3938
}
4039
}
4140

4241
pub fn with_no_backing_store() -> Self {
4342
Self {
44-
compressed_timestamped_data: FrameEncoder::new(vec![]),
43+
compressed_timestamped_data: zstd::Encoder::new(vec![], 1)
44+
.expect("failed to create zstd encoder"),
4545
sample_types_len: 0,
4646
}
4747
}
@@ -74,16 +74,17 @@ impl TimestampedObservations {
7474
pub fn into_iter(self) -> TimestampedObservationsIter {
7575
#[allow(clippy::unwrap_used)]
7676
TimestampedObservationsIter {
77-
decoder: FrameDecoder::new(Cursor::new(
77+
decoder: zstd::Decoder::new(Cursor::new(
7878
self.compressed_timestamped_data.finish().unwrap(),
79-
)),
79+
))
80+
.expect("failed to create zstd decoder"),
8081
sample_types_len: self.sample_types_len,
8182
}
8283
}
8384
}
8485

8586
pub struct TimestampedObservationsIter {
86-
decoder: FrameDecoder<Cursor<Vec<u8>>>,
87+
decoder: zstd::Decoder<'static, BufReader<Cursor<Vec<u8>>>>,
8788
sample_types_len: usize,
8889
}
8990

datadog-profiling/src/internal/profile/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ use crate::collections::identifiable::*;
1414
use crate::collections::string_storage::{CachedProfileId, ManagedStringStorage};
1515
use crate::collections::string_table::{self, StringTable};
1616
use crate::iter::{IntoLendingIterator, LendingIterator};
17+
use crate::profiles::Compressor;
1718
use anyhow::Context;
1819
use datadog_profiling_protobuf::{self as protobuf, Record, Value, NO_OPT_ZERO, OPT_ZERO};
1920
use interning_api::Generation;
20-
use lz4_flex::frame::FrameEncoder;
2121
use std::borrow::Cow;
2222
use std::collections::HashMap;
2323
use std::io;
@@ -342,7 +342,18 @@ impl Profile {
342342
// size of 32KiB should definitely outperform starting at zero for
343343
// time consumed, allocator pressure, and allocator fragmentation.
344344
const INITIAL_PPROF_BUFFER_SIZE: usize = 32 * 1024;
345-
let mut compressor = FrameEncoder::new(Vec::with_capacity(INITIAL_PPROF_BUFFER_SIZE));
345+
const MAX_PROFILE_SIZE: usize = 50 * 1024 * 1024;
346+
347+
// When testing on some profiles that can't be shared publicly,
348+
// level 1 provided better compressed files while taking less time
349+
// compared to lz4.
350+
const COMPRESSION_LEVEL: i32 = 1;
351+
let mut compressor = Compressor::try_new(
352+
INITIAL_PPROF_BUFFER_SIZE,
353+
MAX_PROFILE_SIZE,
354+
COMPRESSION_LEVEL,
355+
)
356+
.context("failed to create compressor")?;
346357

347358
let mut encoded_profile = self.encode(&mut compressor, end_time, duration)?;
348359
encoded_profile.buffer = compressor.finish()?;

datadog-profiling/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ pub mod exporter;
1212
pub mod internal;
1313
pub mod iter;
1414
pub mod pprof;
15+
mod profiles;

datadog-profiling/src/pprof/test_utils.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use anyhow::Context;
45
use datadog_profiling_protobuf::prost_impls::{Profile, Sample};
6+
use std::io::Cursor;
57

68
pub fn deserialize_compressed_pprof(encoded: &[u8]) -> anyhow::Result<Profile> {
79
use prost::Message;
810
use std::io::Read;
911

10-
let mut decoder = lz4_flex::frame::FrameDecoder::new(encoded);
12+
let mut decoder =
13+
zstd::Decoder::new(Cursor::new(encoded)).context("failed to create zstd decoder")?;
1114
let mut buf = Vec::new();
1215
decoder.read_to_end(&mut buf)?;
1316
let profile = Profile::decode(buf.as_slice())?;
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::io::{self, Write};
5+
6+
/// This type wraps a [`Vec`] to provide a [`Write`] interface that has a max
7+
/// capacity that won't be exceeded. Additionally, it gracefully handles
8+
/// out-of-memory conditions instead of panicking (unfortunately not compatible
9+
/// with the `no-panic` crate, though).
10+
pub struct SizeRestrictedBuffer {
11+
vec: Vec<u8>,
12+
max_capacity: usize,
13+
}
14+
15+
impl SizeRestrictedBuffer {
16+
pub fn try_new(size_hint: usize, max_capacity: usize) -> io::Result<Self> {
17+
if size_hint > max_capacity {
18+
return Err(io::Error::new(
19+
io::ErrorKind::InvalidInput,
20+
"size hint shouldn't be larger than max capacity",
21+
));
22+
}
23+
// Round up to the next power of 2, but don't exceed the max_capacity.
24+
let initial_capacity = size_hint.next_power_of_two().min(max_capacity);
25+
let mut vec = Vec::new();
26+
vec.try_reserve(initial_capacity)?;
27+
Ok(SizeRestrictedBuffer { vec, max_capacity })
28+
}
29+
30+
pub fn as_slice(&self) -> &[u8] {
31+
self.vec.as_slice()
32+
}
33+
}
34+
35+
impl From<SizeRestrictedBuffer> for Vec<u8> {
36+
fn from(buf: SizeRestrictedBuffer) -> Self {
37+
buf.vec
38+
}
39+
}
40+
41+
impl Write for SizeRestrictedBuffer {
42+
#[inline]
43+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
44+
let additional = buf.len();
45+
if additional <= self.max_capacity.wrapping_sub(self.vec.len()) {
46+
self.vec.try_reserve(additional)?;
47+
self.vec.extend(buf);
48+
Ok(additional)
49+
} else {
50+
Err(io::ErrorKind::StorageFull.into())
51+
}
52+
}
53+
54+
#[inline]
55+
fn flush(&mut self) -> io::Result<()> {
56+
Ok(())
57+
}
58+
}
59+
60+
type Encoder = zstd::Encoder<'static, SizeRestrictedBuffer>;
61+
62+
/// Used to compress profile data.
63+
pub struct Compressor {
64+
encoder: Encoder,
65+
}
66+
67+
impl Compressor {
68+
/// Creates a new compressor with the provided configuration.
69+
///
70+
/// - `size_hint`: beginning capacity for the output buffer. This is a
71+
/// hint for the starting size, and a different one may
72+
/// be used.
73+
/// - `max_capacity`: the maximum size for the output buffer (hard limit).
74+
/// - `compression_level`: see [`zstd::Encoder::new`] for the valid range.
75+
pub fn try_new(
76+
size_hint: usize,
77+
max_capacity: usize,
78+
compression_level: i32,
79+
) -> io::Result<Compressor> {
80+
let buffer = SizeRestrictedBuffer::try_new(size_hint, max_capacity)?;
81+
let encoder = Encoder::new(buffer, compression_level)?;
82+
Ok(Compressor { encoder })
83+
}
84+
85+
/// Finish the compression, and return the compressed data. The compressor
86+
/// remains valid but is missing its encoder, so it will fail to encode
87+
/// data.
88+
///
89+
/// # Errors
90+
///
91+
/// 1. Fails if the compressor's encoder is missing.
92+
/// 2. Fails if the encoder fails, e.g., the output buffer is full.
93+
pub fn finish(self) -> io::Result<Vec<u8>> {
94+
match self.encoder.try_finish() {
95+
Ok(buffer) => Ok(buffer.vec),
96+
Err(err) => Err(err.1),
97+
}
98+
}
99+
}
100+
101+
impl Write for Compressor {
102+
#[inline]
103+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
104+
let encoder = &mut self.encoder;
105+
encoder.write(buf)
106+
}
107+
108+
#[inline]
109+
fn flush(&mut self) -> io::Result<()> {
110+
self.encoder.flush()
111+
}
112+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
mod compressor;
5+
6+
pub use compressor::*;

0 commit comments

Comments
 (0)