Skip to content

Commit 6370af5

Browse files
committed
refactor: convert enum to traits
1 parent dfbee47 commit 6370af5

File tree

7 files changed

+172
-151
lines changed

7 files changed

+172
-151
lines changed

datadog-profiling/src/exporter/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub use connector::uds::{socket_path_from_uri, socket_path_to_uri};
2929
pub use connector::named_pipe::{named_pipe_path_from_uri, named_pipe_path_to_uri};
3030

3131
use crate::internal::EncodedProfile;
32-
use crate::profiles::Compressor;
32+
use crate::profiles::{Compressor, DefaultProfileCodec};
3333

3434
const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0);
3535

@@ -282,8 +282,12 @@ impl ProfileExporter {
282282
let capacity = (file.bytes.len() / 10).next_power_of_two();
283283
let max_capacity = 50 * 1024 * 1024;
284284
let compression_level = 1;
285-
let mut encoder = Compressor::try_new(capacity, max_capacity, compression_level)
286-
.context("failed to create compressor")?;
285+
let mut encoder = Compressor::<DefaultProfileCodec>::try_new(
286+
capacity,
287+
max_capacity,
288+
compression_level,
289+
)
290+
.context("failed to create compressor")?;
287291
encoder.write_all(file.bytes)?;
288292
let encoded = encoder.finish()?;
289293
/* The Datadog RFC examples strip off the file extension, but the exact behavior

datadog-profiling/src/internal/observation/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ mod observations;
1212
mod timestamped_observations;
1313
mod trimmed_observation;
1414

15-
pub use timestamped_observations::EncodingType;
1615
// We keep trimmed_observation private, to ensure that only maps can make and
1716
// operate on trimmed objects, which helps ensure safety.
1817
pub use observations::*;

datadog-profiling/src/internal/observation/observations.rs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! See the mod.rs file comment for why this module and file exists.
55
66
use super::super::Sample;
7-
use super::timestamped_observations::{EncodingType, TimestampedObservations};
7+
use super::timestamped_observations::TimestampedObservations;
88
use super::trimmed_observation::{ObservationLength, TrimmedObservation};
99
use crate::internal::Timestamp;
1010
use std::collections::HashMap;
@@ -15,7 +15,7 @@ struct NonEmptyObservations {
1515
aggregated_data: AggregatedObservations,
1616
// Samples with timestamps are all separately kept (so we can know the exact values at the
1717
// given timestamp)
18-
timestamped_data: TimestampedObservations,
18+
timestamped_data: Option<TimestampedObservations>,
1919
obs_len: ObservationLength,
2020
timestamped_samples_count: usize,
2121
}
@@ -28,24 +28,15 @@ pub struct Observations {
2828
/// Public API
2929
impl Observations {
3030
pub fn new(observations_len: usize) -> Self {
31-
// zstd does FFI calls which miri cannot handle
32-
let encoding_type = if cfg!(not(miri)) {
33-
EncodingType::Zstd
34-
} else {
35-
EncodingType::None
36-
};
3731
#[allow(clippy::expect_used)]
38-
Self::try_new(encoding_type, observations_len).expect("failed to initialize observations")
32+
Self::try_new(observations_len).expect("failed to initialize observations")
3933
}
4034

41-
pub fn try_new(encoding_type: EncodingType, observations_len: usize) -> io::Result<Self> {
35+
pub fn try_new(observations_len: usize) -> io::Result<Self> {
4236
Ok(Observations {
4337
inner: Some(NonEmptyObservations {
4438
aggregated_data: AggregatedObservations::new(observations_len),
45-
timestamped_data: TimestampedObservations::try_new(
46-
encoding_type,
47-
observations_len,
48-
)?,
39+
timestamped_data: Some(TimestampedObservations::try_new(observations_len)?),
4940
obs_len: ObservationLength::new(observations_len),
5041
timestamped_samples_count: 0,
5142
}),
@@ -74,7 +65,11 @@ impl Observations {
7465
);
7566

7667
if let Some(ts) = timestamp {
77-
observations.timestamped_data.add(sample, ts, values)?;
68+
observations
69+
.timestamped_data
70+
.as_mut()
71+
.expect("timestamped_data should be present")
72+
.add(sample, ts, values)?;
7873
observations.timestamped_samples_count += 1;
7974
} else {
8075
observations.aggregated_data.add(sample, values)?;
@@ -188,12 +183,12 @@ impl IntoIterator for Observations {
188183

189184
fn into_iter(self) -> Self::IntoIter {
190185
let it = self.inner.into_iter().flat_map(|mut observations| {
191-
let timestamped_data_it = std::mem::replace(
192-
&mut observations.timestamped_data,
193-
TimestampedObservations::with_no_backing_store(),
194-
)
195-
.into_iter()
196-
.map(|(s, t, o)| (s, Some(t), o));
186+
let timestamped_data_it = observations
187+
.timestamped_data
188+
.take()
189+
.expect("timestamped_data should be present")
190+
.into_iter()
191+
.map(|(s, t, o)| (s, Some(t), o));
197192
let aggregated_data_it = std::mem::take(&mut observations.aggregated_data.data)
198193
.into_iter()
199194
.map(|(s, o)| (s, None, o))

datadog-profiling/src/internal/observation/timestamped_observations.rs

Lines changed: 14 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -10,93 +10,23 @@ use super::super::Sample;
1010
use super::super::StackTraceId;
1111
use crate::collections::identifiable::Id;
1212
use crate::internal::Timestamp;
13-
use crate::profiles::SizeRestrictedBuffer;
13+
use crate::profiles::{DefaultObservationCodec as DefaultCodec, ObservationCodec};
1414
use byteorder::{NativeEndian, ReadBytesExt};
15-
use std::io::{self, Cursor, Read, Write};
16-
17-
// todo: document
18-
#[derive(Clone, Copy, Debug, Default)]
19-
pub enum EncodingType {
20-
#[default]
21-
None,
22-
Zstd,
23-
}
15+
use std::io::{self, Write};
2416

25-
enum ObservationEncoder {
26-
Noop(SizeRestrictedBuffer),
27-
Zstd(zstd::Encoder<'static, SizeRestrictedBuffer>),
28-
}
17+
pub type TimestampedObservations = TimestampedObservationsImpl<DefaultCodec>;
2918

30-
pub struct TimestampedObservations {
31-
compressed_timestamped_data: ObservationEncoder,
19+
pub struct TimestampedObservationsImpl<C: ObservationCodec> {
20+
compressed_timestamped_data: C::Encoder,
3221
sample_types_len: usize,
3322
}
3423

35-
enum ObservationDecoder {
36-
Noop(Cursor<SizeRestrictedBuffer>),
37-
Zstd(zstd::Decoder<'static, Cursor<SizeRestrictedBuffer>>),
38-
}
39-
40-
pub struct TimestampedObservationsIter {
41-
decoder: ObservationDecoder,
24+
pub struct TimestampedObservationsIterImpl<C: ObservationCodec> {
25+
decoder: C::Decoder,
4226
sample_types_len: usize,
4327
}
4428

45-
impl ObservationEncoder {
46-
pub fn try_new(
47-
encoding_type: EncodingType,
48-
size_hint: usize,
49-
max_capacity: usize,
50-
) -> io::Result<Self> {
51-
let output_buffer = SizeRestrictedBuffer::try_new(size_hint, max_capacity)?;
52-
match encoding_type {
53-
EncodingType::None => Ok(ObservationEncoder::Noop(output_buffer)),
54-
EncodingType::Zstd => Ok(ObservationEncoder::Zstd(zstd::Encoder::new(
55-
output_buffer,
56-
1,
57-
)?)),
58-
}
59-
}
60-
61-
pub fn try_into_decoder(self) -> io::Result<ObservationDecoder> {
62-
match self {
63-
ObservationEncoder::Noop(buffer) => Ok(ObservationDecoder::Noop(Cursor::new(buffer))),
64-
ObservationEncoder::Zstd(encoder) => match encoder.try_finish() {
65-
Ok(buffer) => Ok(ObservationDecoder::Zstd(zstd::Decoder::with_buffer(
66-
Cursor::new(buffer),
67-
)?)),
68-
Err((_encoder, error)) => Err(error),
69-
},
70-
}
71-
}
72-
}
73-
74-
impl Write for ObservationEncoder {
75-
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
76-
match self {
77-
ObservationEncoder::Noop(encoder) => encoder.write(buf),
78-
ObservationEncoder::Zstd(encoder) => encoder.write(buf),
79-
}
80-
}
81-
82-
fn flush(&mut self) -> io::Result<()> {
83-
match self {
84-
ObservationEncoder::Noop(encoder) => encoder.flush(),
85-
ObservationEncoder::Zstd(encoder) => encoder.flush(),
86-
}
87-
}
88-
}
89-
90-
impl Read for ObservationDecoder {
91-
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
92-
match self {
93-
ObservationDecoder::Noop(decoder) => decoder.read(buf),
94-
ObservationDecoder::Zstd(decoder) => decoder.read(buf),
95-
}
96-
}
97-
}
98-
99-
impl TimestampedObservations {
29+
impl<C: ObservationCodec> TimestampedObservationsImpl<C> {
10030
// As documented in the internal Datadog doc "Ruby timeline memory fragmentation impact
10131
// investigation", allowing the timeline storage vec to slowly expand creates A LOT of
10232
// memory fragmentation for apps that employ multiple threads.
@@ -108,26 +38,16 @@ impl TimestampedObservations {
10838
// the profile as a whole would defintely exceed this.
10939
const MAX_CAPACITY: usize = i32::MAX as usize;
11040

111-
pub fn try_new(encoding_type: EncodingType, sample_types_len: usize) -> io::Result<Self> {
41+
pub fn try_new(sample_types_len: usize) -> io::Result<Self> {
11242
Ok(Self {
113-
compressed_timestamped_data: ObservationEncoder::try_new(
114-
encoding_type,
43+
compressed_timestamped_data: C::new_encoder(
11544
Self::DEFAULT_BUFFER_SIZE,
11645
Self::MAX_CAPACITY,
11746
)?,
11847
sample_types_len,
11948
})
12049
}
12150

122-
pub const fn with_no_backing_store() -> Self {
123-
Self {
124-
compressed_timestamped_data: ObservationEncoder::Noop(
125-
SizeRestrictedBuffer::zero_capacity(),
126-
),
127-
sample_types_len: 0,
128-
}
129-
}
130-
13151
pub fn add(&mut self, sample: Sample, ts: Timestamp, values: &[i64]) -> anyhow::Result<()> {
13252
// We explicitly turn the data into a stream of bytes, feeding it to the compressor.
13353
// @ivoanjo: I played with introducing a structure to serialize it all-at-once, but it seems
@@ -153,19 +73,17 @@ impl TimestampedObservations {
15373
Ok(())
15474
}
15575

156-
pub fn into_iter(self) -> TimestampedObservationsIter {
76+
pub fn into_iter(self) -> TimestampedObservationsIterImpl<C> {
15777
#[allow(clippy::expect_used)]
158-
TimestampedObservationsIter {
159-
decoder: self
160-
.compressed_timestamped_data
161-
.try_into_decoder()
78+
TimestampedObservationsIterImpl {
79+
decoder: C::encoder_into_decoder(self.compressed_timestamped_data)
16280
.expect("failed to initialize timestamped observation decoder"),
16381
sample_types_len: self.sample_types_len,
16482
}
16583
}
16684
}
16785

168-
impl Iterator for TimestampedObservationsIter {
86+
impl<C: ObservationCodec> Iterator for TimestampedObservationsIterImpl<C> {
16987
type Item = (Sample, Timestamp, Vec<i64>);
17088

17189
fn next(&mut self) -> Option<Self::Item> {

datadog-profiling/src/internal/profile/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::collections::identifiable::*;
1414
use crate::collections::string_storage::{CachedProfileId, ManagedStringStorage};
1515
use crate::collections::string_table::{self, StringTable};
1616
use crate::iter::{IntoLendingIterator, LendingIterator};
17-
use crate::profiles::Compressor;
17+
use crate::profiles::{Compressor, DefaultProfileCodec};
1818
use anyhow::Context;
1919
use datadog_profiling_protobuf::{self as protobuf, Record, Value, NO_OPT_ZERO, OPT_ZERO};
2020
use interning_api::Generation;
@@ -348,7 +348,7 @@ impl Profile {
348348
// level 1 provided better compressed files while taking less time
349349
// compared to lz4.
350350
const COMPRESSION_LEVEL: i32 = 1;
351-
let mut compressor = Compressor::try_new(
351+
let mut compressor = Compressor::<DefaultProfileCodec>::try_new(
352352
INITIAL_PPROF_BUFFER_SIZE,
353353
MAX_PROFILE_SIZE,
354354
COMPRESSION_LEVEL,

datadog-profiling/src/pprof/test_utils.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,20 @@ use anyhow::Context;
55
use datadog_profiling_protobuf::prost_impls::{Profile, Sample};
66
use std::io::Cursor;
77

8-
pub fn deserialize_compressed_pprof(encoded: &[u8]) -> anyhow::Result<Profile> {
8+
fn deserialize_compressed_pprof(encoded: &[u8]) -> anyhow::Result<Profile> {
99
use prost::Message;
1010
use std::io::Read;
1111

12-
let mut decoder =
13-
zstd::Decoder::new(Cursor::new(encoded)).context("failed to create zstd decoder")?;
14-
let mut buf = Vec::new();
15-
decoder.read_to_end(&mut buf)?;
12+
#[cfg(miri)]
13+
let mut buf = encoded.to_vec();
14+
#[cfg(not(miri))]
15+
let mut buf = {
16+
let mut decoder =
17+
zstd::Decoder::new(Cursor::new(encoded)).context("failed to create zstd decoder")?;
18+
let mut out = Vec::new();
19+
decoder.read_to_end(&mut out)?;
20+
out
21+
};
1622
let profile = Profile::decode(buf.as_slice())?;
1723
Ok(profile)
1824
}

0 commit comments

Comments
 (0)