Skip to content

Commit 9abd5a1

Browse files
SVLS-8260 Add stats flusher to continuous mode (#977)
<!-- dd-meta {"pullId":"00000000-0000-0000-0000-000000000000","source":"chat","resourceId":"6ee9cec1-93a4-4470-b19d-975efc169e19","workflowId":"fcc5dbb2-0259-40cb-ab3b-9d03adbcb0ef","codeChangeId":"fcc5dbb2-0259-40cb-ab3b-9d03adbcb0ef","sourceType":"chat"} --> PR by Bits [View Dev Agent Session](https://app.datadoghq.com/code/6ee9cec1-93a4-4470-b19d-975efc169e19) You can ask for changes by mentioning @DataDog in a comment. Feedback (especially what can be better) welcome in [#code-gen-feedback](https://dd.enterprise.slack.com/archives/C07JA5N2D25)! --- ## Overview - align continuous flush path to include stats - track stats flush handles to prevent overlaps and redrive cleanly - add unit test for stats handle tracking Co-authored-by: datadog-official[bot] <214633350+datadog-official[bot]@users.noreply.github.com> Co-authored-by: tianning.li <[email protected]>
1 parent 3407f02 commit 9abd5a1

File tree

1 file changed

+44
-45
lines changed

1 file changed

+44
-45
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ struct PendingFlushHandles {
109109
log_flush_handles: Vec<JoinHandle<Vec<reqwest::RequestBuilder>>>,
110110
metric_flush_handles: Vec<JoinHandle<MetricsRetryBatch>>,
111111
proxy_flush_handles: Vec<JoinHandle<Vec<reqwest::RequestBuilder>>>,
112+
stats_flush_handles: Vec<JoinHandle<()>>,
112113
}
113114

114115
struct MetricsRetryBatch {
@@ -124,6 +125,7 @@ impl PendingFlushHandles {
124125
log_flush_handles: Vec::new(),
125126
metric_flush_handles: Vec::new(),
126127
proxy_flush_handles: Vec::new(),
128+
stats_flush_handles: Vec::new(),
127129
}
128130
}
129131

@@ -132,8 +134,9 @@ impl PendingFlushHandles {
132134
let log_pending = self.log_flush_handles.iter().any(|h| !h.is_finished());
133135
let metric_pending = self.metric_flush_handles.iter().any(|h| !h.is_finished());
134136
let proxy_pending = self.proxy_flush_handles.iter().any(|h| !h.is_finished());
137+
let stats_pending = self.stats_flush_handles.iter().any(|h| !h.is_finished());
135138

136-
trace_pending || log_pending || metric_pending || proxy_pending
139+
trace_pending || log_pending || metric_pending || proxy_pending || stats_pending
137140
}
138141

139142
#[allow(clippy::too_many_lines)]
@@ -190,8 +193,10 @@ impl PendingFlushHandles {
190193
self.metric_flush_handles.push(handle);
191194
}
192195

193-
// Stats flush (fire and forget, not tracked)
194-
let () = stats_flusher.flush(false).await;
196+
// Spawn stats flush
197+
let sf = Arc::clone(stats_flusher);
198+
self.stats_flush_handles
199+
.push(tokio::spawn(async move { sf.flush(false).await }));
195200

196201
// Spawn proxy flush
197202
let pf = proxy_flusher.clone();
@@ -211,6 +216,13 @@ impl PendingFlushHandles {
211216
let mut joinset = tokio::task::JoinSet::new();
212217
let mut flush_error = false;
213218

219+
for handle in self.stats_flush_handles.drain(..) {
220+
if let Err(e) = handle.await {
221+
error!("PENDING_FLUSH_HANDLES | stats flush error {e:?}");
222+
flush_error = true;
223+
}
224+
}
225+
214226
for handle in self.trace_flush_handles.drain(..) {
215227
match handle.await {
216228
Ok(retry) => {
@@ -918,50 +930,16 @@ async fn extension_loop_active(
918930
//Periodic flush scenario, flush at top of invocation
919931
FlushDecision::Continuous => {
920932
if !pending_flush_handles.has_pending_handles() {
921-
let lf = logs_flusher.clone();
922933
pending_flush_handles
923-
.log_flush_handles
924-
.push(tokio::spawn(async move { lf.flush(None).await }));
925-
let tf = trace_flusher.clone();
926-
pending_flush_handles.trace_flush_handles.push(tokio::spawn(
927-
async move { tf.flush(None).await.unwrap_or_default() },
928-
));
929-
let (metrics_flushers_copy, series, sketches) = {
930-
let locked_metrics = metrics_flushers.lock().await;
931-
let flush_response = metrics_aggregator_handle
932-
.clone()
933-
.flush()
934-
.await
935-
.expect("can't flush metrics handle");
936-
(
937-
locked_metrics.clone(),
938-
flush_response.series,
939-
flush_response.distributions,
934+
.spawn_non_blocking_flushes(
935+
&logs_flusher,
936+
&trace_flusher,
937+
&metrics_flushers,
938+
&stats_flusher,
939+
&proxy_flusher,
940+
&metrics_aggregator_handle,
940941
)
941-
};
942-
for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate()
943-
{
944-
let series_clone = series.clone();
945-
let sketches_clone = sketches.clone();
946-
let handle = tokio::spawn(async move {
947-
let (retry_series, retry_sketches) = flusher
948-
.flush_metrics(series_clone.clone(), sketches_clone.clone())
949-
.await
950-
.unwrap_or_default();
951-
MetricsRetryBatch {
952-
flusher_id: idx,
953-
series: retry_series,
954-
sketches: retry_sketches,
955-
}
956-
});
957-
pending_flush_handles.metric_flush_handles.push(handle);
958-
}
959-
960-
let pf = proxy_flusher.clone();
961-
pending_flush_handles.proxy_flush_handles.push(tokio::spawn(
962-
async move { pf.flush(None).await.unwrap_or_default() },
963-
));
964-
942+
.await;
965943
race_flush_interval.reset();
966944
}
967945
}
@@ -1749,3 +1727,24 @@ fn start_api_runtime_proxy(
17491727
)
17501728
.ok()
17511729
}
1730+
1731+
#[cfg(test)]
1732+
mod pending_flush_handles_tests {
1733+
use super::*;
1734+
use tokio::time::{Duration, sleep};
1735+
1736+
#[tokio::test]
1737+
async fn stats_handle_is_tracked_until_completion() {
1738+
let mut pending = PendingFlushHandles::new();
1739+
let handle = tokio::spawn(async {
1740+
sleep(Duration::from_millis(5)).await;
1741+
});
1742+
pending.stats_flush_handles.push(handle);
1743+
1744+
assert!(pending.has_pending_handles());
1745+
1746+
sleep(Duration::from_millis(10)).await;
1747+
1748+
assert!(!pending.has_pending_handles());
1749+
}
1750+
}

0 commit comments

Comments
 (0)