Skip to content

Commit 89a7668

Browse files
committed
Try to optimise varhandle version further
1 parent 161e053 commit 89a7668

File tree

8 files changed

+257
-305
lines changed

8 files changed

+257
-305
lines changed

internal-api/internal-api-9/src/jmh/java/datadog/trace/util/stacktrace/queue/MPSCQueueBenchmark.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
import org.openjdk.jmh.infra.Blackhole;
2121

2222
/*
23-
MPSCQueueBenchmark.queueTest 1024 thrpt 145.261 ops/us
24-
MPSCQueueBenchmark.queueTest:consume 1024 thrpt 84.185 ops/us
25-
MPSCQueueBenchmark.queueTest:produce 1024 thrpt 61.076 ops/us
26-
MPSCQueueBenchmark.queueTest 65536 thrpt 187.609 ops/us
27-
MPSCQueueBenchmark.queueTest:consume 65536 thrpt 117.097 ops/us
28-
MPSCQueueBenchmark.queueTest:produce 65536 thrpt 70.512 ops/us
23+
Benchmark (capacity) Mode Cnt Score Error Units
24+
MPSCQueueBenchmark.queueTest 65536 thrpt 208.469 ops/us
25+
MPSCQueueBenchmark.queueTest:async 65536 thrpt NaN ---
26+
MPSCQueueBenchmark.queueTest:consume 65536 thrpt 199.309 ops/us
27+
MPSCQueueBenchmark.queueTest:produce 65536 thrpt 9.161 ops/us
28+
MPSCQueueBenchmark.queueTest 1024 thrpt 195.200 ops/us
29+
MPSCQueueBenchmark.queueTest:async 1024 thrpt NaN ---
30+
MPSCQueueBenchmark.queueTest:consume 1024 thrpt 185.929 ops/us
31+
MPSCQueueBenchmark.queueTest:produce 1024 thrpt 9.272 ops/us
2932
*/
3033
@BenchmarkMode(Mode.Throughput)
3134
@Warmup(iterations = 1, time = 30)
Lines changed: 43 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,83 +1,77 @@
11
package datadog.trace.util.stacktrace.queue;
22

33
import datadog.trace.util.queue.SpmcArrayQueueVarHandle;
4+
import java.util.concurrent.CountDownLatch;
45
import java.util.concurrent.TimeUnit;
5-
import java.util.concurrent.atomic.AtomicInteger;
6-
import java.util.concurrent.locks.LockSupport;
76
import org.openjdk.jmh.annotations.Benchmark;
87
import org.openjdk.jmh.annotations.BenchmarkMode;
98
import org.openjdk.jmh.annotations.Fork;
109
import org.openjdk.jmh.annotations.Group;
1110
import org.openjdk.jmh.annotations.GroupThreads;
1211
import org.openjdk.jmh.annotations.Level;
12+
import org.openjdk.jmh.annotations.Measurement;
1313
import org.openjdk.jmh.annotations.Mode;
1414
import org.openjdk.jmh.annotations.OutputTimeUnit;
15+
import org.openjdk.jmh.annotations.Param;
1516
import org.openjdk.jmh.annotations.Scope;
1617
import org.openjdk.jmh.annotations.Setup;
1718
import org.openjdk.jmh.annotations.State;
19+
import org.openjdk.jmh.annotations.Warmup;
20+
import org.openjdk.jmh.infra.Blackhole;
1821

1922
/*
20-
SPMCQueueBenchmark.spmc N/A thrpt 5 484.103 ± 64.709 ops/us
21-
SPMCQueueBenchmark.spmc:consumer N/A thrpt 5 466.954 ± 65.712 ops/us
22-
SPMCQueueBenchmark.spmc:producer N/A thrpt 5 17.149 ± 1.541 ops/us
23+
MPSCQueueBenchmark.queueTest 1024 thrpt 145.261 ops/us
24+
MPSCQueueBenchmark.queueTest:consume 1024 thrpt 84.185 ops/us
25+
MPSCQueueBenchmark.queueTest:produce 1024 thrpt 61.076 ops/us
26+
MPSCQueueBenchmark.queueTest 65536 thrpt 187.609 ops/us
27+
MPSCQueueBenchmark.queueTest:consume 65536 thrpt 117.097 ops/us
28+
MPSCQueueBenchmark.queueTest:produce 65536 thrpt 70.512 ops/us
2329
*/
2430
@BenchmarkMode(Mode.Throughput)
25-
@State(Scope.Group)
26-
@Fork(value = 1, warmups = 0)
31+
@Warmup(iterations = 1, time = 30)
32+
@Measurement(iterations = 1, time = 30)
33+
@Fork(1)
2734
@OutputTimeUnit(TimeUnit.MICROSECONDS)
35+
@State(Scope.Benchmark)
2836
public class SPMCQueueBenchmark {
37+
@State(Scope.Group)
38+
public static class QueueState {
39+
SpmcArrayQueueVarHandle<Integer> queue;
40+
CountDownLatch producerReady;
2941

30-
private static final int QUEUE_CAPACITY = 1024;
31-
private static final int ITEMS_TO_PRODUCE = 100_000;
42+
@Param({"1024", "65536"})
43+
int capacity;
3244

33-
private SpmcArrayQueueVarHandle<Integer> queue;
34-
private AtomicInteger produced;
35-
private AtomicInteger consumed;
36-
37-
@Setup(Level.Iteration)
38-
public void setup() {
39-
queue = new SpmcArrayQueueVarHandle<>(QUEUE_CAPACITY);
40-
produced = new AtomicInteger(0);
41-
consumed = new AtomicInteger(0);
42-
43-
// Pre-fill queue for warmup safety
44-
int warmupFill = Math.min(QUEUE_CAPACITY / 2, ITEMS_TO_PRODUCE);
45-
for (int i = 0; i < warmupFill; i++) {
46-
queue.offer(i);
47-
produced.incrementAndGet();
45+
@Setup(Level.Iteration)
46+
public void setup() {
47+
queue = new SpmcArrayQueueVarHandle<>(capacity);
48+
producerReady = new CountDownLatch(1);
4849
}
4950
}
5051

51-
// Single producer in the group
5252
@Benchmark
53-
@Group("spmc")
54-
@GroupThreads(1)
55-
public void producer() {
56-
int i = produced.getAndIncrement();
57-
if (i < ITEMS_TO_PRODUCE) {
58-
while (!queue.offer(i)) {
59-
LockSupport.parkNanos(1L);
60-
}
53+
@Group("queueTest")
54+
@GroupThreads(4)
55+
public void consume(QueueState state, Blackhole bh) {
56+
try {
57+
state.producerReady.await(); // wait until consumer is ready
58+
} catch (InterruptedException ignored) {
59+
}
60+
Integer v = state.queue.poll();
61+
if (v != null) {
62+
bh.consume(v);
6163
}
6264
}
6365

64-
// Multiple consumers in the group
6566
@Benchmark
66-
@Group("spmc")
67-
@GroupThreads(4) // adjust number of consumers
68-
public int consumer() {
69-
while (true) {
70-
Integer val = queue.poll();
71-
if (val != null) {
72-
consumed.incrementAndGet();
73-
return val;
74-
}
75-
76-
if (produced.get() >= ITEMS_TO_PRODUCE && queue.isEmpty()) {
77-
return 0;
78-
}
79-
80-
LockSupport.parkNanos(1L);
67+
@Group("queueTest")
68+
@GroupThreads(1)
69+
public void produce(QueueState state) {
70+
state.producerReady.countDown(); // signal consumers can start
71+
// bounded attempt: try once, then yield if full
72+
boolean offered = state.queue.offer(0);
73+
if (!offered) {
74+
Thread.yield();
8175
}
8276
}
8377
}

internal-api/internal-api-9/src/main/java/datadog/trace/util/queue/MpscArrayQueueVarHandle.java

Lines changed: 89 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.lang.invoke.MethodHandles;
44
import java.lang.invoke.MethodHandles.Lookup;
55
import java.lang.invoke.VarHandle;
6+
import java.util.Objects;
67
import java.util.concurrent.locks.LockSupport;
78

89
/**
@@ -17,13 +18,16 @@ public class MpscArrayQueueVarHandle<E> extends BaseQueue<E> {
1718
private static final VarHandle ARRAY_HANDLE;
1819
private static final VarHandle HEAD_HANDLE;
1920
private static final VarHandle TAIL_HANDLE;
21+
private static final VarHandle PRODUCER_LIMIT_HANDLE;
2022

2123
static {
2224
try {
2325
final Lookup lookup = MethodHandles.lookup();
2426
TAIL_HANDLE = lookup.findVarHandle(MpscArrayQueueVarHandle.class, "tail", long.class);
2527
HEAD_HANDLE = lookup.findVarHandle(MpscArrayQueueVarHandle.class, "head", long.class);
2628
ARRAY_HANDLE = MethodHandles.arrayElementVarHandle(Object[].class);
29+
PRODUCER_LIMIT_HANDLE =
30+
lookup.findVarHandle(MpscArrayQueueVarHandle.class, "producerLimit", long.class);
2731
} catch (Throwable t) {
2832
throw new IllegalStateException(t);
2933
}
@@ -32,107 +36,156 @@ public class MpscArrayQueueVarHandle<E> extends BaseQueue<E> {
3236
/** The backing array (plain Java array for VarHandle access) */
3337
private final Object[] buffer;
3438

35-
// Padding
39+
// Padding to prevent false sharing
3640
@SuppressWarnings("unused")
3741
private long p0, p1, p2, p3, p4, p5, p6;
3842

39-
/** The next free slot for producers */
43+
/** Next free slot for producers (multi-threaded) */
4044
private volatile long tail = 0L;
4145

42-
// Padding
46+
// Padding around tail
4347
@SuppressWarnings("unused")
4448
private long q0, q1, q2, q3, q4, q5, q6;
4549

46-
// Padding
50+
/** Cached producer limit to reduce volatile head reads */
51+
private volatile long producerLimit = 0L;
52+
53+
// Padding around producerLimit
4754
@SuppressWarnings("unused")
48-
private long p10, p11, p12, p13, p14, p15, p16;
55+
private long r0, r1, r2, r3, r4, r5, r6;
4956

50-
/** The next slot to consume (single-threaded) */
57+
/** Next slot to consume (single-threaded) */
5158
private volatile long head = 0L;
5259

53-
// Padding
54-
private long q10, q11, q12, q13, q14, q15, q16;
60+
// Padding around head
61+
@SuppressWarnings("unused")
62+
private long s0, s1, s2, s3, s4, s5, s6;
5563

5664
/**
5765
* Creates a new MPSC queue.
5866
*
59-
* @param requestedCapacity the desired capacity, rounded up to the next power of two if needed
67+
* @param requestedCapacity the desired capacity, rounded up to next power of two
6068
*/
6169
public MpscArrayQueueVarHandle(int requestedCapacity) {
6270
super(requestedCapacity);
6371
this.buffer = new Object[capacity];
72+
this.producerLimit = capacity;
6473
}
6574

6675
/**
6776
* Attempts to add an element to the queue.
6877
*
69-
* <p>This method uses a CAS loop on {@code tail} to allow multiple producers to safely claim
70-
* distinct slots. The producer then performs a release-store into the buffer using {@code
71-
* ARRAY_HANDLE.setRelease()}.
72-
*
7378
* @param e the element to add (must be non-null)
74-
* @return {@code true} if the element was enqueued, {@code false} if the queue is full
79+
* @return true if element was enqueued, false if queue is full
7580
*/
7681
@Override
7782
public boolean offer(E e) {
78-
if (e == null) {
79-
throw new NullPointerException();
83+
Objects.requireNonNull(e);
84+
85+
// jctools does the same local copy to have the jitter optimise the accesses
86+
final Object[] localBuffer = this.buffer;
87+
88+
// depending on the thread id, choose a different backoff strategy.
89+
// Note: it reduces fairness but also the contention on the cas.
90+
boolean s0 = false, s1 = false, s2 = false;
91+
switch ((int) (Thread.currentThread().getId() & 3)) {
92+
case 0:
93+
s0 = true;
94+
break;
95+
case 1:
96+
s1 = true;
97+
break;
98+
case 2:
99+
s2 = true;
100+
break;
101+
default:
102+
break;
80103
}
81104

105+
long localProducerLimit = (long) PRODUCER_LIMIT_HANDLE.getVolatile(this);
106+
long cachedHead = 0L; // Local cache of head to reduce volatile reads
107+
82108
while (true) {
83-
final long currentTail = (long) TAIL_HANDLE.getVolatile(this);
84-
final long wrapPoint = currentTail - capacity;
85-
final long currentHead = (long) HEAD_HANDLE.getVolatile(this);
109+
long currentTail = (long) TAIL_HANDLE.getVolatile(this);
110+
111+
// Check if producer limit exceeded
112+
if (currentTail >= localProducerLimit) {
113+
// Refresh head only when necessary
114+
cachedHead = (long) HEAD_HANDLE.getVolatile(this);
115+
localProducerLimit = cachedHead + capacity;
86116

87-
if (wrapPoint >= currentHead) {
88-
return false; // full
117+
if (currentTail >= localProducerLimit) return false; // queue full
118+
119+
// Update producerLimit so other producers also benefit
120+
PRODUCER_LIMIT_HANDLE.setVolatile(this, localProducerLimit);
89121
}
90122

123+
// Attempt to claim a slot
91124
if (TAIL_HANDLE.compareAndSet(this, currentTail, currentTail + 1)) {
92125
final int index = (int) (currentTail & mask);
93-
ARRAY_HANDLE.setRelease(buffer, index, e);
126+
127+
// Release-store ensures producer's write is visible to consumer
128+
ARRAY_HANDLE.setRelease(localBuffer, index, e);
94129
return true;
95130
}
96131

97-
// Backoff on contention
98-
LockSupport.parkNanos(1L);
132+
// Backoff to reduce contention
133+
if (s0) Thread.onSpinWait();
134+
else if (s1) Thread.yield();
135+
else if (s2) LockSupport.parkNanos(1);
99136
}
100137
}
101138

102139
/**
103-
* Removes and returns the next element, or {@code null} if the queue is empty.
104-
*
105-
* <p>This method is single-threaded (one consumer). It performs a volatile read of the buffer,
106-
* and then uses {@code setRelease(null)} to free the slot.
140+
* Removes and returns the next element, or null if empty.
107141
*
108-
* @return the dequeued element, or null if the queue is empty
142+
* @return dequeued element, or null if queue empty
109143
*/
110144
@Override
111145
@SuppressWarnings("unchecked")
112146
public E poll() {
113-
final long currentHead = (long) HEAD_HANDLE.getOpaque(this);
147+
final Object[] localBuffer = this.buffer;
148+
149+
long currentHead = (long) HEAD_HANDLE.getOpaque(this);
114150
final int index = (int) (currentHead & mask);
115151

116-
Object value = ARRAY_HANDLE.getAcquire(buffer, index);
117-
if (value == null) {
118-
return null;
119-
}
152+
// Acquire-load ensures visibility of producer write
153+
Object value = ARRAY_HANDLE.getAcquire(localBuffer, index);
154+
if (value == null) return null;
120155

121-
ARRAY_HANDLE.setOpaque(buffer, index, null); // clear slot
156+
// Clear the slot without additional fence
157+
ARRAY_HANDLE.setOpaque(localBuffer, index, null);
158+
159+
// Advance head using opaque write (consumer-only)
122160
HEAD_HANDLE.setOpaque(this, currentHead + 1);
161+
123162
return (E) value;
124163
}
125164

165+
/**
166+
* Returns next element without removing it.
167+
*
168+
* <p>The memory visibility is only correct if the consumer calls it.
169+
*
170+
* @return next element or null if empty
171+
*/
126172
@Override
127173
@SuppressWarnings("unchecked")
128174
public E peek() {
129175
final int index = (int) ((long) HEAD_HANDLE.getOpaque(this) & mask);
130176
return (E) ARRAY_HANDLE.getVolatile(buffer, index);
131177
}
132178

179+
/**
180+
* Returns number of elements in queue.
181+
*
182+
* <p>Volatile reads of tail and head ensure accurate result in multi-threaded context.
183+
*
184+
* @return current size
185+
*/
133186
@Override
134187
public int size() {
135-
long currentHead = head; // non-volatile read
188+
long currentHead = (long) HEAD_HANDLE.getVolatile(this);
136189
long currentTail = (long) TAIL_HANDLE.getVolatile(this);
137190
return (int) (currentTail - currentHead);
138191
}

0 commit comments

Comments
 (0)