Skip to content

Commit 070ae12

Browse files
committed
make agent stats computation conditional
1 parent 4b45863 commit 070ae12

File tree

2 files changed

+57
-44
lines changed

2 files changed

+57
-44
lines changed

crates/datadog-serverless-compat/src/main.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,20 @@ pub async fn main() {
128128
Arc::clone(&config),
129129
));
130130

131-
// Initialize stats concentrator service and generator
132-
let (stats_concentrator_service, stats_concentrator_handle) =
133-
stats_concentrator_service::StatsConcentratorService::new(config.clone());
134-
tokio::spawn(stats_concentrator_service.run());
135-
let stats_generator = Arc::new(stats_generator::StatsGenerator::new(
136-
stats_concentrator_handle.clone(),
137-
));
131+
// Initialize stats concentrator service and generator conditionally
132+
let (stats_concentrator_handle, stats_generator) = if dd_stats_computation_enabled {
133+
info!("Stats computation enabled");
134+
let (stats_concentrator_service, stats_concentrator_handle) =
135+
stats_concentrator_service::StatsConcentratorService::new(config.clone());
136+
tokio::spawn(stats_concentrator_service.run());
137+
let stats_generator = Arc::new(stats_generator::StatsGenerator::new(
138+
stats_concentrator_handle.clone(),
139+
));
140+
(Some(stats_concentrator_handle), Some(stats_generator))
141+
} else {
142+
info!("Stats computation disabled");
143+
(None, None)
144+
};
138145

139146
let mini_agent = Box::new(mini_agent::MiniAgent {
140147
config: Arc::clone(&config),

crates/datadog-trace-agent/src/mini_agent.rs

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ pub struct MiniAgent {
3535
pub stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
3636
pub stats_flusher: Arc<dyn stats_flusher::StatsFlusher + Send + Sync>,
3737
pub env_verifier: Arc<dyn env_verifier::EnvVerifier + Send + Sync>,
38-
pub stats_concentrator: stats_concentrator_service::StatsConcentratorHandle,
39-
pub stats_generator: Arc<stats_generator::StatsGenerator>,
38+
pub stats_concentrator: Option<stats_concentrator_service::StatsConcentratorHandle>,
39+
pub stats_generator: Option<Arc<stats_generator::StatsGenerator>>,
4040
}
4141

4242
impl MiniAgent {
@@ -65,24 +65,29 @@ impl MiniAgent {
6565
let (trace_tx_internal, trace_rx): (Sender<SendData>, Receiver<SendData>) =
6666
mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE);
6767

68-
// Create an intercepting channel that generates stats from traces
69-
let (trace_tx, mut trace_rx_intercept): (Sender<SendData>, Receiver<SendData>) =
70-
mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE);
68+
// Conditionally create an intercepting channel that generates stats from traces
69+
let trace_tx = if let Some(stats_generator) = self.stats_generator.clone() {
70+
let (trace_tx, mut trace_rx_intercept): (Sender<SendData>, Receiver<SendData>) =
71+
mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE);
7172

72-
// Intercept traces to generate stats, then forward to trace flusher
73-
let stats_generator = self.stats_generator.clone();
74-
tokio::spawn(async move {
75-
while let Some(send_data) = trace_rx_intercept.recv().await {
76-
// Generate stats from the trace payload
77-
if let Err(e) = stats_generator.send(send_data.get_payloads()) {
78-
error!("Failed to generate stats from traces: {e}");
79-
}
80-
// Forward to trace flusher
81-
if let Err(e) = trace_tx_internal.send(send_data).await {
82-
error!("Failed to forward traces to flusher: {e}");
73+
// Intercept traces to generate stats, then forward to trace flusher
74+
tokio::spawn(async move {
75+
while let Some(send_data) = trace_rx_intercept.recv().await {
76+
// Generate stats from the trace payload
77+
if let Err(e) = stats_generator.send(send_data.get_payloads()) {
78+
error!("Failed to generate stats from traces: {e}");
79+
}
80+
// Forward to trace flusher
81+
if let Err(e) = trace_tx_internal.send(send_data).await {
82+
error!("Failed to forward traces to flusher: {e}");
83+
}
8384
}
84-
}
85-
});
85+
});
86+
trace_tx
87+
} else {
88+
// If stats generation is disabled, use the internal channel directly
89+
trace_tx_internal.clone()
90+
};
8691

8792
// start our trace flusher. receives trace payloads and handles buffering + deciding when to
8893
// flush to backend.
@@ -124,28 +129,29 @@ impl MiniAgent {
124129
.await;
125130
});
126131

127-
// Start periodic stats flush task
128-
let stats_concentrator = self.stats_concentrator.clone();
129-
let flush_interval = self.config.stats_flush_interval;
130-
tokio::spawn(async move {
131-
loop {
132-
tokio::time::sleep(tokio::time::Duration::from_secs(flush_interval)).await;
133-
match stats_concentrator.flush(false).await {
134-
Ok(Some(stats_payload)) => {
135-
debug!("Flushed generated stats from concentrator");
136-
if let Err(e) = stats_tx_generated.send(stats_payload).await {
137-
error!("Failed to send flushed stats: {e}");
132+
// Start periodic stats flush task only if stats computation is enabled
133+
if let Some(stats_concentrator) = self.stats_concentrator.clone() {
134+
let flush_interval = self.config.stats_flush_interval;
135+
tokio::spawn(async move {
136+
loop {
137+
tokio::time::sleep(tokio::time::Duration::from_secs(flush_interval)).await;
138+
match stats_concentrator.flush(false).await {
139+
Ok(Some(stats_payload)) => {
140+
debug!("Flushed generated stats from concentrator");
141+
if let Err(e) = stats_tx_generated.send(stats_payload).await {
142+
error!("Failed to send flushed stats: {e}");
143+
}
144+
}
145+
Ok(None) => {
146+
debug!("No generated stats to flush");
147+
}
148+
Err(e) => {
149+
error!("Error flushing generated stats: {e}");
138150
}
139-
}
140-
Ok(None) => {
141-
debug!("No generated stats to flush");
142-
}
143-
Err(e) => {
144-
error!("Error flushing generated stats: {e}");
145151
}
146152
}
147-
}
148-
});
153+
});
154+
}
149155

150156
// setup our hyper http server, where the endpoint_handler handles incoming requests
151157
let trace_processor = self.trace_processor.clone();

0 commit comments

Comments
 (0)