Skip to content

Commit 290f4bc

Browse files
dojiongcijothomas
andauthored
fix: batch size accounting in BatchSpanProcessor when queue is full (#3089)
Co-authored-by: Cijo Thomas <[email protected]>
1 parent 0462369 commit 290f4bc

File tree

2 files changed

+43
-34
lines changed

2 files changed

+43
-34
lines changed

opentelemetry-sdk/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
- *Fix* SpanProcessor::on_start is no longer called on non recording spans
77
- **Fix**: Restore true parallel exports in the async-native `BatchSpanProcessor` by honoring `OTEL_BSP_MAX_CONCURRENT_EXPORTS` ([#2959](https://github.com/open-telemetry/opentelemetry-rust/pull/3028)). A regression in [#2685](https://github.com/open-telemetry/opentelemetry-rust/pull/2685) inadvertently awaited the `export()` future directly in `opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs` instead of spawning it on the runtime, forcing all exports to run sequentially.
88
- **Feature**: Added `Clone` implementation to `SdkLogger` for API consistency with `SdkTracer` ([#3058](https://github.com/open-telemetry/opentelemetry-rust/issues/3058)).
9+
- **Fix**: batch size accounting in BatchSpanProcessor when queue is full ([#3089](https://github.com/open-telemetry/opentelemetry-rust/pull/3089)).
910

1011
## 0.30.0
1112

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -522,45 +522,49 @@ impl SpanProcessor for BatchSpanProcessor {
522522
);
523523
return;
524524
}
525-
let result = self.span_sender.try_send(span);
526525

527-
if result.is_err() {
528-
// Increment dropped span count. The first time we have to drop a span,
529-
// emit a warning.
530-
if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
531-
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
532-
message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
533-
}
534-
}
535-
// At this point, sending the span to the data channel was successful.
536-
// Increment the current batch size and check if it has reached the max export batch size.
537-
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
538-
{
539-
// Check if the a control message for exporting spans is already sent to the worker thread.
540-
// If not, send a control message to export spans.
541-
// `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message.
542-
543-
if !self.export_span_message_sent.load(Ordering::Relaxed) {
544-
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
545-
// Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false.
546-
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
547-
// We could have used compare_exchange as well here, but it's more verbose than swap.
548-
if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
549-
match self.message_sender.try_send(BatchMessage::ExportSpan(
550-
self.export_span_message_sent.clone(),
551-
)) {
552-
Ok(_) => {
553-
// Control message sent successfully.
554-
}
555-
Err(_err) => {
556-
// TODO: Log error
557-
// If the control message could not be sent, reset the `export_span_message_sent` flag.
558-
self.export_span_message_sent
559-
.store(false, Ordering::Relaxed);
526+
match self.span_sender.try_send(span) {
527+
Ok(_) => {
528+
// At this point, sending the span to the data channel was successful.
529+
// Increment the current batch size and check if it has reached the max export batch size.
530+
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
531+
>= self.max_export_batch_size
532+
{
533+
// Check if the a control message for exporting spans is already sent to the worker thread.
534+
// If not, send a control message to export spans.
535+
// `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message.
536+
537+
if !self.export_span_message_sent.load(Ordering::Relaxed) {
538+
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
539+
// Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false.
540+
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
541+
// We could have used compare_exchange as well here, but it's more verbose than swap.
542+
if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
543+
match self.message_sender.try_send(BatchMessage::ExportSpan(
544+
self.export_span_message_sent.clone(),
545+
)) {
546+
Ok(_) => {
547+
// Control message sent successfully.
548+
}
549+
Err(_err) => {
550+
// TODO: Log error
551+
// If the control message could not be sent, reset the `export_span_message_sent` flag.
552+
self.export_span_message_sent
553+
.store(false, Ordering::Relaxed);
554+
}
555+
}
560556
}
561557
}
562558
}
563559
}
560+
Err(_) => {
561+
// Increment dropped span count. The first time we have to drop a span,
562+
// emit a warning.
563+
if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
564+
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
565+
message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
566+
}
567+
}
564568
}
565569
}
566570

@@ -1227,6 +1231,10 @@ mod tests {
12271231
// Verify dropped spans count (if accessible in your implementation)
12281232
let dropped_count = processor.dropped_span_count.load(Ordering::Relaxed);
12291233
assert_eq!(dropped_count, 1, "Unexpected number of dropped spans");
1234+
1235+
// Verify current batch size
1236+
let current_batch_size = processor.current_batch_size.load(Ordering::Relaxed);
1237+
assert_eq!(current_batch_size, 0, "Unexpected current batch size");
12301238
}
12311239

12321240
#[test]

0 commit comments

Comments
 (0)