Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- *Fix* SpanProcessor::on_start is no longer called on non recording spans
- **Fix**: Restore true parallel exports in the async-native `BatchSpanProcessor` by honoring `OTEL_BSP_MAX_CONCURRENT_EXPORTS` ([#2959](https://github.com/open-telemetry/opentelemetry-rust/pull/3028)). A regression in [#2685](https://github.com/open-telemetry/opentelemetry-rust/pull/2685) inadvertently awaited the `export()` future directly in `opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs` instead of spawning it on the runtime, forcing all exports to run sequentially.
- **Feature**: Added `Clone` implementation to `SdkLogger` for API consistency with `SdkTracer` ([#3058](https://github.com/open-telemetry/opentelemetry-rust/issues/3058)).
- **Fix**: batch size accounting in BatchSpanProcessor when queue is full ([#3089](https://github.com/open-telemetry/opentelemetry-rust/pull/3089)).

## 0.30.0

Expand Down
76 changes: 42 additions & 34 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,45 +522,49 @@ impl SpanProcessor for BatchSpanProcessor {
);
return;
}
let result = self.span_sender.try_send(span);

if result.is_err() {
// Increment dropped span count. The first time we have to drop a span,
// emit a warning.
if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
}
}
// At this point, sending the span to the data channel was successful.
// Increment the current batch size and check if it has reached the max export batch size.
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
{
// Check if the a control message for exporting spans is already sent to the worker thread.
// If not, send a control message to export spans.
// `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message.

if !self.export_span_message_sent.load(Ordering::Relaxed) {
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
// Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false.
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
// We could have used compare_exchange as well here, but it's more verbose than swap.
if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
match self.message_sender.try_send(BatchMessage::ExportSpan(
self.export_span_message_sent.clone(),
)) {
Ok(_) => {
// Control message sent successfully.
}
Err(_err) => {
// TODO: Log error
// If the control message could not be sent, reset the `export_span_message_sent` flag.
self.export_span_message_sent
.store(false, Ordering::Relaxed);
match self.span_sender.try_send(span) {
Ok(_) => {
// At this point, sending the span to the data channel was successful.
// Increment the current batch size and check if it has reached the max export batch size.
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
>= self.max_export_batch_size
{
// Check if the a control message for exporting spans is already sent to the worker thread.
// If not, send a control message to export spans.
// `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message.

if !self.export_span_message_sent.load(Ordering::Relaxed) {
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
// Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false.
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
// We could have used compare_exchange as well here, but it's more verbose than swap.
if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
match self.message_sender.try_send(BatchMessage::ExportSpan(
self.export_span_message_sent.clone(),
)) {
Ok(_) => {
// Control message sent successfully.
}
Err(_err) => {
// TODO: Log error
// If the control message could not be sent, reset the `export_span_message_sent` flag.
self.export_span_message_sent
.store(false, Ordering::Relaxed);
}
}
}
}
}
}
Err(_) => {
// Increment dropped span count. The first time we have to drop a span,
// emit a warning.
if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
}
}
}
}

Expand Down Expand Up @@ -1227,6 +1231,10 @@ mod tests {
// Verify dropped spans count (if accessible in your implementation)
let dropped_count = processor.dropped_span_count.load(Ordering::Relaxed);
assert_eq!(dropped_count, 1, "Unexpected number of dropped spans");

// Verify current batch size
let current_batch_size = processor.current_batch_size.load(Ordering::Relaxed);
assert_eq!(current_batch_size, 0, "Unexpected current batch size");
}

#[test]
Expand Down
Loading