Skip to content

Commit 36d16a2

Browse files
authored
attempt to reduce flakiness of trace exporter shutdown tests (#1128)
1 parent 4eb2b86 commit 36d16a2

File tree

1 file changed

+46
-6
lines changed
  • data-pipeline/src/trace_exporter

1 file changed

+46
-6
lines changed

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1399,6 +1399,25 @@ impl TraceExporter {
13991399
fn get_agent_url(&self) -> Uri {
14001400
self.output_format.add_path(&self.endpoint.url)
14011401
}
1402+
1403+
#[cfg(test)]
1404+
/// Test only function to check if the stats computation is active and the worker is running
1405+
pub fn is_stats_worker_active(&self) -> bool {
1406+
if !matches!(
1407+
**self.client_side_stats.load(),
1408+
StatsComputationStatus::Enabled { .. }
1409+
) {
1410+
return false;
1411+
}
1412+
1413+
if let Ok(workers) = self.workers.try_lock() {
1414+
if let Some(stats_worker) = &workers.stats {
1415+
return matches!(stats_worker, PausableWorker::Running { .. });
1416+
}
1417+
}
1418+
1419+
false
1420+
}
14021421
}
14031422

14041423
#[derive(Debug, Default, Clone)]
@@ -1498,7 +1517,7 @@ mod tests {
14981517
then.status(200).body("");
14991518
});
15001519

1501-
let mock_info = server.mock(|when, then| {
1520+
let _mock_info = server.mock(|when, then| {
15021521
when.method(GET).path("/info");
15031522
then.status(200)
15041523
.header("content-type", "application/json")
@@ -1528,7 +1547,7 @@ mod tests {
15281547
let data = rmp_serde::to_vec_named(&vec![trace_chunk]).unwrap();
15291548

15301549
// Wait for the info fetcher to get the config
1531-
while mock_info.hits() == 0 {
1550+
while agent_info::get_agent_info().is_none() {
15321551
exporter
15331552
.runtime
15341553
.lock()
@@ -1544,10 +1563,20 @@ mod tests {
15441563
// Error received because server is returning an empty body.
15451564
assert!(result.is_err());
15461565

1566+
// Wait for the stats worker to be active before shutting down to avoid potential flaky
1567+
// tests on CI where we shutdown before the stats worker had time to start
1568+
let start_time = std::time::Instant::now();
1569+
while !exporter.is_stats_worker_active() {
1570+
if start_time.elapsed() > Duration::from_secs(10) {
1571+
panic!("Timeout waiting for stats worker to become active");
1572+
}
1573+
std::thread::sleep(Duration::from_millis(10));
1574+
}
1575+
15471576
exporter.shutdown(None).unwrap();
15481577

15491578
// Wait for the mock server to process the stats
1550-
for _ in 0..500 {
1579+
for _ in 0..1000 {
15511580
if mock_traces.hits() > 0 && mock_stats.hits() > 0 {
15521581
break;
15531582
} else {
@@ -1585,7 +1614,7 @@ mod tests {
15851614
then.delay(Duration::from_secs(10)).status(200).body("");
15861615
});
15871616

1588-
let mock_info = server.mock(|when, then| {
1617+
let _mock_info = server.mock(|when, then| {
15891618
when.method(GET).path("/info");
15901619
then.status(200)
15911620
.header("content-type", "application/json")
@@ -1618,8 +1647,9 @@ mod tests {
16181647

16191648
let data = rmp_serde::to_vec_named(&vec![trace_chunk]).unwrap();
16201649

1621-
// Wait for the info fetcher to get the config
1622-
while mock_info.hits() == 0 {
1650+
// Wait for agent_info to be present so that sending a trace will trigger the stats worker
1651+
// to start
1652+
while agent_info::get_agent_info().is_none() {
16231653
exporter
16241654
.runtime
16251655
.lock()
@@ -1633,6 +1663,16 @@ mod tests {
16331663

16341664
exporter.send(data.as_ref(), 1).unwrap();
16351665

1666+
// Wait for the stats worker to be active before shutting down to avoid potential flaky
1667+
// tests on CI where we shutdown before the stats worker had time to start
1668+
let start_time = std::time::Instant::now();
1669+
while !exporter.is_stats_worker_active() {
1670+
if start_time.elapsed() > Duration::from_secs(10) {
1671+
panic!("Timeout waiting for stats worker to become active");
1672+
}
1673+
std::thread::sleep(Duration::from_millis(10));
1674+
}
1675+
16361676
exporter
16371677
.shutdown(Some(Duration::from_millis(5)))
16381678
.unwrap_err(); // The shutdown should timeout

0 commit comments

Comments
 (0)