File tree Expand file tree Collapse file tree 2 files changed +13
-7
lines changed
trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal
trace/src/main/java/io/opentelemetry/sdk/trace/export Expand file tree Collapse file tree 2 files changed +13
-7
lines changed Original file line number Diff line number Diff line change @@ -64,21 +64,23 @@ public static long capacity(Queue<?> queue) {
6464 * @throws IllegalArgumentException if maxExportBatchSize is negative
6565 */
6666 @ SuppressWarnings ("unchecked" )
67- public static <T > void drain (Queue <T > queue , int limit , Consumer <T > consumer ) {
67+ public static <T > int drain (Queue <T > queue , int limit , Consumer <T > consumer ) {
6868 if (queue instanceof MessagePassingQueue ) {
69- ((MessagePassingQueue <T >) queue ).drain (consumer ::accept , limit );
69+ return ((MessagePassingQueue <T >) queue ).drain (consumer ::accept , limit );
7070 } else {
71- drainNonJcQueue (queue , limit , consumer );
71+ return drainNonJcQueue (queue , limit , consumer );
7272 }
7373 }
7474
75- private static <T > void drainNonJcQueue (
75+ private static <T > int drainNonJcQueue (
7676 Queue <T > queue , int maxExportBatchSize , Consumer <T > consumer ) {
7777 int polledCount = 0 ;
7878 T item ;
79- while (polledCount ++ < maxExportBatchSize && (item = queue .poll ()) != null ) {
79+ while (polledCount < maxExportBatchSize && (item = queue .poll ()) != null ) {
8080 consumer .accept (item );
81+ ++polledCount ;
8182 }
83+ return polledCount ;
8284 }
8385
8486 private JcTools () {}
Original file line number Diff line number Diff line change @@ -173,6 +173,7 @@ private static final class Worker implements Runnable {
173173 private long nextExportTime ;
174174
175175 private final Queue <ReadableSpan > queue ;
176+ private final AtomicInteger queueSize = new AtomicInteger ();
176177 // When waiting on the spans queue, exporter thread sets this atomic to the number of more
177178 // spans it needs before doing an export. Writer threads would then wait for the queue to reach
178179 // spansNeeded size before notifying the exporter thread about new entries.
@@ -237,7 +238,7 @@ private void addSpan(ReadableSpan span) {
237238 if (!queue .offer (span )) {
238239 processedSpansCounter .add (1 , droppedAttrs );
239240 } else {
240- if (queue . size () >= spansNeeded .get ()) {
241+ if (queueSize . incrementAndGet () >= spansNeeded .get ()) {
241242 signal .offer (true );
242243 }
243244 }
@@ -251,8 +252,11 @@ public void run() {
251252 if (flushRequested .get () != null ) {
252253 flush ();
253254 }
254- JcTools .drain (
255+ int drained = JcTools .drain (
255256 queue , maxExportBatchSize - batch .size (), span -> batch .add (span .toSpanData ()));
257+ if (drained > 0 ) {
258+ queueSize .accumulateAndGet (-drained , Integer ::sum );
259+ }
256260
257261 if (batch .size () >= maxExportBatchSize || System .nanoTime () >= nextExportTime ) {
258262 exportCurrentBatch ();
You can’t perform that action at this time.
0 commit comments