Skip to content

Commit 3da2b9d

Browse files
authored
feat: add an Aggregator for Traces and Stats (#534)
* add `start_trace_agent` method to remove the boilerplate to start a trace agent, and only return the flushers, channel, and processor * rename methods in flushers from `manual_flush` to `flush` and `flush` to `send` * add `prost` crate * fix logs aggregator typo * add the `MAX_CONTENT_SIZE_BYTES` for traces its `3.2` MB * add `MessageAggregator` generic `prost::Message` aggregator which batches given a `max_content_size_bytes` * refactor `StatsFlusher` a flusher should only flush, not receive messages, we abstract that from here * use `StatsAggregator` accordingly now the `TraceAgent` spins up a task which receives the stats so that they get properly aggregated * add `TraceAggregator` also modify the existing aggregator to be a stats agregator * use `TraceAggregator` in `TraceFlusher` also modified the naming import for `StatsFlusher` * modify `TraceAgent` it doesnt need the flushers to work, so removing that from it, and spawning own tasks for the data channels * remove generic for `StatsAggregator` it seems adding `prost::Message` import allows us to have the methods directly from the message we are treating with * add unit tests for `StatsAggregator` * add unit tests for `TraceAggregator`
1 parent eb349bc commit 3da2b9d

File tree

11 files changed

+500
-125
lines changed

11 files changed

+500
-125
lines changed

bottlecap/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ base64 = { version = "0.22", default-features = false }
3939
rmp-serde = { version = "1.3.0", default-features = false }
4040
rustls = { version = "0.23.18", default-features = false, features = ["aws-lc-rs"] }
4141
rand = { version = "0.8", default-features = false }
42+
prost = { version = "0.11.6", default-features = false }
4243

4344
[dev-dependencies]
4445
figment = { version = "0.10", default-features = false, features = ["yaml", "env", "test"] }

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 72 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ use bottlecap::{
3232
listener::TelemetryListener,
3333
},
3434
traces::{
35+
stats_aggregator::StatsAggregator,
3536
stats_flusher::{self, StatsFlusher},
36-
stats_processor, trace_agent,
37+
stats_processor, trace_agent, trace_aggregator,
3738
trace_flusher::{self, TraceFlusher},
3839
trace_processor,
3940
},
@@ -301,55 +302,16 @@ async fn extension_loop_active(
301302
};
302303
let mut metrics_flusher = MetricsFlusher::new(flusher_config);
303304

304-
let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher {
305-
buffer: Arc::new(TokioMutex::new(Vec::new())),
306-
config: Arc::clone(config),
307-
});
308-
309305
// Lifecycle Invocation Processor
310306
let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new(
311307
Arc::clone(&tags_provider),
312308
Arc::clone(config),
313309
aws_config,
314310
Arc::clone(&metrics_aggr),
315311
)));
316-
let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
317-
obfuscation_config: Arc::new(
318-
obfuscation_config::ObfuscationConfig::new()
319-
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?,
320-
),
321-
resolved_api_key: resolved_api_key.clone(),
322-
});
323-
324-
let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher {
325-
buffer: Arc::new(TokioMutex::new(Vec::new())),
326-
config: Arc::clone(config),
327-
resolved_api_key: resolved_api_key.clone(),
328-
});
329-
let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});
330-
331-
let trace_flusher_clone = trace_flusher.clone();
332-
let stats_flusher_clone = stats_flusher.clone();
333-
334-
let trace_agent = Box::new(
335-
trace_agent::TraceAgent::new(
336-
Arc::clone(config),
337-
trace_processor.clone(),
338-
trace_flusher_clone,
339-
stats_processor,
340-
stats_flusher_clone,
341-
Arc::clone(&tags_provider),
342-
)
343-
.await,
344-
);
345-
let trace_agent_tx = trace_agent.get_sender_copy();
346312

347-
tokio::spawn(async move {
348-
let res = trace_agent.start().await;
349-
if let Err(e) = res {
350-
error!("Error starting trace agent: {e:?}");
351-
}
352-
});
313+
let (trace_agent_channel, trace_flusher, trace_processor, stats_flusher) =
314+
start_trace_agent(config, resolved_api_key.clone(), &tags_provider);
353315

354316
let lifecycle_listener = LifecycleListener {
355317
invocation_processor: Arc::clone(&invocation_processor),
@@ -460,7 +422,7 @@ async fn extension_loop_active(
460422
config.clone(),
461423
tags_provider.clone(),
462424
trace_processor.clone(),
463-
trace_agent_tx.clone()
425+
trace_agent_channel.clone()
464426
).await;
465427
}
466428
drop(p);
@@ -475,8 +437,8 @@ async fn extension_loop_active(
475437
tokio::join!(
476438
logs_flusher.flush(),
477439
metrics_flusher.flush(),
478-
trace_flusher.manual_flush(),
479-
stats_flusher.manual_flush()
440+
trace_flusher.flush(),
441+
stats_flusher.flush()
480442
);
481443
}
482444

@@ -511,8 +473,8 @@ async fn extension_loop_active(
511473
tokio::join!(
512474
logs_flusher.flush(),
513475
metrics_flusher.flush(),
514-
trace_flusher.manual_flush(),
515-
stats_flusher.manual_flush()
476+
trace_flusher.flush(),
477+
stats_flusher.flush()
516478
);
517479
if matches!(flush_control.flush_strategy, FlushStrategy::Periodically(_)) {
518480
break;
@@ -527,8 +489,8 @@ async fn extension_loop_active(
527489
tokio::join!(
528490
logs_flusher.flush(),
529491
metrics_flusher.flush(),
530-
trace_flusher.manual_flush(),
531-
stats_flusher.manual_flush()
492+
trace_flusher.flush(),
493+
stats_flusher.flush()
532494
);
533495
return Ok(());
534496
}
@@ -573,6 +535,67 @@ fn start_logs_agent(
573535
(logs_agent_channel, logs_flusher)
574536
}
575537

538+
fn start_trace_agent(
539+
config: &Arc<Config>,
540+
resolved_api_key: String,
541+
tags_provider: &Arc<TagProvider>,
542+
) -> (
543+
Sender<datadog_trace_utils::send_data::SendData>,
544+
Arc<trace_flusher::ServerlessTraceFlusher>,
545+
Arc<trace_processor::ServerlessTraceProcessor>,
546+
Arc<stats_flusher::ServerlessStatsFlusher>,
547+
) {
548+
// Stats
549+
let stats_aggregator = Arc::new(TokioMutex::new(StatsAggregator::default()));
550+
let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new(
551+
resolved_api_key.clone(),
552+
stats_aggregator.clone(),
553+
Arc::clone(config),
554+
));
555+
556+
let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});
557+
558+
// Traces
559+
let trace_aggregator = Arc::new(TokioMutex::new(trace_aggregator::TraceAggregator::default()));
560+
let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher {
561+
aggregator: trace_aggregator.clone(),
562+
config: Arc::clone(config),
563+
});
564+
565+
let obfuscation_config = obfuscation_config::ObfuscationConfig::new()
566+
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))
567+
.expect("Failed to create obfuscation config for Trace Agent");
568+
569+
let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
570+
obfuscation_config: Arc::new(obfuscation_config),
571+
resolved_api_key,
572+
});
573+
574+
let trace_agent = Box::new(trace_agent::TraceAgent::new(
575+
Arc::clone(config),
576+
trace_aggregator,
577+
trace_processor.clone(),
578+
stats_aggregator,
579+
stats_processor,
580+
Arc::clone(tags_provider),
581+
));
582+
let trace_agent_channel = trace_agent.get_sender_copy();
583+
584+
tokio::spawn(async move {
585+
let res = trace_agent.start().await;
586+
if let Err(e) = res {
587+
error!("Error starting trace agent: {e:?}");
588+
}
589+
});
590+
591+
(
592+
trace_agent_channel,
593+
trace_flusher,
594+
trace_processor,
595+
stats_flusher,
596+
)
597+
}
598+
576599
async fn start_dogstatsd(metrics_aggr: &Arc<Mutex<MetricsAggregator>>) -> CancellationToken {
577600
let dogstatsd_config = DogStatsDConfig {
578601
host: EXTENSION_HOST.to_string(),

bottlecap/src/logs/aggregator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl Aggregator {
3636
max_batch_entries_size,
3737
max_content_size_bytes,
3838
max_log_size_bytes,
39-
buffer: Vec::with_capacity(max_batch_entries_size),
39+
buffer: Vec::with_capacity(max_content_size_bytes),
4040
}
4141
}
4242

bottlecap/src/traces/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
pub mod context;
55
pub mod propagation;
66
pub mod span_pointers;
7+
pub mod stats_aggregator;
78
pub mod stats_flusher;
89
pub mod stats_processor;
910
pub mod trace_agent;
11+
pub mod trace_aggregator;
1012
pub mod trace_flusher;
1113
pub mod trace_processor;
1214

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
use datadog_trace_protobuf::pb::ClientStatsPayload;
2+
use prost::Message;
3+
use std::collections::VecDeque;
4+
5+
/// Maximum number of entries in a stat payload.
6+
///
7+
/// <https://github.com/DataDog/datadog-agent/blob/996dd54337908a6511948fabd2a41420ba919a8b/pkg/trace/writer/stats.go#L35-L41>
8+
// const MAX_BATCH_ENTRIES_SIZE: usize = 4000;
9+
10+
/// Aproximate size an entry in a stat payload occupies
11+
///
12+
/// <https://github.com/DataDog/datadog-agent/blob/996dd54337908a6511948fabd2a41420ba919a8b/pkg/trace/writer/stats.go#L33-L35>
13+
// const MAX_ENTRY_SIZE_BYTES: usize = 375;
14+
15+
/// Maximum content size per payload in compressed bytes,
16+
///
17+
/// <https://github.com/DataDog/datadog-agent/blob/996dd54337908a6511948fabd2a41420ba919a8b/pkg/trace/writer/stats.go#L35-L41>
18+
const MAX_CONTENT_SIZE_BYTES: usize = 3 * 1024 * 1024; // ~3MB
19+
20+
#[allow(clippy::module_name_repetitions)]
21+
pub struct StatsAggregator {
22+
queue: VecDeque<ClientStatsPayload>,
23+
max_content_size_bytes: usize,
24+
buffer: Vec<ClientStatsPayload>,
25+
}
26+
27+
impl Default for StatsAggregator {
28+
fn default() -> Self {
29+
StatsAggregator {
30+
queue: VecDeque::new(),
31+
max_content_size_bytes: MAX_CONTENT_SIZE_BYTES,
32+
buffer: Vec::with_capacity(MAX_CONTENT_SIZE_BYTES),
33+
}
34+
}
35+
}
36+
37+
impl StatsAggregator {
38+
#[allow(dead_code)]
39+
#[allow(clippy::must_use_candidate)]
40+
pub fn new(max_content_size_bytes: usize) -> Self {
41+
StatsAggregator {
42+
queue: VecDeque::new(),
43+
max_content_size_bytes,
44+
buffer: Vec::with_capacity(max_content_size_bytes),
45+
}
46+
}
47+
48+
pub fn add(&mut self, payload: ClientStatsPayload) {
49+
self.queue.push_back(payload);
50+
}
51+
52+
pub fn get_batch(&mut self) -> Vec<ClientStatsPayload> {
53+
let mut batch_size = 0;
54+
55+
// Fill the batch
56+
while batch_size < self.max_content_size_bytes {
57+
if let Some(payload) = self.queue.pop_front() {
58+
let payload_size = payload.encoded_len();
59+
60+
// Put stats back in the queue
61+
if batch_size + payload_size > self.max_content_size_bytes {
62+
self.queue.push_front(payload);
63+
break;
64+
}
65+
batch_size += payload_size;
66+
self.buffer.push(payload);
67+
} else {
68+
break;
69+
}
70+
}
71+
72+
std::mem::take(&mut self.buffer)
73+
}
74+
}
75+
76+
#[cfg(test)]
77+
#[allow(clippy::unwrap_used)]
78+
mod tests {
79+
use super::*;
80+
81+
#[test]
82+
fn test_add() {
83+
let mut aggregator = StatsAggregator::default();
84+
let payload = ClientStatsPayload {
85+
hostname: "hostname".to_string(),
86+
env: "dev".to_string(),
87+
version: "version".to_string(),
88+
stats: vec![],
89+
lang: "rust".to_string(),
90+
tracer_version: "tracer.version".to_string(),
91+
runtime_id: "hash".to_string(),
92+
sequence: 0,
93+
agent_aggregation: "aggregation".to_string(),
94+
service: "service".to_string(),
95+
container_id: "container_id".to_string(),
96+
tags: vec![],
97+
git_commit_sha: "git_commit_sha".to_string(),
98+
image_tag: "image_tag".to_string(),
99+
};
100+
101+
aggregator.add(payload.clone());
102+
assert_eq!(aggregator.queue.len(), 1);
103+
assert_eq!(aggregator.queue[0], payload);
104+
}
105+
106+
#[test]
107+
fn test_get_batch() {
108+
let mut aggregator = StatsAggregator::default();
109+
let payload = ClientStatsPayload {
110+
hostname: "hostname".to_string(),
111+
env: "dev".to_string(),
112+
version: "version".to_string(),
113+
stats: vec![],
114+
lang: "rust".to_string(),
115+
tracer_version: "tracer.version".to_string(),
116+
runtime_id: "hash".to_string(),
117+
sequence: 0,
118+
agent_aggregation: "aggregation".to_string(),
119+
service: "service".to_string(),
120+
container_id: "container_id".to_string(),
121+
tags: vec![],
122+
git_commit_sha: "git_commit_sha".to_string(),
123+
image_tag: "image_tag".to_string(),
124+
};
125+
aggregator.add(payload.clone());
126+
assert_eq!(aggregator.queue.len(), 1);
127+
let batch = aggregator.get_batch();
128+
assert_eq!(batch, vec![payload]);
129+
}
130+
131+
#[test]
132+
fn test_get_batch_full_entries() {
133+
let mut aggregator = StatsAggregator::new(230);
134+
// Payload below is 115 bytes
135+
let payload = ClientStatsPayload {
136+
hostname: "hostname".to_string(),
137+
env: "dev".to_string(),
138+
version: "version".to_string(),
139+
stats: vec![],
140+
lang: "rust".to_string(),
141+
tracer_version: "tracer.version".to_string(),
142+
runtime_id: "hash".to_string(),
143+
sequence: 0,
144+
agent_aggregation: "aggregation".to_string(),
145+
service: "service".to_string(),
146+
container_id: "container_id".to_string(),
147+
tags: vec![],
148+
git_commit_sha: "git_commit_sha".to_string(),
149+
image_tag: "image_tag".to_string(),
150+
};
151+
152+
// Add 3 payloads
153+
aggregator.add(payload.clone());
154+
aggregator.add(payload.clone());
155+
aggregator.add(payload.clone());
156+
157+
// The batch should only contain the first 2 payloads
158+
let first_batch = aggregator.get_batch();
159+
assert_eq!(first_batch, vec![payload.clone(), payload.clone()]);
160+
assert_eq!(aggregator.queue.len(), 1);
161+
162+
// The second batch should only contain the last log
163+
let second_batch = aggregator.get_batch();
164+
assert_eq!(second_batch, vec![payload]);
165+
assert_eq!(aggregator.queue.len(), 0);
166+
}
167+
}

0 commit comments

Comments
 (0)