Skip to content

Commit 58110c0

Browse files
authored
feat: [Trace Stats] Add skeleton of concentrator (#842)
## This PR - Add the skeleton of `StatsConcentrator`, with no implementation - Add `StatsConcentratorHandle` and `StatsConcentratorService`, which send and process stats requests (`add()` and `get_stats()`) to/from a queue, so mutex is not needed, and lock contention can be avoided. (Thanks @duncanista for the suggestion and @astuyve for the example code DataDog/serverless-components#32) ## Next steps - Implement `StatsConcentrator`, which aggregates stats data into buckets and returns it in batch - Add more fields to `AggregationKey` and `Stats` - Move the processing of stats after "obfuscation", as suggested by APM team. This will involve lots of code changes, so I'll make it a separate PR. I'll mainly move code from this draft PR: #827 ## Architecture <img width="1296" height="674" alt="image" src="https://github.com/user-attachments/assets/2d4cb925-6cfc-4581-8ed6-6bd87cf0d87a" /> Jira: https://datadoghq.atlassian.net/browse/SVLS-7593
1 parent 53cba3c commit 58110c0

File tree

8 files changed

+243
-32
lines changed

8 files changed

+243
-32
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use bottlecap::{
5757
proxy_aggregator,
5858
proxy_flusher::Flusher as ProxyFlusher,
5959
stats_aggregator::StatsAggregator,
60+
stats_concentrator_service::StatsConcentratorService,
6061
stats_flusher::{self, StatsFlusher},
6162
stats_processor, trace_agent,
6263
trace_aggregator::{self, SendDataBuilderInfo},
@@ -522,6 +523,7 @@ async fn extension_loop_active(
522523
&proxy_flusher,
523524
&mut race_flush_interval,
524525
&metrics_aggr_handle.clone(),
526+
false,
525527
)
526528
.await;
527529
}
@@ -537,6 +539,7 @@ async fn extension_loop_active(
537539
&proxy_flusher,
538540
&mut race_flush_interval,
539541
&metrics_aggr_handle.clone(),
542+
false,
540543
)
541544
.await;
542545
let next_response =
@@ -606,6 +609,7 @@ async fn extension_loop_active(
606609
&proxy_flusher,
607610
&mut race_flush_interval,
608611
&metrics_aggr_handle,
612+
false, // force_flush_trace_stats
609613
)
610614
.await;
611615
}
@@ -639,6 +643,7 @@ async fn extension_loop_active(
639643
&proxy_flusher,
640644
&mut race_flush_interval,
641645
&metrics_aggr_handle,
646+
false, // force_flush_trace_stats
642647
)
643648
.await;
644649
}
@@ -697,13 +702,15 @@ async fn extension_loop_active(
697702
&proxy_flusher,
698703
&mut race_flush_interval,
699704
&metrics_aggr_handle,
705+
true, // force_flush_trace_stats
700706
)
701707
.await;
702708
return Ok(());
703709
}
704710
}
705711
}
706712

713+
#[allow(clippy::too_many_arguments)]
707714
async fn blocking_flush_all(
708715
logs_flusher: &LogsFlusher,
709716
metrics_flushers: &mut [MetricsFlusher],
@@ -712,6 +719,7 @@ async fn blocking_flush_all(
712719
proxy_flusher: &ProxyFlusher,
713720
race_flush_interval: &mut tokio::time::Interval,
714721
metrics_aggr_handle: &MetricsAggregatorHandle,
722+
force_flush_trace_stats: bool,
715723
) {
716724
let flush_response = metrics_aggr_handle
717725
.flush()
@@ -731,7 +739,7 @@ async fn blocking_flush_all(
731739
logs_flusher.flush(None),
732740
futures::future::join_all(metrics_futures),
733741
trace_flusher.flush(None),
734-
stats_flusher.flush(),
742+
stats_flusher.flush(force_flush_trace_stats),
735743
proxy_flusher.flush(None),
736744
);
737745
race_flush_interval.reset();
@@ -981,7 +989,12 @@ fn start_trace_agent(
981989
tokio_util::sync::CancellationToken,
982990
) {
983991
// Stats
984-
let stats_aggregator = Arc::new(TokioMutex::new(StatsAggregator::default()));
992+
let (stats_concentrator_service, stats_concentrator_handle) =
993+
StatsConcentratorService::new(Arc::clone(config));
994+
tokio::spawn(stats_concentrator_service.run());
995+
let stats_aggregator: Arc<TokioMutex<StatsAggregator>> = Arc::new(TokioMutex::new(
996+
StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()),
997+
));
985998
let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new(
986999
api_key_factory.clone(),
9871000
stats_aggregator.clone(),
@@ -1029,6 +1042,7 @@ fn start_trace_agent(
10291042
invocation_processor,
10301043
appsec_processor,
10311044
Arc::clone(tags_provider),
1045+
stats_concentrator_handle,
10321046
);
10331047
let trace_agent_channel = trace_agent.get_sender_copy();
10341048
let shutdown_token = trace_agent.shutdown_token();

bottlecap/src/traces/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@ pub mod proxy_aggregator;
77
pub mod proxy_flusher;
88
pub mod span_pointers;
99
pub mod stats_aggregator;
10+
pub mod stats_concentrator;
11+
pub mod stats_concentrator_service;
1012
pub mod stats_flusher;
1113
pub mod stats_processor;
1214
pub mod trace_agent;
1315
pub mod trace_aggregator;
1416
pub mod trace_flusher;
1517
pub mod trace_processor;
18+
pub mod trace_stats_processor;
1619

1720
// URL for a call to the Lambda runtime API. The value may be replaced if `AWS_LAMBDA_RUNTIME_API` is set.
1821
const LAMBDA_RUNTIME_URL_PREFIX: &str = "http://127.0.0.1:9001";

bottlecap/src/traces/stats_aggregator.rs

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
use crate::traces::stats_concentrator_service::StatsConcentratorHandle;
12
use datadog_trace_protobuf::pb::ClientStatsPayload;
23
use std::collections::VecDeque;
4+
use tracing::error;
35

46
#[allow(clippy::empty_line_after_doc_comments)]
57
/// Maximum number of entries in a stat payload.
@@ -22,37 +24,44 @@ pub struct StatsAggregator {
2224
queue: VecDeque<ClientStatsPayload>,
2325
max_content_size_bytes: usize,
2426
buffer: Vec<ClientStatsPayload>,
25-
}
26-
27-
impl Default for StatsAggregator {
28-
fn default() -> Self {
29-
StatsAggregator {
30-
queue: VecDeque::new(),
31-
max_content_size_bytes: MAX_CONTENT_SIZE_BYTES,
32-
buffer: Vec::new(),
33-
}
34-
}
27+
concentrator: StatsConcentratorHandle,
3528
}
3629

3730
/// Takes in individual trace stats payloads and aggregates them into batches to be flushed to Datadog.
3831
impl StatsAggregator {
3932
#[allow(dead_code)]
4033
#[allow(clippy::must_use_candidate)]
41-
pub fn new(max_content_size_bytes: usize) -> Self {
34+
fn new(max_content_size_bytes: usize, concentrator: StatsConcentratorHandle) -> Self {
4235
StatsAggregator {
4336
queue: VecDeque::new(),
4437
max_content_size_bytes,
4538
buffer: Vec::new(),
39+
concentrator,
4640
}
4741
}
4842

43+
#[must_use]
44+
pub fn new_with_concentrator(concentrator: StatsConcentratorHandle) -> Self {
45+
Self::new(MAX_CONTENT_SIZE_BYTES, concentrator)
46+
}
47+
4948
/// Takes in an individual trace stats payload.
5049
pub fn add(&mut self, payload: ClientStatsPayload) {
5150
self.queue.push_back(payload);
5251
}
5352

5453
/// Returns a batch of trace stats payloads, subject to the max content size.
55-
pub fn get_batch(&mut self) -> Vec<ClientStatsPayload> {
54+
pub async fn get_batch(&mut self, force_flush: bool) -> Vec<ClientStatsPayload> {
55+
// Pull stats data from concentrator
56+
match self.concentrator.flush(force_flush).await {
57+
Ok(stats) => {
58+
self.queue.extend(stats);
59+
}
60+
Err(e) => {
61+
error!("Error getting stats from the stats concentrator: {e:?}");
62+
}
63+
}
64+
5665
let mut batch_size = 0;
5766

5867
// Fill the batch
@@ -80,10 +89,15 @@ impl StatsAggregator {
8089
#[allow(clippy::unwrap_used)]
8190
mod tests {
8291
use super::*;
92+
use crate::config::Config;
93+
use crate::traces::stats_concentrator_service::StatsConcentratorService;
94+
use std::sync::Arc;
8395

8496
#[test]
8597
fn test_add() {
86-
let mut aggregator = StatsAggregator::default();
98+
let config = Arc::new(Config::default());
99+
let (_, concentrator) = StatsConcentratorService::new(config);
100+
let mut aggregator = StatsAggregator::new_with_concentrator(concentrator);
87101
let payload = ClientStatsPayload {
88102
hostname: "hostname".to_string(),
89103
env: "dev".to_string(),
@@ -106,9 +120,11 @@ mod tests {
106120
assert_eq!(aggregator.queue[0], payload);
107121
}
108122

109-
#[test]
110-
fn test_get_batch() {
111-
let mut aggregator = StatsAggregator::default();
123+
#[tokio::test]
124+
async fn test_get_batch() {
125+
let config = Arc::new(Config::default());
126+
let (_, concentrator) = StatsConcentratorService::new(config);
127+
let mut aggregator = StatsAggregator::new_with_concentrator(concentrator);
112128
let payload = ClientStatsPayload {
113129
hostname: "hostname".to_string(),
114130
env: "dev".to_string(),
@@ -127,13 +143,15 @@ mod tests {
127143
};
128144
aggregator.add(payload.clone());
129145
assert_eq!(aggregator.queue.len(), 1);
130-
let batch = aggregator.get_batch();
146+
let batch = aggregator.get_batch(false).await;
131147
assert_eq!(batch, vec![payload]);
132148
}
133149

134-
#[test]
135-
fn test_get_batch_full_entries() {
136-
let mut aggregator = StatsAggregator::new(640);
150+
#[tokio::test]
151+
async fn test_get_batch_full_entries() {
152+
let config = Arc::new(Config::default());
153+
let (_, concentrator) = StatsConcentratorService::new(config);
154+
let mut aggregator = StatsAggregator::new(640, concentrator);
137155
// Payload below is 115 bytes
138156
let payload = ClientStatsPayload {
139157
hostname: "hostname".to_string(),
@@ -158,12 +176,12 @@ mod tests {
158176
aggregator.add(payload.clone());
159177

160178
// The batch should only contain the first 2 payloads
161-
let first_batch = aggregator.get_batch();
179+
let first_batch = aggregator.get_batch(false).await;
162180
assert_eq!(first_batch, vec![payload.clone(), payload.clone()]);
163181
assert_eq!(aggregator.queue.len(), 1);
164182

165183
// The second batch should only contain the last log
166-
let second_batch = aggregator.get_batch();
184+
let second_batch = aggregator.get_batch(false).await;
167185
assert_eq!(second_batch, vec![payload]);
168186
assert_eq!(aggregator.queue.len(), 0);
169187
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use crate::config::Config;
2+
use datadog_trace_protobuf::pb;
3+
use std::sync::Arc;
4+
5+
// Event sent to the stats concentrator
6+
#[derive(Clone, Copy)]
7+
pub struct StatsEvent {
8+
pub time: u64,
9+
pub aggregation_key: AggregationKey,
10+
pub stats: Stats,
11+
}
12+
13+
#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)]
14+
pub struct AggregationKey {}
15+
16+
#[derive(Clone, Debug, Default, Copy)]
17+
pub struct Stats {}
18+
19+
pub struct StatsConcentrator {
20+
_config: Arc<Config>,
21+
}
22+
23+
// Aggregates stats into buckets, which are then pulled by the stats aggregator.
24+
impl StatsConcentrator {
25+
#[must_use]
26+
pub fn new(config: Arc<Config>) -> Self {
27+
Self { _config: config }
28+
}
29+
30+
pub fn add(&mut self, _stats_event: StatsEvent) {}
31+
32+
// force_flush: If true, flush all stats. If false, flush stats except for the few latest
33+
// buckets, which may still be getting data.
34+
#[must_use]
35+
pub fn flush(&mut self, _force_flush: bool) -> Vec<pb::ClientStatsPayload> {
36+
vec![]
37+
}
38+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use tokio::sync::{mpsc, oneshot};
2+
3+
use crate::config::Config;
4+
use crate::traces::stats_concentrator::StatsConcentrator;
5+
use crate::traces::stats_concentrator::StatsEvent;
6+
use datadog_trace_protobuf::pb;
7+
use std::sync::Arc;
8+
use tracing::error;
9+
10+
#[derive(Debug, thiserror::Error)]
11+
pub enum StatsError {
12+
#[error("Failed to send command to concentrator: {0}")]
13+
SendError(mpsc::error::SendError<ConcentratorCommand>),
14+
#[error("Failed to receive response from concentrator: {0}")]
15+
RecvError(oneshot::error::RecvError),
16+
}
17+
18+
pub enum ConcentratorCommand {
19+
Add(StatsEvent),
20+
Flush(bool, oneshot::Sender<Vec<pb::ClientStatsPayload>>),
21+
}
22+
23+
#[derive(Clone)]
24+
pub struct StatsConcentratorHandle {
25+
tx: mpsc::UnboundedSender<ConcentratorCommand>,
26+
}
27+
28+
impl StatsConcentratorHandle {
29+
pub fn add(
30+
&self,
31+
stats_event: StatsEvent,
32+
) -> Result<(), mpsc::error::SendError<ConcentratorCommand>> {
33+
self.tx.send(ConcentratorCommand::Add(stats_event))
34+
}
35+
36+
pub async fn flush(
37+
&self,
38+
force_flush: bool,
39+
) -> Result<Vec<pb::ClientStatsPayload>, StatsError> {
40+
let (response_tx, response_rx) = oneshot::channel();
41+
self.tx
42+
.send(ConcentratorCommand::Flush(force_flush, response_tx))
43+
.map_err(StatsError::SendError)?;
44+
let stats = response_rx.await.map_err(StatsError::RecvError)?;
45+
Ok(stats)
46+
}
47+
}
48+
49+
pub struct StatsConcentratorService {
50+
concentrator: StatsConcentrator,
51+
rx: mpsc::UnboundedReceiver<ConcentratorCommand>,
52+
}
53+
54+
// A service that handles add() and flush() requests in the same queue,
55+
// to avoid using mutex, which may cause lock contention.
56+
impl StatsConcentratorService {
57+
#[must_use]
58+
pub fn new(config: Arc<Config>) -> (Self, StatsConcentratorHandle) {
59+
let (tx, rx) = mpsc::unbounded_channel();
60+
let handle = StatsConcentratorHandle { tx };
61+
let concentrator = StatsConcentrator::new(config);
62+
let service: StatsConcentratorService = Self { concentrator, rx };
63+
(service, handle)
64+
}
65+
66+
pub async fn run(mut self) {
67+
while let Some(command) = self.rx.recv().await {
68+
match command {
69+
ConcentratorCommand::Add(stats_event) => self.concentrator.add(stats_event),
70+
ConcentratorCommand::Flush(force_flush, response_tx) => {
71+
let stats = self.concentrator.flush(force_flush);
72+
if let Err(e) = response_tx.send(stats) {
73+
error!("Failed to return trace stats: {e:?}");
74+
}
75+
}
76+
}
77+
}
78+
}
79+
}

bottlecap/src/traces/stats_flusher.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub trait StatsFlusher {
2828
/// Flushes stats to the Datadog trace stats intake.
2929
async fn send(&self, traces: Vec<pb::ClientStatsPayload>);
3030

31-
async fn flush(&self);
31+
async fn flush(&self, force_flush: bool);
3232
}
3333

3434
#[allow(clippy::module_name_repetitions)]
@@ -116,14 +116,15 @@ impl StatsFlusher for ServerlessStatsFlusher {
116116
}
117117
};
118118
}
119-
async fn flush(&self) {
119+
120+
async fn flush(&self, force_flush: bool) {
120121
let mut guard = self.aggregator.lock().await;
121122

122-
let mut stats = guard.get_batch();
123+
let mut stats = guard.get_batch(force_flush).await;
123124
while !stats.is_empty() {
124125
self.send(stats).await;
125126

126-
stats = guard.get_batch();
127+
stats = guard.get_batch(force_flush).await;
127128
}
128129
}
129130
}

0 commit comments

Comments
 (0)