Skip to content

Commit 5aefff2

Browse files
committed
Spawn dedicated worker for BatchSpanProcessor on Tokio current-thread
1 parent 032e32a commit 5aefff2

File tree

1 file changed

+87
-29
lines changed

1 file changed

+87
-29
lines changed

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

Lines changed: 87 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,54 @@ impl<E: SpanExporter + 'static, R: RuntimeChannel> BatchSpanProcessorInternal<E,
366366
}
367367

368368
impl<R: RuntimeChannel> BatchSpanProcessor<R> {
369+
#[cfg(feature = "rt-tokio")]
370+
fn should_spawn_tokio_current_thread_worker<T: RuntimeChannel>() -> bool {
371+
if std::any::TypeId::of::<T>() != std::any::TypeId::of::<crate::runtime::Tokio>() {
372+
return false;
373+
}
374+
375+
match tokio::runtime::Handle::try_current() {
376+
Ok(handle) => matches!(
377+
handle.runtime_flavor(),
378+
tokio::runtime::RuntimeFlavor::CurrentThread
379+
),
380+
Err(_) => false,
381+
}
382+
}
383+
384+
#[cfg(not(feature = "rt-tokio"))]
385+
fn should_spawn_tokio_current_thread_worker<T: RuntimeChannel>() -> bool {
386+
let _ = std::any::TypeId::of::<T>();
387+
false
388+
}
389+
390+
#[cfg(feature = "rt-tokio")]
391+
fn spawn_tokio_current_thread_worker<F>(worker: F)
392+
where
393+
F: std::future::Future<Output = ()> + Send + 'static,
394+
{
395+
std::thread::Builder::new()
396+
.name("OpenTelemetry.Traces.BatchProcessor.TokioCurrentThread".to_string())
397+
.spawn(move || {
398+
let rt = tokio::runtime::Builder::new_current_thread()
399+
.enable_all()
400+
.build()
401+
.expect("failed to create Tokio current thread runtime for OpenTelemetry batch processing");
402+
rt.block_on(worker);
403+
})
404+
.expect("failed to spawn dedicated Tokio current thread for OpenTelemetry batch processing");
405+
}
406+
407+
#[cfg(not(feature = "rt-tokio"))]
408+
fn spawn_tokio_current_thread_worker<F>(_worker: F)
409+
where
410+
F: std::future::Future<Output = ()> + Send + 'static,
411+
{
412+
unreachable!(
413+
"tokio current-thread worker spawn should not be called without rt-tokio feature"
414+
);
415+
}
416+
369417
pub(crate) fn new<E>(exporter: E, config: BatchConfig, runtime: R) -> Self
370418
where
371419
E: SpanExporter + Send + Sync + 'static,
@@ -376,8 +424,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
376424
let max_queue_size = config.max_queue_size;
377425

378426
let inner_runtime = runtime.clone();
379-
// Spawn worker process via user-defined spawn function.
380-
runtime.spawn(async move {
427+
let worker = async move {
381428
// Timer will take a reference to the current runtime, so its important we do this within the
382429
// runtime.spawn()
383430
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
@@ -395,7 +442,21 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
395442
};
396443

397444
processor.run(messages).await
398-
});
445+
};
446+
447+
#[cfg(feature = "rt-tokio")]
448+
{
449+
if Self::should_spawn_tokio_current_thread_worker::<R>() {
450+
Self::spawn_tokio_current_thread_worker(worker);
451+
} else {
452+
runtime.spawn(worker);
453+
}
454+
}
455+
456+
#[cfg(not(feature = "rt-tokio"))]
457+
{
458+
runtime.spawn(worker);
459+
}
399460

400461
// Return batch processor with link to worker
401462
BatchSpanProcessor {
@@ -706,51 +767,52 @@ mod tests {
706767

707768
#[test]
708769
#[cfg(feature = "rt-tokio")]
709-
fn batch_span_processor_force_flush_deadlocks_helper() {
770+
fn batch_span_processor_force_flush_current_thread_helper() {
710771
if std::env::var_os("OTEL_RUN_BSP_DEADLOCK_HELPER").is_none() {
711772
// Running under the normal test harness: skip so the suite does not hang.
712773
return;
713774
}
714775

715-
run_force_flush_deadlock();
776+
run_force_flush_current_thread();
716777
}
717778

718779
#[test]
719780
#[cfg(feature = "rt-tokio")]
720-
fn batch_span_processor_force_flush_deadlocks_current_thread_runtime() {
781+
fn batch_span_processor_force_flush_current_thread_runtime() {
721782
use std::{process::Command, thread, time::Duration};
722783

723784
let mut child = Command::new(
724785
std::env::current_exe().expect("failed to locate current test binary"),
725786
)
726787
.arg("--exact")
727788
.arg(
728-
"trace::span_processor_with_async_runtime::tests::batch_span_processor_force_flush_deadlocks_helper",
789+
"trace::span_processor_with_async_runtime::tests::batch_span_processor_force_flush_current_thread_helper",
729790
)
730791
.env("OTEL_RUN_BSP_DEADLOCK_HELPER", "1")
731792
.spawn()
732793
.expect("failed to spawn helper test");
733794

734-
// Give the helper a short window to reach the blocking call.
735-
thread::sleep(Duration::from_millis(200));
736-
737-
let still_running = child
738-
.try_wait()
739-
.expect("failed to query helper status")
740-
.is_none();
795+
let start = std::time::Instant::now();
796+
loop {
797+
if let Some(status) = child.try_wait().expect("failed to query helper status") {
798+
assert!(
799+
status.success(),
800+
"helper process exited with failure status"
801+
);
802+
break;
803+
}
741804

742-
// Always terminate the helper before making assertions to avoid leaking child processes.
743-
let _ = child.kill();
744-
let _ = child.wait();
805+
if start.elapsed() > Duration::from_secs(2) {
806+
let _ = child.kill();
807+
panic!("force_flush still hangs on Tokio current-thread runtime");
808+
}
745809

746-
assert!(
747-
still_running,
748-
"helper process exited unexpectedly; force_flush no longer blocks on the current-thread runtime"
749-
);
810+
thread::sleep(Duration::from_millis(50));
811+
}
750812
}
751813

752814
#[cfg(feature = "rt-tokio")]
753-
fn run_force_flush_deadlock() -> ! {
815+
fn run_force_flush_current_thread() {
754816
use crate::error::OTelSdkResult;
755817
use crate::testing::trace::new_test_export_span_data;
756818
use crate::trace::{SpanData, SpanExporter};
@@ -770,17 +832,13 @@ mod tests {
770832
.expect("failed to build current-thread runtime");
771833

772834
runtime.block_on(async {
773-
// Building the processor inside the runtime ensures the worker task is spawned via
774-
// `tokio::spawn`, matching application usage.
775835
let processor = BatchSpanProcessor::builder(ReadyExporter, runtime::Tokio).build();
776836

777837
processor.on_end(new_test_export_span_data());
778838

779-
// Calling force_flush from within the same current-thread runtime blocks forever.
780-
let _ = processor.force_flush();
781-
panic!("force_flush unexpectedly returned");
839+
processor
840+
.force_flush()
841+
.expect("force_flush should complete on Tokio current-thread runtime");
782842
});
783-
784-
unreachable!("the helper should never return once force_flush blocks");
785843
}
786844
}

0 commit comments

Comments
 (0)