Skip to content

Commit aa873a0

Browse files
mohitmahianuraaga
andauthored
Added a static method "drain" under JcTools with a generic consumer (#4582)
* Added a static method "drain" under JcTools with a generic consumer * Rename spanT to T * Update sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java * Rename Test methods Co-authored-by: Anuraag Agrawal <[email protected]>
1 parent 198aecd commit aa873a0

File tree

4 files changed

+206
-3
lines changed

4 files changed

+206
-3
lines changed

sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.util.Queue;
99
import java.util.concurrent.ArrayBlockingQueue;
10+
import java.util.function.Consumer;
1011
import org.jctools.queues.MessagePassingQueue;
1112
import org.jctools.queues.MpscArrayQueue;
1213

@@ -42,5 +43,29 @@ public static long capacity(Queue<?> queue) {
4243
}
4344
}
4445

46+
/**
47+
* Remove up to <i>limit</i> elements from the {@link Queue} and hand to consume.
48+
*
49+
* @throws IllegalArgumentException consumer is {@code null}
50+
* @throws IllegalArgumentException if maxExportBatchSize is negative
51+
*/
52+
@SuppressWarnings("unchecked")
53+
public static <T> void drain(Queue<T> queue, int limit, Consumer<T> consumer) {
54+
if (queue instanceof MessagePassingQueue) {
55+
((MessagePassingQueue<T>) queue).drain(consumer::accept, limit);
56+
} else {
57+
drainNonJcQueue(queue, limit, consumer);
58+
}
59+
}
60+
61+
private static <T> void drainNonJcQueue(
62+
Queue<T> queue, int maxExportBatchSize, Consumer<T> consumer) {
63+
int polledCount = 0;
64+
T item;
65+
while (polledCount++ < maxExportBatchSize && (item = queue.poll()) != null) {
66+
consumer.accept(item);
67+
}
68+
}
69+
4570
private JcTools() {}
4671
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.trace.internal;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
10+
import java.util.ArrayList;
11+
import java.util.Queue;
12+
import java.util.concurrent.ArrayBlockingQueue;
13+
import org.jctools.queues.MpscArrayQueue;
14+
import org.junit.jupiter.api.Test;
15+
import org.junit.jupiter.api.extension.ExtendWith;
16+
import org.mockito.junit.jupiter.MockitoExtension;
17+
import org.mockito.junit.jupiter.MockitoSettings;
18+
import org.mockito.quality.Strictness;
19+
20+
@ExtendWith(MockitoExtension.class)
21+
@MockitoSettings(strictness = Strictness.LENIENT)
22+
class JcToolsTest {
23+
24+
ArrayList<String> batch = new ArrayList<>(10);
25+
26+
@Test
27+
void drain_ArrayBlockingQueue() {
28+
// Arrange
29+
batch.add("Test3");
30+
Queue<String> queue = new ArrayBlockingQueue<>(10);
31+
queue.add("Test1");
32+
queue.add("Test2");
33+
34+
// Act
35+
JcTools.drain(queue, 5, batch::add);
36+
37+
// Assert
38+
assertThat(batch).hasSize(3);
39+
assertThat(queue).hasSize(0);
40+
}
41+
42+
@Test
43+
void drain_MessagePassingQueue() {
44+
// Arrange
45+
batch.add("Test3");
46+
Queue<String> queue = new MpscArrayQueue<>(10);
47+
queue.add("Test1");
48+
queue.add("Test2");
49+
50+
// Act
51+
JcTools.drain(queue, 5, batch::add);
52+
53+
// Assert
54+
assertThat(batch).hasSize(3);
55+
assertThat(queue).hasSize(0);
56+
}
57+
58+
@Test
59+
void drain_MaxBatch() {
60+
// Arrange
61+
Queue<String> queue = new MpscArrayQueue<>(10);
62+
queue.add("Test1");
63+
queue.add("Test2");
64+
65+
// Act
66+
JcTools.drain(queue, 1, batch::add);
67+
68+
// Assert
69+
assertThat(batch).hasSize(1);
70+
assertThat(queue).hasSize(1);
71+
}
72+
73+
@Test
74+
void newFixedSize_MpscQueue() {
75+
// Arrange
76+
int capacity = 10;
77+
78+
// Act
79+
Queue<Object> objects = JcTools.newFixedSizeQueue(capacity);
80+
81+
// Assert
82+
assertThat(objects).isInstanceOf(MpscArrayQueue.class);
83+
}
84+
85+
@Test
86+
void capacity_MpscQueue() {
87+
// Arrange
88+
int capacity = 10;
89+
Queue<Object> queue = JcTools.newFixedSizeQueue(capacity);
90+
91+
// Act
92+
long queueSize = JcTools.capacity(queue);
93+
94+
// Assert
95+
assertThat(queueSize).isGreaterThan(capacity);
96+
}
97+
98+
@Test
99+
void capacity_ArrayBlockingQueue() {
100+
// Arrange
101+
Queue<String> queue = new ArrayBlockingQueue<>(10);
102+
103+
// Act
104+
long queueSize = JcTools.capacity(queue);
105+
106+
// Assert
107+
assertThat(queueSize).isEqualTo(10);
108+
}
109+
}

sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,9 @@ public void run() {
237237
if (flushRequested.get() != null) {
238238
flush();
239239
}
240-
while (!queue.isEmpty() && batch.size() < maxExportBatchSize) {
241-
batch.add(queue.poll().toSpanData());
242-
}
240+
JcTools.drain(
241+
queue, maxExportBatchSize - batch.size(), span -> batch.add(span.toSpanData()));
242+
243243
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) {
244244
exportCurrentBatch();
245245
updateNextExportTime();

sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,75 @@ void forceExport() {
220220
assertThat(exported.size()).isEqualTo(2);
221221
}
222222

223+
@Test
224+
void testEmptyQueue() {
225+
// Arrange
226+
WaitingSpanExporter waitingSpanExporter =
227+
new WaitingSpanExporter(100, CompletableResultCode.ofSuccess(), 1);
228+
BatchSpanProcessor batchSpanProcessor =
229+
BatchSpanProcessor.builder(waitingSpanExporter)
230+
.setMaxExportBatchSize(10)
231+
.setScheduleDelay(10, TimeUnit.SECONDS)
232+
.setMaxQueueSize(10_000)
233+
.build();
234+
// Act
235+
sdkTracerProvider = SdkTracerProvider.builder().addSpanProcessor(batchSpanProcessor).build();
236+
List<SpanData> exported = waitingSpanExporter.waitForExport();
237+
238+
// Assert
239+
await().untilAsserted(() -> assertThat(batchSpanProcessor.getQueue()).isEmpty());
240+
assertThat(exported).isNotNull();
241+
assertThat(exported.size()).isEqualTo(0);
242+
}
243+
244+
@Test
245+
void testQueueSizeSmallerThanMaxBatch() {
246+
// Arrange
247+
WaitingSpanExporter waitingSpanExporter =
248+
new WaitingSpanExporter(100, CompletableResultCode.ofSuccess(), 1);
249+
BatchSpanProcessor batchSpanProcessor =
250+
BatchSpanProcessor.builder(waitingSpanExporter)
251+
.setMaxExportBatchSize(11)
252+
.setScheduleDelay(10, TimeUnit.SECONDS)
253+
.setMaxQueueSize(10_000)
254+
.build();
255+
// Act
256+
sdkTracerProvider = SdkTracerProvider.builder().addSpanProcessor(batchSpanProcessor).build();
257+
for (int i = 0; i < 10; i++) {
258+
createEndedSpan("notExported");
259+
}
260+
List<SpanData> exported = waitingSpanExporter.waitForExport();
261+
262+
// Assert
263+
assertThat(exported).isNotNull();
264+
assertThat(exported.size()).isEqualTo(0);
265+
}
266+
267+
@Test
268+
void testQueueSizeSmallerThanMaxBatchWithForceFlush() {
269+
// Arrange
270+
WaitingSpanExporter waitingSpanExporter =
271+
new WaitingSpanExporter(100, CompletableResultCode.ofSuccess(), 1);
272+
BatchSpanProcessor batchSpanProcessor =
273+
BatchSpanProcessor.builder(waitingSpanExporter)
274+
.setMaxExportBatchSize(11)
275+
.setScheduleDelay(10, TimeUnit.SECONDS)
276+
.setMaxQueueSize(10_000)
277+
.build();
278+
// Act
279+
sdkTracerProvider = SdkTracerProvider.builder().addSpanProcessor(batchSpanProcessor).build();
280+
for (int i = 0; i < 10; i++) {
281+
createEndedSpan("notExported");
282+
}
283+
batchSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS);
284+
List<SpanData> exported = waitingSpanExporter.waitForExport();
285+
286+
// Assert
287+
assertThat(exported).isNotNull();
288+
assertThat(exported.size()).isEqualTo(10);
289+
await().untilAsserted(() -> assertThat(batchSpanProcessor.getQueue()).isEmpty());
290+
}
291+
223292
@Test
224293
void exportSpansToMultipleExporters() {
225294
WaitingSpanExporter waitingSpanExporter =

0 commit comments

Comments
 (0)