Skip to content

Commit e80998f

Browse files
committed
Refactor
1 parent fcfc767 commit e80998f

File tree

6 files changed

+231
-337
lines changed

6 files changed

+231
-337
lines changed

dd-trace-core/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ dependencies {
7979

8080
implementation group: 'com.google.re2j', name: 're2j', version: '1.7'
8181

82+
jmhImplementation(libs.jctools)
83+
8284
// We have autoservices defined in test subtree, looks like we need this to be able to properly rebuild this
8385
testAnnotationProcessor libs.autoservice.processor
8486
testCompileOnly libs.autoservice.annotation
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package datadog.trace.util.queue;
2+
3+
import static datadog.trace.util.BitUtils.nextPowerOfTwo;
4+
5+
import java.util.AbstractQueue;
6+
import java.util.Iterator;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.locks.LockSupport;
9+
import java.util.function.Consumer;
10+
import java.util.function.Supplier;
11+
import javax.annotation.Nonnull;
12+
13+
public abstract class BaseQueue<E> extends AbstractQueue<E> {
14+
/** The capacity of the queue (must be a power of two) */
15+
protected final int capacity;
16+
17+
/** Mask for fast modulo operation (index = pos & mask) */
18+
protected final int mask;
19+
20+
public BaseQueue(int capacity) {
21+
this.capacity = nextPowerOfTwo(capacity);
22+
this.mask = this.capacity - 1;
23+
}
24+
25+
/**
26+
* Timed offer with progressive backoff.
27+
*
28+
* <p>Tries to insert an element into the queue within the given timeout. Uses a spin → yield →
29+
* park backoff strategy to reduce CPU usage under contention.
30+
*
31+
* @param e the element to insert
32+
* @param timeout maximum time to wait
33+
* @param unit time unit of timeout
34+
* @return {@code true} if inserted, {@code false} if timeout expires
35+
* @throws InterruptedException if interrupted while waiting
36+
*/
37+
public boolean offer(E e, long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
38+
final long deadline = System.nanoTime() + unit.toNanos(timeout);
39+
int idleCount = 0;
40+
41+
while (true) {
42+
if (offer(e)) {
43+
return true; // successfully inserted
44+
}
45+
46+
long remaining = deadline - System.nanoTime();
47+
if (remaining <= 0) {
48+
return false; // timeout
49+
}
50+
51+
// Progressive backoff
52+
if (idleCount < 100) {
53+
// spin (busy-wait)
54+
} else if (idleCount < 1_000) {
55+
Thread.yield(); // give up CPU to other threads
56+
} else {
57+
// park for a short duration, up to 1 ms
58+
LockSupport.parkNanos(Math.min(remaining, 1_000_000L));
59+
}
60+
61+
idleCount++;
62+
63+
if (Thread.interrupted()) {
64+
throw new InterruptedException();
65+
}
66+
}
67+
}
68+
69+
/**
70+
* Polls with a timeout using progressive backoff.
71+
*
72+
* @param timeout max wait time
73+
* @param unit time unit
74+
* @return the head element, or null if timed out
75+
* @throws InterruptedException if interrupted
76+
*/
77+
public E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
78+
final long deadline = System.nanoTime() + unit.toNanos(timeout);
79+
int idleCount = 0;
80+
81+
while (true) {
82+
E value = poll();
83+
if (value != null) {
84+
return value;
85+
}
86+
87+
long remaining = deadline - System.nanoTime();
88+
if (remaining <= 0) {
89+
return null;
90+
}
91+
92+
// Progressive backoff
93+
if (idleCount < 100) {
94+
// spin
95+
} else if (idleCount < 1_000) {
96+
Thread.yield();
97+
} else {
98+
LockSupport.parkNanos(Math.min(remaining, 1_000_000L));
99+
}
100+
idleCount++;
101+
102+
if (Thread.interrupted()) {
103+
throw new InterruptedException();
104+
}
105+
}
106+
}
107+
108+
/**
109+
* Drains all available elements from the queue to a consumer.
110+
*
111+
* <p>This is efficient since it avoids repeated size() checks and returns immediately when empty.
112+
*
113+
* @param consumer a consumer to accept elements
114+
* @return number of elements drained
115+
*/
116+
public int drain(Consumer<E> consumer) {
117+
return drain(consumer, Integer.MAX_VALUE);
118+
}
119+
120+
/**
121+
* Drains up to {@code limit} elements from the queue to a consumer.
122+
*
123+
* <p>This method is useful for batch processing.
124+
*
125+
* <p>Each element is removed atomically using poll() and passed to the consumer.
126+
*
127+
* @param consumer a consumer to accept elements
128+
* @param limit maximum number of elements to drain
129+
* @return number of elements drained
130+
*/
131+
public int drain(Consumer<E> consumer, int limit) {
132+
int count = 0;
133+
E e;
134+
while (count < limit && (e = poll()) != null) {
135+
consumer.accept(e);
136+
count++;
137+
}
138+
return count;
139+
}
140+
141+
/**
142+
* Fills the queue with elements provided by the supplier until either: - the queue is full, or -
143+
* the supplier runs out of elements (returns null)
144+
*
145+
* @param supplier a supplier of elements
146+
* @param limit maximum number of elements to attempt to insert
147+
* @return number of elements successfully enqueued
148+
*/
149+
public int fill(@Nonnull Supplier<? extends E> supplier, int limit) {
150+
if (limit <= 0) {
151+
return 0;
152+
}
153+
154+
int added = 0;
155+
while (added < limit) {
156+
E e = supplier.get();
157+
if (e == null) {
158+
break; // stop if supplier exhausted
159+
}
160+
161+
if (offer(e)) {
162+
added++;
163+
} else {
164+
break; // queue is full
165+
}
166+
}
167+
return added;
168+
}
169+
170+
/**
171+
* Iterator is not supported.
172+
*
173+
* @throws UnsupportedOperationException always
174+
*/
175+
@Override
176+
public Iterator<E> iterator() {
177+
throw new UnsupportedOperationException();
178+
}
179+
180+
/**
181+
* Returns the remaining capacity.
182+
*
183+
* @return number of additional elements this queue can accept
184+
*/
185+
public int remainingCapacity() {
186+
return capacity - size();
187+
}
188+
189+
/**
190+
* Returns the maximum queue capacity.
191+
*
192+
* @return number of total elements this queue can accept
193+
*/
194+
public int capacity() {
195+
return capacity;
196+
}
197+
}

0 commit comments

Comments
 (0)