Skip to content

Commit 032e32a

Browse files
committed
Add regression tests and standalone repro for BatchSpanProcessor Tokio deadlock
1 parent 5250df2 commit 032e32a

File tree

5 files changed

+158
-1
lines changed

5 files changed

+158
-1
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ members = [
55
"opentelemetry-*/examples/*",
66
"opentelemetry-otlp/tests/*",
77
"examples/*",
8-
"stress",
8+
"stress", "batch-span-processor-hang-repro",
99
]
1010
resolver = "2"
1111
# Avoid applying patch to force use of workspace members for this
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "batch-span-processor-hang-repro"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
opentelemetry = { workspace = true, features = ["trace"] }
8+
opentelemetry_sdk = { workspace = true, features = ["trace", "experimental_trace_batch_span_processor_with_async_runtime", "rt-tokio"] }
9+
tokio = { workspace = true, features = ["rt", "macros"] }
10+
futures-util = { workspace = true, features = ["std"] }
11+
12+
[lints]
13+
workspace = true
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Batch Span Processor Tokio deadlock reproduction
2+
3+
This crate demonstrates how the asynchronous BatchSpanProcessor from
4+
`opentelemetry-sdk` deadlocks when `force_flush` is invoked from a Tokio
5+
current-thread runtime.
6+
7+
## Steps
8+
9+
```bash
10+
# run the example; it will print the first line and then hang
11+
cargo run -p batch-span-processor-hang-repro
12+
```
13+
14+
The program configures `BatchSpanProcessor::builder(..., runtime::Tokio)` and
15+
then calls `SdkTracerProvider::force_flush()` while running inside a
16+
`#[tokio::main(flavor = "current_thread")]` context. The processor calls
17+
`futures_executor::block_on` internally, so the same Tokio thread that is blocked
18+
is also responsible for driving the background worker, resulting in a deadlock.
19+
20+
To see the hang, you can run with a timeout:
21+
22+
```bash
23+
# exits with status 124 because the process never completes
24+
timeout 5 cargo run -p batch-span-processor-hang-repro
25+
```
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use futures_util::future;
2+
use opentelemetry::trace::{Span, Tracer, TracerProvider};
3+
use opentelemetry_sdk::error::OTelSdkResult;
4+
use opentelemetry_sdk::runtime;
5+
use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
6+
use opentelemetry_sdk::trace::{SdkTracerProvider, SpanData, SpanExporter};
7+
8+
#[derive(Debug)]
9+
struct ReadyExporter;
10+
11+
impl SpanExporter for ReadyExporter {
12+
fn export(
13+
&self,
14+
_batch: Vec<SpanData>,
15+
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
16+
future::ready(Ok(()))
17+
}
18+
}
19+
20+
#[tokio::main(flavor = "current_thread")]
21+
async fn main() {
22+
// Build a provider that uses the async-runtime batch span processor with the Tokio runtime.
23+
let batch_processor = BatchSpanProcessor::builder(ReadyExporter, runtime::Tokio).build();
24+
25+
let provider = SdkTracerProvider::builder()
26+
.with_span_processor(batch_processor)
27+
.build();
28+
29+
// End a span so the processor has some work queued.
30+
provider.tracer("repro").start("blocking-call").end();
31+
32+
println!("Calling force_flush... this blocks forever");
33+
34+
// This never returns because force_flush uses futures_executor::block_on,
35+
// which cannot make progress while we're on Tokio's current-thread scheduler.
36+
provider.force_flush().expect("force_flush should succeed");
37+
38+
println!("force_flush returned");
39+
}

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,4 +703,84 @@ mod tests {
703703
"exports overlapped even though max_concurrent_exports was 1"
704704
);
705705
}
706+
707+
#[test]
708+
#[cfg(feature = "rt-tokio")]
709+
fn batch_span_processor_force_flush_deadlocks_helper() {
710+
if std::env::var_os("OTEL_RUN_BSP_DEADLOCK_HELPER").is_none() {
711+
// Running under the normal test harness: skip so the suite does not hang.
712+
return;
713+
}
714+
715+
run_force_flush_deadlock();
716+
}
717+
718+
#[test]
719+
#[cfg(feature = "rt-tokio")]
720+
fn batch_span_processor_force_flush_deadlocks_current_thread_runtime() {
721+
use std::{process::Command, thread, time::Duration};
722+
723+
let mut child = Command::new(
724+
std::env::current_exe().expect("failed to locate current test binary"),
725+
)
726+
.arg("--exact")
727+
.arg(
728+
"trace::span_processor_with_async_runtime::tests::batch_span_processor_force_flush_deadlocks_helper",
729+
)
730+
.env("OTEL_RUN_BSP_DEADLOCK_HELPER", "1")
731+
.spawn()
732+
.expect("failed to spawn helper test");
733+
734+
// Give the helper a short window to reach the blocking call.
735+
thread::sleep(Duration::from_millis(200));
736+
737+
let still_running = child
738+
.try_wait()
739+
.expect("failed to query helper status")
740+
.is_none();
741+
742+
// Always terminate the helper before making assertions to avoid leaking child processes.
743+
let _ = child.kill();
744+
let _ = child.wait();
745+
746+
assert!(
747+
still_running,
748+
"helper process exited unexpectedly; force_flush no longer blocks on the current-thread runtime"
749+
);
750+
}
751+
752+
#[cfg(feature = "rt-tokio")]
753+
fn run_force_flush_deadlock() -> ! {
754+
use crate::error::OTelSdkResult;
755+
use crate::testing::trace::new_test_export_span_data;
756+
use crate::trace::{SpanData, SpanExporter};
757+
758+
#[derive(Debug)]
759+
struct ReadyExporter;
760+
761+
impl SpanExporter for ReadyExporter {
762+
async fn export(&self, _batch: Vec<SpanData>) -> OTelSdkResult {
763+
Ok(())
764+
}
765+
}
766+
767+
let runtime = tokio::runtime::Builder::new_current_thread()
768+
.enable_all()
769+
.build()
770+
.expect("failed to build current-thread runtime");
771+
772+
runtime.block_on(async {
773+
// Building the processor inside the runtime ensures the worker task is spawned via
774+
// `tokio::spawn`, matching application usage.
775+
let processor = BatchSpanProcessor::builder(ReadyExporter, runtime::Tokio).build();
776+
777+
processor.on_end(new_test_export_span_data());
778+
779+
// Calling force_flush from within the same current-thread runtime blocks forever.
780+
let _ = processor.force_flush();
781+
panic!("force_flush unexpectedly returned");
782+
});
783+
784+
unreachable!("the helper should never return once force_flush blocks");
785+
}
706786
}

0 commit comments

Comments
 (0)