Skip to content

Commit b626d7f

Browse files
committed
Address review comments
1 parent 6866a90 commit b626d7f

File tree

2 files changed

+33
-31
lines changed

2 files changed

+33
-31
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ use futures_util::{
1111
};
1212
#[cfg(feature = "spec_unstable_logs_enabled")]
1313
use opentelemetry::logs::Severity;
14-
use opentelemetry::{
15-
global, metrics::Counter, otel_debug, otel_error, otel_warn, InstrumentationScope,
16-
};
14+
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
1715

1816
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1917
use std::{cmp::min, env, sync::Mutex};
@@ -160,7 +158,9 @@ pub struct BatchLogProcessor<R: RuntimeChannel> {
160158
// Track dropped logs. We'll log this at shutdown and also emit
161159
// as a metric.
162160
dropped_logs_count: AtomicUsize,
163-
dropped_logs_metric: Counter<u64>,
161+
162+
// Track the maximum queue size that was configured for this processor
163+
max_queue_size: usize,
164164
}
165165

166166
impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
@@ -180,9 +180,12 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
180180

181181
// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
182182
if result.is_err() {
183-
// Increment dropped logs counter and metric
184-
self.dropped_logs_count.fetch_add(1, Ordering::Relaxed);
185-
self.dropped_logs_metric.add(1, &[]);
183+
// Increment dropped logs counter and metric. The first time we have top drop a log,
184+
// emit a warning.
185+
if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
186+
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
187+
message = "Beginning to drop log messages due to full exporter queue.");
188+
}
186189
}
187190
}
188191

@@ -199,11 +202,13 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
199202

200203
fn shutdown(&self) -> LogResult<()> {
201204
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
205+
let max_queue_size = self.max_queue_size;
202206
if dropped_logs > 0 {
203207
otel_warn!(
204-
name: "BatchLogProcessor.Shutdown",
205-
dropped_logs = dropped_logs,
206-
message = "Logs were dropped due to a full buffer."
208+
name: "BatchLogProcessor.LogsDropped",
209+
dropped_logs_count = dropped_logs,
210+
max_queue_size = max_queue_size,
211+
message = "Logs were dropped due to a full or closed queue. The count represents the total count of lost logs in the lifetime of the BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
207212
);
208213
}
209214

@@ -230,6 +235,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
230235
let (message_sender, message_receiver) =
231236
runtime.batch_message_channel(config.max_queue_size);
232237
let inner_runtime = runtime.clone();
238+
let max_queue_size = config.max_queue_size;
233239

234240
// Spawn worker process via user-defined spawn function.
235241
runtime.spawn(Box::pin(async move {
@@ -312,16 +318,11 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
312318
}
313319
}));
314320

315-
let dropped_logs_metric = global::meter("opentelemetry")
316-
.u64_counter("dropped_logs")
317-
.with_description("Number of logs dropped due to full buffer")
318-
.build();
319-
320321
// Return batch processor with link to worker
321322
BatchLogProcessor {
322323
message_sender,
323324
dropped_logs_count: AtomicUsize::new(0),
324-
dropped_logs_metric,
325+
max_queue_size,
325326
}
326327
}
327328

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ use futures_util::{
4545
stream::{self, FusedStream, FuturesUnordered},
4646
StreamExt as _,
4747
};
48-
use opentelemetry::global::{self};
49-
use opentelemetry::metrics::Counter;
5048
use opentelemetry::{otel_debug, otel_error, otel_warn};
5149
use opentelemetry::{
5250
trace::{TraceError, TraceResult},
@@ -231,10 +229,11 @@ impl SpanProcessor for SimpleSpanProcessor {
231229
pub struct BatchSpanProcessor<R: RuntimeChannel> {
232230
message_sender: R::Sender<BatchMessage>,
233231

234-
// Track dropped spans. We'll log this at shutdown and also emit
235-
// as a metric.
232+
// Track dropped spans
236233
dropped_spans_count: AtomicUsize,
237-
dropped_spans_metric: Counter<u64>,
234+
235+
// Track the maximum queue size that was configured for this processor
236+
max_queue_size: usize,
238237
}
239238

240239
impl<R: RuntimeChannel> fmt::Debug for BatchSpanProcessor<R> {
@@ -259,9 +258,12 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
259258

260259
// If the queue is full, and we can't buffer a span
261260
if result.is_err() {
262-
// Increment the number of dropped spans and the corresponding metric
263-
self.dropped_spans_metric.add(1, &[]);
264-
self.dropped_spans_count.fetch_add(1, Ordering::Relaxed);
261+
// Increment the number of dropped spans. If this is the first time we've had to drop,
262+
// emit a warning.
263+
if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 {
264+
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
265+
message = "Beginning to drop span messages due to full exporter queue.");
266+
}
265267
}
266268
}
267269

@@ -278,11 +280,13 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
278280

279281
fn shutdown(&self) -> TraceResult<()> {
280282
let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
283+
let max_queue_size = self.max_queue_size;
281284
if dropped_spans > 0 {
282285
otel_warn!(
283286
name: "BatchSpanProcessor.Shutdown",
284287
dropped_spans = dropped_spans,
285-
message = "Spans were dropped due to a full buffer."
288+
max_queue_size = max_queue_size,
289+
message = "Spans were dropped due to a full or closed queue. The count represents the total count of lost logs in the lifetime of the BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
286290
);
287291
}
288292

@@ -486,6 +490,8 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
486490
let (message_sender, message_receiver) =
487491
runtime.batch_message_channel(config.max_queue_size);
488492

493+
let max_queue_size = config.max_queue_size;
494+
489495
let inner_runtime = runtime.clone();
490496
// Spawn worker process via user-defined spawn function.
491497
runtime.spawn(Box::pin(async move {
@@ -509,16 +515,11 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
509515
processor.run(messages).await
510516
}));
511517

512-
let dropped_spans_metric = global::meter("opentelemetry")
513-
.u64_counter("dropped_spans")
514-
.with_description("Number of spans dropped due to full buffer")
515-
.build();
516-
517518
// Return batch processor with link to worker
518519
BatchSpanProcessor {
519520
message_sender,
520521
dropped_spans_count: AtomicUsize::new(0),
521-
dropped_spans_metric,
522+
max_queue_size,
522523
}
523524
}
524525

0 commit comments

Comments
 (0)