Skip to content

Commit 71efc41

Browse files
committed
Use a better time wait poll
1 parent b2c39aa commit 71efc41

File tree

3 files changed

+64
-72
lines changed

3 files changed

+64
-72
lines changed

internal-api/internal-api-9/src/main/java/datadog/trace/util/queue/MpscBlockingConsumerArrayQueueVarHandle.java

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public boolean offer(E e, long timeout, @Nonnull TimeUnit unit) throws Interrupt
9898
}
9999

100100
/**
101-
* Polls with a timeout using progressive backoff.
101+
* Polls with a timeout.
102102
*
103103
* @param timeout max wait time
104104
* @param unit time unit
@@ -107,32 +107,25 @@ public boolean offer(E e, long timeout, @Nonnull TimeUnit unit) throws Interrupt
107107
*/
108108
public E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
109109
final long deadline = System.nanoTime() + unit.toNanos(timeout);
110-
int idleCount = 0;
111110

112-
while (true) {
113-
E value = poll();
114-
if (value != null) {
115-
return value;
116-
}
111+
E e = poll();
112+
if (e != null) {
113+
return e;
114+
}
117115

118-
long remaining = deadline - System.nanoTime();
119-
if (remaining <= 0) {
120-
return null;
121-
}
116+
// register this thread as the waiting consumer
117+
consumerThread = Thread.currentThread();
118+
final long remaining = deadline - System.nanoTime();
122119

123-
// Progressive backoff
124-
if (idleCount < 100) {
125-
// spin
126-
} else if (idleCount < 1_000) {
127-
Thread.yield();
128-
} else {
129-
LockSupport.parkNanos(Math.min(remaining, 1_000_000L));
130-
}
131-
idleCount++;
120+
if (remaining <= 0) {
121+
consumerThread = null;
122+
return null;
123+
}
132124

133-
if (Thread.interrupted()) {
134-
throw new InterruptedException();
135-
}
125+
LockSupport.parkNanos(this, remaining);
126+
if (Thread.interrupted()) {
127+
throw new InterruptedException();
136128
}
129+
return poll();
137130
}
138131
}

internal-api/internal-api-9/src/test/groovy/datadog/trace/util/queue/AbstractQueueTest.groovy

Lines changed: 2 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
package datadog.trace.util.queue
22

3-
import static java.util.concurrent.TimeUnit.NANOSECONDS
43

54
import datadog.trace.test.util.DDSpecification
6-
import java.util.concurrent.TimeUnit
7-
import java.util.concurrent.atomic.AtomicBoolean
85
import java.util.function.Consumer
96

10-
abstract class AbstractQueueTest<T extends AbstractQueue<Integer>> extends DDSpecification {
7+
abstract class AbstractQueueTest<T extends BaseQueue<Integer>> extends DDSpecification {
118
abstract T createQueue(int capacity)
129
protected T queue = createQueue(8)
1310

@@ -103,7 +100,7 @@ abstract class AbstractQueueTest<T extends AbstractQueue<Integer>> extends DDSpe
103100

104101
def "remainingCapacity should reflect current occupancy"() {
105102
given:
106-
def q = new MpscArrayQueue<Integer>(4)
103+
def q = createQueue(4)
107104
q.offer(1)
108105
q.offer(2)
109106

@@ -116,48 +113,4 @@ abstract class AbstractQueueTest<T extends AbstractQueue<Integer>> extends DDSpe
116113
then:
117114
q.remainingCapacity() == 3
118115
}
119-
120-
121-
def "poll with timeout returns null if no element becomes available"() {
122-
when:
123-
def start = System.nanoTime()
124-
def value = queue.poll(200, TimeUnit.MILLISECONDS)
125-
def elapsedMs = NANOSECONDS.toMillis(System.nanoTime() - start)
126-
127-
then:
128-
value == null
129-
elapsedMs >= 200 // waited approximately the timeout
130-
}
131-
132-
def "poll with zero timeout behaves like immediate poll"() {
133-
expect:
134-
queue.poll(0, TimeUnit.MILLISECONDS) == null
135-
136-
when:
137-
queue.offer(99)
138-
139-
then:
140-
queue.poll(0, TimeUnit.MILLISECONDS) == 99
141-
}
142-
143-
def "poll throws InterruptedException if interrupted"() {
144-
given:
145-
def thrown = new AtomicBoolean()
146-
def thread = Thread.start {
147-
try {
148-
queue.poll(500, TimeUnit.MILLISECONDS)
149-
} catch (InterruptedException ie) {
150-
thrown.set(true)
151-
Thread.currentThread().interrupt()
152-
}
153-
}
154-
155-
when:
156-
Thread.sleep(50)
157-
thread.interrupt()
158-
thread.join()
159-
160-
then:
161-
thrown.get()
162-
}
163116
}

internal-api/internal-api-9/src/test/groovy/datadog/trace/util/queue/MpscBlockingConsumerArrayQueueVarHandleTest.groovy

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package datadog.trace.util.queue
22

3+
import static java.util.concurrent.TimeUnit.NANOSECONDS
34

45
import java.util.concurrent.CountDownLatch
6+
import java.util.concurrent.TimeUnit
7+
import java.util.concurrent.atomic.AtomicBoolean
58
import java.util.concurrent.atomic.AtomicReference
69
import java.util.function.Consumer
710
import java.util.function.Supplier
@@ -121,4 +124,47 @@ class MpscBlockingConsumerArrayQueueVarHandleTest extends AbstractQueueTest<Mpsc
121124
filled == 8
122125
queue.size() == 8
123126
}
127+
128+
def "poll with timeout returns null if no element becomes available"() {
129+
when:
130+
def start = System.nanoTime()
131+
def value = queue.poll(200, TimeUnit.MILLISECONDS)
132+
def elapsedMs = NANOSECONDS.toMillis(System.nanoTime() - start)
133+
134+
then:
135+
value == null
136+
elapsedMs >= 200 // waited approximately the timeout
137+
}
138+
139+
def "poll with zero timeout behaves like immediate poll"() {
140+
expect:
141+
queue.poll(0, TimeUnit.MILLISECONDS) == null
142+
143+
when:
144+
queue.offer(99)
145+
146+
then:
147+
queue.poll(0, TimeUnit.MILLISECONDS) == 99
148+
}
149+
150+
def "poll throws InterruptedException if interrupted"() {
151+
given:
152+
def thrown = new AtomicBoolean()
153+
def thread = Thread.start {
154+
try {
155+
queue.poll(500, TimeUnit.MILLISECONDS)
156+
} catch (InterruptedException ie) {
157+
thrown.set(true)
158+
Thread.currentThread().interrupt()
159+
}
160+
}
161+
162+
when:
163+
Thread.sleep(50)
164+
thread.interrupt()
165+
thread.join()
166+
167+
then:
168+
thrown.get()
169+
}
124170
}

0 commit comments

Comments
 (0)