Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bottlecap/src/config/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,13 @@ pub struct EnvConfig {
/// Default is `false`.
#[serde(deserialize_with = "deserialize_optional_bool_from_anything")]
pub compute_trace_stats_on_extension: Option<bool>,
/// @env `DD_SPAN_DEDUP_TIMEOUT`
///
/// The timeout for the span deduplication service to check if a span key exists, in seconds.
/// For now, this is a temporary field added to debug the failure of `check_and_add()` in span dedup service.
/// Do not use this field extensively in production.
#[serde(deserialize_with = "deserialize_optional_duration_from_seconds_ignore_zero")]
pub span_dedup_timeout: Option<Duration>,
/// @env `DD_API_KEY_SECRET_RELOAD_INTERVAL`
///
/// The interval at which the Datadog API key is reloaded, in seconds.
Expand Down Expand Up @@ -640,6 +647,7 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) {
merge_option_to_value!(config, env_config, capture_lambda_payload);
merge_option_to_value!(config, env_config, capture_lambda_payload_max_depth);
merge_option_to_value!(config, env_config, compute_trace_stats_on_extension);
merge_option!(config, env_config, span_dedup_timeout);
merge_option!(config, env_config, api_key_secret_reload_interval);
merge_option_to_value!(config, env_config, serverless_appsec_enabled);
merge_option!(config, env_config, appsec_rules);
Expand Down Expand Up @@ -835,6 +843,7 @@ mod tests {
jail.set_env("DD_CAPTURE_LAMBDA_PAYLOAD", "true");
jail.set_env("DD_CAPTURE_LAMBDA_PAYLOAD_MAX_DEPTH", "5");
jail.set_env("DD_COMPUTE_TRACE_STATS_ON_EXTENSION", "true");
jail.set_env("DD_SPAN_DEDUP_TIMEOUT", "5");
jail.set_env("DD_API_KEY_SECRET_RELOAD_INTERVAL", "10");
jail.set_env("DD_SERVERLESS_APPSEC_ENABLED", "true");
jail.set_env("DD_APPSEC_RULES", "/path/to/rules.json");
Expand Down Expand Up @@ -988,6 +997,7 @@ mod tests {
capture_lambda_payload: true,
capture_lambda_payload_max_depth: 5,
compute_trace_stats_on_extension: true,
span_dedup_timeout: Some(Duration::from_secs(5)),
api_key_secret_reload_interval: Some(Duration::from_secs(10)),
serverless_appsec_enabled: true,
appsec_rules: Some("/path/to/rules.json".to_string()),
Expand Down
2 changes: 2 additions & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ pub struct Config {
pub capture_lambda_payload: bool,
pub capture_lambda_payload_max_depth: u32,
pub compute_trace_stats_on_extension: bool,
pub span_dedup_timeout: Option<Duration>,
pub api_key_secret_reload_interval: Option<Duration>,

pub serverless_appsec_enabled: bool,
Expand Down Expand Up @@ -451,6 +452,7 @@ impl Default for Config {
capture_lambda_payload: false,
capture_lambda_payload_max_depth: 10,
compute_trace_stats_on_extension: false,
span_dedup_timeout: None,
api_key_secret_reload_interval: None,

serverless_appsec_enabled: false,
Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/config/yaml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ api_security_sample_delay: 60 # Seconds
capture_lambda_payload: true,
capture_lambda_payload_max_depth: 5,
compute_trace_stats_on_extension: true,
span_dedup_timeout: None,
api_key_secret_reload_interval: None,

serverless_appsec_enabled: true,
Expand Down
52 changes: 35 additions & 17 deletions bottlecap/src/traces/span_dedup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

use tokio::sync::{mpsc, oneshot};
use tracing::error;
use tokio::time::Duration;
use tracing::warn;

use crate::traces::span_dedup::{DedupKey, Deduper};

Expand All @@ -12,6 +13,8 @@ pub enum DedupError {
SendError(mpsc::error::SendError<DedupCommand>),
#[error("Failed to receive response from deduper: {0}")]
RecvError(oneshot::error::RecvError),
#[error("Timeout waiting for response from deduper")]
Timeout,
}

pub enum DedupCommand {
Expand All @@ -34,14 +37,29 @@ impl DedupHandle {
///
/// # Errors
///
/// Returns an error if the command cannot be sent to the deduper service
/// or if the response cannot be received.
pub async fn check_and_add(&self, key: DedupKey) -> Result<bool, DedupError> {
/// Returns an error if the command cannot be sent to the deduper service,
/// if the response cannot be received, or if the operation times out after 5 seconds.
pub async fn check_and_add(
&self,
key: DedupKey,
timeout: Option<Duration>,
) -> Result<bool, DedupError> {
let (response_tx, response_rx) = oneshot::channel();
self.tx
.send(DedupCommand::CheckAndAdd(key, response_tx))
.map_err(DedupError::SendError)?;
response_rx.await.map_err(DedupError::RecvError)

// Sometimes the dedup service fails to send a response for unknown reasons, so we
// add a timeout to avoid blocking the caller forever. We may remove the
// timeout if we can figure out and fix the root cause.
if let Some(timeout) = timeout {
tokio::time::timeout(timeout, response_rx)
.await
.map_err(|_| DedupError::Timeout)?
.map_err(DedupError::RecvError)
} else {
response_rx.await.map_err(DedupError::RecvError)
}
}
}

Expand Down Expand Up @@ -77,7 +95,7 @@ impl DedupService {
DedupCommand::CheckAndAdd(key, response_tx) => {
let was_added = self.deduper.check_and_add(key);
if let Err(e) = response_tx.send(was_added) {
error!("Failed to send check_and_add response: {e:?}");
warn!("Failed to send check_and_add response: {e:?}");
}
}
}
Expand Down Expand Up @@ -112,17 +130,17 @@ mod tests {
let key2 = DedupKey::new(100, 456);

// First call should return true (key was added)
assert!(handle.check_and_add(key1).await.unwrap());
assert!(handle.check_and_add(key1, None).await.unwrap());

// Second call should return false (key already exists)
assert!(!handle.check_and_add(key1).await.unwrap());
assert!(!handle.check_and_add(key1, None).await.unwrap());

// Different key should return true again
assert!(handle.check_and_add(key2).await.unwrap());
assert!(handle.check_and_add(key2, None).await.unwrap());

// Calling again on already-added keys should return false
assert!(!handle.check_and_add(key1).await.unwrap());
assert!(!handle.check_and_add(key2).await.unwrap());
assert!(!handle.check_and_add(key1, None).await.unwrap());
assert!(!handle.check_and_add(key2, None).await.unwrap());
}

#[tokio::test]
Expand All @@ -139,17 +157,17 @@ mod tests {
let key4 = DedupKey::new(4, 40);

// Add 3 keys
assert!(handle.check_and_add(key1).await.unwrap());
assert!(handle.check_and_add(key2).await.unwrap());
assert!(handle.check_and_add(key3).await.unwrap());
assert!(handle.check_and_add(key1, None).await.unwrap());
assert!(handle.check_and_add(key2, None).await.unwrap());
assert!(handle.check_and_add(key3, None).await.unwrap());

// Add a 4th key, should evict the oldest (key1)
assert!(handle.check_and_add(key4).await.unwrap());
assert!(handle.check_and_add(key4, None).await.unwrap());

// Now key1 should be addable again (was evicted)
assert!(handle.check_and_add(key1).await.unwrap());
assert!(handle.check_and_add(key1, None).await.unwrap());

// But key2 should now be evicted
assert!(handle.check_and_add(key2).await.unwrap());
assert!(handle.check_and_add(key2, None).await.unwrap());
}
}
7 changes: 4 additions & 3 deletions bottlecap/src/traces/trace_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::sync::{
};
use tokio_util::sync::CancellationToken;
use tower_http::limit::RequestBodyLimitLayer;
use tracing::{debug, error};
use tracing::{debug, error, warn};

use crate::traces::trace_processor::SendingTraceProcessor;
use crate::{
Expand Down Expand Up @@ -545,7 +545,8 @@ impl TraceAgent {
for mut span in original_chunk {
// Check for duplicates
let key = DedupKey::new(span.trace_id, span.span_id);
let should_keep = match deduper.check_and_add(key).await {
let should_keep = match deduper.check_and_add(key, config.span_dedup_timeout).await
{
Ok(should_keep) => {
if !should_keep {
debug!(
Expand All @@ -556,7 +557,7 @@ impl TraceAgent {
should_keep
}
Err(e) => {
error!("Failed to check span in deduper, keeping span: {e}");
warn!("Failed to check span in deduper, keeping span: {e}");
true
}
};
Expand Down
Loading