Skip to content
40 changes: 33 additions & 7 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use opentelemetry::logs::Severity;
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};

use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cmp::min, env, sync::Mutex};
use std::{
fmt::{self, Debug, Formatter},
Expand Down Expand Up @@ -154,6 +154,13 @@
/// them at a pre-configured interval.
pub struct BatchLogProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,

// Track dropped logs. We'll log this at shutdown and also emit
// as a metric.
dropped_logs_count: AtomicUsize,

// Track the maximum queue size that was configured for this processor
max_queue_size: usize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

odd that we have to store this here just for logging purposes, but not an issue!

Copy link
Member

@lalitb lalitb Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can avoid this by moving the logging of dropped logs (otel_warn!) from the shutdown() method to the worker's Shutdown message processing. Also, dropped_logs_count to be shared with the shutdown worker and the processor object. Haven't tried, but if it seems to be complex, we can park it for separate PR.

}

impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
Expand All @@ -172,11 +179,13 @@
)));

// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
if let Err(err) = result {
otel_error!(
name: "BatchLogProcessor.Export.Error",
error = format!("{}", err)
);
if result.is_err() {
// Increment dropped logs counter and metric. The first time we have top drop a log,
// emit a warning.
if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
message = "Beginning to drop log messages due to full exporter queue.");
}

Check warning on line 188 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L188

Added line #L188 was not covered by tests
}
}

Expand All @@ -192,6 +201,17 @@
}

fn shutdown(&self) -> LogResult<()> {
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_logs > 0 {
otel_warn!(

Check warning on line 207 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L207

Added line #L207 was not covered by tests
name: "BatchLogProcessor.LogsDropped",
dropped_logs_count = dropped_logs,
max_queue_size = max_queue_size,

Check warning on line 210 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L209-L210

Added lines #L209 - L210 were not covered by tests
message = "Logs were dropped due to a full or closed queue. The count represents the total count of lost logs in the lifetime of the BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
);
}

let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))
Expand All @@ -215,6 +235,7 @@
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);
let inner_runtime = runtime.clone();
let max_queue_size = config.max_queue_size;

// Spawn worker process via user-defined spawn function.
runtime.spawn(Box::pin(async move {
Expand Down Expand Up @@ -296,8 +317,13 @@
}
}
}));

// Return batch processor with link to worker
BatchLogProcessor { message_sender }
BatchLogProcessor {
message_sender,
dropped_logs_count: AtomicUsize::new(0),
max_queue_size,
}
}

/// Create a new batch processor builder
Expand Down
41 changes: 34 additions & 7 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@
stream::{self, FusedStream, FuturesUnordered},
StreamExt as _,
};
use opentelemetry::{otel_debug, otel_error};
use opentelemetry::{otel_debug, otel_error, otel_warn};
use opentelemetry::{
trace::{TraceError, TraceResult},
Context,
};
use std::cmp::min;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::{env, fmt, str::FromStr, time::Duration};

Expand Down Expand Up @@ -227,6 +228,12 @@
/// [`async-std`]: https://async.rs
pub struct BatchSpanProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,

// Track dropped spans
dropped_spans_count: AtomicUsize,

// Track the maximum queue size that was configured for this processor
max_queue_size: usize,
}

impl<R: RuntimeChannel> fmt::Debug for BatchSpanProcessor<R> {
Expand All @@ -249,11 +256,14 @@

let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));

if let Err(err) = result {
otel_debug!(
name: "BatchSpanProcessor.OnEnd.ExportQueueingFailed",
reason = format!("{:?}", TraceError::Other(err.into()))
);
// If the queue is full, and we can't buffer a span
if result.is_err() {
// Increment the number of dropped spans. If this is the first time we've had to drop,
// emit a warning.
if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
message = "Beginning to drop span messages due to full exporter queue.");
}

Check warning on line 266 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L263-L266

Added lines #L263 - L266 were not covered by tests
}
}

Expand All @@ -269,6 +279,17 @@
}

fn shutdown(&self) -> TraceResult<()> {
let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_spans > 0 {
otel_warn!(

Check warning on line 285 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L285

Added line #L285 was not covered by tests
name: "BatchSpanProcessor.Shutdown",
dropped_spans = dropped_spans,
max_queue_size = max_queue_size,

Check warning on line 288 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L287-L288

Added lines #L287 - L288 were not covered by tests
message = "Spans were dropped due to a full or closed queue. The count represents the total count of lost logs in the lifetime of the BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
);
}

let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))
Expand Down Expand Up @@ -469,6 +490,8 @@
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);

let max_queue_size = config.max_queue_size;

let inner_runtime = runtime.clone();
// Spawn worker process via user-defined spawn function.
runtime.spawn(Box::pin(async move {
Expand All @@ -493,7 +516,11 @@
}));

// Return batch processor with link to worker
BatchSpanProcessor { message_sender }
BatchSpanProcessor {
message_sender,
dropped_spans_count: AtomicUsize::new(0),
max_queue_size,
}
}

/// Create a new batch processor builder
Expand Down
Loading