Skip to content

Commit 39b8b0e

Browse files
committed
Refactor trace-utils to pass an http client so the connections are reused.
1 parent 0b59f64 commit 39b8b0e

File tree

5 files changed

+65
-80
lines changed

5 files changed

+65
-80
lines changed

datadog-trace-utils/Cargo.toml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@ tinybytes = { path = "../tinybytes", features = [
3838
"serialization",
3939
] }
4040

41-
# Proxy feature
42-
hyper-http-proxy = { version = "1.1.0", default-features = false, features = [
43-
"rustls-tls-webpki-roots",
44-
], optional = true }
45-
4641
# Compression feature
4742
flate2 = { version = "1.0", optional = true }
4843
zstd = { version = "0.13.3", default-features = false, optional = true }
@@ -66,15 +61,14 @@ tempfile = "3.3.0"
6661
[features]
6762
default = ["https"]
6863
https = ["ddcommon/https"]
69-
mini_agent = ["proxy", "compression", "ddcommon/use_webpki_roots"]
64+
mini_agent = ["compression", "ddcommon/use_webpki_roots"]
7065
test-utils = [
7166
"hyper/server",
7267
"httpmock",
7368
"cargo_metadata",
7469
"cargo-platform",
7570
"urlencoding",
7671
]
77-
proxy = ["hyper-http-proxy"]
7872
compression = ["zstd", "flate2"]
7973
# FIPS mode uses the FIPS-compliant cryptographic provider (Unix only)
8074
fips = ["ddcommon/fips"]

datadog-trace-utils/src/send_data/mod.rs

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::trace_utils::TracerHeaderTags;
99
use crate::tracer_payload::TracerPayloadCollection;
1010
use anyhow::{anyhow, Context};
1111
use datadog_trace_protobuf::pb::{AgentPayload, TracerPayload};
12+
use ddcommon::HttpClient;
1213
use ddcommon::{
1314
header::{
1415
APPLICATION_MSGPACK_STR, APPLICATION_PROTOBUF_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR,
@@ -42,6 +43,7 @@ use zstd::stream::write::Encoder;
4243
/// use datadog_trace_utils::trace_utils::TracerHeaderTags;
4344
/// use datadog_trace_utils::tracer_payload::TracerPayloadCollection;
4445
/// use ddcommon::Endpoint;
46+
/// use ddcommon::hyper_migration::new_default_client;
4547
///
4648
/// #[cfg_attr(miri, ignore)]
4749
/// async fn update_send_results_example() {
@@ -58,8 +60,9 @@ use zstd::stream::write::Encoder;
5860
///
5961
/// send_data.set_retry_strategy(retry_strategy);
6062
///
63+
/// let client = new_default_client();
6164
/// // Send the data
62-
/// let result = send_data.send().await;
65+
/// let result = send_data.send(&client).await;
6366
/// }
6467
/// ```
6568
pub struct SendData {
@@ -234,24 +237,15 @@ impl SendData {
234237
/// # Returns
235238
///
236239
/// A `SendDataResult` instance containing the result of the operation.
237-
pub async fn send(&self) -> SendDataResult {
238-
self.send_internal(None).await
240+
pub async fn send(&self, http_client: &HttpClient) -> SendDataResult {
241+
self.send_internal(http_client).await
239242
}
240243

241-
/// Sends the data to the target endpoint.
242-
///
243-
/// # Returns
244-
///
245-
/// A `SendDataResult` instance containing the result of the operation.
246-
pub async fn send_proxy(&self, http_proxy: Option<&str>) -> SendDataResult {
247-
self.send_internal(http_proxy).await
248-
}
249-
250-
async fn send_internal(&self, http_proxy: Option<&str>) -> SendDataResult {
244+
async fn send_internal(&self, http_client: &HttpClient) -> SendDataResult {
251245
if self.use_protobuf() {
252-
self.send_with_protobuf(http_proxy).await
246+
self.send_with_protobuf(http_client).await
253247
} else {
254-
self.send_with_msgpack(http_proxy).await
248+
self.send_with_msgpack(http_client).await
255249
}
256250
}
257251

@@ -260,17 +254,17 @@ impl SendData {
260254
chunks: u64,
261255
payload: Vec<u8>,
262256
headers: HashMap<&'static str, String>,
263-
http_proxy: Option<&str>,
257+
http_client: &HttpClient,
264258
) -> (SendWithRetryResult, u64, u64) {
265259
#[allow(clippy::unwrap_used)]
266260
let payload_len = u64::try_from(payload.len()).unwrap();
267261
(
268262
send_with_retry(
263+
http_client,
269264
&self.target,
270265
payload,
271266
&headers,
272267
&self.retry_strategy,
273-
http_proxy,
274268
)
275269
.await,
276270
payload_len,
@@ -304,7 +298,7 @@ impl SendData {
304298
}
305299
}
306300

307-
async fn send_with_protobuf(&self, http_proxy: Option<&str>) -> SendDataResult {
301+
async fn send_with_protobuf(&self, http_client: &HttpClient) -> SendDataResult {
308302
let mut result = SendDataResult::default();
309303

310304
#[allow(clippy::unwrap_used)]
@@ -331,7 +325,7 @@ impl SendData {
331325
request_headers.insert(CONTENT_TYPE.as_str(), APPLICATION_PROTOBUF_STR.to_string());
332326

333327
let (response, bytes_sent, chunks) = self
334-
.send_payload(chunks, final_payload, request_headers, http_proxy)
328+
.send_payload(chunks, final_payload, request_headers, http_client)
335329
.await;
336330

337331
result.update(response, bytes_sent, chunks);
@@ -342,7 +336,7 @@ impl SendData {
342336
}
343337
}
344338

345-
async fn send_with_msgpack(&self, http_proxy: Option<&str>) -> SendDataResult {
339+
async fn send_with_msgpack(&self, http_client: &HttpClient) -> SendDataResult {
346340
let mut result = SendDataResult::default();
347341
let mut futures = FuturesUnordered::new();
348342

@@ -360,7 +354,7 @@ impl SendData {
360354
Err(e) => return result.error(anyhow!(e)),
361355
};
362356

363-
futures.push(self.send_payload(chunks, payload, headers, http_proxy));
357+
futures.push(self.send_payload(chunks, payload, headers, http_client));
364358
}
365359
}
366360
TracerPayloadCollection::V04(payload) => {
@@ -372,7 +366,7 @@ impl SendData {
372366

373367
let payload = msgpack_encoder::v04::to_vec(payload);
374368

375-
futures.push(self.send_payload(chunks, payload, headers, http_proxy));
369+
futures.push(self.send_payload(chunks, payload, headers, http_client));
376370
}
377371
TracerPayloadCollection::V05(payload) => {
378372
#[allow(clippy::unwrap_used)]
@@ -386,7 +380,7 @@ impl SendData {
386380
Err(e) => return result.error(anyhow!(e)),
387381
};
388382

389-
futures.push(self.send_payload(chunks, payload, headers, http_proxy));
383+
futures.push(self.send_payload(chunks, payload, headers, http_client));
390384
}
391385
}
392386

@@ -592,7 +586,8 @@ mod tests {
592586
);
593587

594588
let data_payload_len = compute_payload_len(&data.tracer_payloads);
595-
let res = data.send().await;
589+
let client = ddcommon::hyper_migration::new_default_client();
590+
let res = data.send(&client).await;
596591

597592
mock.assert_async().await;
598593

@@ -637,7 +632,8 @@ mod tests {
637632
);
638633

639634
let data_payload_len = compute_payload_len(&data.tracer_payloads);
640-
let res = data.send().await;
635+
let client = ddcommon::hyper_migration::new_default_client();
636+
let res = data.send(&client).await;
641637

642638
mock.assert_async().await;
643639

@@ -696,7 +692,8 @@ mod tests {
696692
);
697693

698694
let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
699-
let res = data.send().await;
695+
let client = ddcommon::hyper_migration::new_default_client();
696+
let res = data.send(&client).await;
700697

701698
mock.assert_async().await;
702699

@@ -754,7 +751,8 @@ mod tests {
754751
);
755752

756753
let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
757-
let res = data.send().await;
754+
let client = ddcommon::hyper_migration::new_default_client();
755+
let res = data.send(&client).await;
758756

759757
mock.assert_async().await;
760758

@@ -798,7 +796,8 @@ mod tests {
798796
);
799797

800798
let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
801-
let res = data.send().await;
799+
let client = ddcommon::hyper_migration::new_default_client();
800+
let res = data.send(&client).await;
802801

803802
mock.assert_calls_async(2).await;
804803

@@ -839,7 +838,8 @@ mod tests {
839838
},
840839
);
841840

842-
let res = data.send().await;
841+
let client = ddcommon::hyper_migration::new_default_client();
842+
let res = data.send(&client).await;
843843

844844
mock.assert_calls_async(5).await;
845845

@@ -871,7 +871,8 @@ mod tests {
871871
},
872872
);
873873

874-
let res = data.send().await;
874+
let client = ddcommon::hyper_migration::new_default_client();
875+
let res = data.send(&client).await;
875876

876877
assert!(res.last_result.is_err());
877878
match std::env::consts::OS {
@@ -938,7 +939,8 @@ mod tests {
938939
},
939940
);
940941

941-
let res = data.send().await;
942+
let client = ddcommon::hyper_migration::new_default_client();
943+
let res = data.send(&client).await;
942944

943945
mock.assert_calls_async(5).await;
944946

@@ -980,7 +982,8 @@ mod tests {
980982
},
981983
);
982984

983-
let res = data.send().await;
985+
let client = ddcommon::hyper_migration::new_default_client();
986+
let res = data.send(&client).await;
984987

985988
mock.assert_calls_async(10).await;
986989

0 commit comments

Comments
 (0)