From ca4d65bc2b5c193e1614d61f41bcd1718db72b2c Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Wed, 11 Jun 2025 11:29:30 -0400 Subject: [PATCH 1/5] Simplified ListWriter await logic. --- .../trace/common/writer/ListWriter.java | 93 ++++++++----------- 1 file changed, 40 insertions(+), 53 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java index 6393547c6ef..624011fd69a 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java @@ -1,36 +1,27 @@ package datadog.trace.common.writer; +import static java.util.concurrent.TimeUnit.SECONDS; + import datadog.trace.core.DDSpan; import datadog.trace.core.MetadataConsumer; -import datadog.trace.core.tagprocessor.PeerServiceCalculator; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** List writer used by tests mostly */ public class ListWriter extends CopyOnWriteArrayList> implements Writer { - private static final Logger log = LoggerFactory.getLogger(ListWriter.class); + private static final Filter ACCEPT_ALL = trace -> true; - public static final Filter ACCEPT_ALL = - new Filter() { - @Override - public boolean accept(List trace) { - return true; - } - }; - - private final List latches = new ArrayList<>(); private final AtomicInteger traceCount = new AtomicInteger(); private final TraceStructureWriter structureWriter = new TraceStructureWriter(true); + private final Object monitor = new Object(); - private final PeerServiceCalculator peerServiceCalculator = new PeerServiceCalculator(); private Filter filter = ACCEPT_ALL; public List firstTrace() { @@ -47,30 +38,41 @@ public void write(List trace) { // remotely realistic so the test actually test something span.processTagsAndBaggage(MetadataConsumer.NO_OP); } + + add(trace); + structureWriter.write(trace); + traceCount.incrementAndGet(); - synchronized (latches) { - add(trace); - for (final CountDownLatch latch : latches) { - if (size() >= latch.getCount()) { - while (latch.getCount() > 0) { - latch.countDown(); - } - } - } + synchronized (monitor) { + monitor.notifyAll(); } - structureWriter.write(trace); } - public boolean waitForTracesMax(final int number, int seconds) - throws InterruptedException, TimeoutException { - final CountDownLatch latch = new CountDownLatch(number); - synchronized (latches) { - if (size() >= number) { + private boolean awaitUntilDeadline(long timeout, TimeUnit unit, BooleanSupplier predicate) + throws InterruptedException { + long deadline = System.currentTimeMillis() + unit.toMillis(timeout); + + while (true) { + if (predicate.getAsBoolean()) { return true; } - latches.add(latch); + + long now = System.currentTimeMillis(); + long waitTime = deadline - now; + if (waitTime <= 0) { + break; + } + + synchronized (monitor) { + monitor.wait(waitTime); + } } - return latch.await(seconds, TimeUnit.SECONDS); + + return false; + } + + public boolean waitForTracesMax(final int number, int seconds) throws InterruptedException { + return awaitUntilDeadline(seconds, SECONDS, () -> traceCount.get() >= number); } public void waitForTraces(final int number) throws InterruptedException, TimeoutException { @@ -88,24 +90,17 @@ public void waitForTraces(final int number) throws InterruptedException, Timeout } public void waitUntilReported(final DDSpan span) throws InterruptedException, TimeoutException { - waitUntilReported(span, 20, TimeUnit.SECONDS); + waitUntilReported(span, 20, SECONDS); } public void waitUntilReported(final DDSpan span, int timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - while (true) { - final CountDownLatch latch = new CountDownLatch(size() + 1); - synchronized (latches) { - latches.add(latch); - } - if (isReported(span)) { - return; - } - if (!latch.await(timeout, unit)) { - String msg = "Timeout waiting for span to be reported: " + span; - log.warn(msg); - throw new TimeoutException(msg); - } + boolean reported = awaitUntilDeadline(timeout, unit, () -> isReported(span)); + + if (!reported) { + String msg = "Timeout waiting for span to be reported: " + span; + log.warn(msg); + throw new TimeoutException(msg); } } @@ -145,14 +140,6 @@ public boolean flush() { @Override public void close() { clear(); - synchronized (latches) { - for (final CountDownLatch latch : latches) { - while (latch.getCount() > 0) { - latch.countDown(); - } - } - latches.clear(); - } } @Override From ef302eec07ba42e6896e61edef0577f5e97ee875 Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Wed, 11 Jun 2025 12:32:24 -0400 Subject: [PATCH 2/5] Fixed counter state. --- .../src/main/java/datadog/trace/common/writer/ListWriter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java index 624011fd69a..fcd5f897226 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java @@ -140,6 +140,7 @@ public boolean flush() { @Override public void close() { clear(); + traceCount.set(0); } @Override From 45aabcb461fc8109a0441cffce5425f25f1b8102 Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Wed, 11 Jun 2025 13:37:58 -0400 Subject: [PATCH 3/5] Fixed counter state. --- .../main/java/datadog/trace/common/writer/ListWriter.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java index fcd5f897226..ed70068ab87 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java @@ -137,10 +137,16 @@ public boolean flush() { return true; } + @Override + public void clear() { + super.clear(); + + traceCount.set(0); + } + @Override public void close() { clear(); - traceCount.set(0); } @Override From 967749d24769fb372986263c2ecaf569c7007436 Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Fri, 13 Jun 2025 15:37:08 -0400 Subject: [PATCH 4/5] Test SSH signing. --- .../src/main/java/datadog/trace/common/writer/ListWriter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java index ed70068ab87..0fb482a3a78 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java @@ -140,7 +140,6 @@ public boolean flush() { @Override public void clear() { super.clear(); - traceCount.set(0); } From 3b857fc60d787e4321fc2363fc353f037df06af1 Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Fri, 13 Jun 2025 16:13:09 -0400 Subject: [PATCH 5/5] Test SSH signing. --- .../src/main/java/datadog/trace/common/writer/ListWriter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java index 0fb482a3a78..ed70068ab87 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java @@ -140,6 +140,7 @@ public boolean flush() { @Override public void clear() { super.clear(); + traceCount.set(0); }