Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions data-pipeline/src/stats_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::trace_exporter::TracerMetadata;
use datadog_trace_protobuf::pb;
use datadog_trace_stats::span_concentrator::SpanConcentrator;
use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy};
use ddcommon::{worker::Worker, Endpoint};
use ddcommon::{worker::Worker, Endpoint, HttpClient};
use hyper;
use tokio::select;
use tokio_util::sync::CancellationToken;
Expand All @@ -32,6 +32,7 @@ pub struct StatsExporter {
meta: TracerMetadata,
sequence_id: AtomicU64,
cancellation_token: CancellationToken,
client: HttpClient,
}

impl StatsExporter {
Expand All @@ -49,6 +50,7 @@ impl StatsExporter {
meta: TracerMetadata,
endpoint: Endpoint,
cancellation_token: CancellationToken,
client: HttpClient,
) -> Self {
Self {
flush_interval,
Expand All @@ -57,6 +59,7 @@ impl StatsExporter {
meta,
sequence_id: AtomicU64::new(0),
cancellation_token,
client,
}
}

Expand Down Expand Up @@ -90,11 +93,11 @@ impl StatsExporter {
);

let result = send_with_retry(
&self.client,
&self.endpoint,
body,
&headers,
&RetryStrategy::default(),
None,
)
.await;

Expand Down Expand Up @@ -191,6 +194,7 @@ mod tests {
use super::*;
use datadog_trace_utils::span::{trace_utils, SpanSlice};
use datadog_trace_utils::test_utils::poll_for_mock_hit;
use ddcommon::hyper_migration::new_default_client;
use httpmock::prelude::*;
use httpmock::MockServer;
use time::Duration;
Expand Down Expand Up @@ -267,6 +271,7 @@ mod tests {
get_test_metadata(),
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
CancellationToken::new(),
new_default_client(),
);

let send_status = stats_exporter.send(true).await;
Expand Down Expand Up @@ -294,6 +299,7 @@ mod tests {
get_test_metadata(),
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
CancellationToken::new(),
new_default_client(),
);

let send_status = stats_exporter.send(true).await;
Expand Down Expand Up @@ -326,6 +332,7 @@ mod tests {
get_test_metadata(),
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
CancellationToken::new(),
new_default_client(),
);

tokio::time::pause();
Expand Down Expand Up @@ -366,6 +373,7 @@ mod tests {
get_test_metadata(),
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
cancellation_token.clone(),
new_default_client(),
);

tokio::spawn(async move {
Expand Down
2 changes: 2 additions & 0 deletions data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::trace_exporter::{
INFO_ENDPOINT,
};
use arc_swap::ArcSwap;
use ddcommon::hyper_migration::new_default_client;
use ddcommon::{parse_uri, tag, Endpoint};
use dogstatsd_client::new;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -339,6 +340,7 @@ impl TraceExporterBuilder {
agent_payload_response_version: self
.agent_rates_payload_version_enabled
.then(AgentResponsePayloadVersion::new),
http_client: new_default_client(),
})
}

Expand Down
7 changes: 5 additions & 2 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use datadog_trace_utils::send_with_retry::{
};
use datadog_trace_utils::span::{Span, SpanText};
use datadog_trace_utils::trace_utils::TracerHeaderTags;
use ddcommon::MutexExt;
use ddcommon::{hyper_migration, Endpoint};
use ddcommon::{tag, tag::Tag};
use ddcommon::{HttpClient, MutexExt};
use ddtelemetry::worker::TelemetryWorker;
use dogstatsd_client::Client;
use http_body_util::BodyExt;
Expand Down Expand Up @@ -208,6 +208,7 @@ pub struct TraceExporter {
health_metrics_enabled: bool,
workers: Arc<Mutex<TraceExporterWorkers>>,
agent_payload_response_version: Option<AgentResponsePayloadVersion>,
http_client: HttpClient,
}

impl TraceExporter {
Expand Down Expand Up @@ -424,6 +425,7 @@ impl TraceExporter {
&agent_info,
&self.client_side_stats,
&self.workers,
self.http_client.clone(),
);
}
StatsComputationStatus::Enabled {
Expand Down Expand Up @@ -627,7 +629,8 @@ impl TraceExporter {
let payload_len = mp_payload.len();

// Send traces to the agent
let result = send_with_retry(endpoint, mp_payload, &headers, &strategy, None).await;
let result =
send_with_retry(&self.http_client, endpoint, mp_payload, &headers, &strategy).await;

// Emit http.requests health metric based on number of attempts
let requests_count = match &result {
Expand Down
8 changes: 7 additions & 1 deletion data-pipeline/src/trace_exporter/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::agent_info::schema::AgentInfo;
use crate::stats_exporter;
use arc_swap::ArcSwap;
use datadog_trace_stats::span_concentrator::SpanConcentrator;
use ddcommon::{Endpoint, MutexExt};
use ddcommon::{Endpoint, HttpClient, MutexExt};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -64,6 +64,7 @@ pub(crate) fn start_stats_computation(
workers: &Arc<Mutex<super::TraceExporterWorkers>>,
span_kinds: Vec<String>,
peer_tags: Vec<String>,
client: HttpClient,
) -> anyhow::Result<()> {
if let StatsComputationStatus::DisabledByAgent { bucket_size } = **client_side_stats.load() {
let stats_concentrator = Arc::new(Mutex::new(SpanConcentrator::new(
Expand All @@ -80,6 +81,7 @@ pub(crate) fn start_stats_computation(
&cancellation_token,
workers,
client_side_stats,
client,
)?;
}
Ok(())
Expand All @@ -93,13 +95,15 @@ fn create_and_start_stats_worker(
cancellation_token: &CancellationToken,
workers: &Arc<Mutex<super::TraceExporterWorkers>>,
client_side_stats: &ArcSwap<StatsComputationStatus>,
client: HttpClient,
) -> anyhow::Result<()> {
let stats_exporter = stats_exporter::StatsExporter::new(
bucket_size,
stats_concentrator.clone(),
ctx.metadata.clone(),
Endpoint::from_url(add_path(ctx.endpoint_url, STATS_ENDPOINT)),
cancellation_token.clone(),
client,
);
let mut stats_worker = crate::pausable_worker::PausableWorker::new(stats_exporter);

Expand Down Expand Up @@ -160,6 +164,7 @@ pub(crate) fn handle_stats_disabled_by_agent(
agent_info: &Arc<AgentInfo>,
client_side_stats: &ArcSwap<StatsComputationStatus>,
workers: &Arc<Mutex<super::TraceExporterWorkers>>,
client: HttpClient,
) {
if agent_info.info.client_drop_p0s.is_some_and(|v| v) {
// Client-side stats is supported by the agent
Expand All @@ -169,6 +174,7 @@ pub(crate) fn handle_stats_disabled_by_agent(
workers,
get_span_kinds_for_stats(agent_info),
agent_info.info.peer_tags.clone().unwrap_or_default(),
client,
);
match status {
Ok(()) => info!("Client-side stats enabled"),
Expand Down
7 changes: 5 additions & 2 deletions datadog-sidecar/src/service/tracing/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use datadog_ipc::platform::NamedShmHandle;
use datadog_trace_utils::trace_utils;
use datadog_trace_utils::trace_utils::SendData;
use datadog_trace_utils::trace_utils::SendDataResult;
use ddcommon::{Endpoint, MutexExt};
use ddcommon::hyper_migration::new_default_client;
use ddcommon::{Endpoint, HttpClient, MutexExt};
use futures::future::join_all;
use http_body_util::BodyExt;
use manual_future::{ManualFuture, ManualFutureCompleter};
Expand Down Expand Up @@ -95,6 +96,7 @@ pub(crate) struct TraceFlusher {
pub(crate) min_force_drop_size_bytes: AtomicU32, // put a limit on memory usage
remote_config: Mutex<AgentRemoteConfigs>,
pub metrics: Mutex<TraceFlusherMetrics>,
client: HttpClient,
}
impl Default for TraceFlusher {
fn default() -> Self {
Expand All @@ -105,6 +107,7 @@ impl Default for TraceFlusher {
min_force_drop_size_bytes: AtomicU32::new(trace_utils::MAX_PAYLOAD_SIZE as u32),
remote_config: Mutex::new(Default::default()),
metrics: Mutex::new(Default::default()),
client: new_default_client(),
}
}
}
Expand Down Expand Up @@ -246,7 +249,7 @@ impl TraceFlusher {

async fn send_and_handle_trace(&self, send_data: SendData) {
let endpoint = send_data.get_target().clone();
let response = send_data.send().await;
let response = send_data.send(&self.client).await;
self.metrics.lock_or_panic().update(&response);
match response.last_result {
Ok(response) => {
Expand Down
8 changes: 1 addition & 7 deletions datadog-trace-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ tinybytes = { path = "../tinybytes", features = [
"serialization",
] }

# Proxy feature
hyper-http-proxy = { version = "1.1.0", default-features = false, features = [
"rustls-tls-webpki-roots",
], optional = true }

# Compression feature
flate2 = { version = "1.0", optional = true }
zstd = { version = "0.13.3", default-features = false, optional = true }
Expand All @@ -66,15 +61,14 @@ tempfile = "3.3.0"
[features]
default = ["https"]
https = ["ddcommon/https"]
mini_agent = ["proxy", "compression", "ddcommon/use_webpki_roots"]
mini_agent = ["compression", "ddcommon/use_webpki_roots"]
test-utils = [
"hyper/server",
"httpmock",
"cargo_metadata",
"cargo-platform",
"urlencoding",
]
proxy = ["hyper-http-proxy"]
compression = ["zstd", "flate2"]
# FIPS mode uses the FIPS-compliant cryptographic provider (Unix only)
fips = ["ddcommon/fips"]
Loading
Loading