Skip to content

Commit 130771f

Browse files
committed
Refactor data-pipeline to hold a client pool so the connection with the agent is reused.
1 parent ad0b657 commit 130771f

File tree

4 files changed

+24
-5
lines changed

4 files changed

+24
-5
lines changed

data-pipeline/src/stats_exporter.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use std::{
1414
use crate::{span_concentrator::SpanConcentrator, trace_exporter::TracerMetadata};
1515
use datadog_trace_protobuf::pb;
1616
use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy};
17-
use ddcommon::{worker::Worker, Endpoint};
17+
use ddcommon::hyper_migration::new_default_client;
18+
use ddcommon::{worker::Worker, Endpoint, HttpClient};
1819
use hyper;
1920
use tokio::select;
2021
use tokio_util::sync::CancellationToken;
@@ -31,6 +32,7 @@ pub struct StatsExporter {
3132
meta: TracerMetadata,
3233
sequence_id: AtomicU64,
3334
cancellation_token: CancellationToken,
35+
client: HttpClient,
3436
}
3537

3638
impl StatsExporter {
@@ -48,6 +50,7 @@ impl StatsExporter {
4850
meta: TracerMetadata,
4951
endpoint: Endpoint,
5052
cancellation_token: CancellationToken,
53+
client: Option<HttpClient>,
5154
) -> Self {
5255
Self {
5356
flush_interval,
@@ -56,6 +59,7 @@ impl StatsExporter {
5659
meta,
5760
sequence_id: AtomicU64::new(0),
5861
cancellation_token,
62+
client: client.unwrap_or(new_default_client()),
5963
}
6064
}
6165

@@ -89,11 +93,11 @@ impl StatsExporter {
8993
);
9094

9195
let result = send_with_retry(
96+
&self.client,
9297
&self.endpoint,
9398
body,
9499
&headers,
95100
&RetryStrategy::default(),
96-
None,
97101
)
98102
.await;
99103

@@ -266,6 +270,7 @@ mod tests {
266270
get_test_metadata(),
267271
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
268272
CancellationToken::new(),
273+
None,
269274
);
270275

271276
let send_status = stats_exporter.send(true).await;
@@ -293,6 +298,7 @@ mod tests {
293298
get_test_metadata(),
294299
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
295300
CancellationToken::new(),
301+
None,
296302
);
297303

298304
let send_status = stats_exporter.send(true).await;
@@ -325,6 +331,7 @@ mod tests {
325331
get_test_metadata(),
326332
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
327333
CancellationToken::new(),
334+
None,
328335
);
329336

330337
tokio::time::pause();
@@ -365,6 +372,7 @@ mod tests {
365372
get_test_metadata(),
366373
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
367374
cancellation_token.clone(),
375+
None,
368376
);
369377

370378
tokio::spawn(async move {

data-pipeline/src/trace_exporter/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::trace_exporter::{
1212
INFO_ENDPOINT,
1313
};
1414
use arc_swap::ArcSwap;
15+
use ddcommon::hyper_migration::new_default_client;
1516
use ddcommon::{parse_uri, tag, Endpoint};
1617
use dogstatsd_client::new;
1718
use std::sync::{Arc, Mutex};
@@ -343,6 +344,7 @@ impl TraceExporterBuilder {
343344
agent_payload_response_version: self
344345
.agent_rates_payload_version_enabled
345346
.then(AgentResponsePayloadVersion::new),
347+
http_client: new_default_client(),
346348
})
347349
}
348350

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ use datadog_trace_utils::send_with_retry::{
3636
};
3737
use datadog_trace_utils::span::{Span, SpanText};
3838
use datadog_trace_utils::trace_utils::TracerHeaderTags;
39-
use ddcommon::MutexExt;
4039
use ddcommon::{hyper_migration, Endpoint};
4140
use ddcommon::{tag, tag::Tag};
41+
use ddcommon::{HttpClient, MutexExt};
4242
use ddtelemetry::worker::TelemetryWorker;
4343
use dogstatsd_client::Client;
4444
use http_body_util::BodyExt;
@@ -208,6 +208,7 @@ pub struct TraceExporter {
208208
health_metrics_enabled: bool,
209209
workers: Arc<Mutex<TraceExporterWorkers>>,
210210
agent_payload_response_version: Option<AgentResponsePayloadVersion>,
211+
http_client: HttpClient,
211212
}
212213

213214
impl TraceExporter {
@@ -424,6 +425,7 @@ impl TraceExporter {
424425
&agent_info,
425426
&self.client_side_stats,
426427
&self.workers,
428+
Some(self.http_client.clone()),
427429
);
428430
}
429431
StatsComputationStatus::Enabled {
@@ -627,7 +629,8 @@ impl TraceExporter {
627629
let payload_len = mp_payload.len();
628630

629631
// Send traces to the agent
630-
let result = send_with_retry(endpoint, mp_payload, &headers, &strategy, None).await;
632+
let result =
633+
send_with_retry(&self.http_client, endpoint, mp_payload, &headers, &strategy).await;
631634

632635
// Emit http.requests health metric based on number of attempts
633636
let requests_count = match &result {

data-pipeline/src/trace_exporter/stats.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::agent_info::schema::AgentInfo;
1111
use crate::span_concentrator::SpanConcentrator;
1212
use crate::stats_exporter;
1313
use arc_swap::ArcSwap;
14-
use ddcommon::{Endpoint, MutexExt};
14+
use ddcommon::{Endpoint, HttpClient, MutexExt};
1515
use std::sync::{Arc, Mutex};
1616
use std::time::Duration;
1717
use tokio::runtime::Runtime;
@@ -64,6 +64,7 @@ pub(crate) fn start_stats_computation(
6464
workers: &Arc<Mutex<super::TraceExporterWorkers>>,
6565
span_kinds: Vec<String>,
6666
peer_tags: Vec<String>,
67+
client: Option<HttpClient>,
6768
) -> anyhow::Result<()> {
6869
if let StatsComputationStatus::DisabledByAgent { bucket_size } = **client_side_stats.load() {
6970
let stats_concentrator = Arc::new(Mutex::new(SpanConcentrator::new(
@@ -80,6 +81,7 @@ pub(crate) fn start_stats_computation(
8081
&cancellation_token,
8182
workers,
8283
client_side_stats,
84+
client,
8385
)?;
8486
}
8587
Ok(())
@@ -93,13 +95,15 @@ fn create_and_start_stats_worker(
9395
cancellation_token: &CancellationToken,
9496
workers: &Arc<Mutex<super::TraceExporterWorkers>>,
9597
client_side_stats: &ArcSwap<StatsComputationStatus>,
98+
client: Option<HttpClient>,
9699
) -> anyhow::Result<()> {
97100
let stats_exporter = stats_exporter::StatsExporter::new(
98101
bucket_size,
99102
stats_concentrator.clone(),
100103
ctx.metadata.clone(),
101104
Endpoint::from_url(add_path(ctx.endpoint_url, STATS_ENDPOINT)),
102105
cancellation_token.clone(),
106+
client,
103107
);
104108
let mut stats_worker = crate::pausable_worker::PausableWorker::new(stats_exporter);
105109

@@ -160,6 +164,7 @@ pub(crate) fn handle_stats_disabled_by_agent(
160164
agent_info: &Arc<AgentInfo>,
161165
client_side_stats: &ArcSwap<StatsComputationStatus>,
162166
workers: &Arc<Mutex<super::TraceExporterWorkers>>,
167+
client: Option<HttpClient>,
163168
) {
164169
if agent_info.info.client_drop_p0s.is_some_and(|v| v) {
165170
// Client-side stats is supported by the agent
@@ -169,6 +174,7 @@ pub(crate) fn handle_stats_disabled_by_agent(
169174
workers,
170175
get_span_kinds_for_stats(agent_info),
171176
agent_info.info.peer_tags.clone().unwrap_or_default(),
177+
client,
172178
);
173179
match status {
174180
Ok(()) => info!("Client-side stats enabled"),

0 commit comments

Comments
 (0)