diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 603e4e9ea..85a71b14b 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -104,6 +104,7 @@ pub struct Config { pub logs_config_compression_level: i32, pub serverless_flush_strategy: FlushStrategy, pub enhanced_metrics: bool, + //flush timeout in seconds pub flush_timeout: u64, //TODO go agent adds jitter too pub https_proxy: Option, pub capture_lambda_payload: bool, diff --git a/bottlecap/src/http_client.rs b/bottlecap/src/http_client.rs index d653fe227..84d5b4ac1 100644 --- a/bottlecap/src/http_client.rs +++ b/bottlecap/src/http_client.rs @@ -3,18 +3,16 @@ use core::time::Duration; use std::sync::Arc; use tracing::error; +#[must_use] pub fn get_client(config: Arc) -> reqwest::Client { - match build_client(config) { - Ok(client) => client, - Err(e) => { - error!( - "Unable to parse proxy configuration: {}, no proxy will be used", - e - ); - //TODO this fallback doesn't respect the flush timeout - reqwest::Client::new() - } - } + build_client(config).unwrap_or_else(|e| { + error!( + "Unable to parse proxy configuration: {}, no proxy will be used", + e + ); + //TODO this fallback doesn't respect the flush timeout + reqwest::Client::new() + }) } fn build_client(config: Arc) -> Result { diff --git a/bottlecap/src/logs/flusher.rs b/bottlecap/src/logs/flusher.rs index c7717ef8b..460175e9f 100644 --- a/bottlecap/src/logs/flusher.rs +++ b/bottlecap/src/logs/flusher.rs @@ -1,6 +1,7 @@ use crate::config; use crate::http_client; use crate::logs::aggregator::Aggregator; +use std::time::Instant; use std::{ error::Error, io::Write, @@ -85,13 +86,13 @@ impl Flusher { compression_enabled: bool, compression_level: i32, ) { - let url = format!("{fqdn}/api/v2/logs"); if !data.is_empty() { + let url = format!("{fqdn}/api/v2/logs"); + let start = Instant::now(); let body = if compression_enabled { let result = (|| -> Result, Box> { let mut encoder = Encoder::new(Vec::new(), compression_level) .map_err(|e| Box::new(e) as Box)?; - encoder .write_all(&data) .map_err(|e| Box::new(e) as Box)?; @@ -120,14 +121,24 @@ impl Flusher { }; let resp: Result = req.body(body).send().await; + let elapsed = start.elapsed(); + match resp { Ok(resp) => { if resp.status() != 202 { - debug!("Failed to send logs to datadog: {}", resp.status()); + debug!( + "Failed to send logs to datadog after {}ms: {}", + elapsed.as_millis(), + resp.status() + ); } } Err(e) => { - error!("Failed to send logs to datadog: {}", e); + error!( + "Failed to send logs to datadog after {}ms: {}", + elapsed.as_millis(), + e + ); } } } diff --git a/bottlecap/src/tags/lambda/tags.rs b/bottlecap/src/tags/lambda/tags.rs index d2e91b695..4f21174bf 100644 --- a/bottlecap/src/tags/lambda/tags.rs +++ b/bottlecap/src/tags/lambda/tags.rs @@ -1,5 +1,5 @@ use crate::config; -use std::collections::hash_map; +use std::collections::HashMap; use std::env::consts::ARCH; use std::fs; use std::sync::Arc; @@ -66,7 +66,7 @@ const RESOURCE_KEY: &str = "resource"; #[derive(Debug, Clone)] pub struct Lambda { - tags_map: hash_map::HashMap, + tags_map: HashMap, } fn arch_to_platform<'a>() -> &'a str { @@ -77,10 +77,10 @@ fn arch_to_platform<'a>() -> &'a str { } fn tags_from_env( - mut tags_map: hash_map::HashMap, + mut tags_map: HashMap, config: Arc, - metadata: &hash_map::HashMap, -) -> hash_map::HashMap { + metadata: &HashMap, +) -> HashMap { if metadata.contains_key(FUNCTION_ARN_KEY) { let parts = metadata[FUNCTION_ARN_KEY].split(':').collect::>(); if parts.len() > 6 { @@ -222,10 +222,10 @@ impl Lambda { #[must_use] pub fn new_from_config( config: Arc, - metadata: &hash_map::HashMap, + metadata: &HashMap, ) -> Self { Lambda { - tags_map: tags_from_env(hash_map::HashMap::new(), config, metadata), + tags_map: tags_from_env(HashMap::new(), config, metadata), } } @@ -248,19 +248,19 @@ impl Lambda { } #[must_use] - pub fn get_tags_map(&self) -> &hash_map::HashMap { + pub fn get_tags_map(&self) -> &HashMap { &self.tags_map } #[must_use] - pub fn get_function_tags_map(&self) -> hash_map::HashMap { + pub fn get_function_tags_map(&self) -> HashMap { let tags = self .tags_map .iter() .map(|(k, v)| format!("{k}:{v}")) .collect::>() .join(","); - hash_map::HashMap::from_iter([(FUNCTION_TAGS_KEY.to_string(), tags)]) + HashMap::from_iter([(FUNCTION_TAGS_KEY.to_string(), tags)]) } } @@ -270,13 +270,14 @@ mod tests { use super::*; use crate::config::Config; use serial_test::serial; + use std::collections::HashMap; use std::fs::File; use std::io::Write; use std::path::Path; #[test] fn test_new_from_config() { - let metadata = hash_map::HashMap::new(); + let metadata = HashMap::new(); let tags = Lambda::new_from_config(Arc::new(Config::default()), &metadata); assert_eq!(tags.tags_map.len(), 3); assert_eq!( @@ -297,7 +298,7 @@ mod tests { #[test] fn test_new_with_function_arn_metadata() { - let mut metadata = hash_map::HashMap::new(); + let mut metadata = HashMap::new(); metadata.insert( FUNCTION_ARN_KEY.to_string(), "arn:aws:lambda:us-west-2:123456789012:function:my-function".to_string(), @@ -317,7 +318,7 @@ mod tests { #[test] #[serial] //run test serially since it sets and unsets env vars fn test_with_lambda_env_vars() { - let mut metadata = hash_map::HashMap::new(); + let mut metadata = HashMap::new(); metadata.insert( FUNCTION_ARN_KEY.to_string(), "arn:aws:lambda:us-west-2:123456789012:function:My-function".to_string(), @@ -387,7 +388,7 @@ mod tests { #[test] fn test_get_function_tags_map() { - let mut metadata = hash_map::HashMap::new(); + let mut metadata = HashMap::new(); metadata.insert( FUNCTION_ARN_KEY.to_string(), "arn:aws:lambda:us-west-2:123456789012:function:my-function".to_string(), @@ -402,7 +403,7 @@ mod tests { let tags = Lambda::new_from_config(config, &metadata); let function_tags = tags.get_function_tags_map(); assert_eq!(function_tags.len(), 1); - let fn_tags_map: hash_map::HashMap = function_tags + let fn_tags_map: HashMap = function_tags .get(FUNCTION_TAGS_KEY) .unwrap() .split(',') diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 8e2334aa2..ce8d978c5 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -5,13 +5,13 @@ use async_trait::async_trait; use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; -use tracing::{debug, error}; use crate::config; use crate::traces::stats_aggregator::StatsAggregator; use datadog_trace_protobuf::pb; use datadog_trace_utils::{config_utils::trace_stats_url, stats_utils}; use ddcommon::Endpoint; +use tracing::{debug, error}; #[async_trait] pub trait StatsFlusher { @@ -49,7 +49,7 @@ impl StatsFlusher for ServerlessStatsFlusher { let endpoint = Endpoint { url: hyper::Uri::from_str(&stats_url).expect("can't make URI from stats url, exiting"), api_key: Some(api_key.clone().into()), - timeout_ms: Endpoint::DEFAULT_TIMEOUT, + timeout_ms: config.flush_timeout * 1_000, test_token: None, }; @@ -60,16 +60,6 @@ impl StatsFlusher for ServerlessStatsFlusher { } } - async fn flush(&self) { - let mut guard = self.aggregator.lock().await; - - let mut stats = guard.get_batch(); - while !stats.is_empty() { - self.send(stats).await; - - stats = guard.get_batch(); - } - } async fn send(&self, stats: Vec) { if stats.is_empty() { return; @@ -88,17 +78,37 @@ impl StatsFlusher for ServerlessStatsFlusher { } }; - match stats_utils::send_stats_payload( + let stats_url = trace_stats_url(&self.config.site); + + let start = std::time::Instant::now(); + + let resp = stats_utils::send_stats_payload( serialized_stats_payload, &self.endpoint, &self.config.api_key, ) - .await - { + .await; + let elapsed = start.elapsed(); + debug!( + "Stats request to {} took {}ms", + stats_url, + elapsed.as_millis() + ); + match resp { Ok(()) => debug!("Successfully flushed stats"), Err(e) => { error!("Error sending stats: {e:?}"); } + }; + } + async fn flush(&self) { + let mut guard = self.aggregator.lock().await; + + let mut stats = guard.get_batch(); + while !stats.is_empty() { + self.send(stats).await; + + stats = guard.get_batch(); } } } diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index be137df64..c89834b09 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -50,6 +50,8 @@ impl TraceFlusher for ServerlessTraceFlusher { if traces.is_empty() { return; } + + let start = std::time::Instant::now(); debug!("Flushing {} traces", traces.len()); for traces in trace_utils::coalesce_send_data(traces) { @@ -65,5 +67,6 @@ impl TraceFlusher for ServerlessTraceFlusher { } } } + debug!("Flushing traces took {}ms", start.elapsed().as_millis()); } } diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 9a8d64f5a..f526f1a77 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -1,11 +1,22 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::config; use crate::tags::provider; use crate::traces::span_pointers::{attach_span_pointers_to_meta, SpanPointer}; +use crate::traces::{ + AWS_XRAY_DAEMON_ADDRESS_URL_PREFIX, DNS_LOCAL_HOST_ADDRESS_URL_PREFIX, + DNS_NON_ROUTABLE_ADDRESS_URL_PREFIX, INVOCATION_SPAN_RESOURCE, LAMBDA_EXTENSION_URL_PREFIX, + LAMBDA_RUNTIME_URL_PREFIX, LAMBDA_STATSD_URL_PREFIX, +}; +use datadog_trace_obfuscation::obfuscate::obfuscate_span; use datadog_trace_obfuscation::obfuscation_config; use datadog_trace_protobuf::pb; +use datadog_trace_protobuf::pb::Span; use datadog_trace_utils::config_utils::trace_intake_url; +use datadog_trace_utils::send_data::{RetryBackoffType, RetryStrategy}; +use datadog_trace_utils::trace_utils::SendData; +use datadog_trace_utils::trace_utils::{self}; use datadog_trace_utils::tracer_header_tags; use datadog_trace_utils::tracer_payload::{ TraceChunkProcessor, TraceCollection::V07, TracerPayloadCollection, @@ -14,16 +25,6 @@ use ddcommon::Endpoint; use std::str::FromStr; use std::sync::Arc; -use crate::config; -use crate::traces::{ - AWS_XRAY_DAEMON_ADDRESS_URL_PREFIX, DNS_LOCAL_HOST_ADDRESS_URL_PREFIX, - DNS_NON_ROUTABLE_ADDRESS_URL_PREFIX, INVOCATION_SPAN_RESOURCE, LAMBDA_EXTENSION_URL_PREFIX, - LAMBDA_RUNTIME_URL_PREFIX, LAMBDA_STATSD_URL_PREFIX, -}; -use datadog_trace_obfuscation::obfuscate::obfuscate_span; -use datadog_trace_protobuf::pb::Span; -use datadog_trace_utils::trace_utils::{self, SendData}; - #[derive(Clone)] #[allow(clippy::module_name_repetitions)] pub struct ServerlessTraceProcessor { @@ -161,11 +162,18 @@ impl TraceProcessor for ServerlessTraceProcessor { let endpoint = Endpoint { url: hyper::Uri::from_str(&intake_url).expect("can't parse trace intake URL, exiting"), api_key: Some(self.resolved_api_key.clone().into()), - timeout_ms: Endpoint::DEFAULT_TIMEOUT, + timeout_ms: config.flush_timeout * 1_000, test_token: None, }; - SendData::new(body_size, payload, header_tags, &endpoint) + let mut send_data = SendData::new(body_size, payload, header_tags, &endpoint); + send_data.set_retry_strategy(RetryStrategy::new( + 1, + 100, + RetryBackoffType::Exponential, + None, + )); + send_data } }