Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions bottlecap/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ sha2 = { version = "0.10", default-features = false }
hex = { version = "0.4", default-features = false, features = ["std"] }
base64 = { version = "0.22", default-features = false }
rustls = { version = "0.23.18", default-features = false, features = ["aws-lc-rs"] }
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"] }
rustls-pki-types = { version = "1.0", default-features = false }
hyper-rustls = { version = "0.27.7", default-features = false }
rand = { version = "0.8", default-features = false }
prost = { version = "0.13", default-features = false }
zstd = { version = "0.13.3", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions bottlecap/LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ rustc-hash,https://github.com/rust-lang/rustc-hash,Apache-2.0 OR MIT,The Rust Pr
rustix,https://github.com/bytecodealliance/rustix,Apache-2.0 WITH LLVM-exception OR Apache-2.0 OR MIT,"Dan Gohman <dev@sunfishcode.online>, Jakub Konka <kubkon@jakubkonka.com>"
rustls,https://github.com/rustls/rustls,Apache-2.0 OR ISC OR MIT,The rustls Authors
rustls-native-certs,https://github.com/rustls/rustls-native-certs,Apache-2.0 OR ISC OR MIT,The rustls-native-certs Authors
rustls-pemfile,https://github.com/rustls/pemfile,Apache-2.0 OR ISC OR MIT,The rustls-pemfile Authors
rustls-pki-types,https://github.com/rustls/pki-types,MIT OR Apache-2.0,The rustls-pki-types Authors
rustls-webpki,https://github.com/rustls/webpki,ISC,The rustls-webpki Authors
ryu,https://github.com/dtolnay/ryu,Apache-2.0 OR BSL-1.0,David Tolnay <dtolnay@gmail.com>
Expand Down
8 changes: 8 additions & 0 deletions bottlecap/src/config/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ pub struct EnvConfig {
/// The transport type to use for sending logs. Possible values are "auto" or "http1".
#[serde(deserialize_with = "deserialize_optional_string")]
pub http_protocol: Option<String>,
/// @env `DD_SSL_CA_CERT`
/// The SSL CA certificate path to use for the Datadog Agent.
/// Example: `/opt/ca-cert.pem`
#[serde(deserialize_with = "deserialize_optional_string")]
pub ssl_ca_cert: Option<String>,

// Metrics
/// @env `DD_DD_URL`
Expand Down Expand Up @@ -466,6 +471,7 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) {
merge_option!(config, env_config, proxy_https);
merge_vec!(config, env_config, proxy_no_proxy);
merge_option!(config, env_config, http_protocol);
merge_option!(config, env_config, ssl_ca_cert);

// Endpoints
merge_string!(config, env_config, dd_url);
Expand Down Expand Up @@ -695,6 +701,7 @@ mod tests {
jail.set_env("DD_PROXY_HTTPS", "https://proxy.example.com");
jail.set_env("DD_PROXY_NO_PROXY", "localhost,127.0.0.1");
jail.set_env("DD_HTTP_PROTOCOL", "http1");
jail.set_env("DD_SSL_CA_CERT", "/opt/ca-cert.pem");

// Metrics
jail.set_env("DD_DD_URL", "https://metrics.datadoghq.com");
Expand Down Expand Up @@ -850,6 +857,7 @@ mod tests {
proxy_https: Some("https://proxy.example.com".to_string()),
proxy_no_proxy: vec!["localhost".to_string(), "127.0.0.1".to_string()],
http_protocol: Some("http1".to_string()),
ssl_ca_cert: Some("/opt/ca-cert.pem".to_string()),
dd_url: "https://metrics.datadoghq.com".to_string(),
url: "https://app.datadoghq.com".to_string(),
additional_endpoints: HashMap::from([
Expand Down
2 changes: 2 additions & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ pub struct Config {
pub proxy_https: Option<String>,
pub proxy_no_proxy: Vec<String>,
pub http_protocol: Option<String>,
pub ssl_ca_cert: Option<String>,

// Endpoints
pub dd_url: String,
Expand Down Expand Up @@ -366,6 +367,7 @@ impl Default for Config {
proxy_https: None,
proxy_no_proxy: vec![],
http_protocol: None,
ssl_ca_cert: None,

// Endpoints
dd_url: String::default(),
Expand Down
5 changes: 5 additions & 0 deletions bottlecap/src/config/yaml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct YamlConfig {
pub dd_url: Option<String>,
#[serde(deserialize_with = "deserialize_optional_string")]
pub http_protocol: Option<String>,
#[serde(deserialize_with = "deserialize_optional_string")]
pub ssl_ca_cert: Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the DD_SSL_CA_CERT environment variable comes from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.merge(Env::prefixed("DD_"));

It's handled by this code, which puts the value of env var DD_SSL_CA_CERT to the field ssl_ca_cert. Is this what you are asking?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I'm asking where does DD_SSL_CA_CERT name convention comes from, does this come from the Datadog Agent config? Does it come from the docs? Did you came up with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I came up with it. Upon further search, I found tls_ca_cert here:
https://github.com/DataDog/integrations-core/blob/master/http_check/datadog_checks/http_check/data/conf.yaml.example#L477
so I'll rename the env var to DD_SSL_CA_CERT. Does this sound good to you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, although I'd confirm with the Agent team!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Env var name LGTM from the perspective of agent-configuration team!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. Changing to DD_TLS_CERT_FILE to be consistent with:
https://github.com/DataDog/datadog-agent/blob/0638dfce1e1f3a9ae336334d4df01cb2a5e35120/pkg/config/setup/config.go#L1410

The same config option has different names in different places. This PR just picks one of them.


// Endpoints
#[serde(deserialize_with = "deserialize_additional_endpoints")]
Expand Down Expand Up @@ -417,6 +419,7 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) {
merge_option!(config, proxy_https, yaml_config.proxy, https);
merge_option_to_value!(config, proxy_no_proxy, yaml_config.proxy, no_proxy);
merge_option!(config, yaml_config, http_protocol);
merge_option!(config, yaml_config, ssl_ca_cert);

// Endpoints
merge_hashmap!(config, yaml_config, additional_endpoints);
Expand Down Expand Up @@ -747,6 +750,7 @@ proxy:
no_proxy: ["localhost", "127.0.0.1"]
dd_url: "https://metrics.datadoghq.com"
http_protocol: "http1"
ssl_ca_cert: "/opt/ca-cert.pem"

# Endpoints
additional_endpoints:
Expand Down Expand Up @@ -882,6 +886,7 @@ api_security_sample_delay: 60 # Seconds
proxy_https: Some("https://proxy.example.com".to_string()),
proxy_no_proxy: vec!["localhost".to_string(), "127.0.0.1".to_string()],
http_protocol: Some("http1".to_string()),
ssl_ca_cert: Some("/opt/ca-cert.pem".to_string()),
dd_url: "https://metrics.datadoghq.com".to_string(),
url: String::new(), // doesnt exist in yaml
additional_endpoints: HashMap::from([
Expand Down
7 changes: 4 additions & 3 deletions bottlecap/src/traces/stats_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ impl StatsFlusher for ServerlessStatsFlusher {

let start = std::time::Instant::now();

let Ok(http_client) =
ServerlessTraceFlusher::get_http_client(self.config.proxy_https.as_ref())
else {
let Ok(http_client) = ServerlessTraceFlusher::get_http_client(
self.config.proxy_https.as_ref(),
self.config.ssl_ca_cert.as_ref(),
) else {
error!("STATS_FLUSHER | Failed to create HTTP client");
return;
};
Expand Down
98 changes: 86 additions & 12 deletions bottlecap/src/traces/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@
use async_trait::async_trait;
use dogstatsd::api_key::ApiKeyFactory;
use hyper_http_proxy;
use hyper_rustls::HttpsConnectorBuilder;
use libdd_common::{Endpoint, GenericHttpClient, hyper_migration};
use libdd_trace_utils::{
config_utils::trace_intake_url_prefixed,
send_data::SendDataBuilder,
trace_utils::{self, SendData},
};
use rustls::RootCertStore;
use rustls_pki_types::CertificateDer;
use std::error::Error;
use std::fs::File;
use std::io::BufReader;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
use tokio::task::JoinSet;
use tracing::{debug, error};

Expand All @@ -35,6 +41,7 @@ pub trait TraceFlusher {
traces: Vec<SendData>,
endpoint: Option<&Endpoint>,
proxy_https: &Option<String>,
ssl_ca_cert: &Option<String>,
) -> Option<Vec<SendData>>;

/// Flushes traces by getting every available batch on the aggregator.
Expand Down Expand Up @@ -104,7 +111,13 @@ impl TraceFlusher for ServerlessTraceFlusher {
"TRACES | Retrying to send {} previously failed batches",
traces.len()
);
let retry_result = Self::send(traces, None, &self.config.proxy_https).await;
let retry_result = Self::send(
traces,
None,
&self.config.proxy_https,
&self.config.ssl_ca_cert,
)
.await;
if retry_result.is_some() {
// Still failed, return to retry later
return retry_result;
Expand All @@ -131,13 +144,17 @@ impl TraceFlusher for ServerlessTraceFlusher {

let traces_clone = traces.clone();
let proxy_https = self.config.proxy_https.clone();
batch_tasks.spawn(async move { Self::send(traces_clone, None, &proxy_https).await });
let ssl_ca_cert = self.config.ssl_ca_cert.clone();
batch_tasks.spawn(async move {
Self::send(traces_clone, None, &proxy_https, &ssl_ca_cert).await
});

for endpoint in self.additional_endpoints.clone() {
let traces_clone = traces.clone();
let proxy_https = self.config.proxy_https.clone();
let ssl_ca_cert = self.config.ssl_ca_cert.clone();
batch_tasks.spawn(async move {
Self::send(traces_clone, Some(&endpoint), &proxy_https).await
Self::send(traces_clone, Some(&endpoint), &proxy_https, &ssl_ca_cert).await
});
}
}
Expand All @@ -158,6 +175,7 @@ impl TraceFlusher for ServerlessTraceFlusher {
traces: Vec<SendData>,
endpoint: Option<&Endpoint>,
proxy_https: &Option<String>,
ssl_ca_cert: &Option<String>,
) -> Option<Vec<SendData>> {
if traces.is_empty() {
return None;
Expand All @@ -167,7 +185,9 @@ impl TraceFlusher for ServerlessTraceFlusher {
tokio::task::yield_now().await;
debug!("TRACES | Flushing {} traces", coalesced_traces.len());

let Ok(http_client) = ServerlessTraceFlusher::get_http_client(proxy_https.as_ref()) else {
let Ok(http_client) =
ServerlessTraceFlusher::get_http_client(proxy_https.as_ref(), ssl_ca_cert.as_ref())
else {
error!("TRACES | Failed to create HTTP client");
return None;
};
Expand All @@ -192,25 +212,79 @@ impl TraceFlusher for ServerlessTraceFlusher {
}
}

// Initialize the crypto provider needed for setting custom root certificates
fn ensure_crypto_provider_initialized() {
static INIT_CRYPTO_PROVIDER: LazyLock<()> = LazyLock::new(|| {
#[cfg(unix)]
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.expect("Failed to install default CryptoProvider");
});

let () = &*INIT_CRYPTO_PROVIDER;
}

impl ServerlessTraceFlusher {
pub fn get_http_client(
proxy_https: Option<&String>,
ssl_ca_cert: Option<&String>,
) -> Result<
GenericHttpClient<hyper_http_proxy::ProxyConnector<libdd_common::connector::Connector>>,
Box<dyn Error>,
> {
// Create the base connector with optional custom TLS config
let connector = if let Some(ca_cert_path) = ssl_ca_cert {
// Ensure crypto provider is initialized before creating TLS config
ensure_crypto_provider_initialized();

// Load the custom certificate
let cert_file = File::open(ca_cert_path)?;
let mut reader = BufReader::new(cert_file);
let certs: Vec<CertificateDer> =
rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?;

// Create a root certificate store and add custom certs
let mut root_store = RootCertStore::empty();
for cert in certs {
root_store.add(cert)?;
}

// Build the TLS config with custom root certificates
let tls_config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();

// Build the HTTPS connector with custom config
let https_connector = HttpsConnectorBuilder::new()
.with_tls_config(tls_config)
.https_or_http()
.enable_http1()
.build();

debug!(
"TRACES | GET_HTTP_CLIENT | Added root certificate from {}",
ca_cert_path
);

// Construct the Connector::Https variant directly
libdd_common::connector::Connector::Https(https_connector)
} else {
// Use default connector
libdd_common::connector::Connector::default()
};

if let Some(proxy) = proxy_https {
let proxy =
hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::Https, proxy.parse()?);
let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy(
libdd_common::connector::Connector::default(),
proxy,
)?;
Ok(hyper_migration::client_builder().build(proxy_connector))
let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy(connector, proxy)?;
let client = hyper_migration::client_builder().build(proxy_connector);
debug!(
"TRACES | GET_HTTP_CLIENT | Proxy connector created with proxy: {:?}",
proxy_https
);
Ok(client)
} else {
let proxy_connector = hyper_http_proxy::ProxyConnector::new(
libdd_common::connector::Connector::default(),
)?;
let proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?;
Ok(hyper_migration::client_builder().build(proxy_connector))
}
}
Expand Down
Loading