Skip to content

Commit 5f31527

Browse files
committed
Refactor data-pipeline to hold a client pool so the connection with the agent is reused.
1 parent 39b8b0e commit 5f31527

File tree

3 files changed

+10
-4
lines changed

3 files changed

+10
-4
lines changed

data-pipeline/src/stats_exporter.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use std::{
1414
use crate::{span_concentrator::SpanConcentrator, trace_exporter::TracerMetadata};
1515
use datadog_trace_protobuf::pb;
1616
use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy};
17-
use ddcommon::{worker::Worker, Endpoint};
17+
use ddcommon::{worker::Worker, Endpoint, HttpClient};
18+
use ddcommon::hyper_migration::new_default_client;
1819
use hyper;
1920
use tokio::select;
2021
use tokio_util::sync::CancellationToken;
@@ -31,6 +32,7 @@ pub struct StatsExporter {
3132
meta: TracerMetadata,
3233
sequence_id: AtomicU64,
3334
cancellation_token: CancellationToken,
35+
client: HttpClient,
3436
}
3537

3638
impl StatsExporter {
@@ -56,6 +58,7 @@ impl StatsExporter {
5658
meta,
5759
sequence_id: AtomicU64::new(0),
5860
cancellation_token,
61+
client: new_default_client(),
5962
}
6063
}
6164

@@ -89,11 +92,11 @@ impl StatsExporter {
8992
);
9093

9194
let result = send_with_retry(
95+
&self.client,
9296
&self.endpoint,
9397
body,
9498
&headers,
9599
&RetryStrategy::default(),
96-
None,
97100
)
98101
.await;
99102

data-pipeline/src/trace_exporter/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::trace_exporter::{
1212
INFO_ENDPOINT,
1313
};
1414
use arc_swap::ArcSwap;
15+
use ddcommon::hyper_migration::new_default_client;
1516
use ddcommon::{parse_uri, tag, Endpoint};
1617
use dogstatsd_client::new;
1718
use std::sync::{Arc, Mutex};
@@ -343,6 +344,7 @@ impl TraceExporterBuilder {
343344
agent_payload_response_version: self
344345
.agent_rates_payload_version_enabled
345346
.then(AgentResponsePayloadVersion::new),
347+
http_client: new_default_client(),
346348
})
347349
}
348350

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datadog_trace_utils::send_with_retry::{
3636
};
3737
use datadog_trace_utils::span::{Span, SpanText};
3838
use datadog_trace_utils::trace_utils::TracerHeaderTags;
39-
use ddcommon::MutexExt;
39+
use ddcommon::{HttpClient, MutexExt};
4040
use ddcommon::{hyper_migration, Endpoint};
4141
use ddcommon::{tag, tag::Tag};
4242
use ddtelemetry::worker::TelemetryWorker;
@@ -208,6 +208,7 @@ pub struct TraceExporter {
208208
health_metrics_enabled: bool,
209209
workers: Arc<Mutex<TraceExporterWorkers>>,
210210
agent_payload_response_version: Option<AgentResponsePayloadVersion>,
211+
http_client: HttpClient,
211212
}
212213

213214
impl TraceExporter {
@@ -627,7 +628,7 @@ impl TraceExporter {
627628
let payload_len = mp_payload.len();
628629

629630
// Send traces to the agent
630-
let result = send_with_retry(endpoint, mp_payload, &headers, &strategy, None).await;
631+
let result = send_with_retry(&self.http_client, endpoint, mp_payload, &headers, &strategy).await;
631632

632633
// Emit http.requests health metric based on number of attempts
633634
let requests_count = match &result {

0 commit comments

Comments
 (0)