diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index af68a7a39..0bbc199ef 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -417,6 +417,13 @@ pub struct EnvConfig { /// Default is `false`. #[serde(deserialize_with = "deserialize_optional_bool_from_anything")] pub compute_trace_stats_on_extension: Option, + /// @env `DD_SPAN_DEDUP_TIMEOUT` + /// + /// The timeout for the span deduplication service to check if a span key exists, in seconds. + /// For now, this is a temporary field added to debug the failure of `check_and_add()` in span dedup service. + /// Do not use this field extensively in production. + #[serde(deserialize_with = "deserialize_optional_duration_from_seconds_ignore_zero")] + pub span_dedup_timeout: Option, /// @env `DD_API_KEY_SECRET_RELOAD_INTERVAL` /// /// The interval at which the Datadog API key is reloaded, in seconds. @@ -640,6 +647,7 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) { merge_option_to_value!(config, env_config, capture_lambda_payload); merge_option_to_value!(config, env_config, capture_lambda_payload_max_depth); merge_option_to_value!(config, env_config, compute_trace_stats_on_extension); + merge_option!(config, env_config, span_dedup_timeout); merge_option!(config, env_config, api_key_secret_reload_interval); merge_option_to_value!(config, env_config, serverless_appsec_enabled); merge_option!(config, env_config, appsec_rules); @@ -835,6 +843,7 @@ mod tests { jail.set_env("DD_CAPTURE_LAMBDA_PAYLOAD", "true"); jail.set_env("DD_CAPTURE_LAMBDA_PAYLOAD_MAX_DEPTH", "5"); jail.set_env("DD_COMPUTE_TRACE_STATS_ON_EXTENSION", "true"); + jail.set_env("DD_SPAN_DEDUP_TIMEOUT", "5"); jail.set_env("DD_API_KEY_SECRET_RELOAD_INTERVAL", "10"); jail.set_env("DD_SERVERLESS_APPSEC_ENABLED", "true"); jail.set_env("DD_APPSEC_RULES", "/path/to/rules.json"); @@ -988,6 +997,7 @@ mod tests { capture_lambda_payload: true, capture_lambda_payload_max_depth: 5, compute_trace_stats_on_extension: true, + span_dedup_timeout: Some(Duration::from_secs(5)), api_key_secret_reload_interval: Some(Duration::from_secs(10)), serverless_appsec_enabled: true, appsec_rules: Some("/path/to/rules.json".to_string()), diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 997e25807..48bc3c305 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -346,6 +346,7 @@ pub struct Config { pub capture_lambda_payload: bool, pub capture_lambda_payload_max_depth: u32, pub compute_trace_stats_on_extension: bool, + pub span_dedup_timeout: Option, pub api_key_secret_reload_interval: Option, pub serverless_appsec_enabled: bool, @@ -451,6 +452,7 @@ impl Default for Config { capture_lambda_payload: false, capture_lambda_payload_max_depth: 10, compute_trace_stats_on_extension: false, + span_dedup_timeout: None, api_key_secret_reload_interval: None, serverless_appsec_enabled: false, diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index 8075ef361..dbac692b2 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -995,6 +995,7 @@ api_security_sample_delay: 60 # Seconds capture_lambda_payload: true, capture_lambda_payload_max_depth: 5, compute_trace_stats_on_extension: true, + span_dedup_timeout: None, api_key_secret_reload_interval: None, serverless_appsec_enabled: true, diff --git a/bottlecap/src/traces/span_dedup_service.rs b/bottlecap/src/traces/span_dedup_service.rs index 2cfb26d4d..75ca941ff 100644 --- a/bottlecap/src/traces/span_dedup_service.rs +++ b/bottlecap/src/traces/span_dedup_service.rs @@ -2,7 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use tokio::sync::{mpsc, oneshot}; -use tracing::error; +use tokio::time::Duration; +use tracing::warn; use crate::traces::span_dedup::{DedupKey, Deduper}; @@ -12,6 +13,8 @@ pub enum DedupError { SendError(mpsc::error::SendError), #[error("Failed to receive response from deduper: {0}")] RecvError(oneshot::error::RecvError), + #[error("Timeout waiting for response from deduper")] + Timeout, } pub enum DedupCommand { @@ -34,14 +37,29 @@ impl DedupHandle { /// /// # Errors /// - /// Returns an error if the command cannot be sent to the deduper service - /// or if the response cannot be received. - pub async fn check_and_add(&self, key: DedupKey) -> Result { + /// Returns an error if the command cannot be sent to the deduper service, + /// if the response cannot be received, or if the operation times out after 5 seconds. + pub async fn check_and_add( + &self, + key: DedupKey, + timeout: Option, + ) -> Result { let (response_tx, response_rx) = oneshot::channel(); self.tx .send(DedupCommand::CheckAndAdd(key, response_tx)) .map_err(DedupError::SendError)?; - response_rx.await.map_err(DedupError::RecvError) + + // Sometimes the dedup service fails to send a response for unknown reasons, so we + // add a timeout to avoid blocking the caller forever. We may remove the + // timeout if we can figure out and fix the root cause. + if let Some(timeout) = timeout { + tokio::time::timeout(timeout, response_rx) + .await + .map_err(|_| DedupError::Timeout)? + .map_err(DedupError::RecvError) + } else { + response_rx.await.map_err(DedupError::RecvError) + } } } @@ -77,7 +95,7 @@ impl DedupService { DedupCommand::CheckAndAdd(key, response_tx) => { let was_added = self.deduper.check_and_add(key); if let Err(e) = response_tx.send(was_added) { - error!("Failed to send check_and_add response: {e:?}"); + warn!("Failed to send check_and_add response: {e:?}"); } } } @@ -112,17 +130,17 @@ mod tests { let key2 = DedupKey::new(100, 456); // First call should return true (key was added) - assert!(handle.check_and_add(key1).await.unwrap()); + assert!(handle.check_and_add(key1, None).await.unwrap()); // Second call should return false (key already exists) - assert!(!handle.check_and_add(key1).await.unwrap()); + assert!(!handle.check_and_add(key1, None).await.unwrap()); // Different key should return true again - assert!(handle.check_and_add(key2).await.unwrap()); + assert!(handle.check_and_add(key2, None).await.unwrap()); // Calling again on already-added keys should return false - assert!(!handle.check_and_add(key1).await.unwrap()); - assert!(!handle.check_and_add(key2).await.unwrap()); + assert!(!handle.check_and_add(key1, None).await.unwrap()); + assert!(!handle.check_and_add(key2, None).await.unwrap()); } #[tokio::test] @@ -139,17 +157,17 @@ mod tests { let key4 = DedupKey::new(4, 40); // Add 3 keys - assert!(handle.check_and_add(key1).await.unwrap()); - assert!(handle.check_and_add(key2).await.unwrap()); - assert!(handle.check_and_add(key3).await.unwrap()); + assert!(handle.check_and_add(key1, None).await.unwrap()); + assert!(handle.check_and_add(key2, None).await.unwrap()); + assert!(handle.check_and_add(key3, None).await.unwrap()); // Add a 4th key, should evict the oldest (key1) - assert!(handle.check_and_add(key4).await.unwrap()); + assert!(handle.check_and_add(key4, None).await.unwrap()); // Now key1 should be addable again (was evicted) - assert!(handle.check_and_add(key1).await.unwrap()); + assert!(handle.check_and_add(key1, None).await.unwrap()); // But key2 should now be evicted - assert!(handle.check_and_add(key2).await.unwrap()); + assert!(handle.check_and_add(key2, None).await.unwrap()); } } diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 3ef29c783..fd72ec251 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -19,7 +19,7 @@ use tokio::sync::{ }; use tokio_util::sync::CancellationToken; use tower_http::limit::RequestBodyLimitLayer; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use crate::traces::trace_processor::SendingTraceProcessor; use crate::{ @@ -545,7 +545,8 @@ impl TraceAgent { for mut span in original_chunk { // Check for duplicates let key = DedupKey::new(span.trace_id, span.span_id); - let should_keep = match deduper.check_and_add(key).await { + let should_keep = match deduper.check_and_add(key, config.span_dedup_timeout).await + { Ok(should_keep) => { if !should_keep { debug!( @@ -556,7 +557,7 @@ impl TraceAgent { should_keep } Err(e) => { - error!("Failed to check span in deduper, keeping span: {e}"); + warn!("Failed to check span in deduper, keeping span: {e}"); true } };