diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 7e7031d2b0..db26a19dd6 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -6,6 +6,7 @@ - *Fix* SpanProcessor::on_start is no longer called on non recording spans - **Fix**: Restore true parallel exports in the async-native `BatchSpanProcessor` by honoring `OTEL_BSP_MAX_CONCURRENT_EXPORTS` ([#2959](https://github.com/open-telemetry/opentelemetry-rust/pull/3028)). A regression in [#2685](https://github.com/open-telemetry/opentelemetry-rust/pull/2685) inadvertently awaited the `export()` future directly in `opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs` instead of spawning it on the runtime, forcing all exports to run sequentially. - **Feature**: Added `Clone` implementation to `SdkLogger` for API consistency with `SdkTracer` ([#3058](https://github.com/open-telemetry/opentelemetry-rust/issues/3058)). +- **Fix**: batch size accounting in BatchSpanProcessor when queue is full ([#3089](https://github.com/open-telemetry/opentelemetry-rust/pull/3089)). ## 0.30.0 diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 67862ecb8a..17b47bef80 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -522,45 +522,49 @@ impl SpanProcessor for BatchSpanProcessor { ); return; } - let result = self.span_sender.try_send(span); - if result.is_err() { - // Increment dropped span count. The first time we have to drop a span, - // emit a warning. - if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 { - otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted", - message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped."); - } - } - // At this point, sending the span to the data channel was successful. - // Increment the current batch size and check if it has reached the max export batch size. - if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size - { - // Check if the a control message for exporting spans is already sent to the worker thread. - // If not, send a control message to export spans. - // `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message. - - if !self.export_span_message_sent.load(Ordering::Relaxed) { - // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line. - // Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false. - // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures. - // We could have used compare_exchange as well here, but it's more verbose than swap. - if !self.export_span_message_sent.swap(true, Ordering::Relaxed) { - match self.message_sender.try_send(BatchMessage::ExportSpan( - self.export_span_message_sent.clone(), - )) { - Ok(_) => { - // Control message sent successfully. - } - Err(_err) => { - // TODO: Log error - // If the control message could not be sent, reset the `export_span_message_sent` flag. - self.export_span_message_sent - .store(false, Ordering::Relaxed); + match self.span_sender.try_send(span) { + Ok(_) => { + // At this point, sending the span to the data channel was successful. + // Increment the current batch size and check if it has reached the max export batch size. + if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 + >= self.max_export_batch_size + { + // Check if the a control message for exporting spans is already sent to the worker thread. + // If not, send a control message to export spans. + // `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message. + + if !self.export_span_message_sent.load(Ordering::Relaxed) { + // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line. + // Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false. + // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures. + // We could have used compare_exchange as well here, but it's more verbose than swap. + if !self.export_span_message_sent.swap(true, Ordering::Relaxed) { + match self.message_sender.try_send(BatchMessage::ExportSpan( + self.export_span_message_sent.clone(), + )) { + Ok(_) => { + // Control message sent successfully. + } + Err(_err) => { + // TODO: Log error + // If the control message could not be sent, reset the `export_span_message_sent` flag. + self.export_span_message_sent + .store(false, Ordering::Relaxed); + } + } } } } } + Err(_) => { + // Increment dropped span count. The first time we have to drop a span, + // emit a warning. + if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 { + otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted", + message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped."); + } + } } } @@ -1227,6 +1231,10 @@ mod tests { // Verify dropped spans count (if accessible in your implementation) let dropped_count = processor.dropped_span_count.load(Ordering::Relaxed); assert_eq!(dropped_count, 1, "Unexpected number of dropped spans"); + + // Verify current batch size + let current_batch_size = processor.current_batch_size.load(Ordering::Relaxed); + assert_eq!(current_batch_size, 0, "Unexpected current batch size"); } #[test]