Skip to content

Commit 0631070

Browse files
authored
fix(async-processor): concurrent exports actually serialised (#3028)
1 parent 8925f06 commit 0631070

File tree

3 files changed

+165
-38
lines changed

3 files changed

+165
-38
lines changed

opentelemetry-sdk/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
- TODO: Placeholder for Span processor related things
66
- *Fix* SpanProcessor::on_start is no longer called on non recording spans
7+
- **Fix**: Restore true parallel exports in the async-native `BatchSpanProcessor` by honoring `OTEL_BSP_MAX_CONCURRENT_EXPORTS` ([#2959](https://github.com/open-telemetry/opentelemetry-rust/pull/3028)). A regression in [#2685](https://github.com/open-telemetry/opentelemetry-rust/pull/2685) inadvertently awaited the `export()` future directly in `opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs` instead of spawning it on the runtime, forcing all exports to run sequentially.
78

89
## 0.30.0
910

opentelemetry-sdk/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ serde = { workspace = true, features = ["derive", "rc"], optional = true }
2222
serde_json = { workspace = true, optional = true }
2323
thiserror = { workspace = true }
2424
url = { workspace = true, optional = true }
25-
tokio = { workspace = true, features = ["rt", "time"], optional = true }
25+
tokio = { workspace = true, default-features = false, optional = true }
2626
tokio-stream = { workspace = true, optional = true }
2727
http = { workspace = true, optional = true }
2828

@@ -47,15 +47,15 @@ spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"
4747
metrics = ["opentelemetry/metrics"]
4848
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
4949
experimental_async_runtime = []
50-
rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"]
51-
rt-tokio-current-thread = ["tokio", "tokio-stream", "experimental_async_runtime"]
50+
rt-tokio = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
51+
rt-tokio-current-thread = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
5252
internal-logs = ["opentelemetry/internal-logs"]
5353
experimental_metrics_periodicreader_with_async_runtime = ["metrics", "experimental_async_runtime"]
5454
spec_unstable_metrics_views = ["metrics"]
5555
experimental_metrics_custom_reader = ["metrics"]
5656
experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimental_async_runtime"]
5757
experimental_logs_concurrent_log_processor = ["logs"]
58-
experimental_trace_batch_span_processor_with_async_runtime = ["trace", "experimental_async_runtime"]
58+
experimental_trace_batch_span_processor_with_async_runtime = ["tokio/sync", "trace", "experimental_async_runtime"]
5959
experimental_metrics_disable_name_validation = ["metrics"]
6060

6161
[[bench]]

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

Lines changed: 160 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,21 @@ use crate::trace::Span;
66
use crate::trace::SpanProcessor;
77
use crate::trace::{SpanData, SpanExporter};
88
use futures_channel::oneshot;
9-
use futures_util::pin_mut;
109
use futures_util::{
1110
future::{self, BoxFuture, Either},
12-
select,
11+
pin_mut, select,
1312
stream::{self, FusedStream, FuturesUnordered},
1413
StreamExt as _,
1514
};
1615
use opentelemetry::Context;
1716
use opentelemetry::{otel_debug, otel_error, otel_warn};
1817
use std::fmt;
19-
use std::sync::atomic::{AtomicUsize, Ordering};
20-
use std::sync::Arc;
18+
use std::sync::{
19+
atomic::{AtomicUsize, Ordering},
20+
Arc,
21+
};
2122
use std::time::Duration;
23+
use tokio::sync::RwLock;
2224

2325
/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
2426
/// them at a preconfigured interval.
@@ -188,13 +190,22 @@ struct BatchSpanProcessorInternal<E, R> {
188190
spans: Vec<SpanData>,
189191
export_tasks: FuturesUnordered<BoxFuture<'static, OTelSdkResult>>,
190192
runtime: R,
191-
exporter: E,
192193
config: BatchConfig,
194+
// TODO: Redesign the `SpanExporter` trait to use immutable references (`&self`)
195+
// for all methods. This would allow us to remove the `RwLock` and just use `Arc<E>`,
196+
// similar to how `crate::logs::LogExporter` is implemented.
197+
exporter: Arc<RwLock<E>>,
193198
}
194199

195-
impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
200+
impl<E: SpanExporter + 'static, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
196201
async fn flush(&mut self, res_channel: Option<oneshot::Sender<OTelSdkResult>>) {
197-
let export_result = self.export().await;
202+
let export_result = Self::export(
203+
self.spans.split_off(0),
204+
self.exporter.clone(),
205+
self.runtime.clone(),
206+
self.config.max_export_timeout,
207+
)
208+
.await;
198209
let task = Box::pin(async move {
199210
if let Some(channel) = res_channel {
200211
// If a response channel is provided, attempt to send the export result through it.
@@ -243,9 +254,15 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
243254
self.export_tasks.next().await;
244255
}
245256

246-
let export_result = self.export().await;
257+
let batch = self.spans.split_off(0);
258+
let exporter = self.exporter.clone();
259+
let runtime = self.runtime.clone();
260+
let max_export_timeout = self.config.max_export_timeout;
261+
247262
let task = async move {
248-
if let Err(err) = export_result {
263+
if let Err(err) =
264+
Self::export(batch, exporter, runtime, max_export_timeout).await
265+
{
249266
otel_error!(
250267
name: "BatchSpanProcessor.Export.Error",
251268
reason = format!("{}", err)
@@ -254,6 +271,7 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
254271

255272
Ok(())
256273
};
274+
257275
// Special case when not using concurrent exports
258276
if self.config.max_concurrent_exports == 1 {
259277
let _ = task.await;
@@ -288,34 +306,39 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
288306
// Stream has terminated or processor is shutdown, return to finish execution.
289307
BatchMessage::Shutdown(ch) => {
290308
self.flush(Some(ch)).await;
291-
let _ = self.exporter.shutdown();
309+
let _ = self.exporter.write().await.shutdown();
292310
return false;
293311
}
294312
// propagate the resource
295313
BatchMessage::SetResource(resource) => {
296-
self.exporter.set_resource(&resource);
314+
self.exporter.write().await.set_resource(&resource);
297315
}
298316
}
299317
true
300318
}
301319

302-
async fn export(&mut self) -> OTelSdkResult {
320+
async fn export(
321+
batch: Vec<SpanData>,
322+
exporter: Arc<RwLock<E>>,
323+
runtime: R,
324+
max_export_timeout: Duration,
325+
) -> OTelSdkResult {
303326
// Batch size check for flush / shutdown. Those methods may be called
304327
// when there's no work to do.
305-
if self.spans.is_empty() {
328+
if batch.is_empty() {
306329
return Ok(());
307330
}
308331

309-
let export = self.exporter.export(self.spans.split_off(0));
310-
let timeout = self.runtime.delay(self.config.max_export_timeout);
311-
let time_out = self.config.max_export_timeout;
332+
let exporter_guard = exporter.read().await;
333+
let export = exporter_guard.export(batch);
334+
let timeout = runtime.delay(max_export_timeout);
312335

313336
pin_mut!(export);
314337
pin_mut!(timeout);
315338

316339
match future::select(export, timeout).await {
317340
Either::Left((export_res, _)) => export_res,
318-
Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)),
341+
Either::Right((_, _)) => Err(OTelSdkError::Timeout(max_export_timeout)),
319342
}
320343
}
321344

@@ -368,7 +391,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
368391
export_tasks: FuturesUnordered::new(),
369392
runtime: timeout_runtime,
370393
config,
371-
exporter,
394+
exporter: Arc::new(RwLock::new(exporter)),
372395
};
373396

374397
processor.run(messages).await
@@ -435,6 +458,8 @@ mod tests {
435458
use crate::trace::{SpanData, SpanExporter};
436459
use futures_util::Future;
437460
use std::fmt::Debug;
461+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
462+
use std::sync::Arc;
438463
use std::time::Duration;
439464

440465
struct BlockingExporter<D> {
@@ -463,6 +488,39 @@ mod tests {
463488
}
464489
}
465490

491+
/// Exporter that records whether two exports overlap in time.
492+
struct TrackingExporter {
493+
/// Artificial delay to keep each export alive for a while.
494+
delay: Duration,
495+
/// Current number of in-flight exports.
496+
active: Arc<AtomicUsize>,
497+
/// Set to true the first time we see overlap.
498+
concurrent_seen: Arc<AtomicBool>,
499+
}
500+
501+
impl Debug for TrackingExporter {
502+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
503+
f.write_str("tracking exporter")
504+
}
505+
}
506+
507+
impl SpanExporter for TrackingExporter {
508+
async fn export(&self, _batch: Vec<SpanData>) -> crate::error::OTelSdkResult {
509+
// Increment in-flight counter and note any overlap.
510+
let inflight = self.active.fetch_add(1, Ordering::SeqCst) + 1;
511+
if inflight > 1 {
512+
self.concurrent_seen.store(true, Ordering::SeqCst);
513+
}
514+
515+
// Keep the export "busy" for a bit.
516+
tokio::time::sleep(self.delay).await;
517+
518+
// Decrement counter.
519+
self.active.fetch_sub(1, Ordering::SeqCst);
520+
Ok(())
521+
}
522+
}
523+
466524
#[test]
467525
fn test_build_batch_span_processor_builder() {
468526
let mut env_vars = vec![
@@ -532,8 +590,8 @@ mod tests {
532590
);
533591
}
534592

535-
// If the time_out is true, then the result suppose to ended with timeout.
536-
// otherwise the exporter should be able to export within time out duration.
593+
// If `time_out` is `true`, then the export should fail with a timeout.
594+
// Else, the exporter should be able to export within the timeout duration.
537595
async fn timeout_test_tokio(time_out: bool) {
538596
let config = BatchConfig {
539597
max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
@@ -557,24 +615,92 @@ mod tests {
557615
assert!(shutdown_res.is_ok());
558616
}
559617

560-
#[test]
561-
fn test_timeout_tokio_timeout() {
618+
#[tokio::test(flavor = "multi_thread")]
619+
async fn test_timeout_tokio_timeout() {
562620
// If time_out is true, then we ask exporter to block for 60s and set timeout to 5s.
563621
// If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s.
564622
// Either way, the test should be finished within 5s.
565-
let runtime = tokio::runtime::Builder::new_multi_thread()
566-
.enable_all()
567-
.build()
568-
.unwrap();
569-
runtime.block_on(timeout_test_tokio(true));
623+
timeout_test_tokio(true).await;
570624
}
571625

572-
#[test]
573-
fn test_timeout_tokio_not_timeout() {
574-
let runtime = tokio::runtime::Builder::new_multi_thread()
575-
.enable_all()
576-
.build()
577-
.unwrap();
578-
runtime.block_on(timeout_test_tokio(false));
626+
#[tokio::test(flavor = "multi_thread")]
627+
async fn test_timeout_tokio_not_timeout() {
628+
timeout_test_tokio(false).await;
629+
}
630+
631+
#[tokio::test(flavor = "multi_thread")]
632+
async fn test_concurrent_exports_expected() {
633+
// Shared state for the exporter.
634+
let active = Arc::new(AtomicUsize::new(0));
635+
let concurrent_seen = Arc::new(AtomicBool::new(false));
636+
637+
let exporter = TrackingExporter {
638+
delay: Duration::from_millis(50),
639+
active: active.clone(),
640+
concurrent_seen: concurrent_seen.clone(),
641+
};
642+
643+
// Intentionally tiny batch-size so every span forces an export.
644+
let config = BatchConfig {
645+
max_export_batch_size: 1,
646+
max_queue_size: 16,
647+
scheduled_delay: Duration::from_secs(3600), // effectively disabled
648+
max_export_timeout: Duration::from_secs(5),
649+
max_concurrent_exports: 2, // what we want to verify
650+
};
651+
652+
// Spawn the processor.
653+
let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio);
654+
655+
// Finish three spans in rapid succession.
656+
processor.on_end(new_test_export_span_data());
657+
processor.on_end(new_test_export_span_data());
658+
processor.on_end(new_test_export_span_data());
659+
660+
// Wait until everything has been exported.
661+
processor.force_flush().expect("force flush failed");
662+
processor.shutdown().expect("shutdown failed");
663+
664+
// Expect at least one period with >1 export in flight.
665+
assert!(
666+
concurrent_seen.load(Ordering::SeqCst),
667+
"exports never overlapped, processor is still serialising them"
668+
);
669+
}
670+
671+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
672+
async fn test_exports_serial_when_max_concurrent_exports_1() {
673+
let active = Arc::new(AtomicUsize::new(0));
674+
let concurrent_seen = Arc::new(AtomicBool::new(false));
675+
676+
let exporter = TrackingExporter {
677+
delay: Duration::from_millis(50),
678+
active: active.clone(),
679+
concurrent_seen: concurrent_seen.clone(),
680+
};
681+
682+
let config = BatchConfig {
683+
max_export_batch_size: 1,
684+
max_queue_size: 16,
685+
scheduled_delay: Duration::from_secs(3600),
686+
max_export_timeout: Duration::from_secs(5),
687+
max_concurrent_exports: 1, // what we want to verify
688+
};
689+
690+
let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio);
691+
692+
// Finish several spans quickly.
693+
processor.on_end(new_test_export_span_data());
694+
processor.on_end(new_test_export_span_data());
695+
processor.on_end(new_test_export_span_data());
696+
697+
processor.force_flush().expect("force flush failed");
698+
processor.shutdown().expect("shutdown failed");
699+
700+
// There must never have been more than one export in flight.
701+
assert!(
702+
!concurrent_seen.load(Ordering::SeqCst),
703+
"exports overlapped even though max_concurrent_exports was 1"
704+
);
579705
}
580706
}

0 commit comments

Comments
 (0)