Skip to content

Commit a529caa

Browse files
committed
profiling: test using reqwest instead of hyper for exporting
1 parent 6b26318 commit a529caa

File tree

8 files changed

+1610
-59
lines changed

8 files changed

+1610
-59
lines changed

Cargo.lock

Lines changed: 523 additions & 56 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libdd-profiling/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,11 @@ thiserror = "2"
5050
tokio = {version = "1.23", features = ["rt", "macros"]}
5151
tokio-util = "0.7.1"
5252
zstd = { version = "0.13", default-features = false }
53+
reqwest = { version = "0.12.23", features = ["multipart", "rustls-tls"], default-features = false }
5354

5455
[dev-dependencies]
5556
bolero = "0.13"
5657
criterion = "0.5.1"
5758
lz4_flex = { version = "0.9", default-features = false, features = ["std", "frame"] }
5859
proptest = "1"
60+
wiremock = "0.5"

libdd-profiling/src/exporter/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use libdd_common::{
2121

2222
pub mod config;
2323
mod errors;
24+
pub mod reqwest_exporter;
2425

2526
#[cfg(unix)]
2627
pub use connector::uds::{socket_path_from_uri, socket_path_to_uri};
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Reqwest-based profiling exporter
5+
//!
6+
//! This is a simplified async implementation using reqwest.
7+
8+
use anyhow::Context;
9+
use libdd_common::{azure_app_services, tag, Endpoint};
10+
use libdd_common::tag::Tag;
11+
use serde_json::json;
12+
use std::{future, io::Write};
13+
use tokio_util::sync::CancellationToken;
14+
15+
use crate::internal::{EncodedProfile, Profile};
16+
use crate::profiles::{Compressor, DefaultProfileCodec};
17+
18+
pub struct ProfileExporter {
19+
client: reqwest::Client,
20+
endpoint: Endpoint,
21+
family: String,
22+
profiling_library_name: String,
23+
profiling_library_version: String,
24+
tags: Vec<Tag>,
25+
}
26+
27+
pub struct File<'a> {
28+
pub name: &'a str,
29+
pub bytes: &'a [u8],
30+
}
31+
32+
impl ProfileExporter {
33+
/// Creates a new exporter to be used to report profiling data.
34+
///
35+
/// Note: Reqwest v0.12.23+ includes automatic retry support for transient failures.
36+
/// The default configuration automatically retries safe errors and low-level protocol NACKs.
37+
/// For custom retry policies, users can configure the reqwest client before creating the exporter.
38+
pub fn new(
39+
profiling_library_name: impl Into<String>,
40+
profiling_library_version: impl Into<String>,
41+
family: impl Into<String>,
42+
tags: Vec<Tag>,
43+
endpoint: Endpoint,
44+
) -> anyhow::Result<Self> {
45+
let mut builder = reqwest::Client::builder().use_rustls_tls();
46+
47+
// Check if this is a Unix Domain Socket
48+
#[cfg(unix)]
49+
if endpoint.url.scheme_str() == Some("unix") {
50+
use libdd_common::connector::uds::socket_path_from_uri;
51+
let socket_path = socket_path_from_uri(&endpoint.url)?;
52+
builder = builder.unix_socket(socket_path);
53+
}
54+
55+
Ok(Self {
56+
client: builder.build()?,
57+
endpoint,
58+
family: family.into(),
59+
profiling_library_name: profiling_library_name.into(),
60+
profiling_library_version: profiling_library_version.into(),
61+
tags,
62+
})
63+
}
64+
65+
/// Build and send a profile. Returns the HTTP status code.
66+
pub async fn send<'a>(
67+
&self,
68+
profile: EncodedProfile,
69+
files_to_compress_and_export: &'a [File<'a>],
70+
files_to_export_unmodified: &'a [File<'a>],
71+
additional_tags: &[Tag],
72+
internal_metadata: Option<serde_json::Value>,
73+
info: Option<serde_json::Value>,
74+
cancel: Option<&CancellationToken>,
75+
) -> anyhow::Result<reqwest::StatusCode> {
76+
let tags_profiler = self.build_tags_string(additional_tags);
77+
let event = self.build_event_json(&profile, files_to_compress_and_export,
78+
files_to_export_unmodified, &tags_profiler, internal_metadata, info);
79+
80+
let form = self.build_multipart_form(event, profile, files_to_compress_and_export,
81+
files_to_export_unmodified)?;
82+
83+
// For Unix Domain Sockets, convert the URL to use http:// scheme
84+
// The socket path was already configured in the client builder
85+
let url = if self.endpoint.url.scheme_str() == Some("unix") {
86+
// Use localhost as a placeholder - the actual connection goes through the UDS
87+
format!("http://localhost{}", self.endpoint.url.path())
88+
} else {
89+
self.endpoint.url.to_string()
90+
};
91+
92+
// Build request
93+
let request = self.client
94+
.post(url)
95+
.header("Connection", "close")
96+
.header("DD-EVP-ORIGIN", &self.profiling_library_name)
97+
.header("DD-EVP-ORIGIN-VERSION", &self.profiling_library_version)
98+
.header("User-Agent", format!("DDProf/{}", env!("CARGO_PKG_VERSION")))
99+
.headers(self.build_optional_headers())
100+
.timeout(std::time::Duration::from_millis(self.endpoint.timeout_ms))
101+
.multipart(form)
102+
.build()?;
103+
104+
// Send request with cancellation support
105+
tokio::select! {
106+
_ = async {
107+
match cancel {
108+
Some(token) => token.cancelled().await,
109+
None => future::pending().await,
110+
}
111+
} => Err(anyhow::anyhow!("Operation cancelled by user")),
112+
result = self.client.execute(request) => {
113+
Ok(result?.status())
114+
}
115+
}
116+
}
117+
118+
// Helper methods
119+
120+
fn build_tags_string(&self, additional_tags: &[Tag]) -> String {
121+
let mut tags = String::new();
122+
123+
// Add configured tags
124+
for tag in &self.tags {
125+
tags.push_str(tag.as_ref());
126+
tags.push(',');
127+
}
128+
129+
// Add additional tags
130+
for tag in additional_tags {
131+
tags.push_str(tag.as_ref());
132+
tags.push(',');
133+
}
134+
135+
// Add Azure App Services tags if available
136+
if let Some(aas) = &*azure_app_services::AAS_METADATA {
137+
for (name, value) in [
138+
("aas.resource.id", aas.get_resource_id()),
139+
("aas.environment.extension_version", aas.get_extension_version()),
140+
("aas.environment.instance_id", aas.get_instance_id()),
141+
("aas.environment.instance_name", aas.get_instance_name()),
142+
("aas.environment.os", aas.get_operating_system()),
143+
("aas.resource.group", aas.get_resource_group()),
144+
("aas.site.name", aas.get_site_name()),
145+
("aas.site.kind", aas.get_site_kind()),
146+
("aas.site.type", aas.get_site_type()),
147+
("aas.subscription.id", aas.get_subscription_id()),
148+
] {
149+
if let Ok(tag) = Tag::new(name, value) {
150+
tags.push_str(tag.as_ref());
151+
tags.push(',');
152+
}
153+
}
154+
}
155+
156+
// Add runtime platform tag (last, no trailing comma)
157+
tags.push_str(tag!("runtime_platform", target_triple::TARGET).as_ref());
158+
tags
159+
}
160+
161+
fn build_event_json(
162+
&self,
163+
profile: &EncodedProfile,
164+
files_to_compress: &[File],
165+
files_unmodified: &[File],
166+
tags_profiler: &str,
167+
internal_metadata: Option<serde_json::Value>,
168+
info: Option<serde_json::Value>,
169+
) -> String {
170+
let attachments: Vec<_> = files_to_compress.iter()
171+
.chain(files_unmodified.iter())
172+
.map(|f| f.name)
173+
.chain(std::iter::once("profile.pprof"))
174+
.collect();
175+
176+
let mut internal = internal_metadata.unwrap_or_else(|| json!({}));
177+
internal["libdatadog_version"] = json!(env!("CARGO_PKG_VERSION"));
178+
179+
json!({
180+
"attachments": attachments,
181+
"tags_profiler": tags_profiler,
182+
"start": chrono::DateTime::<chrono::Utc>::from(profile.start)
183+
.format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string(),
184+
"end": chrono::DateTime::<chrono::Utc>::from(profile.end)
185+
.format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string(),
186+
"family": self.family,
187+
"version": "4",
188+
"endpoint_counts": if profile.endpoints_stats.is_empty() {
189+
None
190+
} else {
191+
Some(&profile.endpoints_stats)
192+
},
193+
"internal": internal,
194+
"info": info.unwrap_or_else(|| json!({})),
195+
})
196+
.to_string()
197+
}
198+
199+
fn build_multipart_form(
200+
&self,
201+
event: String,
202+
profile: EncodedProfile,
203+
files_to_compress: &[File],
204+
files_unmodified: &[File],
205+
) -> anyhow::Result<reqwest::multipart::Form> {
206+
let mut form = reqwest::multipart::Form::new()
207+
.part(
208+
"event",
209+
reqwest::multipart::Part::text(event)
210+
.file_name("event.json")
211+
.mime_str("application/json")?,
212+
);
213+
214+
// Add compressed files
215+
for file in files_to_compress {
216+
let mut encoder = Compressor::<DefaultProfileCodec>::try_new(
217+
(file.bytes.len() >> 3).next_power_of_two(),
218+
10 * 1024 * 1024,
219+
Profile::COMPRESSION_LEVEL,
220+
)
221+
.context("failed to create compressor")?;
222+
encoder.write_all(file.bytes)?;
223+
224+
form = form.part(
225+
file.name.to_string(),
226+
reqwest::multipart::Part::bytes(encoder.finish()?)
227+
.file_name(file.name.to_string()),
228+
);
229+
}
230+
231+
// Add uncompressed files
232+
for file in files_unmodified {
233+
form = form.part(
234+
file.name.to_string(),
235+
reqwest::multipart::Part::bytes(file.bytes.to_vec())
236+
.file_name(file.name.to_string()),
237+
);
238+
}
239+
240+
// Add profile
241+
Ok(form.part(
242+
"profile.pprof",
243+
reqwest::multipart::Part::bytes(profile.buffer)
244+
.file_name("profile.pprof"),
245+
))
246+
}
247+
248+
fn build_optional_headers(&self) -> reqwest::header::HeaderMap {
249+
let mut headers = reqwest::header::HeaderMap::new();
250+
251+
if let Some(api_key) = &self.endpoint.api_key {
252+
if let Ok(value) = reqwest::header::HeaderValue::from_str(api_key) {
253+
headers.insert("DD-API-KEY", value);
254+
}
255+
}
256+
257+
if let Some(test_token) = &self.endpoint.test_token {
258+
if let Ok(value) = reqwest::header::HeaderValue::from_str(test_token) {
259+
headers.insert("X-Datadog-Test-Session-Token", value);
260+
}
261+
}
262+
263+
headers
264+
}
265+
}
266+

libdd-profiling/src/internal/profile/mod.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,46 @@ impl Profile {
536536
Ok(self)
537537
}
538538

539+
/// Convenience method to serialize and export the profile to a reqwest exporter.
540+
///
541+
/// # Arguments
542+
/// * `exporter` - The reqwest ProfileExporter to send the profile to
543+
/// * `files_to_compress_and_export` - Additional files to compress and attach to the profile
544+
/// * `files_to_export_unmodified` - Additional files to attach without compression
545+
/// * `additional_tags` - Tags to add to this specific profile (in addition to the exporter's configured tags)
546+
/// * `internal_metadata` - Internal metadata to include in the event JSON
547+
/// * `info` - System info to include in the event JSON
548+
/// * `end_time` - Optional end time (defaults to now)
549+
/// * `duration` - Optional duration (defaults to end_time - start_time)
550+
/// * `cancel` - Optional cancellation token to abort the upload
551+
///
552+
/// # Returns
553+
/// The HTTP status code from the upload
554+
#[allow(clippy::too_many_arguments)]
555+
pub async fn export_to_endpoint<'a>(
556+
self,
557+
exporter: &crate::exporter::reqwest_exporter::ProfileExporter,
558+
files_to_compress_and_export: &'a [crate::exporter::reqwest_exporter::File<'a>],
559+
files_to_export_unmodified: &'a [crate::exporter::reqwest_exporter::File<'a>],
560+
additional_tags: &[libdd_common::tag::Tag],
561+
internal_metadata: Option<serde_json::Value>,
562+
info: Option<serde_json::Value>,
563+
end_time: Option<SystemTime>,
564+
duration: Option<Duration>,
565+
cancel: Option<&tokio_util::sync::CancellationToken>,
566+
) -> anyhow::Result<reqwest::StatusCode> {
567+
let encoded = self.serialize_into_compressed_pprof(end_time, duration)?;
568+
exporter.send(
569+
encoded,
570+
files_to_compress_and_export,
571+
files_to_export_unmodified,
572+
additional_tags,
573+
internal_metadata,
574+
info,
575+
cancel,
576+
).await
577+
}
578+
539579
/// In incident 35390 (JIRA PROF-11456) we observed invalid location_ids being present in
540580
/// emitted profiles. We're doing extra checks here so that if we see incorrect ids again,
541581
/// we are 100% sure they were not introduced prior to this stage.

0 commit comments

Comments
 (0)