Skip to content

Commit 4758699

Browse files
committed
Fix OOM in span_processor_with_async_runtime::BatchSpanProcessor
1 parent 52cd0e9 commit 4758699

File tree

1 file changed

+62
-44
lines changed

1 file changed

+62
-44
lines changed

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

Lines changed: 62 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,20 @@ use crate::trace::Span;
66
use crate::trace::SpanProcessor;
77
use crate::trace::{SpanData, SpanExporter};
88
use futures_channel::oneshot;
9-
use futures_util::pin_mut;
109
use futures_util::{
1110
future::{self, BoxFuture, Either},
12-
select,
11+
pin_mut, select,
1312
stream::{self, FusedStream, FuturesUnordered},
14-
StreamExt as _,
13+
FutureExt as _, StreamExt as _, TryFutureExt as _,
1514
};
1615
use opentelemetry::Context;
1716
use opentelemetry::{otel_debug, otel_error, otel_warn};
17+
use std::collections::VecDeque;
1818
use std::fmt;
19+
use std::future::Future;
1920
use std::sync::atomic::{AtomicUsize, Ordering};
2021
use std::sync::Arc;
22+
use tokio::sync::RwLock;
2123

2224
/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
2325
/// them at a preconfigured interval.
@@ -185,17 +187,20 @@ enum BatchMessage {
185187
}
186188

187189
struct BatchSpanProcessorInternal<E, R> {
188-
spans: Vec<SpanData>,
190+
spans: VecDeque<SpanData>,
189191
export_tasks: FuturesUnordered<BoxFuture<'static, OTelSdkResult>>,
190192
runtime: R,
191-
exporter: E,
193+
exporter: Arc<RwLock<E>>,
192194
config: BatchConfig,
193195
}
194196

195-
impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
197+
impl<E, R> BatchSpanProcessorInternal<E, R>
198+
where
199+
E: SpanExporter + 'static,
200+
R: RuntimeChannel,
201+
{
196202
async fn flush(&mut self, res_channel: Option<oneshot::Sender<OTelSdkResult>>) {
197-
let export_result = self.export().await;
198-
let task = Box::pin(async move {
203+
let task = self.export().map(|export_result| {
199204
if let Some(channel) = res_channel {
200205
// If a response channel is provided, attempt to send the export result through it.
201206
if let Err(result) = channel.send(export_result) {
@@ -221,7 +226,7 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
221226
if self.config.max_concurrent_exports == 1 {
222227
let _ = task.await;
223228
} else {
224-
self.export_tasks.push(task);
229+
self.export_tasks.push(task.boxed());
225230
while self.export_tasks.next().await.is_some() {}
226231
}
227232
}
@@ -233,27 +238,32 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
233238
match message {
234239
// Span has finished, add to buffer of pending spans.
235240
BatchMessage::ExportSpan(span) => {
236-
self.spans.push(span);
241+
if self.spans.len() == self.config.max_export_batch_size {
242+
// Replace the oldest span with the new span to avoid suspending messages
243+
// processing.
244+
self.spans.pop_front();
245+
}
246+
self.spans.push_back(span);
237247

238248
if self.spans.len() == self.config.max_export_batch_size {
239249
// If concurrent exports are saturated, wait for one to complete.
240250
if !self.export_tasks.is_empty()
241251
&& self.export_tasks.len() == self.config.max_concurrent_exports
242252
{
253+
// TODO: Refactor to avoid stopping message processing to not delay
254+
// shutdown/resource set because of export saturation.
243255
self.export_tasks.next().await;
244256
}
245257

246-
let export_result = self.export().await;
247-
let task = async move {
248-
if let Err(err) = export_result {
249-
otel_error!(
250-
name: "BatchSpanProcessor.Export.Error",
251-
reason = format!("{}", err)
252-
);
253-
}
258+
let task = self.export().or_else(|err| async move {
259+
otel_error!(
260+
name: "BatchSpanProcessor.Export.Error",
261+
reason = format!("{err}"),
262+
);
254263

255264
Ok(())
256-
};
265+
});
266+
257267
// Special case when not using concurrent exports
258268
if self.config.max_concurrent_exports == 1 {
259269
let _ = task.await;
@@ -288,34 +298,42 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
288298
// Stream has terminated or processor is shutdown, return to finish execution.
289299
BatchMessage::Shutdown(ch) => {
290300
self.flush(Some(ch)).await;
291-
let _ = self.exporter.shutdown();
301+
let _ = self.exporter.write().await.shutdown();
292302
return false;
293303
}
294304
// propagate the resource
295305
BatchMessage::SetResource(resource) => {
296-
self.exporter.set_resource(&resource);
306+
self.exporter.write().await.set_resource(&resource);
297307
}
298308
}
299309
true
300310
}
301311

302-
async fn export(&mut self) -> OTelSdkResult {
303-
// Batch size check for flush / shutdown. Those methods may be called
304-
// when there's no work to do.
305-
if self.spans.is_empty() {
306-
return Ok(());
307-
}
308-
309-
let export = self.exporter.export(self.spans.split_off(0));
310-
let timeout = self.runtime.delay(self.config.max_export_timeout);
312+
#[allow(impl_trait_overcaptures)] // MSRV compatibility.
313+
fn export(&mut self) -> impl Future<Output = OTelSdkResult> {
314+
let spans = self.spans.drain(..).collect::<Vec<_>>();
315+
let exporter = self.exporter.clone();
316+
let runtime = self.runtime.clone();
311317
let time_out = self.config.max_export_timeout;
312318

313-
pin_mut!(export);
314-
pin_mut!(timeout);
319+
async move {
320+
// Batch size check for flush / shutdown. Those methods may be called
321+
// when there's no work to do.
322+
if spans.is_empty() {
323+
return Ok(());
324+
}
325+
326+
let exporter = exporter.read().await;
327+
let export = exporter.export(spans);
328+
let timeout = runtime.delay(time_out);
329+
330+
pin_mut!(export);
331+
pin_mut!(timeout);
315332

316-
match future::select(export, timeout).await {
317-
Either::Left((export_res, _)) => export_res,
318-
Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)),
333+
match future::select(export, timeout).await {
334+
Either::Left((export_res, _)) => export_res,
335+
Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)),
336+
}
319337
}
320338
}
321339

@@ -328,14 +346,14 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
328346
// An export task completed; do we need to do anything with it?
329347
},
330348
message = messages.next() => {
331-
match message {
332-
Some(message) => {
333-
if !self.process_message(message).await {
334-
break;
335-
}
336-
},
337-
None => break,
349+
if let Some(m) = message {
350+
if self.process_message(m).await {
351+
continue;
352+
}
338353
}
354+
355+
// Shutdown if there's no message, or the message indicates shutdown.
356+
break;
339357
},
340358
}
341359
}
@@ -364,11 +382,11 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
364382

365383
let messages = Box::pin(stream::select(message_receiver, ticker));
366384
let processor = BatchSpanProcessorInternal {
367-
spans: Vec::new(),
385+
spans: VecDeque::new(),
368386
export_tasks: FuturesUnordered::new(),
369387
runtime: timeout_runtime,
370388
config,
371-
exporter,
389+
exporter: Arc::new(RwLock::new(exporter)),
372390
};
373391

374392
processor.run(messages).await

0 commit comments

Comments
 (0)