Skip to content

Commit 47be650

Browse files
committed
optimize in-progress handling
1 parent 7abfad8 commit 47be650

File tree

1 file changed

+13
-14
lines changed

1 file changed

+13
-14
lines changed

src/main/java/net/tascalate/concurrent/AsyncCompletions.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
import java.util.Iterator;
2020
import java.util.NoSuchElementException;
2121
import java.util.Set;
22+
import java.util.Spliterator;
2223
import java.util.Spliterators;
2324
import java.util.concurrent.BlockingQueue;
2425
import java.util.concurrent.CompletionStage;
2526
import java.util.concurrent.ConcurrentHashMap;
2627
import java.util.concurrent.LinkedBlockingQueue;
27-
import java.util.concurrent.atomic.AtomicInteger;
2828
import java.util.function.BiConsumer;
2929
import java.util.stream.Stream;
3030
import java.util.stream.StreamSupport;
@@ -64,7 +64,8 @@ void apply(Set<CompletionStage<?>> enlistedPromises, Iterator<? extends Completi
6464
private final BlockingQueue<Try<T>> settledResults;
6565
private final Set<CompletionStage<?>> enlistedPromises;
6666

67-
private final AtomicInteger inProgress = new AtomicInteger(0);
67+
// Must be accessed from the single thread - i.e. the "main" thread that controls iteration via hasNext / next
68+
private int inProgress = 0;
6869

6970
AsyncCompletions(Iterator<? extends CompletionStage<? extends T>> pendingValues, int chunkSize) {
7071
this(pendingValues, chunkSize, Cancel.NONE);
@@ -84,16 +85,15 @@ protected AsyncCompletions(Iterator<? extends CompletionStage<? extends T>> pend
8485

8586
@Override
8687
public boolean hasNext() {
87-
int unprocessed = inProgress.get();
88-
if (unprocessed < 0) {
88+
if (inProgress < 0) {
8989
// Forcibly closed
9090
return false;
9191
} else {
9292
if (!settledResults.isEmpty()) {
9393
// There are some resolved results available
9494
return true;
9595
} else {
96-
if (unprocessed > 0) {
96+
if (inProgress > 0) {
9797
// If we are still producing then there are more...
9898
return true;
9999
} else {
@@ -107,26 +107,25 @@ public boolean hasNext() {
107107
@Override
108108
public T next() {
109109
while (true) {
110-
int unprocessed = inProgress.get();
111-
if (unprocessed < 0) {
110+
if (inProgress < 0) {
112111
// Forcibly closed
113112
throw new NoSuchElementException("This sequence was closed");
114113
} else {
115114
if (!settledResults.isEmpty()) {
116115
// There are some resolved results available
117116
Try<T> settledResult = settledResults.poll();
118-
inProgress.decrementAndGet();
117+
inProgress--;
119118
return settledResult.done();
120119
} else {
121-
if (unprocessed > 0) {
120+
if (inProgress > 0) {
122121
// If we are still producing then await for any result...
123122
Try<T> settledResult;
124123
try {
125124
settledResult = settledResults.take();
126125
} catch (InterruptedException ex) {
127126
throw new NoSuchElementException(ex.getMessage());
128127
}
129-
inProgress.decrementAndGet();
128+
inProgress--;
130129
return settledResult.done();
131130
} else {
132131
if (enlistPending()) {
@@ -149,9 +148,9 @@ public void remove() {
149148

150149
@Override
151150
public void close() {
152-
inProgress.set(Integer.MIN_VALUE);
153-
settledResults.clear();
151+
inProgress = Integer.MIN_VALUE;
154152
cancelStrategy.apply(enlistedPromises, pendingPromises);
153+
settledResults.clear();
155154
}
156155

157156

@@ -196,7 +195,7 @@ private static <T> Stream<T> stream(Iterator<? extends CompletionStage<? extends
196195
}
197196

198197
private static <T> Stream<T> toStream(AsyncCompletions<T> iterator) {
199-
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false)
198+
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
200199
.onClose(iterator::close);
201200
}
202201

@@ -208,7 +207,7 @@ private boolean enlistPending() {
208207
// while stage may be completed already
209208
// we should increment step-by-step
210209
// instead of setting the value at once
211-
int isClosed = inProgress.getAndIncrement();
210+
int isClosed = inProgress++;
212211
if (isClosed < 0) {
213212
break;
214213
}

0 commit comments

Comments
 (0)