Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ async fn start_dogstatsd(
timeout: DOGSTATSD_TIMEOUT_DURATION,
retry_strategy: RetryStrategy::LinearBackoff(3, 1),
compression_level: CompressionLevel::try_from(6).unwrap_or_default(),
// Not supported yet
ca_cert_path: None,
});
Some(metrics_flusher)
}
Expand Down
1 change: 1 addition & 0 deletions crates/dogstatsd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tracing = { version = "0.1.40", default-features = false }
regex = { version = "1.10.6", default-features = false }
zstd = { version = "0.13.3", default-features = false }
datadog-fips = { path = "../datadog-fips", default-features = false }
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"] }

[dev-dependencies]
mockito = { version = "1.5.0", default-features = false }
Expand Down
53 changes: 50 additions & 3 deletions crates/dogstatsd/src/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use reqwest::{Client, Response};
use serde::{Serialize, Serializer};
use serde_json;
use std::error::Error;
use std::io::Write;
use std::fs::File;
use std::io::{BufReader, Write};
use std::sync::OnceLock;
use std::time::Duration;
use tracing::{debug, error};
Expand Down Expand Up @@ -148,11 +149,12 @@ impl DdApi {
api_key: String,
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
https_proxy: Option<String>,
ca_cert_path: Option<String>,
timeout: Duration,
retry_strategy: RetryStrategy,
compression_level: CompressionLevel,
) -> Self {
let client = build_client(https_proxy, timeout)
let client = build_client(https_proxy, ca_cert_path, timeout)
.inspect_err(|e| {
error!("Unable to create client {:?}", e);
})
Expand Down Expand Up @@ -290,14 +292,59 @@ pub enum RetryStrategy {
LinearBackoff(u64, u64), // attempts, delay
}

fn build_client(https_proxy: Option<String>, timeout: Duration) -> Result<Client, Box<dyn Error>> {
fn build_client(
https_proxy: Option<String>,
ca_cert_path: Option<String>,
timeout: Duration,
) -> Result<Client, Box<dyn Error>> {
let mut builder = create_reqwest_client_builder()?.timeout(timeout);

// Load custom TLS certificate if configured
if let Some(cert_path) = &ca_cert_path {
match load_custom_cert(cert_path) {
Ok(certs) => {
let cert_count = certs.len();
for cert in certs {
builder = builder.add_root_certificate(cert);
}
debug!(
"DOGSTATSD | Added {} root certificate(s) from {}",
cert_count, cert_path
);
}
Err(e) => {
error!(
"DOGSTATSD | Failed to load TLS certificate from {}: {}, continuing without custom cert",
cert_path, e
);
}
}
}

if let Some(proxy) = https_proxy {
builder = builder.proxy(reqwest::Proxy::https(proxy)?);
}
Ok(builder.build()?)
}

fn load_custom_cert(cert_path: &str) -> Result<Vec<reqwest::Certificate>, Box<dyn Error>> {
let file = File::open(cert_path)?;
let mut reader = BufReader::new(file);

// Parse PEM certificates
let certs = rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?;

if certs.is_empty() {
return Err("No certificates found in file".into());
}

// Convert all certificates found in the file
certs
.into_iter()
.map(|cert| reqwest::Certificate::from_der(cert.as_ref()).map_err(Into::into))
.collect()
}

#[derive(Debug, Serialize, Clone, Copy)]
/// A single point in time
pub(crate) struct Point {
Expand Down
7 changes: 7 additions & 0 deletions crates/dogstatsd/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct Flusher {
api_key_factory: Arc<ApiKeyFactory>,
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
https_proxy: Option<String>,
ca_cert_path: Option<String>,
timeout: Duration,
retry_strategy: RetryStrategy,
aggregator_handle: AggregatorHandle,
Expand All @@ -29,6 +30,7 @@ pub struct FlusherConfig {
pub aggregator_handle: AggregatorHandle,
pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
pub https_proxy: Option<String>,
pub ca_cert_path: Option<String>,
pub timeout: Duration,
pub retry_strategy: RetryStrategy,
pub compression_level: CompressionLevel,
Expand All @@ -40,6 +42,7 @@ impl Flusher {
api_key_factory: Arc::clone(&config.api_key_factory),
metrics_intake_url_prefix: config.metrics_intake_url_prefix,
https_proxy: config.https_proxy,
ca_cert_path: config.ca_cert_path,
timeout: config.timeout,
retry_strategy: config.retry_strategy,
aggregator_handle: config.aggregator_handle,
Expand All @@ -57,6 +60,7 @@ impl Flusher {
api_key.to_string(),
self.metrics_intake_url_prefix.clone(),
self.https_proxy.clone(),
self.ca_cert_path.clone(),
self.timeout,
self.retry_strategy.clone(),
self.compression_level,
Expand Down Expand Up @@ -283,6 +287,7 @@ mod tests {
)
.expect("failed to create URL"),
https_proxy: None,
ca_cert_path: None,
timeout: Duration::from_secs(5),
retry_strategy: RetryStrategy::Immediate(1),
compression_level: CompressionLevel::try_from(6)
Expand Down Expand Up @@ -329,6 +334,7 @@ mod tests {
)
.expect("failed to create URL"),
https_proxy: None,
ca_cert_path: None,
timeout: Duration::from_secs(5),
retry_strategy: RetryStrategy::Immediate(1),
compression_level: CompressionLevel::try_from(6)
Expand Down Expand Up @@ -378,6 +384,7 @@ mod tests {
)
.expect("failed to create URL"),
https_proxy: None,
ca_cert_path: None,
timeout: Duration::from_secs(5),
retry_strategy: RetryStrategy::Immediate(1),
compression_level: CompressionLevel::try_from(6)
Expand Down
4 changes: 4 additions & 0 deletions crates/dogstatsd/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async fn dogstatsd_server_ships_series() {
)
.expect("failed to create URL"),
https_proxy: None,
ca_cert_path: None,
timeout: std::time::Duration::from_secs(5),
retry_strategy: RetryStrategy::Immediate(3),
compression_level: CompressionLevel::try_from(6)
Expand Down Expand Up @@ -139,6 +140,7 @@ async fn test_send_with_retry_immediate_failure() {
)
.expect("failed to create URL"),
None,
None,
Duration::from_secs(1),
retry_strategy.clone(),
6,
Expand Down Expand Up @@ -195,6 +197,7 @@ async fn test_send_with_retry_linear_backoff_success() {
)
.expect("failed to create URL"),
None,
None,
Duration::from_secs(1),
retry_strategy.clone(),
6,
Expand Down Expand Up @@ -250,6 +253,7 @@ async fn test_send_with_retry_immediate_failure_after_one_attempt() {
)
.expect("failed to create URL"),
None,
None,
Duration::from_secs(1),
retry_strategy.clone(),
6,
Expand Down
Loading