Skip to content

Commit 8371a1b

Browse files
committed
Try to optimise more java 9+ implementation
1 parent 89f0f33 commit 8371a1b

File tree

11 files changed

+841
-126
lines changed

11 files changed

+841
-126
lines changed
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package datadog.trace.util.queue;
2+
3+
import java.util.Collection;
4+
import java.util.Iterator;
5+
import java.util.concurrent.TimeUnit;
6+
import java.util.concurrent.locks.LockSupport;
7+
import java.util.function.Consumer;
8+
import java.util.function.Supplier;
9+
import javax.annotation.Nonnull;
10+
11+
public abstract class BaseQueueVarHandle<E> extends BaseQueue<E> {
12+
public BaseQueueVarHandle(int capacity) {
13+
super(capacity);
14+
}
15+
16+
/**
17+
* Timed offer with progressive backoff.
18+
*
19+
* <p>Tries to insert an element into the queue within the given timeout. Uses a spin → yield →
20+
* park backoff strategy to reduce CPU usage under contention.
21+
*
22+
* @param e the element to insert
23+
* @param timeout maximum time to wait
24+
* @param unit time unit of timeout
25+
* @return {@code true} if inserted, {@code false} if timeout expires
26+
* @throws InterruptedException if interrupted while waiting
27+
*/
28+
public boolean offer(E e, long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
29+
if (e == null) {
30+
throw new NullPointerException();
31+
}
32+
final long deadline = System.nanoTime() + unit.toNanos(timeout);
33+
int idle = 0;
34+
35+
while (true) {
36+
if (offer(e)) return true;
37+
38+
long remaining = deadline - System.nanoTime();
39+
if (remaining <= 0) return false;
40+
41+
// Progressive backoff
42+
if (idle < 100) {
43+
// spin
44+
} else if (idle < 1_000) {
45+
Thread.yield();
46+
} else {
47+
LockSupport.parkNanos(1_000L);
48+
}
49+
50+
if (Thread.interrupted()) {
51+
throw new InterruptedException();
52+
}
53+
idle++;
54+
}
55+
}
56+
57+
/**
58+
* Polls with a timeout using progressive backoff.
59+
*
60+
* @param timeout max wait time
61+
* @param unit time unit
62+
* @return the head element, or null if timed out
63+
* @throws InterruptedException if interrupted
64+
*/
65+
public E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
66+
if (timeout <= 0) {
67+
return poll();
68+
}
69+
70+
final long deadline = System.nanoTime() + unit.toNanos(timeout);
71+
int idleCount = 0;
72+
73+
while (true) {
74+
E e = poll();
75+
if (e != null) return e;
76+
77+
long remaining = deadline - System.nanoTime();
78+
if (remaining <= 0) return null;
79+
80+
if (idleCount < 100) {
81+
// spin
82+
} else if (idleCount < 1_000) {
83+
Thread.yield();
84+
} else {
85+
LockSupport.parkNanos(1_000L);
86+
}
87+
88+
if (Thread.interrupted()) {
89+
throw new InterruptedException();
90+
}
91+
idleCount++;
92+
}
93+
}
94+
95+
/**
96+
* Drains all available elements from the queue to a consumer.
97+
*
98+
* <p>This is efficient since it avoids repeated size() checks and returns immediately when empty.
99+
*
100+
* @param consumer a consumer to accept elements
101+
* @return number of elements drained
102+
*/
103+
public int drain(Consumer<E> consumer) {
104+
return drain(consumer, Integer.MAX_VALUE);
105+
}
106+
107+
/**
108+
* Drains up to {@code limit} elements from the queue to a consumer.
109+
*
110+
* <p>This method is useful for batch processing.
111+
*
112+
* <p>Each element is removed atomically using poll() and passed to the consumer.
113+
*
114+
* @param consumer a consumer to accept elements
115+
* @param limit maximum number of elements to drain
116+
* @return number of elements drained
117+
*/
118+
public int drain(Consumer<E> consumer, int limit) {
119+
int count = 0;
120+
E e;
121+
while (count < limit && (e = poll()) != null) {
122+
consumer.accept(e);
123+
count++;
124+
}
125+
return count;
126+
}
127+
128+
/**
129+
* Fills the queue with elements provided by the supplier until either: - the queue is full, or -
130+
* the supplier runs out of elements (returns null)
131+
*
132+
* @param supplier a supplier of elements
133+
* @param limit maximum number of elements to attempt to insert
134+
* @return number of elements successfully enqueued
135+
*/
136+
public int fill(@Nonnull Supplier<? extends E> supplier, int limit) {
137+
if (limit <= 0) {
138+
return 0;
139+
}
140+
141+
int added = 0;
142+
while (added < limit) {
143+
E e = supplier.get();
144+
if (e == null) {
145+
break; // stop if supplier exhausted
146+
}
147+
148+
if (offer(e)) {
149+
added++;
150+
} else {
151+
break; // queue is full
152+
}
153+
}
154+
return added;
155+
}
156+
157+
/**
158+
* Iterator is not supported.
159+
*
160+
* @throws UnsupportedOperationException always
161+
*/
162+
@Override
163+
public Iterator<E> iterator() {
164+
throw new UnsupportedOperationException();
165+
}
166+
167+
/**
168+
* Returns the remaining capacity.
169+
*
170+
* @return number of additional elements this queue can accept
171+
*/
172+
public int remainingCapacity() {
173+
return capacity - size();
174+
}
175+
176+
/**
177+
* Returns the maximum queue capacity.
178+
*
179+
* @return number of total elements this queue can accept
180+
*/
181+
public int capacity() {
182+
return capacity;
183+
}
184+
185+
@Override
186+
public void put(E e) throws InterruptedException {
187+
throw new UnsupportedOperationException("Not implementing blocking operations for producers");
188+
}
189+
190+
@Override
191+
public E take() throws InterruptedException {
192+
throw new UnsupportedOperationException("Not implementing blocking operations for consumers");
193+
}
194+
195+
@Override
196+
public int drainTo(Collection<? super E> c) {
197+
return drainTo(c, Integer.MAX_VALUE);
198+
}
199+
200+
@Override
201+
public int drainTo(Collection<? super E> c, int maxElements) {
202+
return drain(c::add, maxElements);
203+
}
204+
}

internal-api/internal-api-9/src/main/java/datadog/trace/util/queue/MpscArrayQueueVarHandle.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -80,22 +80,22 @@ public boolean offer(E e) {
8080
}
8181

8282
while (true) {
83-
long currentTail = (long) TAIL_HANDLE.getVolatile(this);
84-
int index = (int) (currentTail & mask);
83+
final long currentTail = (long) TAIL_HANDLE.getVolatile(this);
84+
final long wrapPoint = currentTail - capacity;
85+
final long currentHead = (long) HEAD_HANDLE.getVolatile(this);
8586

86-
Object existing = ARRAY_HANDLE.getVolatile(buffer, index);
87-
if (existing != null) {
88-
return false; // queue full
87+
if (wrapPoint >= currentHead) {
88+
return false; // full
8989
}
9090

91-
// CAS tail to claim the slot
9291
if (TAIL_HANDLE.compareAndSet(this, currentTail, currentTail + 1)) {
93-
ARRAY_HANDLE.setRelease(buffer, index, e); // publish with release semantics
92+
final int index = (int) (currentTail & mask);
93+
ARRAY_HANDLE.setRelease(buffer, index, e);
9494
return true;
9595
}
9696

97-
// CAS failed → short backoff to reduce contention
98-
LockSupport.parkNanos(1);
97+
// Backoff on contention
98+
LockSupport.parkNanos(1L);
9999
}
100100
}
101101

@@ -110,30 +110,30 @@ public boolean offer(E e) {
110110
@Override
111111
@SuppressWarnings("unchecked")
112112
public E poll() {
113-
long currentHead = (long) HEAD_HANDLE.getVolatile(this);
114-
int index = (int) (currentHead & mask);
113+
final long currentHead = (long) HEAD_HANDLE.getOpaque(this);
114+
final int index = (int) (currentHead & mask);
115115

116-
Object value = ARRAY_HANDLE.getVolatile(buffer, index);
116+
Object value = ARRAY_HANDLE.getAcquire(buffer, index);
117117
if (value == null) {
118-
return null; // empty
118+
return null;
119119
}
120120

121-
ARRAY_HANDLE.setRelease(buffer, index, null); // mark slot free
122-
HEAD_HANDLE.setVolatile(this, currentHead + 1); // advance head
121+
ARRAY_HANDLE.setOpaque(buffer, index, null); // clear slot
122+
HEAD_HANDLE.setOpaque(this, currentHead + 1);
123123
return (E) value;
124124
}
125125

126126
@Override
127127
@SuppressWarnings("unchecked")
128128
public E peek() {
129-
int index = (int) ((long) HEAD_HANDLE.getVolatile(this) & mask);
129+
final int index = (int) ((long) HEAD_HANDLE.getOpaque(this) & mask);
130130
return (E) ARRAY_HANDLE.getVolatile(buffer, index);
131131
}
132132

133133
@Override
134134
public int size() {
135+
long currentHead = head; // non-volatile read
135136
long currentTail = (long) TAIL_HANDLE.getVolatile(this);
136-
long currentHead = (long) HEAD_HANDLE.getVolatile(this);
137137
return (int) (currentTail - currentHead);
138138
}
139139
}

0 commit comments

Comments
 (0)