-
Notifications
You must be signed in to change notification settings - Fork 557
fix(async-processor): concurrent exports actually serialised #3028
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
1905b50
f7faf80
d8e046e
2076120
8346e30
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,19 +6,21 @@ | |
use crate::trace::SpanProcessor; | ||
use crate::trace::{SpanData, SpanExporter}; | ||
use futures_channel::oneshot; | ||
use futures_util::pin_mut; | ||
use futures_util::{ | ||
future::{self, BoxFuture, Either}, | ||
select, | ||
pin_mut, select, | ||
stream::{self, FusedStream, FuturesUnordered}, | ||
StreamExt as _, | ||
}; | ||
use opentelemetry::Context; | ||
use opentelemetry::{otel_debug, otel_error, otel_warn}; | ||
use std::fmt; | ||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||
use std::sync::Arc; | ||
use std::sync::{ | ||
atomic::{AtomicUsize, Ordering}, | ||
Arc, | ||
}; | ||
use std::time::Duration; | ||
use tokio::sync::RwLock; | ||
|
||
/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports | ||
/// them at a preconfigured interval. | ||
|
@@ -188,13 +190,19 @@ | |
spans: Vec<SpanData>, | ||
export_tasks: FuturesUnordered<BoxFuture<'static, OTelSdkResult>>, | ||
runtime: R, | ||
exporter: E, | ||
exporter: Arc<RwLock<E>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Going forward, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed — my goal was mainly to minimise the scope of the change, but you're right that using a RwLock is... questionable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a TODO comment for this so someone can work on it? I'll also file an issue. Thanks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Emmm if we need to add RwLock to support concurrent export maybe we should consider offer two favor of span processor so users that don't use concurrent export don't have to pay the cost |
||
config: BatchConfig, | ||
} | ||
|
||
impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> { | ||
impl<E: SpanExporter + Send + Sync + 'static, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> { | ||
alexbrt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
async fn flush(&mut self, res_channel: Option<oneshot::Sender<OTelSdkResult>>) { | ||
let export_result = self.export().await; | ||
let export_result = Self::export( | ||
self.spans.split_off(0), | ||
self.exporter.clone(), | ||
self.runtime.clone(), | ||
self.config.max_export_timeout, | ||
) | ||
.await; | ||
let task = Box::pin(async move { | ||
if let Some(channel) = res_channel { | ||
// If a response channel is provided, attempt to send the export result through it. | ||
|
@@ -243,9 +251,15 @@ | |
self.export_tasks.next().await; | ||
} | ||
|
||
let export_result = self.export().await; | ||
let batch = self.spans.split_off(0); | ||
let exporter = self.exporter.clone(); | ||
let runtime = self.runtime.clone(); | ||
lalitb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let max_export_timeout = self.config.max_export_timeout; | ||
|
||
let task = async move { | ||
if let Err(err) = export_result { | ||
if let Err(err) = | ||
Self::export(batch, exporter, runtime, max_export_timeout).await | ||
{ | ||
otel_error!( | ||
name: "BatchSpanProcessor.Export.Error", | ||
reason = format!("{}", err) | ||
|
@@ -254,6 +268,7 @@ | |
|
||
Ok(()) | ||
}; | ||
|
||
// Special case when not using concurrent exports | ||
if self.config.max_concurrent_exports == 1 { | ||
let _ = task.await; | ||
|
@@ -288,34 +303,39 @@ | |
// Stream has terminated or processor is shutdown, return to finish execution. | ||
BatchMessage::Shutdown(ch) => { | ||
self.flush(Some(ch)).await; | ||
let _ = self.exporter.shutdown(); | ||
let _ = self.exporter.write().await.shutdown(); | ||
return false; | ||
} | ||
// propagate the resource | ||
BatchMessage::SetResource(resource) => { | ||
self.exporter.set_resource(&resource); | ||
self.exporter.write().await.set_resource(&resource); | ||
} | ||
} | ||
true | ||
} | ||
|
||
async fn export(&mut self) -> OTelSdkResult { | ||
async fn export( | ||
batch: Vec<SpanData>, | ||
exporter: Arc<RwLock<E>>, | ||
runtime: R, | ||
max_export_timeout: Duration, | ||
) -> OTelSdkResult { | ||
// Batch size check for flush / shutdown. Those methods may be called | ||
// when there's no work to do. | ||
if self.spans.is_empty() { | ||
if batch.is_empty() { | ||
return Ok(()); | ||
} | ||
|
||
let export = self.exporter.export(self.spans.split_off(0)); | ||
let timeout = self.runtime.delay(self.config.max_export_timeout); | ||
let time_out = self.config.max_export_timeout; | ||
let exporter_guard = exporter.read().await; | ||
let export = exporter_guard.export(batch); | ||
let timeout = runtime.delay(max_export_timeout); | ||
|
||
pin_mut!(export); | ||
pin_mut!(timeout); | ||
|
||
match future::select(export, timeout).await { | ||
Either::Left((export_res, _)) => export_res, | ||
Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)), | ||
Either::Right((_, _)) => Err(OTelSdkError::Timeout(max_export_timeout)), | ||
} | ||
} | ||
|
||
|
@@ -368,7 +388,7 @@ | |
export_tasks: FuturesUnordered::new(), | ||
runtime: timeout_runtime, | ||
config, | ||
exporter, | ||
exporter: Arc::new(RwLock::new(exporter)), | ||
}; | ||
|
||
processor.run(messages).await | ||
|
@@ -435,6 +455,8 @@ | |
use crate::trace::{SpanData, SpanExporter}; | ||
use futures_util::Future; | ||
use std::fmt::Debug; | ||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
|
||
struct BlockingExporter<D> { | ||
|
@@ -463,6 +485,39 @@ | |
} | ||
} | ||
|
||
/// Exporter that records whether two exports overlap in time. | ||
struct TrackingExporter { | ||
/// Artificial delay to keep each export alive for a while. | ||
delay: Duration, | ||
/// Current number of in-flight exports. | ||
active: Arc<AtomicUsize>, | ||
/// Set to true the first time we see overlap. | ||
concurrent_seen: Arc<AtomicBool>, | ||
} | ||
|
||
impl Debug for TrackingExporter { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.write_str("tracking exporter") | ||
} | ||
} | ||
|
||
impl SpanExporter for TrackingExporter { | ||
async fn export(&self, _batch: Vec<SpanData>) -> crate::error::OTelSdkResult { | ||
// Increment in-flight counter and note any overlap. | ||
let inflight = self.active.fetch_add(1, Ordering::SeqCst) + 1; | ||
if inflight > 1 { | ||
self.concurrent_seen.store(true, Ordering::SeqCst); | ||
} | ||
|
||
// Keep the export "busy" for a bit. | ||
tokio::time::sleep(self.delay).await; | ||
|
||
// Decrement counter. | ||
self.active.fetch_sub(1, Ordering::SeqCst); | ||
Ok(()) | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_build_batch_span_processor_builder() { | ||
let mut env_vars = vec![ | ||
|
@@ -532,8 +587,8 @@ | |
); | ||
} | ||
|
||
// If the time_out is true, then the result suppose to ended with timeout. | ||
// otherwise the exporter should be able to export within time out duration. | ||
// If `time_out` is `true`, then the export should fail with a timeout. | ||
// Else, the exporter should be able to export within the timeout duration. | ||
async fn timeout_test_tokio(time_out: bool) { | ||
let config = BatchConfig { | ||
max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }), | ||
|
@@ -557,24 +612,92 @@ | |
assert!(shutdown_res.is_ok()); | ||
} | ||
|
||
#[test] | ||
fn test_timeout_tokio_timeout() { | ||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_timeout_tokio_timeout() { | ||
// If time_out is true, then we ask exporter to block for 60s and set timeout to 5s. | ||
// If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s. | ||
// Either way, the test should be finished within 5s. | ||
let runtime = tokio::runtime::Builder::new_multi_thread() | ||
.enable_all() | ||
.build() | ||
.unwrap(); | ||
runtime.block_on(timeout_test_tokio(true)); | ||
timeout_test_tokio(true).await; | ||
} | ||
|
||
#[test] | ||
fn test_timeout_tokio_not_timeout() { | ||
let runtime = tokio::runtime::Builder::new_multi_thread() | ||
.enable_all() | ||
.build() | ||
.unwrap(); | ||
runtime.block_on(timeout_test_tokio(false)); | ||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_timeout_tokio_not_timeout() { | ||
timeout_test_tokio(false).await; | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_concurrent_exports_expected() { | ||
// Shared state for the exporter. | ||
let active = Arc::new(AtomicUsize::new(0)); | ||
let concurrent_seen = Arc::new(AtomicBool::new(false)); | ||
|
||
let exporter = TrackingExporter { | ||
delay: Duration::from_millis(50), | ||
active: active.clone(), | ||
concurrent_seen: concurrent_seen.clone(), | ||
}; | ||
|
||
// Intentionally tiny batch-size so every span forces an export. | ||
let config = BatchConfig { | ||
max_export_batch_size: 1, | ||
max_queue_size: 16, | ||
scheduled_delay: Duration::from_secs(3600), // effectively disabled | ||
max_export_timeout: Duration::from_secs(5), | ||
max_concurrent_exports: 2, // what we want to verify | ||
}; | ||
|
||
// Spawn the processor. | ||
let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio); | ||
|
||
// Finish three spans in rapid succession. | ||
processor.on_end(new_test_export_span_data()); | ||
processor.on_end(new_test_export_span_data()); | ||
processor.on_end(new_test_export_span_data()); | ||
|
||
// Wait until everything has been exported. | ||
processor.force_flush().expect("force flush failed"); | ||
processor.shutdown().expect("shutdown failed"); | ||
|
||
// Expect at least one period with >1 export in flight. | ||
assert!( | ||
concurrent_seen.load(Ordering::SeqCst), | ||
"exports never overlapped, processor is still serialising them" | ||
); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] | ||
async fn test_exports_serial_when_max_concurrent_exports_1() { | ||
let active = Arc::new(AtomicUsize::new(0)); | ||
let concurrent_seen = Arc::new(AtomicBool::new(false)); | ||
|
||
let exporter = TrackingExporter { | ||
delay: Duration::from_millis(50), | ||
active: active.clone(), | ||
concurrent_seen: concurrent_seen.clone(), | ||
}; | ||
|
||
let config = BatchConfig { | ||
max_export_batch_size: 1, | ||
max_queue_size: 16, | ||
scheduled_delay: Duration::from_secs(3600), | ||
max_export_timeout: Duration::from_secs(5), | ||
max_concurrent_exports: 1, // what we want to verify | ||
}; | ||
|
||
let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio); | ||
|
||
// Finish several spans quickly. | ||
processor.on_end(new_test_export_span_data()); | ||
processor.on_end(new_test_export_span_data()); | ||
processor.on_end(new_test_export_span_data()); | ||
|
||
processor.force_flush().expect("force flush failed"); | ||
processor.shutdown().expect("shutdown failed"); | ||
|
||
// There must never have been more than one export in flight. | ||
assert!( | ||
!concurrent_seen.load(Ordering::SeqCst), | ||
"exports overlapped even though max_concurrent_exports was 1" | ||
); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.