diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 9c58b93e6..1d5c699ee 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -433,6 +433,7 @@ dependencies = [ "log", "nix", "proptest", + "prost 0.11.9", "protobuf", "rand", "regex", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 467f1435b..b50462757 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -39,6 +39,7 @@ base64 = { version = "0.22", default-features = false } rmp-serde = { version = "1.3.0", default-features = false } rustls = { version = "0.23.18", default-features = false, features = ["aws-lc-rs"] } rand = { version = "0.8", default-features = false } +prost = { version = "0.11.6", default-features = false } [dev-dependencies] figment = { version = "0.10", default-features = false, features = ["yaml", "env", "test"] } diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index a34879556..4b765288d 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -32,8 +32,9 @@ use bottlecap::{ listener::TelemetryListener, }, traces::{ + stats_aggregator::StatsAggregator, stats_flusher::{self, StatsFlusher}, - stats_processor, trace_agent, + stats_processor, trace_agent, trace_aggregator, trace_flusher::{self, TraceFlusher}, trace_processor, }, @@ -297,11 +298,6 @@ async fn extension_loop_active( }; let mut metrics_flusher = MetricsFlusher::new(flusher_config); - let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher { - buffer: Arc::new(TokioMutex::new(Vec::new())), - config: Arc::clone(config), - }); - // Lifecycle Invocation Processor let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new( Arc::clone(&tags_provider), @@ -309,43 +305,9 @@ async fn extension_loop_active( aws_config, Arc::clone(&metrics_aggr), ))); - let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor { - obfuscation_config: Arc::new( - obfuscation_config::ObfuscationConfig::new() - .map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?, - ), - resolved_api_key: resolved_api_key.clone(), - }); - - let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher { - buffer: Arc::new(TokioMutex::new(Vec::new())), - config: Arc::clone(config), - resolved_api_key: resolved_api_key.clone(), - }); - let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {}); - - let trace_flusher_clone = trace_flusher.clone(); - let stats_flusher_clone = stats_flusher.clone(); - - let trace_agent = Box::new( - trace_agent::TraceAgent::new( - Arc::clone(config), - trace_processor.clone(), - trace_flusher_clone, - stats_processor, - stats_flusher_clone, - Arc::clone(&tags_provider), - ) - .await, - ); - let trace_agent_tx = trace_agent.get_sender_copy(); - tokio::spawn(async move { - let res = trace_agent.start().await; - if let Err(e) = res { - error!("Error starting trace agent: {e:?}"); - } - }); + let (trace_agent_channel, trace_flusher, trace_processor, stats_flusher) = + start_trace_agent(config, resolved_api_key.clone(), &tags_provider); let lifecycle_listener = LifecycleListener { invocation_processor: Arc::clone(&invocation_processor), @@ -456,7 +418,7 @@ async fn extension_loop_active( config.clone(), tags_provider.clone(), trace_processor.clone(), - trace_agent_tx.clone() + trace_agent_channel.clone() ).await; } drop(p); @@ -471,8 +433,8 @@ async fn extension_loop_active( tokio::join!( logs_flusher.flush(), metrics_flusher.flush(), - trace_flusher.manual_flush(), - stats_flusher.manual_flush() + trace_flusher.flush(), + stats_flusher.flush() ); } @@ -507,8 +469,8 @@ async fn extension_loop_active( tokio::join!( logs_flusher.flush(), metrics_flusher.flush(), - trace_flusher.manual_flush(), - stats_flusher.manual_flush() + trace_flusher.flush(), + stats_flusher.flush() ); if matches!(flush_control.flush_strategy, FlushStrategy::Periodically(_)) { break; @@ -523,8 +485,8 @@ async fn extension_loop_active( tokio::join!( logs_flusher.flush(), metrics_flusher.flush(), - trace_flusher.manual_flush(), - stats_flusher.manual_flush() + trace_flusher.flush(), + stats_flusher.flush() ); return Ok(()); } @@ -569,6 +531,67 @@ fn start_logs_agent( (logs_agent_channel, logs_flusher) } +fn start_trace_agent( + config: &Arc, + resolved_api_key: String, + tags_provider: &Arc, +) -> ( + Sender, + Arc, + Arc, + Arc, +) { + // Stats + let stats_aggregator = Arc::new(TokioMutex::new(StatsAggregator::default())); + let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new( + resolved_api_key.clone(), + stats_aggregator.clone(), + Arc::clone(config), + )); + + let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {}); + + // Traces + let trace_aggregator = Arc::new(TokioMutex::new(trace_aggregator::TraceAggregator::default())); + let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher { + aggregator: trace_aggregator.clone(), + config: Arc::clone(config), + }); + + let obfuscation_config = obfuscation_config::ObfuscationConfig::new() + .map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string())) + .expect("Failed to create obfuscation config for Trace Agent"); + + let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor { + obfuscation_config: Arc::new(obfuscation_config), + resolved_api_key, + }); + + let trace_agent = Box::new(trace_agent::TraceAgent::new( + Arc::clone(config), + trace_aggregator, + trace_processor.clone(), + stats_aggregator, + stats_processor, + Arc::clone(tags_provider), + )); + let trace_agent_channel = trace_agent.get_sender_copy(); + + tokio::spawn(async move { + let res = trace_agent.start().await; + if let Err(e) = res { + error!("Error starting trace agent: {e:?}"); + } + }); + + ( + trace_agent_channel, + trace_flusher, + trace_processor, + stats_flusher, + ) +} + async fn start_dogstatsd(metrics_aggr: &Arc>) -> CancellationToken { let dogstatsd_config = DogStatsDConfig { host: EXTENSION_HOST.to_string(), diff --git a/bottlecap/src/logs/aggregator.rs b/bottlecap/src/logs/aggregator.rs index 0b05cc0e4..86522dba9 100644 --- a/bottlecap/src/logs/aggregator.rs +++ b/bottlecap/src/logs/aggregator.rs @@ -36,7 +36,7 @@ impl Aggregator { max_batch_entries_size, max_content_size_bytes, max_log_size_bytes, - buffer: Vec::with_capacity(max_batch_entries_size), + buffer: Vec::with_capacity(max_content_size_bytes), } } diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 96b2ee9ab..a44e862a6 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -4,9 +4,11 @@ pub mod context; pub mod propagation; pub mod span_pointers; +pub mod stats_aggregator; pub mod stats_flusher; pub mod stats_processor; pub mod trace_agent; +pub mod trace_aggregator; pub mod trace_flusher; pub mod trace_processor; diff --git a/bottlecap/src/traces/stats_aggregator.rs b/bottlecap/src/traces/stats_aggregator.rs new file mode 100644 index 000000000..d2a3f0597 --- /dev/null +++ b/bottlecap/src/traces/stats_aggregator.rs @@ -0,0 +1,167 @@ +use datadog_trace_protobuf::pb::ClientStatsPayload; +use prost::Message; +use std::collections::VecDeque; + +/// Maximum number of entries in a stat payload. +/// +/// +// const MAX_BATCH_ENTRIES_SIZE: usize = 4000; + +/// Aproximate size an entry in a stat payload occupies +/// +/// +// const MAX_ENTRY_SIZE_BYTES: usize = 375; + +/// Maximum content size per payload in compressed bytes, +/// +/// +const MAX_CONTENT_SIZE_BYTES: usize = 3 * 1024 * 1024; // ~3MB + +#[allow(clippy::module_name_repetitions)] +pub struct StatsAggregator { + queue: VecDeque, + max_content_size_bytes: usize, + buffer: Vec, +} + +impl Default for StatsAggregator { + fn default() -> Self { + StatsAggregator { + queue: VecDeque::new(), + max_content_size_bytes: MAX_CONTENT_SIZE_BYTES, + buffer: Vec::with_capacity(MAX_CONTENT_SIZE_BYTES), + } + } +} + +impl StatsAggregator { + #[allow(dead_code)] + #[allow(clippy::must_use_candidate)] + pub fn new(max_content_size_bytes: usize) -> Self { + StatsAggregator { + queue: VecDeque::new(), + max_content_size_bytes, + buffer: Vec::with_capacity(max_content_size_bytes), + } + } + + pub fn add(&mut self, payload: ClientStatsPayload) { + self.queue.push_back(payload); + } + + pub fn get_batch(&mut self) -> Vec { + let mut batch_size = 0; + + // Fill the batch + while batch_size < self.max_content_size_bytes { + if let Some(payload) = self.queue.pop_front() { + let payload_size = payload.encoded_len(); + + // Put stats back in the queue + if batch_size + payload_size > self.max_content_size_bytes { + self.queue.push_front(payload); + break; + } + batch_size += payload_size; + self.buffer.push(payload); + } else { + break; + } + } + + std::mem::take(&mut self.buffer) + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + + #[test] + fn test_add() { + let mut aggregator = StatsAggregator::default(); + let payload = ClientStatsPayload { + hostname: "hostname".to_string(), + env: "dev".to_string(), + version: "version".to_string(), + stats: vec![], + lang: "rust".to_string(), + tracer_version: "tracer.version".to_string(), + runtime_id: "hash".to_string(), + sequence: 0, + agent_aggregation: "aggregation".to_string(), + service: "service".to_string(), + container_id: "container_id".to_string(), + tags: vec![], + git_commit_sha: "git_commit_sha".to_string(), + image_tag: "image_tag".to_string(), + }; + + aggregator.add(payload.clone()); + assert_eq!(aggregator.queue.len(), 1); + assert_eq!(aggregator.queue[0], payload); + } + + #[test] + fn test_get_batch() { + let mut aggregator = StatsAggregator::default(); + let payload = ClientStatsPayload { + hostname: "hostname".to_string(), + env: "dev".to_string(), + version: "version".to_string(), + stats: vec![], + lang: "rust".to_string(), + tracer_version: "tracer.version".to_string(), + runtime_id: "hash".to_string(), + sequence: 0, + agent_aggregation: "aggregation".to_string(), + service: "service".to_string(), + container_id: "container_id".to_string(), + tags: vec![], + git_commit_sha: "git_commit_sha".to_string(), + image_tag: "image_tag".to_string(), + }; + aggregator.add(payload.clone()); + assert_eq!(aggregator.queue.len(), 1); + let batch = aggregator.get_batch(); + assert_eq!(batch, vec![payload]); + } + + #[test] + fn test_get_batch_full_entries() { + let mut aggregator = StatsAggregator::new(230); + // Payload below is 115 bytes + let payload = ClientStatsPayload { + hostname: "hostname".to_string(), + env: "dev".to_string(), + version: "version".to_string(), + stats: vec![], + lang: "rust".to_string(), + tracer_version: "tracer.version".to_string(), + runtime_id: "hash".to_string(), + sequence: 0, + agent_aggregation: "aggregation".to_string(), + service: "service".to_string(), + container_id: "container_id".to_string(), + tags: vec![], + git_commit_sha: "git_commit_sha".to_string(), + image_tag: "image_tag".to_string(), + }; + + // Add 3 payloads + aggregator.add(payload.clone()); + aggregator.add(payload.clone()); + aggregator.add(payload.clone()); + + // The batch should only contain the first 2 payloads + let first_batch = aggregator.get_batch(); + assert_eq!(first_batch, vec![payload.clone(), payload.clone()]); + assert_eq!(aggregator.queue.len(), 1); + + // The second batch should only contain the last log + let second_batch = aggregator.get_batch(); + assert_eq!(second_batch, vec![payload]); + assert_eq!(aggregator.queue.len(), 0); + } +} diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 3acb28f91..8e2334aa2 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -4,55 +4,73 @@ use async_trait::async_trait; use std::str::FromStr; use std::sync::Arc; -use tokio::sync::{mpsc::Receiver, Mutex}; +use tokio::sync::Mutex; use tracing::{debug, error}; use crate::config; +use crate::traces::stats_aggregator::StatsAggregator; use datadog_trace_protobuf::pb; -use datadog_trace_utils::config_utils::trace_stats_url; -use datadog_trace_utils::stats_utils; +use datadog_trace_utils::{config_utils::trace_stats_url, stats_utils}; use ddcommon::Endpoint; #[async_trait] pub trait StatsFlusher { - /// Starts a stats flusher that listens for stats payloads sent to the tokio mpsc Receiver, - /// implementing flushing logic that calls flush_stats. - async fn start_stats_flusher(&self, mut rx: Receiver); + fn new( + api_key: String, + aggregator: Arc>, + config: Arc, + ) -> Self + where + Self: Sized; /// Flushes stats to the Datadog trace stats intake. - async fn flush_stats(&self, traces: Vec); + async fn send(&self, traces: Vec); - async fn manual_flush(&self); + async fn flush(&self); } #[allow(clippy::module_name_repetitions)] #[derive(Clone)] pub struct ServerlessStatsFlusher { - pub buffer: Arc>>, - pub config: Arc, - pub resolved_api_key: String, + // pub buffer: Arc>>, + aggregator: Arc>, + config: Arc, + endpoint: Endpoint, } #[async_trait] impl StatsFlusher for ServerlessStatsFlusher { - async fn start_stats_flusher(&self, mut rx: Receiver) { - let buffer_producer = self.buffer.clone(); + fn new( + api_key: String, + aggregator: Arc>, + config: Arc, + ) -> Self { + let stats_url = trace_stats_url(&config.site); - tokio::spawn(async move { - while let Some(stats_payload) = rx.recv().await { - let mut buffer = buffer_producer.lock().await; - buffer.push(stats_payload); - } - }); + let endpoint = Endpoint { + url: hyper::Uri::from_str(&stats_url).expect("can't make URI from stats url, exiting"), + api_key: Some(api_key.clone().into()), + timeout_ms: Endpoint::DEFAULT_TIMEOUT, + test_token: None, + }; + + ServerlessStatsFlusher { + aggregator, + config, + endpoint, + } } - async fn manual_flush(&self) { - let mut buffer = self.buffer.lock().await; - if !buffer.is_empty() { - self.flush_stats(buffer.to_vec()).await; - buffer.clear(); + async fn flush(&self) { + let mut guard = self.aggregator.lock().await; + + let mut stats = guard.get_batch(); + while !stats.is_empty() { + self.send(stats).await; + + stats = guard.get_batch(); } } - async fn flush_stats(&self, stats: Vec) { + async fn send(&self, stats: Vec) { if stats.is_empty() { return; } @@ -70,18 +88,9 @@ impl StatsFlusher for ServerlessStatsFlusher { } }; - let stats_url = trace_stats_url(&self.config.site); - - let endpoint = Endpoint { - url: hyper::Uri::from_str(&stats_url).expect("can't make URI from stats url, exiting"), - api_key: Some(self.resolved_api_key.clone().into()), - timeout_ms: Endpoint::DEFAULT_TIMEOUT, - test_token: None, - }; - match stats_utils::send_stats_payload( serialized_stats_payload, - &endpoint, + &self.endpoint, &self.config.api_key, ) .await diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 1446e7a5e..fbf132acb 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -9,11 +9,12 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Instant; use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::Mutex; use tracing::{debug, error}; use crate::config; use crate::tags::provider; -use crate::traces::{stats_flusher, stats_processor, trace_flusher, trace_processor}; +use crate::traces::{stats_aggregator, stats_processor, trace_aggregator, trace_processor}; use datadog_trace_mini_agent::http_utils::{ self, log_and_create_http_response, log_and_create_traces_success_http_response, }; @@ -32,9 +33,8 @@ pub const MAX_CONTENT_LENGTH: usize = 10 * 1024 * 1024; pub struct TraceAgent { pub config: Arc, pub trace_processor: Arc, - pub trace_flusher: Arc, + pub stats_aggregator: Arc>, pub stats_processor: Arc, - pub stats_flusher: Arc, pub tags_provider: Arc, tx: Sender, } @@ -47,31 +47,36 @@ pub enum ApiVersion { impl TraceAgent { #[must_use] - pub async fn new( + #[allow(clippy::too_many_arguments)] + pub fn new( config: Arc, + trace_aggregator: Arc>, trace_processor: Arc, - trace_flusher: Arc, + stats_aggregator: Arc>, stats_processor: Arc, - stats_flusher: Arc, tags_provider: Arc, ) -> TraceAgent { // setup a channel to send processed traces to our flusher. tx is passed through each // endpoint_handler to the trace processor, which uses it to send de-serialized // processed trace payloads to our trace flusher. - let (trace_tx, trace_rx): (Sender, Receiver) = + let (trace_tx, mut trace_rx): (Sender, Receiver) = mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE); // start our trace flusher. receives trace payloads and handles buffering + deciding when to // flush to backend. - let trace_flusher = trace_flusher.clone(); - trace_flusher.start_trace_flusher(trace_rx).await; + + tokio::spawn(async move { + while let Some(tracer_payload) = trace_rx.recv().await { + let mut aggregator = trace_aggregator.lock().await; + aggregator.add(tracer_payload); + } + }); TraceAgent { config, trace_processor, - trace_flusher, + stats_aggregator, stats_processor, - stats_flusher, tags_provider, tx: trace_tx, } @@ -82,17 +87,18 @@ impl TraceAgent { let trace_tx = self.tx.clone(); // channels to send processed stats to our stats flusher. - let (stats_tx, stats_rx): ( + let (stats_tx, mut stats_rx): ( Sender, Receiver, ) = mpsc::channel(STATS_PAYLOAD_CHANNEL_BUFFER_SIZE); - // start our stats flusher. - let stats_flusher = self.stats_flusher.clone(); - // let stats_config = self.config.clone(); + // Receive stats payload and send it to the aggregator + let stats_aggregator = self.stats_aggregator.clone(); tokio::spawn(async move { - let stats_flusher = stats_flusher.clone(); - stats_flusher.start_stats_flusher(stats_rx).await; + while let Some(stats_payload) = stats_rx.recv().await { + let mut aggregator = stats_aggregator.lock().await; + aggregator.add(stats_payload); + } }); // setup our hyper http server, where the endpoint_handler handles incoming requests diff --git a/bottlecap/src/traces/trace_aggregator.rs b/bottlecap/src/traces/trace_aggregator.rs new file mode 100644 index 000000000..beac589fb --- /dev/null +++ b/bottlecap/src/traces/trace_aggregator.rs @@ -0,0 +1,169 @@ +use datadog_trace_utils::send_data::SendData; +use std::collections::VecDeque; + +/// Maximum content size per payload uncompressed in bytes, +/// that the Datadog Trace API accepts. The value is 3.2 MB. +/// +/// +pub const MAX_CONTENT_SIZE_BYTES: usize = (32 * 1_024 * 1_024) / 10; + +#[allow(clippy::module_name_repetitions)] +pub struct TraceAggregator { + queue: VecDeque, + max_content_size_bytes: usize, + buffer: Vec, +} + +impl Default for TraceAggregator { + fn default() -> Self { + TraceAggregator { + queue: VecDeque::new(), + max_content_size_bytes: MAX_CONTENT_SIZE_BYTES, + buffer: Vec::with_capacity(MAX_CONTENT_SIZE_BYTES), + } + } +} + +impl TraceAggregator { + #[allow(dead_code)] + #[allow(clippy::must_use_candidate)] + pub fn new(max_content_size_bytes: usize) -> Self { + TraceAggregator { + queue: VecDeque::new(), + max_content_size_bytes, + buffer: Vec::with_capacity(max_content_size_bytes), + } + } + + pub fn add(&mut self, p: SendData) { + self.queue.push_back(p); + } + + pub fn get_batch(&mut self) -> Vec { + let mut batch_size = 0; + + // Fill the batch + while batch_size < self.max_content_size_bytes { + if let Some(payload) = self.queue.pop_front() { + // TODO(duncanista): revisit if this is bigger than limit + let payload_size = payload.len(); + + // Put stats back in the queue + if batch_size + payload_size > self.max_content_size_bytes { + self.queue.push_front(payload); + break; + } + batch_size += payload_size; + self.buffer.push(payload); + } else { + break; + } + } + + std::mem::take(&mut self.buffer) + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use datadog_trace_utils::{ + trace_utils::TracerHeaderTags, tracer_payload::TracerPayloadCollection, + }; + use ddcommon::Endpoint; + + use super::*; + + #[test] + fn test_add() { + let mut aggregator = TraceAggregator::default(); + let tracer_header_tags = TracerHeaderTags { + lang: "lang", + lang_version: "lang_version", + lang_interpreter: "lang_interpreter", + lang_vendor: "lang_vendor", + tracer_version: "tracer_version", + container_id: "container_id", + client_computed_top_level: true, + client_computed_stats: true, + dropped_p0_traces: 0, + dropped_p0_spans: 0, + }; + let payload = SendData::new( + 1, + TracerPayloadCollection::V07(Vec::new()), + tracer_header_tags, + &Endpoint::from_slice("localhost"), + ); + + aggregator.add(payload.clone()); + assert_eq!(aggregator.queue.len(), 1); + assert_eq!(aggregator.queue[0].is_empty(), payload.is_empty()); + } + + #[test] + fn test_get_batch() { + let mut aggregator = TraceAggregator::default(); + let tracer_header_tags = TracerHeaderTags { + lang: "lang", + lang_version: "lang_version", + lang_interpreter: "lang_interpreter", + lang_vendor: "lang_vendor", + tracer_version: "tracer_version", + container_id: "container_id", + client_computed_top_level: true, + client_computed_stats: true, + dropped_p0_traces: 0, + dropped_p0_spans: 0, + }; + let payload = SendData::new( + 1, + TracerPayloadCollection::V07(Vec::new()), + tracer_header_tags, + &Endpoint::from_slice("localhost"), + ); + + aggregator.add(payload.clone()); + assert_eq!(aggregator.queue.len(), 1); + let batch = aggregator.get_batch(); + assert_eq!(batch.len(), 1); + } + + #[test] + fn test_get_batch_full_entries() { + let mut aggregator = TraceAggregator::new(2); + let tracer_header_tags = TracerHeaderTags { + lang: "lang", + lang_version: "lang_version", + lang_interpreter: "lang_interpreter", + lang_vendor: "lang_vendor", + tracer_version: "tracer_version", + container_id: "container_id", + client_computed_top_level: true, + client_computed_stats: true, + dropped_p0_traces: 0, + dropped_p0_spans: 0, + }; + let payload = SendData::new( + 1, + TracerPayloadCollection::V07(Vec::new()), + tracer_header_tags, + &Endpoint::from_slice("localhost"), + ); + + // Add 3 payloads + aggregator.add(payload.clone()); + aggregator.add(payload.clone()); + aggregator.add(payload.clone()); + + // The batch should only contain the first 2 payloads + let first_batch = aggregator.get_batch(); + assert_eq!(first_batch.len(), 2); + assert_eq!(aggregator.queue.len(), 1); + + // The second batch should only contain the last log + let second_batch = aggregator.get_batch(); + assert_eq!(second_batch.len(), 1); + assert_eq!(aggregator.queue.len(), 0); + } +} diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 92916f1be..be137df64 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -3,52 +3,50 @@ use async_trait::async_trait; use std::sync::Arc; -use tokio::sync::{mpsc::Receiver, Mutex}; +use tokio::sync::Mutex; use tracing::{debug, error}; use datadog_trace_utils::trace_utils::{self, SendData}; use crate::config::Config; +use crate::traces::trace_aggregator::TraceAggregator; #[async_trait] pub trait TraceFlusher { - /// Starts a trace flusher that listens for trace payloads sent to the tokio mpsc Receiver, - /// implementing flushing logic that calls flush_traces. - async fn start_trace_flusher(&self, mut rx: Receiver); + fn new(aggregator: Arc>, config: Arc) -> Self + where + Self: Sized; /// Flushes traces to the Datadog trace intake. - async fn flush_traces(&self, traces: Vec); + async fn send(&self, traces: Vec); - async fn manual_flush(&self); + async fn flush(&self); } #[derive(Clone)] #[allow(clippy::module_name_repetitions)] pub struct ServerlessTraceFlusher { - pub buffer: Arc>>, + pub aggregator: Arc>, pub config: Arc, } #[async_trait] impl TraceFlusher for ServerlessTraceFlusher { - async fn start_trace_flusher(&self, mut rx: Receiver) { - let buffer_producer = self.buffer.clone(); - tokio::spawn(async move { - while let Some(tracer_payload) = rx.recv().await { - let mut buffer = buffer_producer.lock().await; - buffer.push(tracer_payload); - } - }); + fn new(aggregator: Arc>, config: Arc) -> Self { + ServerlessTraceFlusher { aggregator, config } } - async fn manual_flush(&self) { - let mut buffer = self.buffer.lock().await; - if !buffer.is_empty() { - self.flush_traces(buffer.to_vec()).await; - buffer.clear(); + async fn flush(&self) { + let mut guard = self.aggregator.lock().await; + + let mut traces = guard.get_batch(); + while !traces.is_empty() { + self.send(traces).await; + + traces = guard.get_batch(); } } - async fn flush_traces(&self, traces: Vec) { + async fn send(&self, traces: Vec) { if traces.is_empty() { return; } diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index bcb5e5030..9a8d64f5a 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -22,8 +22,7 @@ use crate::traces::{ }; use datadog_trace_obfuscation::obfuscate::obfuscate_span; use datadog_trace_protobuf::pb::Span; -use datadog_trace_utils::trace_utils::SendData; -use datadog_trace_utils::trace_utils::{self}; +use datadog_trace_utils::trace_utils::{self, SendData}; #[derive(Clone)] #[allow(clippy::module_name_repetitions)]