From 563fc65bfb587b5c682799ee413e379936e6212e Mon Sep 17 00:00:00 2001 From: Julio Gonzalez Date: Tue, 30 Sep 2025 13:29:28 +0200 Subject: [PATCH 1/4] Refactor trace-utils to pass an http client so the connections are reused. --- datadog-trace-utils/Cargo.toml | 8 +-- datadog-trace-utils/src/send_data/mod.rs | 67 ++++++++++--------- .../src/send_with_retry/mod.rs | 57 +++++----------- .../src/test_utils/datadog_test_agent.rs | 4 +- datadog-trace-utils/tests/test_send_data.rs | 13 ++-- 5 files changed, 66 insertions(+), 83 deletions(-) diff --git a/datadog-trace-utils/Cargo.toml b/datadog-trace-utils/Cargo.toml index 37eb0e96e1..46bf4bf503 100644 --- a/datadog-trace-utils/Cargo.toml +++ b/datadog-trace-utils/Cargo.toml @@ -38,11 +38,6 @@ tinybytes = { path = "../tinybytes", features = [ "serialization", ] } -# Proxy feature -hyper-http-proxy = { version = "1.1.0", default-features = false, features = [ - "rustls-tls-webpki-roots", -], optional = true } - # Compression feature flate2 = { version = "1.0", optional = true } zstd = { version = "0.13.3", default-features = false, optional = true } @@ -66,7 +61,7 @@ tempfile = "3.3.0" [features] default = ["https"] https = ["ddcommon/https"] -mini_agent = ["proxy", "compression", "ddcommon/use_webpki_roots"] +mini_agent = ["compression", "ddcommon/use_webpki_roots"] test-utils = [ "hyper/server", "httpmock", @@ -74,7 +69,6 @@ test-utils = [ "cargo-platform", "urlencoding", ] -proxy = ["hyper-http-proxy"] compression = ["zstd", "flate2"] # FIPS mode uses the FIPS-compliant cryptographic provider (Unix only) fips = ["ddcommon/fips"] diff --git a/datadog-trace-utils/src/send_data/mod.rs b/datadog-trace-utils/src/send_data/mod.rs index 82147a25b4..5f8d77f43e 100644 --- a/datadog-trace-utils/src/send_data/mod.rs +++ b/datadog-trace-utils/src/send_data/mod.rs @@ -9,6 +9,7 @@ use crate::trace_utils::TracerHeaderTags; use crate::tracer_payload::TracerPayloadCollection; use anyhow::{anyhow, Context}; use datadog_trace_protobuf::pb::{AgentPayload, TracerPayload}; +use ddcommon::HttpClient; use ddcommon::{ header::{ APPLICATION_MSGPACK_STR, APPLICATION_PROTOBUF_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR, @@ -42,6 +43,7 @@ use zstd::stream::write::Encoder; /// use datadog_trace_utils::trace_utils::TracerHeaderTags; /// use datadog_trace_utils::tracer_payload::TracerPayloadCollection; /// use ddcommon::Endpoint; +/// use ddcommon::hyper_migration::new_default_client; /// /// #[cfg_attr(miri, ignore)] /// async fn update_send_results_example() { @@ -58,8 +60,9 @@ use zstd::stream::write::Encoder; /// /// send_data.set_retry_strategy(retry_strategy); /// +/// let client = new_default_client(); /// // Send the data -/// let result = send_data.send().await; +/// let result = send_data.send(&client).await; /// } /// ``` pub struct SendData { @@ -234,24 +237,15 @@ impl SendData { /// # Returns /// /// A `SendDataResult` instance containing the result of the operation. - pub async fn send(&self) -> SendDataResult { - self.send_internal(None).await + pub async fn send(&self, http_client: &HttpClient) -> SendDataResult { + self.send_internal(http_client).await } - /// Sends the data to the target endpoint. - /// - /// # Returns - /// - /// A `SendDataResult` instance containing the result of the operation. - pub async fn send_proxy(&self, http_proxy: Option<&str>) -> SendDataResult { - self.send_internal(http_proxy).await - } - - async fn send_internal(&self, http_proxy: Option<&str>) -> SendDataResult { + async fn send_internal(&self, http_client: &HttpClient) -> SendDataResult { if self.use_protobuf() { - self.send_with_protobuf(http_proxy).await + self.send_with_protobuf(http_client).await } else { - self.send_with_msgpack(http_proxy).await + self.send_with_msgpack(http_client).await } } @@ -260,17 +254,17 @@ impl SendData { chunks: u64, payload: Vec, headers: HashMap<&'static str, String>, - http_proxy: Option<&str>, + http_client: &HttpClient, ) -> (SendWithRetryResult, u64, u64) { #[allow(clippy::unwrap_used)] let payload_len = u64::try_from(payload.len()).unwrap(); ( send_with_retry( + http_client, &self.target, payload, &headers, &self.retry_strategy, - http_proxy, ) .await, payload_len, @@ -304,7 +298,7 @@ impl SendData { } } - async fn send_with_protobuf(&self, http_proxy: Option<&str>) -> SendDataResult { + async fn send_with_protobuf(&self, http_client: &HttpClient) -> SendDataResult { let mut result = SendDataResult::default(); #[allow(clippy::unwrap_used)] @@ -331,7 +325,7 @@ impl SendData { request_headers.insert(CONTENT_TYPE.as_str(), APPLICATION_PROTOBUF_STR.to_string()); let (response, bytes_sent, chunks) = self - .send_payload(chunks, final_payload, request_headers, http_proxy) + .send_payload(chunks, final_payload, request_headers, http_client) .await; result.update(response, bytes_sent, chunks); @@ -342,7 +336,7 @@ impl SendData { } } - async fn send_with_msgpack(&self, http_proxy: Option<&str>) -> SendDataResult { + async fn send_with_msgpack(&self, http_client: &HttpClient) -> SendDataResult { let mut result = SendDataResult::default(); let mut futures = FuturesUnordered::new(); @@ -360,7 +354,7 @@ impl SendData { Err(e) => return result.error(anyhow!(e)), }; - futures.push(self.send_payload(chunks, payload, headers, http_proxy)); + futures.push(self.send_payload(chunks, payload, headers, http_client)); } } TracerPayloadCollection::V04(payload) => { @@ -372,7 +366,7 @@ impl SendData { let payload = msgpack_encoder::v04::to_vec(payload); - futures.push(self.send_payload(chunks, payload, headers, http_proxy)); + futures.push(self.send_payload(chunks, payload, headers, http_client)); } TracerPayloadCollection::V05(payload) => { #[allow(clippy::unwrap_used)] @@ -386,7 +380,7 @@ impl SendData { Err(e) => return result.error(anyhow!(e)), }; - futures.push(self.send_payload(chunks, payload, headers, http_proxy)); + futures.push(self.send_payload(chunks, payload, headers, http_client)); } } @@ -592,7 +586,8 @@ mod tests { ); let data_payload_len = compute_payload_len(&data.tracer_payloads); - let res = data.send().await; + let client = ddcommon::hyper_migration::new_default_client(); + let res = data.send(&client).await; mock.assert_async().await; @@ -637,7 +632,8 @@ mod tests { ); let data_payload_len = compute_payload_len(&data.tracer_payloads); - let res = data.send().await; + let client = ddcommon::hyper_migration::new_default_client(); + let res = data.send(&client).await; mock.assert_async().await; @@ -696,7 +692,8 @@ mod tests { ); let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads); - let res = data.send().await; + let client = ddcommon::hyper_migration::new_default_client(); + let res = data.send(&client).await; mock.assert_async().await; @@ -754,7 +751,8 @@ mod tests { ); let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads); - let res = data.send().await; + let client = ddcommon::hyper_migration::new_default_client(); + let res = data.send(&client).await; mock.assert_async().await; @@ -798,7 +796,8 @@ mod tests { ); let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads); - let res = data.send().await; + let client = ddcommon::hyper_migration::new_default_client(); + let res = data.send(&client).await; mock.assert_calls_async(2).await; @@ -839,7 +838,8 @@ mod tests { }, ); - let res = data.send().await; + let client = ddcommon::hyper_migration::new_default_client(); + let res = data.send(&client).await; mock.assert_calls_async(5).await; @@ -871,7 +871,8 @@ mod tests { }, ); - let res = data.send().await; + let client = ddcommon::hyper_migration::new_default_client(); + let res = data.send(&client).await; assert!(res.last_result.is_err()); match std::env::consts::OS { @@ -938,7 +939,8 @@ mod tests { }, ); - let res = data.send().await; + let client = ddcommon::hyper_migration::new_default_client(); + let res = data.send(&client).await; mock.assert_calls_async(5).await; @@ -980,7 +982,8 @@ mod tests { }, ); - let res = data.send().await; + let client = ddcommon::hyper_migration::new_default_client(); + let res = data.send(&client).await; mock.assert_calls_async(10).await; diff --git a/datadog-trace-utils/src/send_with_retry/mod.rs b/datadog-trace-utils/src/send_with_retry/mod.rs index 6638ebaef8..4cd43e7910 100644 --- a/datadog-trace-utils/src/send_with_retry/mod.rs +++ b/datadog-trace-utils/src/send_with_retry/mod.rs @@ -8,7 +8,7 @@ mod retry_strategy; pub use retry_strategy::{RetryBackoffType, RetryStrategy}; use bytes::Bytes; -use ddcommon::{hyper_migration, Endpoint, HttpRequestBuilder}; +use ddcommon::{hyper_migration, Endpoint, HttpClient, HttpRequestBuilder}; use hyper::Method; use std::{collections::HashMap, time::Duration}; use tracing::{debug, error, info, warn}; @@ -79,12 +79,10 @@ impl std::error::Error for RequestError {} /// request fails. /// /// The request builder from [`Endpoint::to_request_builder`] is used with the associated headers -/// (api key, test token), and `headers` are added to the request. If `http_proxy` is provided then -/// it is used as the uri of the proxy. The request is executed with a timeout of -/// [`Endpoint::timeout_ms`]. +/// (api key, test token), and `headers` are added to the request. The request is executed with a +/// timeout of [`Endpoint::timeout_ms`]. /// /// # Arguments -/// http_proxy will be ignored if hte crate is not compiled with the `proxy` feature /// /// # Returns /// @@ -98,6 +96,7 @@ impl std::error::Error for RequestError {} /// /// ```rust, no_run /// # use ddcommon::Endpoint; +/// # use ddcommon::hyper_migration::new_default_client; /// # use std::collections::HashMap; /// # use datadog_trace_utils::send_with_retry::*; /// # async fn run() -> SendWithRetryResult { @@ -108,15 +107,16 @@ impl std::error::Error for RequestError {} /// }; /// let headers = HashMap::from([("Content-type", "application/msgpack".to_string())]); /// let retry_strategy = RetryStrategy::new(3, 10, RetryBackoffType::Exponential, Some(5)); -/// send_with_retry(&target, payload, &headers, &retry_strategy, None).await +/// let client = new_default_client(); +/// send_with_retry(&client, &target, payload, &headers, &retry_strategy).await /// # } /// ``` pub async fn send_with_retry( + client: &HttpClient, target: &Endpoint, payload: Vec, headers: &HashMap<&'static str, String>, retry_strategy: &RetryStrategy, - http_proxy: Option<&str>, ) -> SendWithRetryResult { let mut request_attempt = 0; // Wrap the payload in Bytes to avoid expensive clone between retries @@ -147,10 +147,10 @@ pub async fn send_with_retry( } match send_request( + client, Duration::from_millis(target.timeout_ms), req, payload.clone(), - http_proxy, ) .await { @@ -223,41 +223,16 @@ pub async fn send_with_retry( } async fn send_request( + client: &HttpClient, timeout: Duration, req: HttpRequestBuilder, payload: Bytes, - http_proxy: Option<&str>, ) -> Result { let req = req .body(hyper_migration::Body::from_bytes(payload)) .or(Err(RequestError::Build))?; - #[cfg(feature = "proxy")] - #[allow(clippy::unwrap_used)] - let req_future = { - if let Some(proxy) = http_proxy { - let proxy = hyper_http_proxy::Proxy::new( - hyper_http_proxy::Intercept::Https, - proxy.parse().unwrap(), - ); - let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy( - ddcommon::connector::Connector::default(), - proxy, - ) - .unwrap(); - hyper_migration::client_builder() - .build(proxy_connector) - .request(req) - } else { - hyper_migration::new_default_client().request(req) - } - }; - - #[cfg(not(feature = "proxy"))] - let req_future = { - let _ = http_proxy; - hyper_migration::new_default_client().request(req) - }; + let req_future = { client.request(req) }; match tokio::time::timeout(timeout, req_future).await { Ok(resp) => match resp { @@ -305,13 +280,14 @@ mod tests { let strategy = RetryStrategy::new(0, 2, RetryBackoffType::Constant, None); + let client = ddcommon::hyper_migration::new_default_client(); tokio::spawn(async move { let result = send_with_retry( + &client, &target_endpoint, vec![0, 1, 2, 3], &HashMap::new(), &strategy, - None, ) .await; assert!(result.is_err(), "Expected an error result"); @@ -353,13 +329,14 @@ mod tests { let strategy = RetryStrategy::new(2, 250, RetryBackoffType::Constant, None); + let client = ddcommon::hyper_migration::new_default_client(); tokio::spawn(async move { let result = send_with_retry( + &client, &target_endpoint, vec![0, 1, 2, 3], &HashMap::new(), &strategy, - None, ) .await; assert!( @@ -401,13 +378,14 @@ mod tests { None, ); + let client = ddcommon::hyper_migration::new_default_client(); tokio::spawn(async move { let result = send_with_retry( + &client, &target_endpoint, vec![0, 1, 2, 3], &HashMap::new(), &strategy, - None, ) .await; assert!( @@ -449,13 +427,14 @@ mod tests { let strategy = RetryStrategy::new(2, 10, RetryBackoffType::Constant, None); + let client = ddcommon::hyper_migration::new_default_client(); tokio::spawn(async move { let result = send_with_retry( + &client, &target_endpoint, vec![0, 1, 2, 3], &HashMap::new(), &strategy, - None, ) .await; assert!( diff --git a/datadog-trace-utils/src/test_utils/datadog_test_agent.rs b/datadog-trace-utils/src/test_utils/datadog_test_agent.rs index 367b939946..6a07278a58 100644 --- a/datadog-trace-utils/src/test_utils/datadog_test_agent.rs +++ b/datadog-trace-utils/src/test_utils/datadog_test_agent.rs @@ -196,6 +196,7 @@ impl DatadogAgentContainerBuilder { /// use datadog_trace_utils::test_utils::datadog_test_agent::DatadogTestAgent; /// use datadog_trace_utils::trace_utils::TracerHeaderTags; /// use datadog_trace_utils::tracer_payload::TracerPayloadCollection; +/// use ddcommon::hyper_migration::new_default_client; /// use ddcommon::Endpoint; /// /// use tokio; @@ -221,7 +222,8 @@ impl DatadogAgentContainerBuilder { /// &endpoint, /// ); /// -/// let _result = data.send().await; +/// let client = new_default_client(); +/// let _result = data.send(&client).await; /// /// // Assert that the snapshot for a given token matches the expected snapshot /// test_agent.assert_snapshot("snapshot-token").await; diff --git a/datadog-trace-utils/tests/test_send_data.rs b/datadog-trace-utils/tests/test_send_data.rs index 2f6a26ab8a..8a380ebaba 100644 --- a/datadog-trace-utils/tests/test_send_data.rs +++ b/datadog-trace-utils/tests/test_send_data.rs @@ -10,6 +10,7 @@ mod tracing_integration_tests { use datadog_trace_utils::tracer_payload::{decode_to_trace_chunks, TraceEncoding}; #[cfg(target_os = "linux")] use ddcommon::connector::uds::socket_path_to_uri; + use ddcommon::hyper_migration::new_default_client; use ddcommon::{hyper_migration, Endpoint}; use http_body_util::BodyExt; #[cfg(target_os = "linux")] @@ -116,7 +117,8 @@ mod tracing_integration_tests { &endpoint, ); - let _result = data.send().await; + let client = new_default_client(); + let _result = data.send(&client).await; test_agent.assert_snapshot(snapshot_name).await; } @@ -208,7 +210,8 @@ mod tracing_integration_tests { &endpoint, ); - let _result = data.send().await; + let client = new_default_client(); + let _result = data.send(&client).await; test_agent.assert_snapshot(snapshot_name).await; } @@ -245,7 +248,8 @@ mod tracing_integration_tests { &endpoint, ); - let result = data.send().await; + let client = new_default_client(); + let result = data.send(&client).await; assert!(result.last_result.is_ok()); } @@ -326,7 +330,8 @@ mod tracing_integration_tests { &endpoint, ); - let _result = data.send().await; + let client = new_default_client(); + let _result = data.send(&client).await; test_agent.assert_snapshot(snapshot_name).await; } From 551b6e1d1cd3a89a4a9676ecf943e329ade6a741 Mon Sep 17 00:00:00 2001 From: Julio Gonzalez Date: Tue, 30 Sep 2025 13:31:25 +0200 Subject: [PATCH 2/4] Refactor data-pipeline to hold a client pool so the connection with the agent is reused. --- Cargo.lock | 21 --------------------- data-pipeline/src/stats_exporter.rs | 12 ++++++++++-- data-pipeline/src/trace_exporter/builder.rs | 2 ++ data-pipeline/src/trace_exporter/mod.rs | 7 +++++-- data-pipeline/src/trace_exporter/stats.rs | 8 +++++++- 5 files changed, 24 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0633f6a322..2ddd013fee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1806,7 +1806,6 @@ dependencies = [ "http-body-util", "httpmock", "hyper", - "hyper-http-proxy", "prost", "rand 0.8.5", "rmp", @@ -2675,26 +2674,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-http-proxy" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ad4b0a1e37510028bc4ba81d0e38d239c39671b0f0ce9e02dfa93a8133f7c08" -dependencies = [ - "bytes", - "futures-util", - "headers", - "http", - "hyper", - "hyper-rustls", - "hyper-util", - "pin-project-lite", - "tokio", - "tokio-rustls", - "tower-service", - "webpki-roots", -] - [[package]] name = "hyper-multipart-rfc7578" version = "0.9.0" diff --git a/data-pipeline/src/stats_exporter.rs b/data-pipeline/src/stats_exporter.rs index d57dbcf2f4..c2b090ed75 100644 --- a/data-pipeline/src/stats_exporter.rs +++ b/data-pipeline/src/stats_exporter.rs @@ -15,7 +15,8 @@ use crate::trace_exporter::TracerMetadata; use datadog_trace_protobuf::pb; use datadog_trace_stats::span_concentrator::SpanConcentrator; use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy}; -use ddcommon::{worker::Worker, Endpoint}; +use ddcommon::hyper_migration::new_default_client; +use ddcommon::{worker::Worker, Endpoint, HttpClient}; use hyper; use tokio::select; use tokio_util::sync::CancellationToken; @@ -32,6 +33,7 @@ pub struct StatsExporter { meta: TracerMetadata, sequence_id: AtomicU64, cancellation_token: CancellationToken, + client: HttpClient, } impl StatsExporter { @@ -49,6 +51,7 @@ impl StatsExporter { meta: TracerMetadata, endpoint: Endpoint, cancellation_token: CancellationToken, + client: Option, ) -> Self { Self { flush_interval, @@ -57,6 +60,7 @@ impl StatsExporter { meta, sequence_id: AtomicU64::new(0), cancellation_token, + client: client.unwrap_or(new_default_client()), } } @@ -90,11 +94,11 @@ impl StatsExporter { ); let result = send_with_retry( + &self.client, &self.endpoint, body, &headers, &RetryStrategy::default(), - None, ) .await; @@ -267,6 +271,7 @@ mod tests { get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), CancellationToken::new(), + None, ); let send_status = stats_exporter.send(true).await; @@ -294,6 +299,7 @@ mod tests { get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), CancellationToken::new(), + None, ); let send_status = stats_exporter.send(true).await; @@ -326,6 +332,7 @@ mod tests { get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), CancellationToken::new(), + None, ); tokio::time::pause(); @@ -366,6 +373,7 @@ mod tests { get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), cancellation_token.clone(), + None, ); tokio::spawn(async move { diff --git a/data-pipeline/src/trace_exporter/builder.rs b/data-pipeline/src/trace_exporter/builder.rs index 0339a29983..33f5e5b973 100644 --- a/data-pipeline/src/trace_exporter/builder.rs +++ b/data-pipeline/src/trace_exporter/builder.rs @@ -12,6 +12,7 @@ use crate::trace_exporter::{ INFO_ENDPOINT, }; use arc_swap::ArcSwap; +use ddcommon::hyper_migration::new_default_client; use ddcommon::{parse_uri, tag, Endpoint}; use dogstatsd_client::new; use std::sync::{Arc, Mutex}; @@ -339,6 +340,7 @@ impl TraceExporterBuilder { agent_payload_response_version: self .agent_rates_payload_version_enabled .then(AgentResponsePayloadVersion::new), + http_client: new_default_client(), }) } diff --git a/data-pipeline/src/trace_exporter/mod.rs b/data-pipeline/src/trace_exporter/mod.rs index 37b2208f81..e30e32466d 100644 --- a/data-pipeline/src/trace_exporter/mod.rs +++ b/data-pipeline/src/trace_exporter/mod.rs @@ -36,9 +36,9 @@ use datadog_trace_utils::send_with_retry::{ }; use datadog_trace_utils::span::{Span, SpanText}; use datadog_trace_utils::trace_utils::TracerHeaderTags; -use ddcommon::MutexExt; use ddcommon::{hyper_migration, Endpoint}; use ddcommon::{tag, tag::Tag}; +use ddcommon::{HttpClient, MutexExt}; use ddtelemetry::worker::TelemetryWorker; use dogstatsd_client::Client; use http_body_util::BodyExt; @@ -208,6 +208,7 @@ pub struct TraceExporter { health_metrics_enabled: bool, workers: Arc>, agent_payload_response_version: Option, + http_client: HttpClient, } impl TraceExporter { @@ -424,6 +425,7 @@ impl TraceExporter { &agent_info, &self.client_side_stats, &self.workers, + Some(self.http_client.clone()), ); } StatsComputationStatus::Enabled { @@ -627,7 +629,8 @@ impl TraceExporter { let payload_len = mp_payload.len(); // Send traces to the agent - let result = send_with_retry(endpoint, mp_payload, &headers, &strategy, None).await; + let result = + send_with_retry(&self.http_client, endpoint, mp_payload, &headers, &strategy).await; // Emit http.requests health metric based on number of attempts let requests_count = match &result { diff --git a/data-pipeline/src/trace_exporter/stats.rs b/data-pipeline/src/trace_exporter/stats.rs index 5b61a2b9d1..b2679dcd05 100644 --- a/data-pipeline/src/trace_exporter/stats.rs +++ b/data-pipeline/src/trace_exporter/stats.rs @@ -11,7 +11,7 @@ use crate::agent_info::schema::AgentInfo; use crate::stats_exporter; use arc_swap::ArcSwap; use datadog_trace_stats::span_concentrator::SpanConcentrator; -use ddcommon::{Endpoint, MutexExt}; +use ddcommon::{Endpoint, HttpClient, MutexExt}; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::runtime::Runtime; @@ -64,6 +64,7 @@ pub(crate) fn start_stats_computation( workers: &Arc>, span_kinds: Vec, peer_tags: Vec, + client: Option, ) -> anyhow::Result<()> { if let StatsComputationStatus::DisabledByAgent { bucket_size } = **client_side_stats.load() { let stats_concentrator = Arc::new(Mutex::new(SpanConcentrator::new( @@ -80,6 +81,7 @@ pub(crate) fn start_stats_computation( &cancellation_token, workers, client_side_stats, + client, )?; } Ok(()) @@ -93,6 +95,7 @@ fn create_and_start_stats_worker( cancellation_token: &CancellationToken, workers: &Arc>, client_side_stats: &ArcSwap, + client: Option, ) -> anyhow::Result<()> { let stats_exporter = stats_exporter::StatsExporter::new( bucket_size, @@ -100,6 +103,7 @@ fn create_and_start_stats_worker( ctx.metadata.clone(), Endpoint::from_url(add_path(ctx.endpoint_url, STATS_ENDPOINT)), cancellation_token.clone(), + client, ); let mut stats_worker = crate::pausable_worker::PausableWorker::new(stats_exporter); @@ -160,6 +164,7 @@ pub(crate) fn handle_stats_disabled_by_agent( agent_info: &Arc, client_side_stats: &ArcSwap, workers: &Arc>, + client: Option, ) { if agent_info.info.client_drop_p0s.is_some_and(|v| v) { // Client-side stats is supported by the agent @@ -169,6 +174,7 @@ pub(crate) fn handle_stats_disabled_by_agent( workers, get_span_kinds_for_stats(agent_info), agent_info.info.peer_tags.clone().unwrap_or_default(), + client, ); match status { Ok(()) => info!("Client-side stats enabled"), From e702ace43facbccf8178d645c13c8f2a49c05294 Mon Sep 17 00:00:00 2001 From: Julio Gonzalez Date: Tue, 30 Sep 2025 13:32:08 +0200 Subject: [PATCH 3/4] Refactor trace_flusher so it stores an http client. --- datadog-sidecar/src/service/tracing/trace_flusher.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datadog-sidecar/src/service/tracing/trace_flusher.rs b/datadog-sidecar/src/service/tracing/trace_flusher.rs index 3a3524faf9..9e1ed04771 100644 --- a/datadog-sidecar/src/service/tracing/trace_flusher.rs +++ b/datadog-sidecar/src/service/tracing/trace_flusher.rs @@ -7,7 +7,8 @@ use datadog_ipc::platform::NamedShmHandle; use datadog_trace_utils::trace_utils; use datadog_trace_utils::trace_utils::SendData; use datadog_trace_utils::trace_utils::SendDataResult; -use ddcommon::{Endpoint, MutexExt}; +use ddcommon::hyper_migration::new_default_client; +use ddcommon::{Endpoint, HttpClient, MutexExt}; use futures::future::join_all; use http_body_util::BodyExt; use manual_future::{ManualFuture, ManualFutureCompleter}; @@ -95,6 +96,7 @@ pub(crate) struct TraceFlusher { pub(crate) min_force_drop_size_bytes: AtomicU32, // put a limit on memory usage remote_config: Mutex, pub metrics: Mutex, + client: HttpClient, } impl Default for TraceFlusher { fn default() -> Self { @@ -105,6 +107,7 @@ impl Default for TraceFlusher { min_force_drop_size_bytes: AtomicU32::new(trace_utils::MAX_PAYLOAD_SIZE as u32), remote_config: Mutex::new(Default::default()), metrics: Mutex::new(Default::default()), + client: new_default_client(), } } } @@ -246,7 +249,7 @@ impl TraceFlusher { async fn send_and_handle_trace(&self, send_data: SendData) { let endpoint = send_data.get_target().clone(); - let response = send_data.send().await; + let response = send_data.send(&self.client).await; self.metrics.lock_or_panic().update(&response); match response.last_result { Ok(response) => { From 386bc6b539a8b7f5895788ead6ba29904841134c Mon Sep 17 00:00:00 2001 From: Julio Gonzalez Date: Mon, 6 Oct 2025 10:23:41 +0200 Subject: [PATCH 4/4] Remove Option passing client to the Stats Exporter. --- data-pipeline/src/stats_exporter.rs | 14 +++++++------- data-pipeline/src/trace_exporter/mod.rs | 2 +- data-pipeline/src/trace_exporter/stats.rs | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/data-pipeline/src/stats_exporter.rs b/data-pipeline/src/stats_exporter.rs index c2b090ed75..9bb333b86f 100644 --- a/data-pipeline/src/stats_exporter.rs +++ b/data-pipeline/src/stats_exporter.rs @@ -15,7 +15,6 @@ use crate::trace_exporter::TracerMetadata; use datadog_trace_protobuf::pb; use datadog_trace_stats::span_concentrator::SpanConcentrator; use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy}; -use ddcommon::hyper_migration::new_default_client; use ddcommon::{worker::Worker, Endpoint, HttpClient}; use hyper; use tokio::select; @@ -51,7 +50,7 @@ impl StatsExporter { meta: TracerMetadata, endpoint: Endpoint, cancellation_token: CancellationToken, - client: Option, + client: HttpClient, ) -> Self { Self { flush_interval, @@ -60,7 +59,7 @@ impl StatsExporter { meta, sequence_id: AtomicU64::new(0), cancellation_token, - client: client.unwrap_or(new_default_client()), + client, } } @@ -195,6 +194,7 @@ mod tests { use super::*; use datadog_trace_utils::span::{trace_utils, SpanSlice}; use datadog_trace_utils::test_utils::poll_for_mock_hit; + use ddcommon::hyper_migration::new_default_client; use httpmock::prelude::*; use httpmock::MockServer; use time::Duration; @@ -271,7 +271,7 @@ mod tests { get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), CancellationToken::new(), - None, + new_default_client(), ); let send_status = stats_exporter.send(true).await; @@ -299,7 +299,7 @@ mod tests { get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), CancellationToken::new(), - None, + new_default_client(), ); let send_status = stats_exporter.send(true).await; @@ -332,7 +332,7 @@ mod tests { get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), CancellationToken::new(), - None, + new_default_client(), ); tokio::time::pause(); @@ -373,7 +373,7 @@ mod tests { get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), cancellation_token.clone(), - None, + new_default_client(), ); tokio::spawn(async move { diff --git a/data-pipeline/src/trace_exporter/mod.rs b/data-pipeline/src/trace_exporter/mod.rs index e30e32466d..534aebe3f6 100644 --- a/data-pipeline/src/trace_exporter/mod.rs +++ b/data-pipeline/src/trace_exporter/mod.rs @@ -425,7 +425,7 @@ impl TraceExporter { &agent_info, &self.client_side_stats, &self.workers, - Some(self.http_client.clone()), + self.http_client.clone(), ); } StatsComputationStatus::Enabled { diff --git a/data-pipeline/src/trace_exporter/stats.rs b/data-pipeline/src/trace_exporter/stats.rs index b2679dcd05..30389c6e89 100644 --- a/data-pipeline/src/trace_exporter/stats.rs +++ b/data-pipeline/src/trace_exporter/stats.rs @@ -64,7 +64,7 @@ pub(crate) fn start_stats_computation( workers: &Arc>, span_kinds: Vec, peer_tags: Vec, - client: Option, + client: HttpClient, ) -> anyhow::Result<()> { if let StatsComputationStatus::DisabledByAgent { bucket_size } = **client_side_stats.load() { let stats_concentrator = Arc::new(Mutex::new(SpanConcentrator::new( @@ -95,7 +95,7 @@ fn create_and_start_stats_worker( cancellation_token: &CancellationToken, workers: &Arc>, client_side_stats: &ArcSwap, - client: Option, + client: HttpClient, ) -> anyhow::Result<()> { let stats_exporter = stats_exporter::StatsExporter::new( bucket_size, @@ -164,7 +164,7 @@ pub(crate) fn handle_stats_disabled_by_agent( agent_info: &Arc, client_side_stats: &ArcSwap, workers: &Arc>, - client: Option, + client: HttpClient, ) { if agent_info.info.client_drop_p0s.is_some_and(|v| v) { // Client-side stats is supported by the agent