Skip to content

Commit 0bae56b

Browse files
author
Alan Bateman
committed
8367857: Implement JEP 525: Structured Concurrency (Sixth Preview)
Reviewed-by: vklang
1 parent 72989e0 commit 0bae56b

File tree

6 files changed

+784
-313
lines changed

6 files changed

+784
-313
lines changed

src/java.base/share/classes/java/util/concurrent/Joiners.java

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.util.concurrent.StructuredTaskScope.Joiner;
3535
import java.util.concurrent.StructuredTaskScope.Subtask;
3636
import java.util.function.Predicate;
37-
import java.util.stream.Stream;
3837
import jdk.internal.invoke.MhUtil;
3938

4039
/**
@@ -64,42 +63,48 @@ private static Subtask.State ensureCompleted(Subtask<?> subtask) {
6463
}
6564

6665
/**
67-
* A joiner that returns a stream of all subtasks when all subtasks complete
66+
* A joiner that returns a list of all results when all subtasks complete
6867
* successfully. Cancels the scope if any subtask fails.
6968
*/
70-
static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {
69+
static final class AllSuccessful<T> implements Joiner<T, List<T>> {
7170
private static final VarHandle FIRST_EXCEPTION =
7271
MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
7372

74-
// list of forked subtasks, only accessed by owner thread
75-
private final List<Subtask<T>> subtasks = new ArrayList<>();
73+
// list of forked subtasks, created lazily, only accessed by owner thread
74+
private List<Subtask<T>> subtasks;
7675

7776
private volatile Throwable firstException;
7877

7978
@Override
80-
public boolean onFork(Subtask<? extends T> subtask) {
79+
public boolean onFork(Subtask<T> subtask) {
8180
ensureUnavailable(subtask);
82-
@SuppressWarnings("unchecked")
83-
var s = (Subtask<T>) subtask;
84-
subtasks.add(s);
81+
if (subtasks == null) {
82+
subtasks = new ArrayList<>();
83+
}
84+
subtasks.add(subtask);
8585
return false;
8686
}
8787

8888
@Override
89-
public boolean onComplete(Subtask<? extends T> subtask) {
89+
public boolean onComplete(Subtask<T> subtask) {
9090
Subtask.State state = ensureCompleted(subtask);
9191
return (state == Subtask.State.FAILED)
9292
&& (firstException == null)
9393
&& FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
9494
}
9595

9696
@Override
97-
public Stream<Subtask<T>> result() throws Throwable {
97+
public List<T> result() throws Throwable {
9898
Throwable ex = firstException;
99-
if (ex != null) {
100-
throw ex;
101-
} else {
102-
return subtasks.stream();
99+
try {
100+
if (ex != null) {
101+
throw ex;
102+
}
103+
return (subtasks != null)
104+
? subtasks.stream().map(Subtask::get).toList()
105+
: List.of();
106+
} finally {
107+
subtasks = null; // allow subtasks to be GC'ed
103108
}
104109
}
105110
}
@@ -130,7 +135,7 @@ private static int stateToInt(Subtask.State s) {
130135
}
131136

132137
@Override
133-
public boolean onComplete(Subtask<? extends T> subtask) {
138+
public boolean onComplete(Subtask<T> subtask) {
134139
Subtask.State state = ensureCompleted(subtask);
135140
Subtask<T> s;
136141
while (((s = this.subtask) == null)
@@ -166,7 +171,7 @@ static final class AwaitSuccessful<T> implements Joiner<T, Void> {
166171
private volatile Throwable firstException;
167172

168173
@Override
169-
public boolean onComplete(Subtask<? extends T> subtask) {
174+
public boolean onComplete(Subtask<T> subtask) {
170175
Subtask.State state = ensureCompleted(subtask);
171176
return (state == Subtask.State.FAILED)
172177
&& (firstException == null)
@@ -185,36 +190,48 @@ public Void result() throws Throwable {
185190
}
186191

187192
/**
188-
* A joiner that returns a stream of all subtasks.
193+
* A joiner that returns a list of all subtasks.
189194
*/
190-
static final class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
191-
private final Predicate<Subtask<? extends T>> isDone;
195+
static final class AllSubtasks<T> implements Joiner<T, List<Subtask<T>>> {
196+
private final Predicate<Subtask<T>> isDone;
192197

193-
// list of forked subtasks, only accessed by owner thread
194-
private final List<Subtask<T>> subtasks = new ArrayList<>();
198+
// list of forked subtasks, created lazily, only accessed by owner thread
199+
private List<Subtask<T>> subtasks;
195200

196-
AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
201+
AllSubtasks(Predicate<Subtask<T>> isDone) {
197202
this.isDone = Objects.requireNonNull(isDone);
198203
}
199204

200205
@Override
201-
public boolean onFork(Subtask<? extends T> subtask) {
206+
public boolean onFork(Subtask<T> subtask) {
202207
ensureUnavailable(subtask);
203-
@SuppressWarnings("unchecked")
204-
var s = (Subtask<T>) subtask;
205-
subtasks.add(s);
208+
if (subtasks == null) {
209+
subtasks = new ArrayList<>();
210+
}
211+
subtasks.add(subtask);
206212
return false;
207213
}
208214

209215
@Override
210-
public boolean onComplete(Subtask<? extends T> subtask) {
216+
public boolean onComplete(Subtask<T> subtask) {
211217
ensureCompleted(subtask);
212218
return isDone.test(subtask);
213219
}
214220

215221
@Override
216-
public Stream<Subtask<T>> result() {
217-
return subtasks.stream();
222+
public void onTimeout() {
223+
// do nothing, this joiner does not throw TimeoutException
224+
}
225+
226+
@Override
227+
public List<Subtask<T>> result() {
228+
if (subtasks != null) {
229+
List<Subtask<T>> result = List.copyOf(subtasks);
230+
subtasks = null; // allow subtasks to be GC'ed
231+
return result;
232+
} else {
233+
return List.of();
234+
}
218235
}
219236
}
220237
}

0 commit comments

Comments
 (0)