Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ async fn start_dogstatsd(
timeout: DOGSTATSD_TIMEOUT_DURATION,
retry_strategy: RetryStrategy::LinearBackoff(3, 1),
compression_level: CompressionLevel::try_from(6).unwrap_or_default(),
// Not supported yet
ca_cert_path: None,
});
Some(metrics_flusher)
}
Expand Down
1 change: 1 addition & 0 deletions crates/dogstatsd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tracing = { version = "0.1.40", default-features = false }
regex = { version = "1.10.6", default-features = false }
zstd = { version = "0.13.3", default-features = false }
datadog-fips = { path = "../datadog-fips", default-features = false }
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"] }

[dev-dependencies]
mockito = { version = "1.5.0", default-features = false }
Expand Down
53 changes: 50 additions & 3 deletions crates/dogstatsd/src/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use reqwest::{Client, Response};
use serde::{Serialize, Serializer};
use serde_json;
use std::error::Error;
use std::io::Write;
use std::fs::File;
use std::io::{BufReader, Write};
use std::sync::OnceLock;
use std::time::Duration;
use tracing::{debug, error};
Expand Down Expand Up @@ -148,11 +149,12 @@ impl DdApi {
api_key: String,
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
https_proxy: Option<String>,
ca_cert_path: Option<String>,
timeout: Duration,
retry_strategy: RetryStrategy,
compression_level: CompressionLevel,
) -> Self {
let client = build_client(https_proxy, timeout)
let client = build_client(https_proxy, ca_cert_path, timeout)
.inspect_err(|e| {
error!("Unable to create client {:?}", e);
})
Expand Down Expand Up @@ -290,14 +292,59 @@ pub enum RetryStrategy {
LinearBackoff(u64, u64), // attempts, delay
}

fn build_client(https_proxy: Option<String>, timeout: Duration) -> Result<Client, Box<dyn Error>> {
fn build_client(
https_proxy: Option<String>,
ca_cert_path: Option<String>,
timeout: Duration,
) -> Result<Client, Box<dyn Error>> {
let mut builder = create_reqwest_client_builder()?.timeout(timeout);

// Load custom TLS certificate if configured
if let Some(cert_path) = &ca_cert_path {
match load_custom_cert(cert_path) {
Ok(certs) => {
let cert_count = certs.len();
for cert in certs {
builder = builder.add_root_certificate(cert);
}
debug!(
"HTTP | Added {} root certificate(s) from {}",
cert_count, cert_path
);
}
Err(e) => {
error!(
"Failed to load TLS certificate from {}: {}, continuing without custom cert",
cert_path, e
);
}
}
}

if let Some(proxy) = https_proxy {
builder = builder.proxy(reqwest::Proxy::https(proxy)?);
}
Ok(builder.build()?)
}

fn load_custom_cert(cert_path: &str) -> Result<Vec<reqwest::Certificate>, Box<dyn Error>> {
let file = File::open(cert_path)?;
let mut reader = BufReader::new(file);

// Parse PEM certificates
let certs = rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?;

if certs.is_empty() {
return Err("No certificates found in file".into());
}

// Convert all certificates found in the file
certs
.into_iter()
.map(|cert| reqwest::Certificate::from_der(cert.as_ref()).map_err(Into::into))
.collect()
}

#[derive(Debug, Serialize, Clone, Copy)]
/// A single point in time
pub(crate) struct Point {
Expand Down
7 changes: 7 additions & 0 deletions crates/dogstatsd/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct Flusher {
api_key_factory: Arc<ApiKeyFactory>,
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
https_proxy: Option<String>,
ca_cert_path: Option<String>,
timeout: Duration,
retry_strategy: RetryStrategy,
aggregator_handle: AggregatorHandle,
Expand All @@ -29,6 +30,7 @@ pub struct FlusherConfig {
pub aggregator_handle: AggregatorHandle,
pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
pub https_proxy: Option<String>,
pub ca_cert_path: Option<String>,
pub timeout: Duration,
pub retry_strategy: RetryStrategy,
pub compression_level: CompressionLevel,
Expand All @@ -40,6 +42,7 @@ impl Flusher {
api_key_factory: Arc::clone(&config.api_key_factory),
metrics_intake_url_prefix: config.metrics_intake_url_prefix,
https_proxy: config.https_proxy,
ca_cert_path: config.ca_cert_path,
timeout: config.timeout,
retry_strategy: config.retry_strategy,
aggregator_handle: config.aggregator_handle,
Expand All @@ -57,6 +60,7 @@ impl Flusher {
api_key.to_string(),
self.metrics_intake_url_prefix.clone(),
self.https_proxy.clone(),
self.ca_cert_path.clone(),
self.timeout,
self.retry_strategy.clone(),
self.compression_level,
Expand Down Expand Up @@ -283,6 +287,7 @@ mod tests {
)
.expect("failed to create URL"),
https_proxy: None,
ca_cert_path: None,
timeout: Duration::from_secs(5),
retry_strategy: RetryStrategy::Immediate(1),
compression_level: CompressionLevel::try_from(6)
Expand Down Expand Up @@ -329,6 +334,7 @@ mod tests {
)
.expect("failed to create URL"),
https_proxy: None,
ca_cert_path: None,
timeout: Duration::from_secs(5),
retry_strategy: RetryStrategy::Immediate(1),
compression_level: CompressionLevel::try_from(6)
Expand Down Expand Up @@ -378,6 +384,7 @@ mod tests {
)
.expect("failed to create URL"),
https_proxy: None,
ca_cert_path: None,
timeout: Duration::from_secs(5),
retry_strategy: RetryStrategy::Immediate(1),
compression_level: CompressionLevel::try_from(6)
Expand Down
4 changes: 4 additions & 0 deletions crates/dogstatsd/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async fn dogstatsd_server_ships_series() {
)
.expect("failed to create URL"),
https_proxy: None,
ca_cert_path: None,
timeout: std::time::Duration::from_secs(5),
retry_strategy: RetryStrategy::Immediate(3),
compression_level: CompressionLevel::try_from(6)
Expand Down Expand Up @@ -139,6 +140,7 @@ async fn test_send_with_retry_immediate_failure() {
)
.expect("failed to create URL"),
None,
None,
Duration::from_secs(1),
retry_strategy.clone(),
6,
Expand Down Expand Up @@ -195,6 +197,7 @@ async fn test_send_with_retry_linear_backoff_success() {
)
.expect("failed to create URL"),
None,
None,
Duration::from_secs(1),
retry_strategy.clone(),
6,
Expand Down Expand Up @@ -250,6 +253,7 @@ async fn test_send_with_retry_immediate_failure_after_one_attempt() {
)
.expect("failed to create URL"),
None,
None,
Duration::from_secs(1),
retry_strategy.clone(),
6,
Expand Down
Loading