Skip to content

Commit 6c6c596

Browse files
committed
Rollback blocking export fix
1 parent 7a5d76f commit 6c6c596

File tree

2 files changed

+45
-67
lines changed

2 files changed

+45
-67
lines changed

opentelemetry-sdk/Cargo.toml

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ serde = { workspace = true, features = ["derive", "rc"], optional = true }
2323
serde_json = { workspace = true, optional = true }
2424
thiserror = { workspace = true }
2525
url = { workspace = true, optional = true }
26-
tokio = { workspace = true, optional = true }
26+
tokio = { workspace = true, features = ["rt", "time"], optional = true }
2727
tokio-stream = { workspace = true, optional = true }
2828
http = { workspace = true, optional = true }
2929
tracing = {workspace = true, optional = true}
@@ -48,21 +48,9 @@ logs = ["opentelemetry/logs", "serde_json"]
4848
spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"]
4949
metrics = ["opentelemetry/metrics", "glob"]
5050
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
51-
experimental_async_runtime = ["tokio", "tokio/sync"]
52-
rt-tokio = [
53-
"tokio",
54-
"tokio-stream",
55-
"tokio/rt",
56-
"tokio/time",
57-
"experimental_async_runtime"
58-
]
59-
rt-tokio-current-thread = [
60-
"tokio",
61-
"tokio-stream",
62-
"tokio/rt",
63-
"tokio/time",
64-
"experimental_async_runtime"
65-
]
51+
experimental_async_runtime = []
52+
rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"]
53+
rt-tokio-current-thread = ["tokio", "tokio-stream", "experimental_async_runtime"]
6654
rt-async-std = ["async-std", "experimental_async_runtime"]
6755
internal-logs = ["tracing"]
6856
experimental_metrics_periodicreader_with_async_runtime = ["metrics"]

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

Lines changed: 41 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,19 @@ use crate::trace::Span;
66
use crate::trace::SpanProcessor;
77
use crate::trace::{SpanData, SpanExporter};
88
use futures_channel::oneshot;
9+
use futures_util::pin_mut;
910
use futures_util::{
1011
future::{self, BoxFuture, Either},
11-
pin_mut, select,
12+
select,
1213
stream::{self, FusedStream, FuturesUnordered},
13-
FutureExt as _, StreamExt as _, TryFutureExt as _,
14+
StreamExt as _,
1415
};
1516
use opentelemetry::Context;
1617
use opentelemetry::{otel_debug, otel_error, otel_warn};
1718
use std::collections::VecDeque;
1819
use std::fmt;
19-
use std::future::Future;
2020
use std::sync::atomic::{AtomicUsize, Ordering};
2121
use std::sync::Arc;
22-
use tokio::sync::RwLock;
2322

2423
/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
2524
/// them at a preconfigured interval.
@@ -190,17 +189,14 @@ struct BatchSpanProcessorInternal<E, R> {
190189
spans: VecDeque<SpanData>,
191190
export_tasks: FuturesUnordered<BoxFuture<'static, OTelSdkResult>>,
192191
runtime: R,
193-
exporter: Arc<RwLock<E>>,
192+
exporter: E,
194193
config: BatchConfig,
195194
}
196195

197-
impl<E, R> BatchSpanProcessorInternal<E, R>
198-
where
199-
E: SpanExporter + 'static,
200-
R: RuntimeChannel,
201-
{
196+
impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
202197
async fn flush(&mut self, res_channel: Option<oneshot::Sender<OTelSdkResult>>) {
203-
let task = self.export().map(|export_result| {
198+
let export_result = self.export().await; // TODO: Move execution to `export_tasks`.
199+
let task = Box::pin(async move {
204200
if let Some(channel) = res_channel {
205201
// If a response channel is provided, attempt to send the export result through it.
206202
if let Err(result) = channel.send(export_result) {
@@ -226,7 +222,7 @@ where
226222
if self.config.max_concurrent_exports == 1 {
227223
let _ = task.await;
228224
} else {
229-
self.export_tasks.push(task.boxed());
225+
self.export_tasks.push(task);
230226
while self.export_tasks.next().await.is_some() {}
231227
}
232228
}
@@ -255,15 +251,17 @@ where
255251
self.export_tasks.next().await;
256252
}
257253

258-
let task = self.export().or_else(|err| async move {
259-
otel_error!(
260-
name: "BatchSpanProcessor.Export.Error",
261-
reason = format!("{err}"),
262-
);
254+
let export_result = self.export().await; // TODO: Move execution to `export_tasks`.
255+
let task = async move {
256+
if let Err(err) = export_result {
257+
otel_error!(
258+
name: "BatchSpanProcessor.Export.Error",
259+
reason = format!("{}", err)
260+
);
261+
}
263262

264263
Ok(())
265-
});
266-
264+
};
267265
// Special case when not using concurrent exports
268266
if self.config.max_concurrent_exports == 1 {
269267
let _ = task.await;
@@ -298,42 +296,34 @@ where
298296
// Stream has terminated or processor is shutdown, return to finish execution.
299297
BatchMessage::Shutdown(ch) => {
300298
self.flush(Some(ch)).await;
301-
let _ = self.exporter.write().await.shutdown();
299+
let _ = self.exporter.shutdown();
302300
return false;
303301
}
304302
// propagate the resource
305303
BatchMessage::SetResource(resource) => {
306-
self.exporter.write().await.set_resource(&resource);
304+
self.exporter.set_resource(&resource);
307305
}
308306
}
309307
true
310308
}
311309

312-
#[allow(impl_trait_overcaptures)] // MSRV compatibility.
313-
fn export(&mut self) -> impl Future<Output = OTelSdkResult> {
314-
let spans = self.spans.drain(..).collect::<Vec<_>>();
315-
let exporter = self.exporter.clone();
316-
let runtime = self.runtime.clone();
317-
let time_out = self.config.max_export_timeout;
318-
319-
async move {
320-
// Batch size check for flush / shutdown. Those methods may be called
321-
// when there's no work to do.
322-
if spans.is_empty() {
323-
return Ok(());
324-
}
310+
async fn export(&mut self) -> OTelSdkResult {
311+
// Batch size check for flush / shutdown. Those methods may be called
312+
// when there's no work to do.
313+
if self.spans.is_empty() {
314+
return Ok(());
315+
}
325316

326-
let exporter = exporter.read().await;
327-
let export = exporter.export(spans);
328-
let timeout = runtime.delay(time_out);
317+
let export = self.exporter.export(self.spans.split_off(0));
318+
let timeout = self.runtime.delay(self.config.max_export_timeout);
319+
let time_out = self.config.max_export_timeout;
329320

330-
pin_mut!(export);
331-
pin_mut!(timeout);
321+
pin_mut!(export);
322+
pin_mut!(timeout);
332323

333-
match future::select(export, timeout).await {
334-
Either::Left((export_res, _)) => export_res,
335-
Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)),
336-
}
324+
match future::select(export, timeout).await {
325+
Either::Left((export_res, _)) => export_res,
326+
Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)),
337327
}
338328
}
339329

@@ -346,14 +336,14 @@ where
346336
// An export task completed; do we need to do anything with it?
347337
},
348338
message = messages.next() => {
349-
if let Some(m) = message {
350-
if self.process_message(m).await {
351-
continue;
352-
}
339+
match message {
340+
Some(message) => {
341+
if !self.process_message(message).await {
342+
break;
343+
}
344+
},
345+
None => break,
353346
}
354-
355-
// Shutdown if there's no message, or the message indicates shutdown.
356-
break;
357347
},
358348
}
359349
}
@@ -386,7 +376,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
386376
export_tasks: FuturesUnordered::new(),
387377
runtime: timeout_runtime,
388378
config,
389-
exporter: Arc::new(RwLock::new(exporter)),
379+
exporter,
390380
};
391381

392382
processor.run(messages).await

0 commit comments

Comments
 (0)