Skip to content

Commit dfbee47

Browse files
committed
refactor: use enum for noop vs zstd compressor
1 parent 42d85c9 commit dfbee47

File tree

4 files changed

+138
-33
lines changed

4 files changed

+138
-33
lines changed

datadog-profiling/src/internal/observation/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ mod observations;
1212
mod timestamped_observations;
1313
mod trimmed_observation;
1414

15+
pub use timestamped_observations::EncodingType;
1516
// We keep trimmed_observation private, to ensure that only maps can make and
1617
// operate on trimmed objects, which helps ensure safety.
1718
pub use observations::*;

datadog-profiling/src/internal/observation/observations.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
//! See the mod.rs file comment for why this module and file exists.
55
66
use super::super::Sample;
7-
use super::timestamped_observations::TimestampedObservations;
7+
use super::timestamped_observations::{EncodingType, TimestampedObservations};
88
use super::trimmed_observation::{ObservationLength, TrimmedObservation};
99
use crate::internal::Timestamp;
1010
use std::collections::HashMap;
11+
use std::io;
1112

1213
struct NonEmptyObservations {
1314
// Samples with no timestamps are aggregated in-place as each observation is added
@@ -27,14 +28,28 @@ pub struct Observations {
2728
/// Public API
2829
impl Observations {
2930
pub fn new(observations_len: usize) -> Self {
30-
Observations {
31+
// zstd does FFI calls which miri cannot handle
32+
let encoding_type = if cfg!(not(miri)) {
33+
EncodingType::Zstd
34+
} else {
35+
EncodingType::None
36+
};
37+
#[allow(clippy::expect_used)]
38+
Self::try_new(encoding_type, observations_len).expect("failed to initialize observations")
39+
}
40+
41+
pub fn try_new(encoding_type: EncodingType, observations_len: usize) -> io::Result<Self> {
42+
Ok(Observations {
3143
inner: Some(NonEmptyObservations {
3244
aggregated_data: AggregatedObservations::new(observations_len),
33-
timestamped_data: TimestampedObservations::new(observations_len),
45+
timestamped_data: TimestampedObservations::try_new(
46+
encoding_type,
47+
observations_len,
48+
)?,
3449
obs_len: ObservationLength::new(observations_len),
3550
timestamped_samples_count: 0,
3651
}),
37-
}
52+
})
3853
}
3954

4055
pub fn add(

datadog-profiling/src/internal/observation/timestamped_observations.rs

Lines changed: 103 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,40 +10,120 @@ use super::super::Sample;
1010
use super::super::StackTraceId;
1111
use crate::collections::identifiable::Id;
1212
use crate::internal::Timestamp;
13+
use crate::profiles::SizeRestrictedBuffer;
1314
use byteorder::{NativeEndian, ReadBytesExt};
14-
use std::io::Write;
15-
use std::io::{BufReader, Cursor};
15+
use std::io::{self, Cursor, Read, Write};
16+
17+
// todo: document
18+
#[derive(Clone, Copy, Debug, Default)]
19+
pub enum EncodingType {
20+
#[default]
21+
None,
22+
Zstd,
23+
}
24+
25+
enum ObservationEncoder {
26+
Noop(SizeRestrictedBuffer),
27+
Zstd(zstd::Encoder<'static, SizeRestrictedBuffer>),
28+
}
1629

1730
pub struct TimestampedObservations {
18-
compressed_timestamped_data: zstd::Encoder<'static, Vec<u8>>,
31+
compressed_timestamped_data: ObservationEncoder,
32+
sample_types_len: usize,
33+
}
34+
35+
enum ObservationDecoder {
36+
Noop(Cursor<SizeRestrictedBuffer>),
37+
Zstd(zstd::Decoder<'static, Cursor<SizeRestrictedBuffer>>),
38+
}
39+
40+
pub struct TimestampedObservationsIter {
41+
decoder: ObservationDecoder,
1942
sample_types_len: usize,
2043
}
2144

45+
impl ObservationEncoder {
46+
pub fn try_new(
47+
encoding_type: EncodingType,
48+
size_hint: usize,
49+
max_capacity: usize,
50+
) -> io::Result<Self> {
51+
let output_buffer = SizeRestrictedBuffer::try_new(size_hint, max_capacity)?;
52+
match encoding_type {
53+
EncodingType::None => Ok(ObservationEncoder::Noop(output_buffer)),
54+
EncodingType::Zstd => Ok(ObservationEncoder::Zstd(zstd::Encoder::new(
55+
output_buffer,
56+
1,
57+
)?)),
58+
}
59+
}
60+
61+
pub fn try_into_decoder(self) -> io::Result<ObservationDecoder> {
62+
match self {
63+
ObservationEncoder::Noop(buffer) => Ok(ObservationDecoder::Noop(Cursor::new(buffer))),
64+
ObservationEncoder::Zstd(encoder) => match encoder.try_finish() {
65+
Ok(buffer) => Ok(ObservationDecoder::Zstd(zstd::Decoder::with_buffer(
66+
Cursor::new(buffer),
67+
)?)),
68+
Err((_encoder, error)) => Err(error),
69+
},
70+
}
71+
}
72+
}
73+
74+
impl Write for ObservationEncoder {
75+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
76+
match self {
77+
ObservationEncoder::Noop(encoder) => encoder.write(buf),
78+
ObservationEncoder::Zstd(encoder) => encoder.write(buf),
79+
}
80+
}
81+
82+
fn flush(&mut self) -> io::Result<()> {
83+
match self {
84+
ObservationEncoder::Noop(encoder) => encoder.flush(),
85+
ObservationEncoder::Zstd(encoder) => encoder.flush(),
86+
}
87+
}
88+
}
89+
90+
impl Read for ObservationDecoder {
91+
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
92+
match self {
93+
ObservationDecoder::Noop(decoder) => decoder.read(buf),
94+
ObservationDecoder::Zstd(decoder) => decoder.read(buf),
95+
}
96+
}
97+
}
98+
2299
impl TimestampedObservations {
23100
// As documented in the internal Datadog doc "Ruby timeline memory fragmentation impact
24101
// investigation", allowing the timeline storage vec to slowly expand creates A LOT of
25102
// memory fragmentation for apps that employ multiple threads.
26103
// To avoid this, we've picked a default buffer size of 1MB that very rarely needs to grow, and
27104
// when it does, is expected to grow in larger steps.
28-
const DEFAULT_BUFFER_SIZE: usize = 1_048_576;
29-
30-
pub fn new(sample_types_len: usize) -> Self {
31-
#[allow(clippy::expect_used)] // previous API panic'd implicitly
32-
Self {
33-
compressed_timestamped_data: zstd::Encoder::new(
34-
Vec::with_capacity(Self::DEFAULT_BUFFER_SIZE),
35-
1,
36-
)
37-
.expect("failed to create zstd encoder"),
105+
const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
106+
107+
// Protobufs can't exceed 2 GiB, if our observations grow this large, then
108+
// the profile as a whole would defintely exceed this.
109+
const MAX_CAPACITY: usize = i32::MAX as usize;
110+
111+
pub fn try_new(encoding_type: EncodingType, sample_types_len: usize) -> io::Result<Self> {
112+
Ok(Self {
113+
compressed_timestamped_data: ObservationEncoder::try_new(
114+
encoding_type,
115+
Self::DEFAULT_BUFFER_SIZE,
116+
Self::MAX_CAPACITY,
117+
)?,
38118
sample_types_len,
39-
}
119+
})
40120
}
41121

42-
pub fn with_no_backing_store() -> Self {
43-
#[allow(clippy::expect_used)] // previous API panic'd implicitly
122+
pub const fn with_no_backing_store() -> Self {
44123
Self {
45-
compressed_timestamped_data: zstd::Encoder::new(vec![], 1)
46-
.expect("failed to create zstd encoder"),
124+
compressed_timestamped_data: ObservationEncoder::Noop(
125+
SizeRestrictedBuffer::zero_capacity(),
126+
),
47127
sample_types_len: 0,
48128
}
49129
}
@@ -74,22 +154,17 @@ impl TimestampedObservations {
74154
}
75155

76156
pub fn into_iter(self) -> TimestampedObservationsIter {
77-
#[allow(clippy::expect_used, clippy::unwrap_used)]
157+
#[allow(clippy::expect_used)]
78158
TimestampedObservationsIter {
79-
decoder: zstd::Decoder::new(Cursor::new(
80-
self.compressed_timestamped_data.finish().unwrap(),
81-
))
82-
.expect("failed to create zstd decoder"),
159+
decoder: self
160+
.compressed_timestamped_data
161+
.try_into_decoder()
162+
.expect("failed to initialize timestamped observation decoder"),
83163
sample_types_len: self.sample_types_len,
84164
}
85165
}
86166
}
87167

88-
pub struct TimestampedObservationsIter {
89-
decoder: zstd::Decoder<'static, BufReader<Cursor<Vec<u8>>>>,
90-
sample_types_len: usize,
91-
}
92-
93168
impl Iterator for TimestampedObservationsIter {
94169
type Item = (Sample, Timestamp, Vec<i64>);
95170

datadog-profiling/src/profiles/compressor.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@ pub struct SizeRestrictedBuffer {
1313
}
1414

1515
impl SizeRestrictedBuffer {
16+
/// Returns a buffer which never holds any data.
17+
pub const fn zero_capacity() -> Self {
18+
Self {
19+
vec: Vec::new(),
20+
max_capacity: 0,
21+
}
22+
}
23+
1624
pub fn try_new(size_hint: usize, max_capacity: usize) -> io::Result<Self> {
1725
if size_hint > max_capacity {
1826
return Err(io::Error::new(
@@ -27,7 +35,7 @@ impl SizeRestrictedBuffer {
2735
Ok(SizeRestrictedBuffer { vec, max_capacity })
2836
}
2937

30-
pub fn as_slice(&self) -> &[u8] {
38+
pub const fn as_slice(&self) -> &[u8] {
3139
self.vec.as_slice()
3240
}
3341
}
@@ -38,6 +46,12 @@ impl From<SizeRestrictedBuffer> for Vec<u8> {
3846
}
3947
}
4048

49+
impl AsRef<[u8]> for SizeRestrictedBuffer {
50+
fn as_ref(&self) -> &[u8] {
51+
self.as_slice()
52+
}
53+
}
54+
4155
impl Write for SizeRestrictedBuffer {
4256
#[inline]
4357
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {

0 commit comments

Comments
 (0)