Skip to content

Commit e9206da

Browse files
committed
try to improve the fairness
1 parent 71efc41 commit e9206da

File tree

12 files changed

+153
-230
lines changed

12 files changed

+153
-230
lines changed

dd-java-agent/agent-llmobs/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ dependencies {
1717
implementation project(':communication')
1818
implementation project(':components:json')
1919
implementation project(':internal-api')
20-
api project(':internal-api:internal-api-9')
20+
implementation project(':internal-api:internal-api-9')
2121

2222

2323
testImplementation project(':dd-java-agent:testing')

internal-api/build.gradle.kts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,6 @@ dependencies {
281281
// it contains annotations that are also present in the instrumented application classes
282282
api("com.datadoghq:dd-javac-plugin-client:0.2.2")
283283

284-
jmhImplementation(libs.jctools)
285-
286284
testImplementation("org.snakeyaml:snakeyaml-engine:2.9")
287285
testImplementation(project(":utils:test-utils"))
288286
testImplementation(libs.bundles.junit5)

internal-api/internal-api-9/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ val minimumInstructionCoverage by extra(0.8)
3939

4040
dependencies {
4141
api(project(":internal-api"))
42-
api(libs.jctools) // probably the Queues factory should be moved away from there
42+
implementation(libs.jctools) // probably the Queues factory should be moved away from there
4343

4444
testImplementation(project(":dd-java-agent:testing"))
4545
testImplementation(libs.slf4j)

internal-api/internal-api-9/src/jmh/java/datadog/trace/util/stacktrace/queue/SPMCQueueBenchmark.java renamed to internal-api/internal-api-9/src/jmh/java/datadog/trace/util/queue/MPSCBlockingConsumerQueueBenchmark.java

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
package datadog.trace.util.stacktrace.queue;
1+
package datadog.trace.util.queue;
22

3-
import datadog.trace.util.queue.SpmcArrayQueueVarHandle;
43
import java.util.concurrent.CountDownLatch;
54
import java.util.concurrent.TimeUnit;
65
import org.openjdk.jmh.annotations.Benchmark;
@@ -20,58 +19,62 @@
2019
import org.openjdk.jmh.infra.Blackhole;
2120

2221
/*
23-
MPSCQueueBenchmark.queueTest 1024 thrpt 145.261 ops/us
24-
MPSCQueueBenchmark.queueTest:consume 1024 thrpt 84.185 ops/us
25-
MPSCQueueBenchmark.queueTest:produce 1024 thrpt 61.076 ops/us
26-
MPSCQueueBenchmark.queueTest 65536 thrpt 187.609 ops/us
27-
MPSCQueueBenchmark.queueTest:consume 65536 thrpt 117.097 ops/us
28-
MPSCQueueBenchmark.queueTest:produce 65536 thrpt 70.512 ops/us
29-
*/
22+
Benchmark (capacity) Mode Cnt Score Error Units
23+
MPSCBlockingConsumerQueueBenchmark.queueTest 1024 thrpt 121.534 ops/us
24+
MPSCBlockingConsumerQueueBenchmark.queueTest:async 1024 thrpt NaN ---
25+
MPSCBlockingConsumerQueueBenchmark.queueTest:consume 1024 thrpt 110.962 ops/us
26+
MPSCBlockingConsumerQueueBenchmark.queueTest:produce 1024 thrpt 10.572 ops/us
27+
MPSCBlockingConsumerQueueBenchmark.queueTest 65536 thrpt 126.856 ops/us
28+
MPSCBlockingConsumerQueueBenchmark.queueTest:async 65536 thrpt NaN ---
29+
MPSCBlockingConsumerQueueBenchmark.queueTest:consume 65536 thrpt 113.213 ops/us
30+
MPSCBlockingConsumerQueueBenchmark.queueTest:produce 65536 thrpt 13.644 ops/us
31+
*/
3032
@BenchmarkMode(Mode.Throughput)
3133
@Warmup(iterations = 1, time = 30)
3234
@Measurement(iterations = 1, time = 30)
3335
@Fork(1)
3436
@OutputTimeUnit(TimeUnit.MICROSECONDS)
3537
@State(Scope.Benchmark)
36-
public class SPMCQueueBenchmark {
38+
public class MPSCBlockingConsumerQueueBenchmark {
3739
@State(Scope.Group)
3840
public static class QueueState {
39-
SpmcArrayQueueVarHandle<Integer> queue;
40-
CountDownLatch producerReady;
41+
MpscBlockingConsumerArrayQueueVarHandle<Integer> queue;
42+
CountDownLatch consumerReady;
4143

4244
@Param({"1024", "65536"})
4345
int capacity;
4446

4547
@Setup(Level.Iteration)
4648
public void setup() {
47-
queue = new SpmcArrayQueueVarHandle<>(capacity);
48-
producerReady = new CountDownLatch(1);
49+
queue = new MpscBlockingConsumerArrayQueueVarHandle<>(capacity);
50+
consumerReady = new CountDownLatch(1);
4951
}
5052
}
5153

5254
@Benchmark
5355
@Group("queueTest")
5456
@GroupThreads(4)
55-
public void consume(QueueState state, Blackhole bh) {
57+
public void produce(QueueState state) {
5658
try {
57-
state.producerReady.await(); // wait until consumer is ready
59+
state.consumerReady.await(); // wait until consumer is ready
5860
} catch (InterruptedException ignored) {
5961
}
60-
Integer v = state.queue.poll();
61-
if (v != null) {
62-
bh.consume(v);
62+
63+
// bounded attempt: try once, then yield if full
64+
boolean offered = state.queue.offer(0);
65+
if (!offered) {
66+
Thread.yield();
6367
}
6468
}
6569

6670
@Benchmark
6771
@Group("queueTest")
6872
@GroupThreads(1)
69-
public void produce(QueueState state) {
70-
state.producerReady.countDown(); // signal consumers can start
71-
// bounded attempt: try once, then yield if full
72-
boolean offered = state.queue.offer(0);
73-
if (!offered) {
74-
Thread.yield();
73+
public void consume(QueueState state, Blackhole bh) {
74+
state.consumerReady.countDown(); // signal producers can start
75+
Integer v = state.queue.poll();
76+
if (v != null) {
77+
bh.consume(v);
7578
}
7679
}
7780
}

internal-api/internal-api-9/src/jmh/java/datadog/trace/util/stacktrace/queue/MPSCQueueBenchmark.java renamed to internal-api/internal-api-9/src/jmh/java/datadog/trace/util/queue/MPSCQueueBenchmark.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
package datadog.trace.util.stacktrace.queue;
1+
package datadog.trace.util.queue;
22

3-
import datadog.trace.util.queue.MpscArrayQueueVarHandle;
43
import java.util.concurrent.CountDownLatch;
54
import java.util.concurrent.TimeUnit;
65
import org.openjdk.jmh.annotations.Benchmark;
@@ -21,15 +20,14 @@
2120

2221
/*
2322
Benchmark (capacity) Mode Cnt Score Error Units
24-
MPSCQueueBenchmark.queueTest 65536 thrpt 208.469 ops/us
25-
MPSCQueueBenchmark.queueTest:async 65536 thrpt NaN ---
26-
MPSCQueueBenchmark.queueTest:consume 65536 thrpt 199.309 ops/us
27-
MPSCQueueBenchmark.queueTest:produce 65536 thrpt 9.161 ops/us
28-
MPSCQueueBenchmark.queueTest 1024 thrpt 195.200 ops/us
23+
MPSCQueueBenchmark.queueTest 1024 thrpt 146.530 ops/us
2924
MPSCQueueBenchmark.queueTest:async 1024 thrpt NaN ---
30-
MPSCQueueBenchmark.queueTest:consume 1024 thrpt 185.929 ops/us
31-
MPSCQueueBenchmark.queueTest:produce 1024 thrpt 9.272 ops/us
32-
*/
25+
MPSCQueueBenchmark.queueTest:consume 1024 thrpt 108.357 ops/us
26+
MPSCQueueBenchmark.queueTest:produce 1024 thrpt 38.172 ops/us
27+
MPSCQueueBenchmark.queueTest 65536 thrpt 179.177 ops/us
28+
MPSCQueueBenchmark.queueTest:async 65536 thrpt NaN ---
29+
MPSCQueueBenchmark.queueTest:consume 65536 thrpt 140.968 ops/us
30+
MPSCQueueBenchmark.queueTest:produce 65536 thrpt 38.209 ops/us */
3331
@BenchmarkMode(Mode.Throughput)
3432
@Warmup(iterations = 1, time = 30)
3533
@Measurement(iterations = 1, time = 30)

internal-api/internal-api-9/src/jmh/java/datadog/trace/util/stacktrace/queue/SPSCQueueBenchmark.java renamed to internal-api/internal-api-9/src/jmh/java/datadog/trace/util/queue/SPSCQueueBenchmark.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
package datadog.trace.util.stacktrace.queue;
1+
package datadog.trace.util.queue;
22

3-
import datadog.trace.util.queue.SpscArrayQueueVarHandle;
43
import java.util.concurrent.TimeUnit;
54
import org.openjdk.jmh.annotations.Benchmark;
65
import org.openjdk.jmh.annotations.BenchmarkMode;

internal-api/internal-api-9/src/main/java/datadog/trace/util/queue/BlockingConsumerNonBlockingQueue.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,7 @@
44
import javax.annotation.Nonnull;
55

66
public interface BlockingConsumerNonBlockingQueue<E> extends NonBlockingQueue<E> {
7-
boolean offer(E e, long timeout, @Nonnull TimeUnit unit) throws InterruptedException;
8-
97
E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedException;
108

11-
void put(E e) throws InterruptedException;
12-
139
E take() throws InterruptedException;
1410
}

internal-api/internal-api-9/src/main/java/datadog/trace/util/queue/JctoolsMpscBlockingConsumerWrappedQueue.java

Lines changed: 1 addition & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.util.concurrent.BlockingQueue;
44
import java.util.concurrent.TimeUnit;
5-
import java.util.concurrent.locks.LockSupport;
65
import javax.annotation.Nonnull;
76
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
87

@@ -17,98 +16,13 @@ public JctoolsMpscBlockingConsumerWrappedQueue(
1716
this.blockingQueueDelegate = delegate;
1817
}
1918

20-
@Override
21-
public void put(E e) throws InterruptedException {
22-
blockingQueueDelegate.put(e);
23-
}
24-
2519
@Override
2620
public E take() throws InterruptedException {
2721
return blockingQueueDelegate.take();
2822
}
2923

30-
/**
31-
* Timed offer with progressive backoff.
32-
*
33-
* <p>Tries to insert an element into the queue within the given timeout. Uses a spin → yield →
34-
* park backoff strategy to reduce CPU usage under contention.
35-
*
36-
* @param e the element to insert
37-
* @param timeout maximum time to wait
38-
* @param unit time unit of timeout
39-
* @return {@code true} if inserted, {@code false} if timeout expires
40-
* @throws InterruptedException if interrupted while waiting
41-
*/
42-
@Override
43-
public boolean offer(E e, long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
44-
final long deadline = System.nanoTime() + unit.toNanos(timeout);
45-
int idleCount = 0;
46-
47-
while (true) {
48-
if (offer(e)) {
49-
return true; // successfully inserted
50-
}
51-
52-
long remaining = deadline - System.nanoTime();
53-
if (remaining <= 0) {
54-
return false; // timeout
55-
}
56-
57-
// Progressive backoff
58-
if (idleCount < 100) {
59-
// spin (busy-wait)
60-
} else if (idleCount < 1_000) {
61-
Thread.yield(); // give up CPU to other threads
62-
} else {
63-
// park for a short duration, up to 1 ms
64-
LockSupport.parkNanos(Math.min(remaining, 1_000_000L));
65-
}
66-
67-
idleCount++;
68-
69-
if (Thread.interrupted()) {
70-
throw new InterruptedException();
71-
}
72-
}
73-
}
74-
75-
/**
76-
* Polls with a timeout using progressive backoff.
77-
*
78-
* @param timeout max wait time
79-
* @param unit time unit
80-
* @return the head element, or null if timed out
81-
* @throws InterruptedException if interrupted
82-
*/
8324
@Override
8425
public E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
85-
final long deadline = System.nanoTime() + unit.toNanos(timeout);
86-
int idleCount = 0;
87-
88-
while (true) {
89-
E value = poll();
90-
if (value != null) {
91-
return value;
92-
}
93-
94-
long remaining = deadline - System.nanoTime();
95-
if (remaining <= 0) {
96-
return null;
97-
}
98-
99-
// Progressive backoff
100-
if (idleCount < 100) {
101-
// spin
102-
} else if (idleCount < 1_000) {
103-
Thread.yield();
104-
} else {
105-
LockSupport.parkNanos(Math.min(remaining, 1_000_000L));
106-
}
107-
idleCount++;
108-
109-
if (Thread.interrupted()) {
110-
throw new InterruptedException();
111-
}
112-
}
26+
return blockingQueueDelegate.poll(timeout, unit);
11327
}
11428
}

internal-api/internal-api-9/src/main/java/datadog/trace/util/queue/MpscArrayQueueVarHandle.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.lang.invoke.MethodHandles.Lookup;
55
import java.lang.invoke.VarHandle;
66
import java.util.Objects;
7+
import java.util.concurrent.ThreadLocalRandom;
78
import java.util.concurrent.locks.LockSupport;
89

910
/**
@@ -85,23 +86,6 @@ public boolean offer(E e) {
8586
// jctools does the same local copy to have the jitter optimise the accesses
8687
final Object[] localBuffer = this.buffer;
8788

88-
// depending on the thread id, choose a different backoff strategy.
89-
// Note: it reduces fairness but also the contention on the cas.
90-
boolean s0 = false, s1 = false, s2 = false;
91-
switch ((int) (Thread.currentThread().getId() & 3)) {
92-
case 0:
93-
s0 = true;
94-
break;
95-
case 1:
96-
s1 = true;
97-
break;
98-
case 2:
99-
s2 = true;
100-
break;
101-
default:
102-
break;
103-
}
104-
10589
long localProducerLimit = (long) PRODUCER_LIMIT_HANDLE.getVolatile(this);
10690
long cachedHead = 0L; // Local cache of head to reduce volatile reads
10791

@@ -114,7 +98,9 @@ public boolean offer(E e) {
11498
cachedHead = (long) HEAD_HANDLE.getVolatile(this);
11599
localProducerLimit = cachedHead + capacity;
116100

117-
if (currentTail >= localProducerLimit) return false; // queue full
101+
if (currentTail >= localProducerLimit) {
102+
return false; // queue full
103+
}
118104

119105
// Update producerLimit so other producers also benefit
120106
PRODUCER_LIMIT_HANDLE.setVolatile(this, localProducerLimit);
@@ -130,9 +116,20 @@ public boolean offer(E e) {
130116
}
131117

132118
// Backoff to reduce contention
133-
if (s0) Thread.onSpinWait();
134-
else if (s1) Thread.yield();
135-
else if (s2) LockSupport.parkNanos(1);
119+
switch (ThreadLocalRandom.current().nextInt(0, 4)) {
120+
case 0:
121+
Thread.yield();
122+
break;
123+
case 1:
124+
LockSupport.parkNanos(1);
125+
break;
126+
case 2:
127+
Thread.onSpinWait();
128+
break;
129+
default:
130+
// busy spin
131+
break;
132+
}
136133
}
137134
}
138135

@@ -151,7 +148,9 @@ public E poll() {
151148

152149
// Acquire-load ensures visibility of producer write
153150
Object value = ARRAY_HANDLE.getAcquire(localBuffer, index);
154-
if (value == null) return null;
151+
if (value == null) {
152+
return null;
153+
}
155154

156155
// Clear the slot without additional fence
157156
ARRAY_HANDLE.setOpaque(localBuffer, index, null);

0 commit comments

Comments
 (0)