|
3 | 3 | import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; |
4 | 4 |
|
5 | 5 | import java.util.ArrayList; |
| 6 | +import java.util.Iterator; |
6 | 7 | import java.util.List; |
7 | | -import java.util.concurrent.ConcurrentLinkedQueue; |
8 | 8 | import java.util.concurrent.locks.ReentrantLock; |
9 | 9 | import java.util.function.BooleanSupplier; |
10 | 10 |
|
@@ -33,7 +33,7 @@ private enum State { |
33 | 33 |
|
34 | 34 | private final ReentrantLock internalLock = new ReentrantLock(); |
35 | 35 |
|
36 | | - private final ConcurrentLinkedQueue<UniSubscriber<? super I>> awaiters = new ConcurrentLinkedQueue<>(); |
| 36 | + private final List<UniSubscriber<? super I>> awaiters = new ArrayList<>(); |
37 | 37 |
|
38 | 38 | private Object cachedResult = null; |
39 | 39 |
|
@@ -118,12 +118,17 @@ public void onItem(I item) { |
118 | 118 | } |
119 | 119 |
|
120 | 120 | private List<UniSubscriber<? super I>> gatherAwaiters() { |
121 | | - return new ArrayList<>(awaiters); |
| 121 | + ArrayList<UniSubscriber<? super I>> copy = new ArrayList<>(awaiters); |
| 122 | + awaiters.clear(); |
| 123 | + return copy; |
122 | 124 | } |
123 | 125 |
|
124 | 126 | private void notifyAwaiters(List<UniSubscriber<? super I>> toNotify, Object result) { |
125 | | - for (UniSubscriber<? super I> awaiter : toNotify) { |
| 127 | + Iterator<UniSubscriber<? super I>> iterator = toNotify.iterator(); |
| 128 | + while (iterator.hasNext()) { |
| 129 | + UniSubscriber<? super I> awaiter = iterator.next(); |
126 | 130 | forwardTo(awaiter, result); |
| 131 | + iterator.remove(); |
127 | 132 | } |
128 | 133 | } |
129 | 134 |
|
@@ -166,7 +171,12 @@ private class MemoizedSubscription implements UniSubscription { |
166 | 171 |
|
167 | 172 | @Override |
168 | 173 | public void cancel() { |
169 | | - awaiters.remove(subscriber); |
| 174 | + internalLock.lock(); |
| 175 | + try { |
| 176 | + awaiters.remove(subscriber); |
| 177 | + } finally { |
| 178 | + internalLock.unlock(); |
| 179 | + } |
170 | 180 | } |
171 | 181 | } |
172 | 182 | } |
0 commit comments