Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,23 @@
* @throws IllegalArgumentException if maxExportBatchSize is negative
*/
@SuppressWarnings("unchecked")
public static <T> void drain(Queue<T> queue, int limit, Consumer<T> consumer) {
public static <T> int drain(Queue<T> queue, int limit, Consumer<T> consumer) {
if (queue instanceof MessagePassingQueue) {
((MessagePassingQueue<T>) queue).drain(consumer::accept, limit);
return ((MessagePassingQueue<T>) queue).drain(consumer::accept, limit);
} else {
drainNonJcQueue(queue, limit, consumer);
return drainNonJcQueue(queue, limit, consumer);

Check warning on line 71 in sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java

View check run for this annotation

Codecov / codecov/patch

sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java#L71

Added line #L71 was not covered by tests
}
}

private static <T> void drainNonJcQueue(
private static <T> int drainNonJcQueue(
Queue<T> queue, int maxExportBatchSize, Consumer<T> consumer) {
int polledCount = 0;
T item;
while (polledCount++ < maxExportBatchSize && (item = queue.poll()) != null) {
while (polledCount < maxExportBatchSize && (item = queue.poll()) != null) {
consumer.accept(item);
++polledCount;

Check warning on line 81 in sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java

View check run for this annotation

Codecov / codecov/patch

sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java#L81

Added line #L81 was not covered by tests
}
return polledCount;

Check warning on line 83 in sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java

View check run for this annotation

Codecov / codecov/patch

sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java#L83

Added line #L83 was not covered by tests
}

private JcTools() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ private static final class Worker implements Runnable {
private long nextExportTime;

private final Queue<ReadableSpan> queue;
private final AtomicInteger queueSize = new AtomicInteger();
// When waiting on the spans queue, exporter thread sets this atomic to the number of more
// spans it needs before doing an export. Writer threads would then wait for the queue to reach
// spansNeeded size before notifying the exporter thread about new entries.
Expand Down Expand Up @@ -237,7 +238,7 @@ private void addSpan(ReadableSpan span) {
if (!queue.offer(span)) {
processedSpansCounter.add(1, droppedAttrs);
} else {
if (queue.size() >= spansNeeded.get()) {
if (queueSize.incrementAndGet() >= spansNeeded.get()) {
signal.offer(true);
}
}
Expand All @@ -251,8 +252,7 @@ public void run() {
if (flushRequested.get() != null) {
flush();
}
JcTools.drain(
queue, maxExportBatchSize - batch.size(), span -> batch.add(span.toSpanData()));
drain(maxExportBatchSize - batch.size());

if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) {
exportCurrentBatch();
Expand All @@ -274,13 +274,17 @@ public void run() {
}
}

private int drain(int limit) {
int drained = JcTools.drain(queue, limit, span -> batch.add(span.toSpanData()));
queueSize.addAndGet(-drained);
return drained;
}

private void flush() {
int spansToFlush = queue.size();
int spansToFlush = queueSize.get();
while (spansToFlush > 0) {
ReadableSpan span = queue.poll();
assert span != null;
batch.add(span.toSpanData());
spansToFlush--;
int drained = drain(maxExportBatchSize - batch.size());
spansToFlush -= drained;
if (batch.size() >= maxExportBatchSize) {
exportCurrentBatch();
}
Expand Down
Loading