Skip to content

Commit 14b362f

Browse files
committed
Merge branch 'main' of https://github.com/open-telemetry/opentelemetry-java into declarative-config-0.4
2 parents b124b82 + 2cf5f01 commit 14b362f

File tree

6 files changed

+24
-22
lines changed

6 files changed

+24
-22
lines changed

.github/renovate.json5

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
},
1818
{
1919
"matchPackageNames": [
20+
"io.opentelemetry.contrib:opentelemetry-aws-xray-propagator",
2021
"io.opentelemetry.proto:opentelemetry-proto",
2122
"io.opentelemetry.semconv:opentelemetry-semconv-incubating"
2223
],

dependencyManagement/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ val DEPENDENCY_BOMS = listOf(
1414
// by FOSSA for containing EPL-licensed)
1515

1616
"com.fasterxml.jackson:jackson-bom:2.18.3",
17-
"com.google.guava:guava-bom:33.4.5-jre",
17+
"com.google.guava:guava-bom:33.4.6-jre",
1818
"com.google.protobuf:protobuf-bom:4.29.3",
1919
"com.squareup.okhttp3:okhttp-bom:4.12.0",
2020
"com.squareup.okio:okio-bom:3.10.2", // applies to transitive dependencies of okhttp
@@ -83,7 +83,7 @@ val DEPENDENCIES = listOf(
8383
"eu.rekawek.toxiproxy:toxiproxy-java:2.1.7",
8484
"io.github.netmikey.logunit:logunit-jul:2.0.0",
8585
"io.jaegertracing:jaeger-client:1.8.1",
86-
"io.opentelemetry.contrib:opentelemetry-aws-xray-propagator:1.39.0-alpha",
86+
"io.opentelemetry.contrib:opentelemetry-aws-xray-propagator:1.45.0-alpha",
8787
"io.opentelemetry.semconv:opentelemetry-semconv-incubating:1.30.0-alpha",
8888
"io.opentelemetry.proto:opentelemetry-proto:1.5.0-alpha",
8989
"io.opentracing:opentracing-api:0.33.0",

sdk-extensions/incubator/build.gradle.kts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@ dependencies {
4040
testImplementation(project(":exporters:zipkin"))
4141
testImplementation(project(":sdk-extensions:jaeger-remote-sampler"))
4242
testImplementation(project(":extensions:trace-propagators"))
43-
// As a part of the tests we check that we can parse examples without error. The https://github.com/open-telemetry/opentelemetry-configuration/blob/main/examples/kitchen-sink.yam contains a reference to the xray propagator
44-
// TODO: add when updated to reflect new API locations
45-
// testImplementation("io.opentelemetry.contrib:opentelemetry-aws-xray-propagator")
43+
testImplementation("io.opentelemetry.contrib:opentelemetry-aws-xray-propagator")
4644
testImplementation("com.linecorp.armeria:armeria-junit5")
4745

4846
testImplementation("com.google.guava:guava-testlib")

sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfigurationCreateTest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,7 @@ void parseAndCreate_Examples(@TempDir Path tempDir)
9494
"client_certificate_file: .*\n",
9595
"client_certificate_file: "
9696
+ clientCertificatePath.replace("\\", "\\\\")
97-
+ System.lineSeparator())
98-
// TODO: remove once updated ComponentProvider SPI contract implemented in
99-
// https://github.com/open-telemetry/opentelemetry-java-contrib/tree/main/aws-xray-propagator
100-
.replaceAll("xray,", "");
97+
+ System.lineSeparator());
10198
InputStream is =
10299
new ByteArrayInputStream(rewrittenExampleContent.getBytes(StandardCharsets.UTF_8));
103100

sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,21 +64,23 @@ public static long capacity(Queue<?> queue) {
6464
* @throws IllegalArgumentException if maxExportBatchSize is negative
6565
*/
6666
@SuppressWarnings("unchecked")
67-
public static <T> void drain(Queue<T> queue, int limit, Consumer<T> consumer) {
67+
public static <T> int drain(Queue<T> queue, int limit, Consumer<T> consumer) {
6868
if (queue instanceof MessagePassingQueue) {
69-
((MessagePassingQueue<T>) queue).drain(consumer::accept, limit);
69+
return ((MessagePassingQueue<T>) queue).drain(consumer::accept, limit);
7070
} else {
71-
drainNonJcQueue(queue, limit, consumer);
71+
return drainNonJcQueue(queue, limit, consumer);
7272
}
7373
}
7474

75-
private static <T> void drainNonJcQueue(
75+
private static <T> int drainNonJcQueue(
7676
Queue<T> queue, int maxExportBatchSize, Consumer<T> consumer) {
7777
int polledCount = 0;
7878
T item;
79-
while (polledCount++ < maxExportBatchSize && (item = queue.poll()) != null) {
79+
while (polledCount < maxExportBatchSize && (item = queue.poll()) != null) {
8080
consumer.accept(item);
81+
++polledCount;
8182
}
83+
return polledCount;
8284
}
8385

8486
private JcTools() {}

sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ private static final class Worker implements Runnable {
173173
private long nextExportTime;
174174

175175
private final Queue<ReadableSpan> queue;
176+
private final AtomicInteger queueSize = new AtomicInteger();
176177
// When waiting on the spans queue, exporter thread sets this atomic to the number of more
177178
// spans it needs before doing an export. Writer threads would then wait for the queue to reach
178179
// spansNeeded size before notifying the exporter thread about new entries.
@@ -237,7 +238,7 @@ private void addSpan(ReadableSpan span) {
237238
if (!queue.offer(span)) {
238239
processedSpansCounter.add(1, droppedAttrs);
239240
} else {
240-
if (queue.size() >= spansNeeded.get()) {
241+
if (queueSize.incrementAndGet() >= spansNeeded.get()) {
241242
signal.offer(true);
242243
}
243244
}
@@ -251,8 +252,7 @@ public void run() {
251252
if (flushRequested.get() != null) {
252253
flush();
253254
}
254-
JcTools.drain(
255-
queue, maxExportBatchSize - batch.size(), span -> batch.add(span.toSpanData()));
255+
drain(maxExportBatchSize - batch.size());
256256

257257
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) {
258258
exportCurrentBatch();
@@ -274,13 +274,17 @@ public void run() {
274274
}
275275
}
276276

277+
private int drain(int limit) {
278+
int drained = JcTools.drain(queue, limit, span -> batch.add(span.toSpanData()));
279+
queueSize.addAndGet(-drained);
280+
return drained;
281+
}
282+
277283
private void flush() {
278-
int spansToFlush = queue.size();
284+
int spansToFlush = queueSize.get();
279285
while (spansToFlush > 0) {
280-
ReadableSpan span = queue.poll();
281-
assert span != null;
282-
batch.add(span.toSpanData());
283-
spansToFlush--;
286+
int drained = drain(maxExportBatchSize - batch.size());
287+
spansToFlush -= drained;
284288
if (batch.size() >= maxExportBatchSize) {
285289
exportCurrentBatch();
286290
}

0 commit comments

Comments
 (0)