Skip to content

Commit 18c8773

Browse files
authored
APMSP-1969 fix dropped trace payloads for the sidecar (#1047)
- The MAX_PAYLOAD_SIZE used in trace_utils::coalesce_send_data() was 50mb. The agent drops payloads greater than > 25mb and returns a 413. So it was potentially combining payloads that would result in an error and drop. - In the sidecar's trace_flusher payloads were being dropped if the queue's size exceeded the min drop size. The correct behavior is to check if the payload being enqueued exceeds min drop size and log an error. - When the sidecar's trace_flusher was dropping a payload that was too large it was still adding that payload's size to the queue size. This could have lead to subsequent payloads being dropped due to an incorrectly lar ge queue size until a time based flush was done. - A bug was discovered in the test helper function poll_for_mock_hits() where we were incorrectly returning true always when expected hits was 0.
1 parent aa24498 commit 18c8773

File tree

3 files changed

+33
-14
lines changed

3 files changed

+33
-14
lines changed

datadog-sidecar/src/service/tracing/trace_flusher.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use tracing::{debug, error, info};
2525

2626
const DEFAULT_FLUSH_INTERVAL_MS: u64 = 5_000;
2727
const DEFAULT_MIN_FORCE_FLUSH_SIZE_BYTES: u32 = 1_000_000;
28-
const DEFAULT_MIN_FORCE_DROP_SIZE_BYTES: u32 = 10_000_000;
2928

3029
/// `TraceFlusherStats` holds stats of the trace flusher like the count of allocated shared memory
3130
/// for agent config, agent config writers, last used entries in agent configs, and the size of send
@@ -103,7 +102,7 @@ impl Default for TraceFlusher {
103102
inner: Mutex::new(TraceFlusherData::default()),
104103
interval_ms: AtomicU64::new(DEFAULT_FLUSH_INTERVAL_MS),
105104
min_force_flush_size_bytes: AtomicU32::new(DEFAULT_MIN_FORCE_FLUSH_SIZE_BYTES),
106-
min_force_drop_size_bytes: AtomicU32::new(DEFAULT_MIN_FORCE_DROP_SIZE_BYTES),
105+
min_force_drop_size_bytes: AtomicU32::new(trace_utils::MAX_PAYLOAD_SIZE as u32),
107106
remote_config: Mutex::new(Default::default()),
108107
metrics: Mutex::new(Default::default()),
109108
}
@@ -120,20 +119,24 @@ impl TraceFlusher {
120119
let mut flush_data = self.inner.lock_or_panic();
121120
let flush_data = flush_data.deref_mut();
122121

123-
flush_data.traces.send_data_size += data.len();
124-
125-
if flush_data.traces.send_data_size
126-
> self.min_force_drop_size_bytes.load(Ordering::Relaxed) as usize
127-
{
122+
if data.len() > self.min_force_drop_size_bytes.load(Ordering::Relaxed) as usize {
123+
error!(
124+
"Error sending trace. Individual trace size of {}B exceeds {}B limit",
125+
data.len(),
126+
self.min_force_drop_size_bytes.load(Ordering::Relaxed) as usize
127+
);
128128
return;
129129
}
130130

131+
flush_data.traces.send_data_size += data.len();
131132
flush_data.traces.send_data.push(data);
133+
132134
if flush_data.flusher.is_none() {
133135
let (force_flush, completer) = ManualFuture::new();
134136
flush_data.flusher = Some(self.clone().start_trace_flusher(force_flush));
135137
flush_data.traces.force_flush = Some(completer);
136138
}
139+
137140
if flush_data.traces.send_data_size
138141
> self.min_force_flush_size_bytes.load(Ordering::Relaxed) as usize
139142
{
@@ -321,7 +324,11 @@ mod tests {
321324
// observe that a request to the trace agent is not made. Then enqueue a third trace exceeding
322325
// the min force flush size, and observe that a request to the trace agent is made.
323326
async fn test_min_flush_size() {
324-
let trace_flusher = Arc::new(TraceFlusher::default());
327+
// Set the interval high enough that it can't cause a false positive
328+
let trace_flusher = Arc::new(TraceFlusher {
329+
interval_ms: AtomicU64::new(20_000),
330+
..TraceFlusher::default()
331+
});
325332

326333
let server = MockServer::start();
327334

@@ -399,10 +406,12 @@ mod tests {
399406

400407
#[cfg_attr(miri, ignore)]
401408
#[tokio::test]
402-
async fn test_flush_drop_size() {
409+
// Test scenario: Enqueue a trace with a size greater than the minimum force drop size, and
410+
// observe that it is not sent.
411+
async fn test_drop_size_no_flush() {
403412
// Set the interval high enough that it can't cause a false positive
404413
let trace_flusher = Arc::new(TraceFlusher {
405-
interval_ms: AtomicU64::new(10_000),
414+
interval_ms: AtomicU64::new(20_000),
406415
..TraceFlusher::default()
407416
});
408417
let server = MockServer::start();
@@ -413,6 +422,7 @@ mod tests {
413422
.body(r#"{"status":"ok"}"#);
414423
})
415424
.await;
425+
416426
let size = trace_flusher
417427
.min_force_drop_size_bytes
418428
.load(Ordering::Relaxed) as usize

datadog-trace-utils/src/test_utils/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,14 +409,21 @@ pub async fn poll_for_mock_hit(
409409
expected_hits: usize,
410410
delete_after_hits: bool,
411411
) -> bool {
412-
let mut mock_hit = mock.hits_async().await == expected_hits;
412+
let mut mock_hit = false;
413413

414414
let mut mock_observations_remaining = poll_attempts;
415415

416416
while !mock_hit {
417417
sleep(Duration::from_millis(sleep_interval_ms)).await;
418-
mock_hit = mock.hits_async().await == expected_hits;
419418
mock_observations_remaining -= 1;
419+
mock_hit = if expected_hits > 0 {
420+
mock.hits_async().await == expected_hits
421+
} else {
422+
// If we are polling for 0 hits, we need to ensure we do all observations, otherwise
423+
// this will always be true
424+
mock.hits_async().await == 0 && mock_observations_remaining == 0
425+
};
426+
420427
if mock_observations_remaining == 0 || mock_hit {
421428
if delete_after_hits {
422429
mock.delete();

datadog-trace-utils/src/trace_utils.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,16 @@ use std::cmp::Ordering;
2323
use std::collections::{HashMap, HashSet};
2424
use std::env;
2525

26+
/// The maximum payload size for a single request that can be sent to the trace agent. Payloads
27+
/// larger than this size will be dropped and the agent will return a 413 error if
28+
/// `datadog-send-real-http-status` is set.
29+
pub const MAX_PAYLOAD_SIZE: usize = 25 * 1024 * 1024;
2630
/// Span metric the mini agent must set for the backend to recognize top level span
2731
const TOP_LEVEL_KEY: &str = "_top_level";
2832
/// Span metric the tracer sets to denote a top level span
2933
const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level";
3034
const MEASURED_KEY: &str = "_dd.measured";
3135
const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
32-
33-
const MAX_PAYLOAD_SIZE: usize = 50 * 1024 * 1024;
3436
const MAX_STRING_DICT_SIZE: u32 = 25_000_000;
3537
const SPAN_ELEMENT_COUNT: usize = 12;
3638

0 commit comments

Comments
 (0)