|
2 | 2 |
|
3 | 3 | import java.util.Iterator; |
4 | 4 | import java.util.Map; |
5 | | -import java.util.Queue; |
6 | 5 | import java.util.concurrent.CancellationException; |
7 | 6 | import java.util.concurrent.CompletableFuture; |
8 | 7 | import java.util.concurrent.ConcurrentHashMap; |
9 | 8 | import java.util.concurrent.ConcurrentLinkedDeque; |
10 | | -import java.util.concurrent.ConcurrentLinkedQueue; |
11 | 9 | import java.util.concurrent.atomic.AtomicInteger; |
12 | 10 | import java.util.function.BiConsumer; |
13 | 11 |
|
@@ -55,7 +53,7 @@ public interface Handler<T> { |
55 | 53 | private final AtomicInteger queueSize = new AtomicInteger(); |
56 | 54 |
|
57 | 55 | /** Queue of waiting acquire requests */ |
58 | | - private final Queue<CompletableFuture<T>> waitingAcquires = new ConcurrentLinkedQueue<>(); |
| 56 | + private final ConcurrentLinkedDeque<CompletableFuture<T>> waitingAcquires = new ConcurrentLinkedDeque<>(); |
59 | 57 | /** Size of waiting acquires queue */ |
60 | 58 | private final AtomicInteger waitingAcqueireCount = new AtomicInteger(); |
61 | 59 |
|
@@ -271,16 +269,18 @@ private void checkNextWaitingAcquire() { |
271 | 269 | return; |
272 | 270 | } |
273 | 271 |
|
274 | | - CompletableFuture<T> next = waitingAcquires.peek(); |
| 272 | + CompletableFuture<T> next = waitingAcquires.poll(); |
275 | 273 | while (next != null && next.isDone()) { |
276 | | - waitingAcquires.remove(); |
277 | 274 | waitingAcqueireCount.decrementAndGet(); |
278 | | - next = waitingAcquires.peek(); |
| 275 | + next = waitingAcquires.poll(); |
279 | 276 | } |
280 | 277 |
|
281 | | - if (next != null && tryToCreateNewPending(next)) { |
282 | | - waitingAcquires.remove(); |
283 | | - waitingAcqueireCount.decrementAndGet(); |
| 278 | + if (next != null) { |
| 279 | + if (tryToCreateNewPending(next)) { |
| 280 | + waitingAcqueireCount.decrementAndGet(); |
| 281 | + } else { |
| 282 | + waitingAcquires.offerFirst(next); |
| 283 | + } |
284 | 284 | } |
285 | 285 | } |
286 | 286 |
|
|
0 commit comments