Skip to content

Commit 4089906

Browse files
committed
Incorporate latest changes - make queues race free
1 parent aa7a76a commit 4089906

31 files changed

+1322
-845
lines changed

dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/EvalProcessingWorker.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import com.squareup.moshi.JsonAdapter;
88
import com.squareup.moshi.Moshi;
9-
import datadog.common.queue.BlockingConsumerNonBlockingQueue;
9+
import datadog.common.queue.MessagePassingBlockingQueue;
1010
import datadog.common.queue.Queues;
1111
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
1212
import datadog.communication.ddagent.SharedCommunicationObjects;
@@ -35,7 +35,7 @@ public class EvalProcessingWorker implements AutoCloseable {
3535

3636
private static final Logger log = LoggerFactory.getLogger(EvalProcessingWorker.class);
3737

38-
private final BlockingConsumerNonBlockingQueue<LLMObsEval> queue;
38+
private final MessagePassingBlockingQueue<LLMObsEval> queue;
3939
private final Thread serializerThread;
4040

4141
public EvalProcessingWorker(
@@ -99,7 +99,7 @@ public static class EvalSerializingHandler implements Runnable {
9999
private static final Logger log = LoggerFactory.getLogger(EvalSerializingHandler.class);
100100
private static final int FLUSH_THRESHOLD = 50;
101101

102-
private final BlockingConsumerNonBlockingQueue<LLMObsEval> queue;
102+
private final MessagePassingBlockingQueue<LLMObsEval> queue;
103103
private final long ticksRequiredToFlush;
104104
private long lastTicks;
105105

@@ -112,7 +112,7 @@ public static class EvalSerializingHandler implements Runnable {
112112
private final List<LLMObsEval> buffer = new ArrayList<>();
113113

114114
public EvalSerializingHandler(
115-
final BlockingConsumerNonBlockingQueue<LLMObsEval> queue,
115+
final MessagePassingBlockingQueue<LLMObsEval> queue,
116116
final long flushInterval,
117117
final TimeUnit timeUnit,
118118
final HttpUrl submissionUrl,

dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,16 @@
22

33
import static java.util.concurrent.TimeUnit.MILLISECONDS;
44

5-
import datadog.common.queue.NonBlockingQueue;
65
import datadog.trace.common.metrics.SignalItem.StopSignal;
76
import datadog.trace.core.util.LRUCache;
87
import datadog.trace.util.queue.NonBlockingQueue;
98
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
109
import java.util.Iterator;
1110
import java.util.Map;
12-
import java.util.Queue;
1311
import java.util.Set;
1412
import java.util.concurrent.ConcurrentMap;
1513
import java.util.concurrent.TimeUnit;
16-
import java.util.function.Consumer;
14+
import org.jctools.queues.MessagePassingQueue;
1715
import org.slf4j.Logger;
1816
import org.slf4j.LoggerFactory;
1917

@@ -23,8 +21,8 @@ final class Aggregator implements Runnable {
2321

2422
private static final Logger log = LoggerFactory.getLogger(Aggregator.class);
2523

26-
private final Queue<Batch> batchPool;
27-
private final NonBlockingQueue<InboxItem> inbox;
24+
private final MessagePassingQueue<Batch> batchPool;
25+
private final MessagePassingQueue<InboxItem> inbox;
2826
private final LRUCache<MetricKey, AggregateMetric> aggregates;
2927
private final ConcurrentMap<MetricKey, Batch> pending;
3028
private final Set<MetricKey> commonKeys;
@@ -43,8 +41,8 @@ final class Aggregator implements Runnable {
4341

4442
Aggregator(
4543
MetricWriter writer,
46-
Queue<Batch> batchPool,
47-
NonBlockingQueue<InboxItem> inbox,
44+
MessagePassingQueue<Batch> batchPool,
45+
MessagePassingQueue<InboxItem> inbox,
4846
ConcurrentMap<MetricKey, Batch> pending,
4947
final Set<MetricKey> commonKeys,
5048
int maxAggregates,
@@ -64,8 +62,8 @@ final class Aggregator implements Runnable {
6462

6563
Aggregator(
6664
MetricWriter writer,
67-
Queue<Batch> batchPool,
68-
NonBlockingQueue<InboxItem> inbox,
65+
MessagePassingQueue<Batch> batchPool,
66+
MessagePassingQueue<InboxItem> inbox,
6967
ConcurrentMap<MetricKey, Batch> pending,
7068
final Set<MetricKey> commonKeys,
7169
int maxAggregates,
@@ -108,7 +106,7 @@ public void run() {
108106
log.debug("metrics aggregator exited");
109107
}
110108

111-
private final class Drainer implements Consumer<InboxItem> {
109+
private final class Drainer implements MessagePassingQueue.Consumer<InboxItem> {
112110

113111
boolean stopped = false;
114112

dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static java.util.Collections.unmodifiableSet;
2020
import static java.util.concurrent.TimeUnit.SECONDS;
2121

22-
import datadog.common.queue.NonBlockingQueue;
2322
import datadog.common.queue.Queues;
2423
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
2524
import datadog.communication.ddagent.SharedCommunicationObjects;
@@ -41,13 +40,13 @@
4140
import java.util.HashSet;
4241
import java.util.List;
4342
import java.util.Map;
44-
import java.util.Queue;
4543
import java.util.Set;
4644
import java.util.concurrent.CompletableFuture;
4745
import java.util.concurrent.ConcurrentHashMap;
4846
import java.util.concurrent.Future;
4947
import java.util.concurrent.TimeUnit;
5048
import java.util.function.Function;
49+
import org.jctools.queues.MessagePassingQueue;
5150
import javax.annotation.Nonnull;
5251
import org.slf4j.Logger;
5352
import org.slf4j.LoggerFactory;
@@ -90,11 +89,11 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
9089
new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER, SPAN_KIND_CONSUMER)));
9190

9291
private final Set<String> ignoredResources;
93-
private final Queue<Batch> batchPool;
92+
private final MessagePassingQueue<Batch> batchPool;
9493
private final ConcurrentHashMap<MetricKey, Batch> pending;
9594
private final ConcurrentHashMap<MetricKey, MetricKey> keys;
9695
private final Thread thread;
97-
private final NonBlockingQueue<InboxItem> inbox;
96+
private final MessagePassingQueue<InboxItem> inbox;
9897
private final Sink sink;
9998
private final Aggregator aggregator;
10099
private final long reportingInterval;

dd-trace-core/src/main/java/datadog/trace/common/metrics/OkHttpSink.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import static datadog.trace.common.metrics.EventListener.EventType.OK;
1010
import static java.util.concurrent.TimeUnit.SECONDS;
1111

12-
import datadog.common.queue.NonBlockingQueue;
1312
import datadog.common.queue.Queues;
1413
import datadog.trace.util.AgentTaskScheduler;
1514
import java.io.IOException;
@@ -25,6 +24,7 @@
2524
import okhttp3.OkHttpClient;
2625
import okhttp3.Request;
2726
import okhttp3.RequestBody;
27+
import org.jctools.queues.MessagePassingQueue;
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

@@ -37,7 +37,7 @@ public final class OkHttpSink implements Sink, EventListener {
3737
private final OkHttpClient client;
3838
private final HttpUrl metricsUrl;
3939
private final List<EventListener> listeners;
40-
private final NonBlockingQueue<Request> enqueuedRequests = Queues.spscArrayQueue(16);
40+
private final MessagePassingQueue<Request> enqueuedRequests = Queues.spscArrayQueue(16);
4141
private final AtomicLong lastRequestTime = new AtomicLong();
4242
private final AtomicLong asyncRequestCounter = new AtomicLong();
4343
private final boolean bufferingEnabled;
@@ -158,9 +158,9 @@ private void handleFailure(okhttp3.Response response) throws IOException {
158158

159159
private static final class Sender implements AgentTaskScheduler.Task<OkHttpSink> {
160160

161-
private final NonBlockingQueue<Request> inbox;
161+
private final MessagePassingQueue<Request> inbox;
162162

163-
private Sender(NonBlockingQueue<Request> inbox) {
163+
private Sender(MessagePassingQueue<Request> inbox) {
164164
this.inbox = inbox;
165165
}
166166

dd-trace-core/src/main/java/datadog/trace/common/writer/SpanSamplingWorker.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
66
import static java.util.concurrent.TimeUnit.MILLISECONDS;
77

8-
import datadog.common.queue.BlockingConsumerNonBlockingQueue;
9-
import datadog.common.queue.NonBlockingQueue;
8+
import datadog.common.queue.MessagePassingBlockingQueue;
109
import datadog.common.queue.Queues;
1110
import datadog.communication.ddagent.DroppingPolicy;
1211
import datadog.trace.common.sampling.SingleSpanSampler;
@@ -15,6 +14,7 @@
1514
import java.util.ArrayList;
1615
import java.util.List;
1716
import java.util.Queue;
17+
import org.jctools.queues.MessagePassingQueue;
1818
import org.slf4j.Logger;
1919
import org.slf4j.LoggerFactory;
2020

@@ -46,7 +46,7 @@ class DefaultSpanSamplingWorker implements SpanSamplingWorker {
4646

4747
private final Thread spanSamplingThread;
4848
private final SamplingHandler samplingHandler;
49-
private final BlockingConsumerNonBlockingQueue<Object> spanSamplingQueue;
49+
private final MessagePassingBlockingQueue<Object> spanSamplingQueue;
5050
private final Queue<Object> primaryQueue;
5151
private final Queue<Object> secondaryQueue;
5252
private final SingleSpanSampler singleSpanSampler;
@@ -173,7 +173,7 @@ public void onEvent(Object event) {
173173
}
174174
}
175175

176-
private void consumeBatch(NonBlockingQueue<Object> queue) {
176+
private void consumeBatch(MessagePassingQueue<Object> queue) {
177177
queue.drain(this::onEvent, queue.size());
178178
}
179179
}

dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
66
import static java.util.concurrent.TimeUnit.MILLISECONDS;
77

8-
import datadog.common.queue.BlockingConsumerNonBlockingQueue;
9-
import datadog.common.queue.NonBlockingQueue;
8+
import datadog.common.queue.MessagePassingBlockingQueue;
109
import datadog.common.queue.Queues;
1110
import datadog.communication.ddagent.DroppingPolicy;
1211
import datadog.trace.api.Config;
@@ -22,6 +21,7 @@
2221
import java.util.concurrent.CountDownLatch;
2322
import java.util.concurrent.TimeUnit;
2423
import java.util.function.BooleanSupplier;
24+
import org.jctools.queues.MessagePassingQueue;
2525
import org.slf4j.Logger;
2626
import org.slf4j.LoggerFactory;
2727

@@ -37,8 +37,8 @@ public class TraceProcessingWorker implements AutoCloseable {
3737
private static final Logger log = LoggerFactory.getLogger(TraceProcessingWorker.class);
3838

3939
private final PrioritizationStrategy prioritizationStrategy;
40-
private final BlockingConsumerNonBlockingQueue<Object> primaryQueue;
41-
private final BlockingConsumerNonBlockingQueue<Object> secondaryQueue;
40+
private final MessagePassingBlockingQueue<Object> primaryQueue;
41+
private final MessagePassingBlockingQueue<Object> secondaryQueue;
4242
private final TraceSerializingHandler serializingHandler;
4343
private final Thread serializerThread;
4444
private final int capacity;
@@ -122,23 +122,23 @@ public long getRemainingCapacity() {
122122
return primaryQueue.remainingCapacity();
123123
}
124124

125-
private static BlockingConsumerNonBlockingQueue<Object> createQueue(int capacity) {
125+
private static MessagePassingBlockingQueue<Object> createQueue(int capacity) {
126126
return Queues.mpscBlockingConsumerArrayQueue(capacity);
127127
}
128128

129129
public static class TraceSerializingHandler implements Runnable {
130130

131-
private final BlockingConsumerNonBlockingQueue<Object> primaryQueue;
132-
private final BlockingConsumerNonBlockingQueue<Object> secondaryQueue;
131+
private final MessagePassingBlockingQueue<Object> primaryQueue;
132+
private final MessagePassingBlockingQueue<Object> secondaryQueue;
133133
private final HealthMetrics healthMetrics;
134134
private final long ticksRequiredToFlush;
135135
private final boolean doTimeFlush;
136136
private final PayloadDispatcher payloadDispatcher;
137137
private long lastTicks;
138138

139139
public TraceSerializingHandler(
140-
final BlockingConsumerNonBlockingQueue<Object> primaryQueue,
141-
final BlockingConsumerNonBlockingQueue<Object> secondaryQueue,
140+
final MessagePassingBlockingQueue<Object> primaryQueue,
141+
final MessagePassingBlockingQueue<Object> secondaryQueue,
142142
final HealthMetrics healthMetrics,
143143
final PayloadDispatcher payloadDispatcher,
144144
final long flushInterval,
@@ -239,7 +239,7 @@ private boolean shouldFlush() {
239239
return false;
240240
}
241241

242-
private void consumeBatch(NonBlockingQueue<Object> queue) {
242+
private void consumeBatch(MessagePassingQueue<Object> queue) {
243243
queue.drain(this::onEvent, queue.size());
244244
}
245245

dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
77
import static java.util.Comparator.comparingLong;
88

9-
import datadog.common.queue.BlockingConsumerNonBlockingQueue;
9+
import datadog.common.queue.MessagePassingBlockingQueue;
1010
import datadog.common.queue.Queues;
1111
import datadog.communication.ddagent.SharedCommunicationObjects;
1212
import datadog.trace.api.Config;
@@ -20,10 +20,9 @@
2020
import java.util.List;
2121
import java.util.concurrent.TimeUnit;
2222
import java.util.concurrent.atomic.AtomicInteger;
23-
import java.util.function.Consumer;
2423
import java.util.function.Predicate;
25-
import java.util.function.Supplier;
2624
import java.util.zip.ZipOutputStream;
25+
import org.jctools.queues.MessagePassingQueue;
2726
import org.slf4j.Logger;
2827
import org.slf4j.LoggerFactory;
2928

@@ -63,7 +62,7 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
6362
private static final CommandElement DUMP_ELEMENT = new CommandElement();
6463
private static final CommandElement STAND_IN_ELEMENT = new CommandElement();
6564

66-
private final BlockingConsumerNonBlockingQueue<Element> queue;
65+
private final MessagePassingBlockingQueue<Element> queue;
6766
private final Thread worker;
6867
private final TimeSource timeSource;
6968

@@ -138,7 +137,7 @@ public void flush() {
138137
}
139138
}
140139

141-
private static final class WriteDrain implements Consumer<Element> {
140+
private static final class WriteDrain implements MessagePassingQueue.Consumer<Element> {
142141
private static final WriteDrain WRITE_DRAIN = new WriteDrain();
143142

144143
@Override
@@ -147,7 +146,8 @@ public void accept(Element pendingTrace) {
147146
}
148147
}
149148

150-
private static final class DumpDrain implements Consumer<Element>, Supplier<Element> {
149+
private static final class DumpDrain
150+
implements MessagePassingQueue.Consumer<Element>, MessagePassingQueue.Supplier<Element> {
151151
private static final Logger LOGGER = LoggerFactory.getLogger(DumpDrain.class);
152152
private static final DumpDrain DUMP_DRAIN = new DumpDrain();
153153
private static final int MAX_DUMPED_TRACES = 50;

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
1212
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
1313

14-
import datadog.common.queue.NonBlockingQueue;
1514
import datadog.common.queue.Queues;
1615
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
1716
import datadog.communication.ddagent.SharedCommunicationObjects;
@@ -40,6 +39,7 @@
4039
import java.util.concurrent.ConcurrentHashMap;
4140
import java.util.concurrent.TimeUnit;
4241
import java.util.function.Supplier;
42+
import org.jctools.queues.MessagePassingQueue;
4343
import org.slf4j.Logger;
4444
import org.slf4j.LoggerFactory;
4545

@@ -54,7 +54,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
5454
new StatsPoint(DataStreamsTags.EMPTY, 0, 0, 0, 0, 0, 0, 0, null);
5555

5656
private final Map<Long, Map<String, StatsBucket>> timeToBucket = new HashMap<>();
57-
private final NonBlockingQueue<InboxItem> inbox = Queues.mpscArrayQueue(1024);
57+
private final MessagePassingQueue<InboxItem> inbox = Queues.mpscArrayQueue(1024);
5858
private final DatastreamsPayloadWriter payloadWriter;
5959
private final DDAgentFeaturesDiscovery features;
6060
private final TimeSource timeSource;

products/feature-flagging/lib/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ dependencies {
2121
api(libs.jctools)
2222
api(project(":communication"))
2323
api(project(":products:feature-flagging:bootstrap"))
24+
api(project(":utils:queue-utils"))
2425

2526
compileOnly(project(":dd-trace-core")) // shading does not work with this one
2627

0 commit comments

Comments
 (0)