Skip to content

Commit 18b49ba

Browse files
authored
[SVLS-7945] feat: Support TLS certificate for dogstatsd flusher (#61)
* [SVLS-7945] feat: Support TLS certificate for dogstatsd flusher * Update datadog-serverless-compat * Update Cargo.toml * Update log message
1 parent fd8b7a9 commit 18b49ba

File tree

6 files changed

+65
-3
lines changed

6 files changed

+65
-3
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/datadog-serverless-compat/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ async fn start_dogstatsd(
224224
timeout: DOGSTATSD_TIMEOUT_DURATION,
225225
retry_strategy: RetryStrategy::LinearBackoff(3, 1),
226226
compression_level: CompressionLevel::try_from(6).unwrap_or_default(),
227+
// Not supported yet
228+
ca_cert_path: None,
227229
});
228230
Some(metrics_flusher)
229231
}

crates/dogstatsd/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ tracing = { version = "0.1.40", default-features = false }
2525
regex = { version = "1.10.6", default-features = false }
2626
zstd = { version = "0.13.3", default-features = false }
2727
datadog-fips = { path = "../datadog-fips", default-features = false }
28+
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"] }
2829

2930
[dev-dependencies]
3031
mockito = { version = "1.5.0", default-features = false }

crates/dogstatsd/src/datadog.rs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use reqwest::{Client, Response};
1414
use serde::{Serialize, Serializer};
1515
use serde_json;
1616
use std::error::Error;
17-
use std::io::Write;
17+
use std::fs::File;
18+
use std::io::{BufReader, Write};
1819
use std::sync::OnceLock;
1920
use std::time::Duration;
2021
use tracing::{debug, error};
@@ -148,11 +149,12 @@ impl DdApi {
148149
api_key: String,
149150
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
150151
https_proxy: Option<String>,
152+
ca_cert_path: Option<String>,
151153
timeout: Duration,
152154
retry_strategy: RetryStrategy,
153155
compression_level: CompressionLevel,
154156
) -> Self {
155-
let client = build_client(https_proxy, timeout)
157+
let client = build_client(https_proxy, ca_cert_path, timeout)
156158
.inspect_err(|e| {
157159
error!("Unable to create client {:?}", e);
158160
})
@@ -290,14 +292,59 @@ pub enum RetryStrategy {
290292
LinearBackoff(u64, u64), // attempts, delay
291293
}
292294

293-
fn build_client(https_proxy: Option<String>, timeout: Duration) -> Result<Client, Box<dyn Error>> {
295+
fn build_client(
296+
https_proxy: Option<String>,
297+
ca_cert_path: Option<String>,
298+
timeout: Duration,
299+
) -> Result<Client, Box<dyn Error>> {
294300
let mut builder = create_reqwest_client_builder()?.timeout(timeout);
301+
302+
// Load custom TLS certificate if configured
303+
if let Some(cert_path) = &ca_cert_path {
304+
match load_custom_cert(cert_path) {
305+
Ok(certs) => {
306+
let cert_count = certs.len();
307+
for cert in certs {
308+
builder = builder.add_root_certificate(cert);
309+
}
310+
debug!(
311+
"DOGSTATSD | Added {} root certificate(s) from {}",
312+
cert_count, cert_path
313+
);
314+
}
315+
Err(e) => {
316+
error!(
317+
"DOGSTATSD | Failed to load TLS certificate from {}: {}, continuing without custom cert",
318+
cert_path, e
319+
);
320+
}
321+
}
322+
}
323+
295324
if let Some(proxy) = https_proxy {
296325
builder = builder.proxy(reqwest::Proxy::https(proxy)?);
297326
}
298327
Ok(builder.build()?)
299328
}
300329

330+
fn load_custom_cert(cert_path: &str) -> Result<Vec<reqwest::Certificate>, Box<dyn Error>> {
331+
let file = File::open(cert_path)?;
332+
let mut reader = BufReader::new(file);
333+
334+
// Parse PEM certificates
335+
let certs = rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?;
336+
337+
if certs.is_empty() {
338+
return Err("No certificates found in file".into());
339+
}
340+
341+
// Convert all certificates found in the file
342+
certs
343+
.into_iter()
344+
.map(|cert| reqwest::Certificate::from_der(cert.as_ref()).map_err(Into::into))
345+
.collect()
346+
}
347+
301348
#[derive(Debug, Serialize, Clone, Copy)]
302349
/// A single point in time
303350
pub(crate) struct Point {

crates/dogstatsd/src/flusher.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub struct Flusher {
1717
api_key_factory: Arc<ApiKeyFactory>,
1818
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
1919
https_proxy: Option<String>,
20+
ca_cert_path: Option<String>,
2021
timeout: Duration,
2122
retry_strategy: RetryStrategy,
2223
aggregator_handle: AggregatorHandle,
@@ -29,6 +30,7 @@ pub struct FlusherConfig {
2930
pub aggregator_handle: AggregatorHandle,
3031
pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
3132
pub https_proxy: Option<String>,
33+
pub ca_cert_path: Option<String>,
3234
pub timeout: Duration,
3335
pub retry_strategy: RetryStrategy,
3436
pub compression_level: CompressionLevel,
@@ -40,6 +42,7 @@ impl Flusher {
4042
api_key_factory: Arc::clone(&config.api_key_factory),
4143
metrics_intake_url_prefix: config.metrics_intake_url_prefix,
4244
https_proxy: config.https_proxy,
45+
ca_cert_path: config.ca_cert_path,
4346
timeout: config.timeout,
4447
retry_strategy: config.retry_strategy,
4548
aggregator_handle: config.aggregator_handle,
@@ -57,6 +60,7 @@ impl Flusher {
5760
api_key.to_string(),
5861
self.metrics_intake_url_prefix.clone(),
5962
self.https_proxy.clone(),
63+
self.ca_cert_path.clone(),
6064
self.timeout,
6165
self.retry_strategy.clone(),
6266
self.compression_level,
@@ -283,6 +287,7 @@ mod tests {
283287
)
284288
.expect("failed to create URL"),
285289
https_proxy: None,
290+
ca_cert_path: None,
286291
timeout: Duration::from_secs(5),
287292
retry_strategy: RetryStrategy::Immediate(1),
288293
compression_level: CompressionLevel::try_from(6)
@@ -329,6 +334,7 @@ mod tests {
329334
)
330335
.expect("failed to create URL"),
331336
https_proxy: None,
337+
ca_cert_path: None,
332338
timeout: Duration::from_secs(5),
333339
retry_strategy: RetryStrategy::Immediate(1),
334340
compression_level: CompressionLevel::try_from(6)
@@ -378,6 +384,7 @@ mod tests {
378384
)
379385
.expect("failed to create URL"),
380386
https_proxy: None,
387+
ca_cert_path: None,
381388
timeout: Duration::from_secs(5),
382389
retry_strategy: RetryStrategy::Immediate(1),
383390
compression_level: CompressionLevel::try_from(6)

crates/dogstatsd/tests/integration_test.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ async fn dogstatsd_server_ships_series() {
5959
)
6060
.expect("failed to create URL"),
6161
https_proxy: None,
62+
ca_cert_path: None,
6263
timeout: std::time::Duration::from_secs(5),
6364
retry_strategy: RetryStrategy::Immediate(3),
6465
compression_level: CompressionLevel::try_from(6)
@@ -139,6 +140,7 @@ async fn test_send_with_retry_immediate_failure() {
139140
)
140141
.expect("failed to create URL"),
141142
None,
143+
None,
142144
Duration::from_secs(1),
143145
retry_strategy.clone(),
144146
6,
@@ -195,6 +197,7 @@ async fn test_send_with_retry_linear_backoff_success() {
195197
)
196198
.expect("failed to create URL"),
197199
None,
200+
None,
198201
Duration::from_secs(1),
199202
retry_strategy.clone(),
200203
6,
@@ -250,6 +253,7 @@ async fn test_send_with_retry_immediate_failure_after_one_attempt() {
250253
)
251254
.expect("failed to create URL"),
252255
None,
256+
None,
253257
Duration::from_secs(1),
254258
retry_strategy.clone(),
255259
6,

0 commit comments

Comments
 (0)