Skip to content

Commit 89f0f33

Browse files
committed
Introduce varhandle based queues
1 parent e80998f commit 89f0f33

File tree

21 files changed

+855
-53
lines changed

21 files changed

+855
-53
lines changed

dd-java-agent/agent-llmobs/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ dependencies {
1616
implementation project(':communication')
1717
implementation project(':components:json')
1818
implementation project(':internal-api')
19+
api project(':internal-api:internal-api-9')
20+
1921

2022
testImplementation project(':dd-java-agent:testing')
2123
}

dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/EvalProcessingWorker.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
import datadog.communication.http.OkHttpUtils;
1313
import datadog.trace.api.Config;
1414
import datadog.trace.llmobs.domain.LLMObsEval;
15-
import datadog.trace.util.queue.MpscArrayQueue;
15+
import datadog.trace.util.queue.BaseQueue;
16+
import datadog.trace.util.queue.Queues;
1617
import java.util.ArrayList;
1718
import java.util.List;
1819
import java.util.concurrent.TimeUnit;
@@ -34,7 +35,7 @@ public class EvalProcessingWorker implements AutoCloseable {
3435

3536
private static final Logger log = LoggerFactory.getLogger(EvalProcessingWorker.class);
3637

37-
private final MpscArrayQueue<LLMObsEval> queue;
38+
private final BaseQueue<LLMObsEval> queue;
3839
private final Thread serializerThread;
3940

4041
public EvalProcessingWorker(
@@ -43,7 +44,7 @@ public EvalProcessingWorker(
4344
final TimeUnit timeUnit,
4445
final SharedCommunicationObjects sco,
4546
Config config) {
46-
this.queue = new MpscArrayQueue<>(capacity);
47+
this.queue = Queues.mpscArrayQueue(capacity);
4748

4849
boolean isAgentless = config.isLlmObsAgentlessEnabled();
4950
if (isAgentless && (config.getApiKey() == null || config.getApiKey().isEmpty())) {
@@ -98,7 +99,7 @@ public static class EvalSerializingHandler implements Runnable {
9899
private static final Logger log = LoggerFactory.getLogger(EvalSerializingHandler.class);
99100
private static final int FLUSH_THRESHOLD = 50;
100101

101-
private final MpscArrayQueue<LLMObsEval> queue;
102+
private final BaseQueue<LLMObsEval> queue;
102103
private final long ticksRequiredToFlush;
103104
private long lastTicks;
104105

@@ -111,7 +112,7 @@ public static class EvalSerializingHandler implements Runnable {
111112
private final List<LLMObsEval> buffer = new ArrayList<>();
112113

113114
public EvalSerializingHandler(
114-
final MpscArrayQueue<LLMObsEval> queue,
115+
final BaseQueue<LLMObsEval> queue,
115116
final long flushInterval,
116117
final TimeUnit timeUnit,
117118
final HttpUrl submissionUrl,

dd-trace-core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ dependencies {
6666
api project(':dd-trace-api')
6767
api project(':communication')
6868
api project(':internal-api')
69+
api project(':internal-api:internal-api-9')
6970
implementation project(':components:json')
7071
implementation project(':utils:container-utils')
7172
implementation project(':utils:socket-utils')

dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import datadog.trace.common.metrics.SignalItem.StopSignal;
66
import datadog.trace.core.util.LRUCache;
7-
import datadog.trace.util.queue.MpscArrayQueue;
7+
import datadog.trace.util.queue.BaseQueue;
88
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
99
import java.util.Iterator;
1010
import java.util.Map;
@@ -23,7 +23,7 @@ final class Aggregator implements Runnable {
2323
private static final Logger log = LoggerFactory.getLogger(Aggregator.class);
2424

2525
private final Queue<Batch> batchPool;
26-
private final MpscArrayQueue<InboxItem> inbox;
26+
private final BaseQueue<InboxItem> inbox;
2727
private final LRUCache<MetricKey, AggregateMetric> aggregates;
2828
private final ConcurrentMap<MetricKey, Batch> pending;
2929
private final Set<MetricKey> commonKeys;
@@ -43,7 +43,7 @@ final class Aggregator implements Runnable {
4343
Aggregator(
4444
MetricWriter writer,
4545
Queue<Batch> batchPool,
46-
MpscArrayQueue<InboxItem> inbox,
46+
BaseQueue<InboxItem> inbox,
4747
ConcurrentMap<MetricKey, Batch> pending,
4848
final Set<MetricKey> commonKeys,
4949
int maxAggregates,
@@ -64,7 +64,7 @@ final class Aggregator implements Runnable {
6464
Aggregator(
6565
MetricWriter writer,
6666
Queue<Batch> batchPool,
67-
MpscArrayQueue<InboxItem> inbox,
67+
BaseQueue<InboxItem> inbox,
6868
ConcurrentMap<MetricKey, Batch> pending,
6969
final Set<MetricKey> commonKeys,
7070
int maxAggregates,

dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import datadog.trace.core.DDTraceCoreInfo;
3434
import datadog.trace.core.monitor.HealthMetrics;
3535
import datadog.trace.util.AgentTaskScheduler;
36-
import datadog.trace.util.queue.MpscArrayQueue;
37-
import datadog.trace.util.queue.SpmcArrayQueue;
36+
import datadog.trace.util.queue.BaseQueue;
37+
import datadog.trace.util.queue.Queues;
3838
import java.util.ArrayList;
3939
import java.util.Arrays;
4040
import java.util.Collections;
@@ -94,7 +94,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
9494
private final ConcurrentHashMap<MetricKey, Batch> pending;
9595
private final ConcurrentHashMap<MetricKey, MetricKey> keys;
9696
private final Thread thread;
97-
private final MpscArrayQueue<InboxItem> inbox;
97+
private final BaseQueue<InboxItem> inbox;
9898
private final Sink sink;
9999
private final Aggregator aggregator;
100100
private final long reportingInterval;
@@ -177,8 +177,8 @@ public ConflatingMetricsAggregator(
177177
long reportingInterval,
178178
TimeUnit timeUnit) {
179179
this.ignoredResources = ignoredResources;
180-
this.inbox = new MpscArrayQueue<>(queueSize);
181-
this.batchPool = new SpmcArrayQueue<>(maxAggregates);
180+
this.inbox = Queues.mpscArrayQueue(queueSize);
181+
this.batchPool = Queues.spmcArrayQueue(maxAggregates);
182182
this.pending = new ConcurrentHashMap<>(maxAggregates * 4 / 3);
183183
this.keys = new ConcurrentHashMap<>();
184184
this.features = features;

dd-trace-core/src/main/java/datadog/trace/common/metrics/OkHttpSink.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
import static java.util.concurrent.TimeUnit.SECONDS;
1111

1212
import datadog.trace.util.AgentTaskScheduler;
13-
import datadog.trace.util.queue.SpscArrayQueue;
13+
import datadog.trace.util.queue.BaseQueue;
14+
import datadog.trace.util.queue.Queues;
1415
import java.io.IOException;
1516
import java.nio.ByteBuffer;
1617
import java.util.Collections;
@@ -36,7 +37,7 @@ public final class OkHttpSink implements Sink, EventListener {
3637
private final OkHttpClient client;
3738
private final HttpUrl metricsUrl;
3839
private final List<EventListener> listeners;
39-
private final SpscArrayQueue<Request> enqueuedRequests = new SpscArrayQueue<>(16);
40+
private final BaseQueue<Request> enqueuedRequests = Queues.spscArrayQueue(16);
4041
private final AtomicLong lastRequestTime = new AtomicLong();
4142
private final AtomicLong asyncRequestCounter = new AtomicLong();
4243
private final boolean bufferingEnabled;
@@ -157,9 +158,9 @@ private void handleFailure(okhttp3.Response response) throws IOException {
157158

158159
private static final class Sender implements AgentTaskScheduler.Task<OkHttpSink> {
159160

160-
private final SpscArrayQueue<Request> inbox;
161+
private final BaseQueue<Request> inbox;
161162

162-
private Sender(SpscArrayQueue<Request> inbox) {
163+
private Sender(BaseQueue<Request> inbox) {
163164
this.inbox = inbox;
164165
}
165166

dd-trace-core/src/main/java/datadog/trace/common/writer/SpanSamplingWorker.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
import datadog.trace.common.sampling.SingleSpanSampler;
1010
import datadog.trace.core.DDSpan;
1111
import datadog.trace.core.monitor.HealthMetrics;
12-
import datadog.trace.util.queue.MpscBlockingConsumerArrayQueue;
12+
import datadog.trace.util.queue.BaseQueue;
13+
import datadog.trace.util.queue.Queues;
1314
import java.util.ArrayList;
1415
import java.util.List;
1516
import java.util.Queue;
@@ -44,7 +45,7 @@ class DefaultSpanSamplingWorker implements SpanSamplingWorker {
4445

4546
private final Thread spanSamplingThread;
4647
private final SamplingHandler samplingHandler;
47-
private final MpscBlockingConsumerArrayQueue<Object> spanSamplingQueue;
48+
private final BaseQueue<Object> spanSamplingQueue;
4849
private final Queue<Object> primaryQueue;
4950
private final Queue<Object> secondaryQueue;
5051
private final SingleSpanSampler singleSpanSampler;
@@ -61,7 +62,7 @@ protected DefaultSpanSamplingWorker(
6162
DroppingPolicy droppingPolicy) {
6263
this.samplingHandler = new SamplingHandler();
6364
this.spanSamplingThread = newAgentThread(SPAN_SAMPLING_PROCESSOR, samplingHandler);
64-
this.spanSamplingQueue = new MpscBlockingConsumerArrayQueue<>(capacity);
65+
this.spanSamplingQueue = Queues.mpscBlockingConsumerArrayQueue(capacity);
6566
this.primaryQueue = primaryQueue;
6667
this.secondaryQueue = secondaryQueue;
6768
this.singleSpanSampler = singleSpanSampler;
@@ -171,7 +172,7 @@ public void onEvent(Object event) {
171172
}
172173
}
173174

174-
private void consumeBatch(MpscBlockingConsumerArrayQueue<Object> queue) {
175+
private void consumeBatch(BaseQueue<Object> queue) {
175176
queue.drain(this::onEvent, queue.size());
176177
}
177178
}

dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
import datadog.trace.core.CoreSpan;
1616
import datadog.trace.core.DDSpan;
1717
import datadog.trace.core.monitor.HealthMetrics;
18-
import datadog.trace.util.queue.MpscArrayQueue;
18+
import datadog.trace.util.queue.BaseQueue;
19+
import datadog.trace.util.queue.Queues;
1920
import java.util.List;
2021
import java.util.concurrent.CountDownLatch;
2122
import java.util.concurrent.TimeUnit;
@@ -35,8 +36,8 @@ public class TraceProcessingWorker implements AutoCloseable {
3536
private static final Logger log = LoggerFactory.getLogger(TraceProcessingWorker.class);
3637

3738
private final PrioritizationStrategy prioritizationStrategy;
38-
private final MpscArrayQueue<Object> primaryQueue;
39-
private final MpscArrayQueue<Object> secondaryQueue;
39+
private final BaseQueue<Object> primaryQueue;
40+
private final BaseQueue<Object> secondaryQueue;
4041
private final TraceSerializingHandler serializingHandler;
4142
private final Thread serializerThread;
4243
private final int capacity;
@@ -120,23 +121,23 @@ public long getRemainingCapacity() {
120121
return primaryQueue.remainingCapacity();
121122
}
122123

123-
private static MpscArrayQueue<Object> createQueue(int capacity) {
124-
return new MpscArrayQueue<>(capacity);
124+
private static BaseQueue<Object> createQueue(int capacity) {
125+
return Queues.mpscArrayQueue(capacity);
125126
}
126127

127128
public static class TraceSerializingHandler implements Runnable {
128129

129-
private final MpscArrayQueue<Object> primaryQueue;
130-
private final MpscArrayQueue<Object> secondaryQueue;
130+
private final BaseQueue<Object> primaryQueue;
131+
private final BaseQueue<Object> secondaryQueue;
131132
private final HealthMetrics healthMetrics;
132133
private final long ticksRequiredToFlush;
133134
private final boolean doTimeFlush;
134135
private final PayloadDispatcher payloadDispatcher;
135136
private long lastTicks;
136137

137138
public TraceSerializingHandler(
138-
final MpscArrayQueue<Object> primaryQueue,
139-
final MpscArrayQueue<Object> secondaryQueue,
139+
final BaseQueue<Object> primaryQueue,
140+
final BaseQueue<Object> secondaryQueue,
140141
final HealthMetrics healthMetrics,
141142
final PayloadDispatcher payloadDispatcher,
142143
final long flushInterval,
@@ -237,7 +238,7 @@ private boolean shouldFlush() {
237238
return false;
238239
}
239240

240-
private void consumeBatch(MpscArrayQueue<Object> queue) {
241+
private void consumeBatch(BaseQueue<Object> queue) {
241242
queue.drain(this::onEvent, queue.size());
242243
}
243244

dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
import datadog.trace.api.time.TimeSource;
1313
import datadog.trace.common.writer.TraceDumpJsonExporter;
1414
import datadog.trace.core.monitor.HealthMetrics;
15-
import datadog.trace.util.queue.MpscBlockingConsumerArrayQueue;
15+
import datadog.trace.util.queue.BaseQueue;
16+
import datadog.trace.util.queue.Queues;
1617
import java.io.IOException;
1718
import java.util.ArrayList;
1819
import java.util.Comparator;
@@ -62,7 +63,7 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
6263
private static final CommandElement DUMP_ELEMENT = new CommandElement();
6364
private static final CommandElement STAND_IN_ELEMENT = new CommandElement();
6465

65-
private final MpscBlockingConsumerArrayQueue<Element> queue;
66+
private final BaseQueue<Element> queue;
6667
private final Thread worker;
6768
private final TimeSource timeSource;
6869

@@ -292,7 +293,7 @@ public DelayingPendingTraceBuffer(
292293
Config config,
293294
SharedCommunicationObjects sharedCommunicationObjects,
294295
HealthMetrics healthMetrics) {
295-
this.queue = new MpscBlockingConsumerArrayQueue<>(bufferSize);
296+
this.queue = Queues.mpscBlockingConsumerArrayQueue(bufferSize);
296297
this.worker = newAgentThread(TRACE_MONITOR, new Worker());
297298
this.timeSource = timeSource;
298299
boolean runningSpansEnabled = config.isLongRunningTraceEnabled();

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
import datadog.trace.core.DDSpan;
3030
import datadog.trace.core.DDTraceCoreInfo;
3131
import datadog.trace.util.AgentTaskScheduler;
32-
import datadog.trace.util.queue.MpscArrayQueue;
32+
import datadog.trace.util.queue.BaseQueue;
33+
import datadog.trace.util.queue.Queues;
3334
import java.util.Collections;
3435
import java.util.HashMap;
3536
import java.util.Iterator;
@@ -53,7 +54,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
5354
new StatsPoint(DataStreamsTags.EMPTY, 0, 0, 0, 0, 0, 0, 0, null);
5455

5556
private final Map<Long, Map<String, StatsBucket>> timeToBucket = new HashMap<>();
56-
private final MpscArrayQueue<InboxItem> inbox = new MpscArrayQueue<>(1024);
57+
private final BaseQueue<InboxItem> inbox = Queues.mpscArrayQueue(1024);
5758
private final DatastreamsPayloadWriter payloadWriter;
5859
private final DDAgentFeaturesDiscovery features;
5960
private final TimeSource timeSource;

0 commit comments

Comments
 (0)