Skip to content

Commit 1d1f4b8

Browse files
committed
feat(profiling)!: Add upload compression config
For now, the options are Off, On, and Lz4: - Off: no compression. - On: default, and which algorithm it chooses can change. - Lz4: lz4 frame encoded. Currently what On uses. This is being done in preparation for having Zstd as an option, and possibly the new default.
1 parent f292b76 commit 1d1f4b8

File tree

7 files changed

+94
-33
lines changed

7 files changed

+94
-33
lines changed

datadog-profiling-ffi/src/profiles/datatypes.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use function_name::named;
1212
use std::num::NonZeroI64;
1313
use std::str::Utf8Error;
1414
use std::time::SystemTime;
15+
use datadog_profiling::serializer::UploadCompression;
1516

1617
/// Represents a profile. Do not access its member for any reason, only use
1718
/// the C API functions on this struct.
@@ -743,6 +744,7 @@ pub unsafe extern "C" fn ddog_prof_Profile_serialize(
743744
profile: *mut Profile,
744745
start_time: Option<&Timespec>,
745746
end_time: Option<&Timespec>,
747+
compressor: UploadCompression,
746748
) -> SerializeResult {
747749
(|| {
748750
let profile = profile_ptr_to_inner(profile)?;
@@ -753,7 +755,7 @@ pub unsafe extern "C" fn ddog_prof_Profile_serialize(
753755
}
754756

755757
let end_time = end_time.map(SystemTime::from);
756-
old_profile.serialize_into_compressed_pprof(end_time, None)
758+
old_profile.serialize_into_compressed_pprof(end_time, None, compressor)
757759
})()
758760
.context("ddog_prof_Profile_serialize failed")
759761
.into()

datadog-profiling-replayer/src/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,11 @@ fn main() -> anyhow::Result<()> {
225225

226226
if let Some(file) = output {
227227
println!("Writing out pprof to file {file}");
228-
let encoded = outprof
229-
.serialize_into_compressed_pprof(Some(replayer.start_time), Some(replayer.duration))?;
228+
let encoded = outprof.serialize_into_compressed_pprof(
229+
Some(replayer.start_time),
230+
Some(replayer.duration),
231+
Default::default(),
232+
)?;
230233
if let Some(s) = &mut sysinfo {
231234
s.measure_memory("After serializing");
232235
}

datadog-profiling/examples/profiles.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ fn main() {
5454
Err(_) => exit(1),
5555
}
5656

57-
match profile.serialize_into_compressed_pprof(None, None) {
57+
match profile.serialize_into_compressed_pprof(None, None, Default::default()) {
5858
Ok(encoded_profile) => {
5959
let buffer = &encoded_profile.buffer;
6060
assert!(buffer.len() > 100);

datadog-profiling/src/internal/profile/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::collections::string_storage::{CachedProfileId, ManagedStringStorage};
1515
use crate::collections::string_table::StringTable;
1616
use crate::iter::{IntoLendingIterator, LendingIterator};
1717
use crate::pprof::sliced_proto::*;
18-
use crate::serializer::CompressedProtobufSerializer;
18+
use crate::serializer::{CompressedProtobufSerializer, UploadCompression};
1919
use anyhow::Context;
2020
use interning_api::Generation;
2121
use std::borrow::Cow;
@@ -323,6 +323,7 @@ impl Profile {
323323
mut self,
324324
end_time: Option<SystemTime>,
325325
duration: Option<Duration>,
326+
upload_compression: UploadCompression,
326327
) -> anyhow::Result<EncodedProfile> {
327328
let end = end_time.unwrap_or_else(SystemTime::now);
328329
let start = self.start_time;
@@ -353,7 +354,10 @@ impl Profile {
353354
// size of 32KiB should definitely out-perform starting at zero for
354355
// time consumed, allocator pressure, and allocator fragmentation.
355356
const INITIAL_PPROF_BUFFER_SIZE: usize = 32 * 1024;
356-
let mut encoder = CompressedProtobufSerializer::with_capacity(INITIAL_PPROF_BUFFER_SIZE);
357+
let mut encoder = CompressedProtobufSerializer::with_config_and_capacity(
358+
upload_compression,
359+
INITIAL_PPROF_BUFFER_SIZE,
360+
);
357361

358362
for (sample, timestamp, mut values) in std::mem::take(&mut self.observations).into_iter() {
359363
let labels = self.enrich_sample_labels(sample, timestamp)?;
@@ -1295,7 +1299,7 @@ mod api_tests {
12951299
let profile: Profile = Profile::new(&sample_types, None);
12961300

12971301
let encoded_profile = profile
1298-
.serialize_into_compressed_pprof(None, None)
1302+
.serialize_into_compressed_pprof(None, None, Default::default())
12991303
.expect("Unable to encode/serialize the profile");
13001304

13011305
let endpoints_stats = encoded_profile.endpoints_stats;
@@ -1319,7 +1323,7 @@ mod api_tests {
13191323
profile.add_endpoint_count(Cow::from(second_endpoint), 1)?;
13201324

13211325
let encoded_profile = profile
1322-
.serialize_into_compressed_pprof(None, None)
1326+
.serialize_into_compressed_pprof(None, None, Default::default())
13231327
.expect("Unable to encode/serialize the profile");
13241328

13251329
let endpoints_stats = encoded_profile.endpoints_stats;

datadog-profiling/src/pprof/test_utils.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use crate::serializer::UploadCompression;
5+
46
pub fn deserialize_compressed_pprof(encoded: &[u8]) -> anyhow::Result<super::Profile> {
57
use prost::Message;
68
use std::io::Read;
@@ -12,7 +14,9 @@ pub fn deserialize_compressed_pprof(encoded: &[u8]) -> anyhow::Result<super::Pro
1214
Ok(profile)
1315
}
1416

17+
// todo: allow choosing what compressor to use, because in some case we may
18+
// want that, and other cases None will execute faster.
1519
pub fn roundtrip_to_pprof(profile: crate::internal::Profile) -> anyhow::Result<super::Profile> {
16-
let encoded = profile.serialize_into_compressed_pprof(None, None)?;
20+
let encoded = profile.serialize_into_compressed_pprof(None, None, UploadCompression::Lz4)?;
1721
deserialize_compressed_pprof(&encoded.buffer)
1822
}
Lines changed: 56 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,36 @@
11
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use super::UploadCompression;
45
use bytes::BufMut;
56
use lz4_flex::frame::FrameEncoder;
67
use prost::encoding::{encode_key, encode_varint, encoded_len_varint, key_len, WireType};
7-
use std::io::Write;
8+
use std::io::{self, Write};
9+
10+
enum Compressor {
11+
None,
12+
Lz4 { zipper: FrameEncoder<Vec<u8>> },
13+
}
14+
15+
impl Compressor {
16+
#[inline]
17+
fn compress(&mut self, buffer: &mut Vec<u8>) -> io::Result<()> {
18+
match self {
19+
Compressor::None => Ok(()),
20+
Compressor::Lz4 { zipper } => {
21+
zipper.write_all(buffer)?;
22+
buffer.clear();
23+
Ok(())
24+
}
25+
}
26+
}
27+
}
828

929
pub struct CompressedProtobufSerializer {
30+
/// Buffer that protobuf is encoded into. Lz4 uses this as a temporary
31+
/// buffer, while None uses this as the final output buffer.
1032
buffer: Vec<u8>,
11-
zipper: FrameEncoder<Vec<u8>>,
33+
compressor: Compressor,
1234
}
1335

1436
// I've opened a PR for a generic version of this upstream:
@@ -20,44 +42,54 @@ fn encode_str(tag: u32, value: &str, buf: &mut Vec<u8>) {
2042
}
2143

2244
impl CompressedProtobufSerializer {
23-
pub fn encode(&mut self, item: impl prost::Message) -> anyhow::Result<()> {
24-
item.encode(&mut self.buffer)?;
25-
self.zipper.write_all(&self.buffer)?;
26-
self.buffer.clear();
27-
Ok(())
45+
pub fn encode(&mut self, item: impl prost::Message) -> io::Result<()> {
46+
let buffer = &mut self.buffer;
47+
item.encode(buffer)?;
48+
self.compressor.compress(buffer)
2849
}
2950

3051
/// Only meant for string table strings. This is essentially an
3152
/// implementation of [prost::Message::encode] but for any `AsRef<str>`,
3253
/// and specialized for handling the unlikely OOM conditions of writing
3354
/// into a `Vec<u8>`.
34-
pub(crate) fn encode_string_table_entry(
35-
&mut self,
36-
item: impl AsRef<str>,
37-
) -> anyhow::Result<()> {
55+
pub(crate) fn encode_string_table_entry(&mut self, item: impl AsRef<str>) -> io::Result<()> {
56+
let buffer = &mut self.buffer;
3857
// In pprof, string tables are tag 6 on the Profile message.
3958
let tag = 6u32;
4059
let str = item.as_ref();
4160
let encoded_len = encoded_len_varint(str.len() as u64);
4261
let required = key_len(tag) + encoded_len + str.len();
43-
if let Err(err) = self.buffer.try_reserve(required) {
44-
return Err(anyhow::Error::from(err)
45-
.context("failed to encode Protobuf str; insufficient buffer capacity"));
46-
}
62+
buffer.try_reserve(required)?;
4763

48-
encode_str(tag, str, &mut self.buffer);
49-
self.zipper.write_all(&self.buffer)?;
50-
self.buffer.clear();
51-
Ok(())
64+
encode_str(tag, str, buffer);
65+
self.compressor.compress(buffer)
5266
}
5367

54-
pub fn finish(self) -> anyhow::Result<Vec<u8>> {
55-
Ok(self.zipper.finish()?)
68+
pub fn finish(self) -> io::Result<Vec<u8>> {
69+
match self.compressor {
70+
Compressor::None => Ok(self.buffer),
71+
Compressor::Lz4 { zipper } => {
72+
debug_assert!(self.buffer.is_empty());
73+
Ok(zipper.finish()?)
74+
}
75+
}
5676
}
5777

58-
pub fn with_capacity(capacity: usize) -> Self {
78+
pub fn with_config_and_capacity(config: UploadCompression, capacity: usize) -> Self {
79+
// Final output buffer.
5980
let buffer = Vec::with_capacity(capacity);
60-
let zipper = FrameEncoder::new(Vec::with_capacity(capacity));
61-
Self { buffer, zipper }
81+
match config {
82+
UploadCompression::Off => Self {
83+
buffer,
84+
compressor: Compressor::None,
85+
},
86+
UploadCompression::On | UploadCompression::Lz4 => Self {
87+
// Temporary input buffer.
88+
buffer: Vec::with_capacity(256),
89+
compressor: Compressor::Lz4 {
90+
zipper: FrameEncoder::new(buffer),
91+
},
92+
},
93+
}
6294
}
6395
}

datadog-profiling/src/serializer/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,19 @@
44
mod compressed_streaming_encoder;
55

66
pub use compressed_streaming_encoder::*;
7+
8+
#[repr(C)]
9+
#[derive(Debug)]
10+
pub enum UploadCompression {
11+
Off,
12+
/// On is the default, with the exact compression algorithm being
13+
/// unspecified, and free to change. For example, we're testing zstd.
14+
On,
15+
Lz4,
16+
}
17+
18+
impl Default for UploadCompression {
19+
fn default() -> Self {
20+
UploadCompression::On
21+
}
22+
}

0 commit comments

Comments
 (0)