Skip to content

Commit 7dde6e5

Browse files
committed
Minor tweaks
1 parent 4972eae commit 7dde6e5

File tree

8 files changed

+225
-19
lines changed

8 files changed

+225
-19
lines changed

src/main/java/groovy/concurrent/AsyncStream.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,13 @@ static <T> AsyncStream<T> empty() {
8383
return (AsyncStream<T>) EMPTY;
8484
}
8585

86-
/** Singleton empty stream instance. */
86+
/**
87+
* Singleton empty stream instance.
88+
* <p>
89+
* This is an implementation detail backing {@link #empty()}.
90+
* User code should call {@code AsyncStream.empty()} rather than
91+
* referencing this field directly.
92+
*/
8793
AsyncStream<Object> EMPTY = new AsyncStream<>() {
8894
@Override public Awaitable<Boolean> moveNext() { return Awaitable.of(false); }
8995
@Override public Object getCurrent() { return null; }

src/main/java/groovy/concurrent/AwaitResult.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ private AwaitResult(T value, Throwable error, boolean success) {
4747

4848
/**
4949
* Creates a successful result with the given value.
50+
*
51+
* @param value the computation result (may be {@code null})
52+
* @param <T> the value type
53+
* @return a success result wrapping the value
5054
*/
5155
@SuppressWarnings("unchecked")
5256
public static <T> AwaitResult<T> success(Object value) {
@@ -55,6 +59,11 @@ public static <T> AwaitResult<T> success(Object value) {
5559

5660
/**
5761
* Creates a failure result with the given exception.
62+
*
63+
* @param error the exception that caused the failure; must not be {@code null}
64+
* @param <T> the value type (never actually used, since the result is a failure)
65+
* @return a failure result wrapping the exception
66+
* @throws NullPointerException if {@code error} is {@code null}
5867
*/
5968
public static <T> AwaitResult<T> failure(Throwable error) {
6069
return new AwaitResult<>(null, Objects.requireNonNull(error), false);

src/main/java/groovy/concurrent/Awaitable.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,30 @@ public interface Awaitable<T> {
9494

9595
/**
9696
* Blocks until the computation completes and returns the result.
97+
*
98+
* @return the computed result
99+
* @throws InterruptedException if the calling thread is interrupted while waiting
100+
* @throws ExecutionException if the computation completed exceptionally
97101
*/
98102
T get() throws InterruptedException, ExecutionException;
99103

100104
/**
101105
* Blocks until the computation completes or the timeout expires.
106+
*
107+
* @param timeout the maximum time to wait
108+
* @param unit the time unit of the timeout argument
109+
* @return the computed result
110+
* @throws InterruptedException if the calling thread is interrupted while waiting
111+
* @throws ExecutionException if the computation completed exceptionally
112+
* @throws TimeoutException if the wait timed out
102113
*/
103114
T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
104115

105116
/**
106-
* Returns {@code true} if the computation has completed (normally or exceptionally).
117+
* Returns {@code true} if the computation has completed (normally,
118+
* exceptionally, or via cancellation).
119+
*
120+
* @return {@code true} if complete
107121
*/
108122
boolean isDone();
109123

@@ -118,24 +132,37 @@ public interface Awaitable<T> {
118132

119133
/**
120134
* Returns {@code true} if the computation was cancelled before completing normally.
135+
*
136+
* @return {@code true} if cancelled
121137
*/
122138
boolean isCancelled();
123139

124140
/**
125141
* Returns {@code true} if this computation completed exceptionally
126142
* (including cancellation).
143+
*
144+
* @return {@code true} if completed with an error or cancellation
127145
*/
128146
boolean isCompletedExceptionally();
129147

130148
/**
131149
* Returns a new {@code Awaitable} whose result is obtained by applying the
132150
* given function to this awaitable's result when it completes.
151+
*
152+
* @param fn the mapping function
153+
* @param <U> the type of the mapped result
154+
* @return a new awaitable holding the mapped result
133155
*/
134156
<U> Awaitable<U> then(Function<? super T, ? extends U> fn);
135157

136158
/**
137-
* Returns a new {@code Awaitable} produced by applying the given function
138-
* to this awaitable's result, flattening the nested {@code Awaitable}.
159+
* Returns a new {@code Awaitable} produced by applying the given async
160+
* function to this awaitable's result, flattening the nested {@code Awaitable}.
161+
* This is the monadic {@code flatMap} operation for awaitables.
162+
*
163+
* @param fn the async mapping function that returns an {@code Awaitable}
164+
* @param <U> the type of the inner awaitable's result
165+
* @return a new awaitable holding the inner result
139166
*/
140167
<U> Awaitable<U> thenCompose(Function<? super T, ? extends Awaitable<U>> fn);
141168

@@ -158,6 +185,11 @@ default Awaitable<Void> thenAccept(Consumer<? super T> action) {
158185
/**
159186
* Returns a new {@code Awaitable} that, if this one completes exceptionally,
160187
* applies the given function to the exception to produce a recovery value.
188+
* The throwable passed to the function is deeply unwrapped to strip JDK
189+
* wrapper layers.
190+
*
191+
* @param fn the recovery function
192+
* @return a new awaitable that recovers from failures
161193
*/
162194
Awaitable<T> exceptionally(Function<Throwable, ? extends T> fn);
163195

@@ -260,13 +292,21 @@ default Awaitable<T> completeOnTimeout(T fallback, long duration, TimeUnit unit)
260292
/**
261293
* Converts this {@code Awaitable} to a JDK {@link CompletableFuture}
262294
* for interoperability with APIs that require it.
295+
*
296+
* @return a {@code CompletableFuture} representing this computation
263297
*/
264298
CompletableFuture<T> toCompletableFuture();
265299

266300
// ---- Static factories ----
267301

268302
/**
269303
* Returns an already-completed {@code Awaitable} with the given value.
304+
* Analogous to C#'s {@code Task.FromResult()} or JavaScript's
305+
* {@code Promise.resolve()}.
306+
*
307+
* @param value the result value (may be {@code null})
308+
* @param <T> the result type
309+
* @return a completed awaitable
270310
*/
271311
static <T> Awaitable<T> of(T value) {
272312
return new GroovyPromise<>(CompletableFuture.completedFuture(value));

src/main/java/groovy/concurrent/AwaitableAdapter.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@ public interface AwaitableAdapter {
6060

6161
/**
6262
* Returns {@code true} if this adapter can convert instances of the given
63-
* type to {@link AsyncStream}.
63+
* type to {@link AsyncStream}. Defaults to {@code false}; override if
64+
* this adapter supports multi-value stream conversion (e.g., for Reactor
65+
* {@code Flux} or RxJava {@code Observable}).
66+
*
67+
* @param type the source class to check
68+
* @return {@code true} if this adapter can produce an {@code AsyncStream}
6469
*/
6570
default boolean supportsAsyncStream(Class<?> type) {
6671
return false;
@@ -69,6 +74,11 @@ default boolean supportsAsyncStream(Class<?> type) {
6974
/**
7075
* Converts the given source object to an {@link AsyncStream}.
7176
* Called only when {@link #supportsAsyncStream} returned {@code true}.
77+
*
78+
* @param source the source object (e.g., a Reactor {@code Flux})
79+
* @param <T> the element type
80+
* @return an async stream backed by the source
81+
* @throws UnsupportedOperationException if not overridden (default)
7282
*/
7383
default <T> AsyncStream<T> toAsyncStream(Object source) {
7484
throw new UnsupportedOperationException("AsyncStream conversion not supported by " + getClass().getName());

src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,13 @@ public void yield(Object value) {
144144

145145
/**
146146
* Signals that the generator has completed (no more elements).
147-
* If interrupted, the completion signal is delivered on a best-effort basis.
147+
* <p>
148+
* If the blocking {@code queue.put()} is interrupted, a best-effort
149+
* non-blocking {@code queue.offer()} is attempted. If that also fails
150+
* (no consumer is currently blocked in {@code take()}), the stream is
151+
* force-closed to prevent the consumer from blocking indefinitely on a
152+
* subsequent {@link #moveNext()} call. This defensive close ensures no
153+
* thread leak occurs even under unexpected interrupt timing.
148154
*/
149155
public void complete() {
150156
if (closed.get()) {
@@ -155,15 +161,25 @@ public void complete() {
155161
} catch (InterruptedException e) {
156162
if (!closed.get()) {
157163
Thread.currentThread().interrupt();
158-
// Best-effort delivery: use non-blocking offer as fallback
159-
queue.offer(DONE);
164+
// Best-effort: non-blocking handoff to a waiting consumer.
165+
// If no consumer is waiting, offer() returns false and the DONE
166+
// signal is lost — force-close to unblock future moveNext() calls.
167+
if (!queue.offer(DONE)) {
168+
close();
169+
}
160170
}
161171
}
162172
}
163173

164174
/**
165175
* Signals that the generator failed with an exception.
166-
* If interrupted, the error signal is delivered on a best-effort basis.
176+
* <p>
177+
* If the blocking {@code queue.put()} is interrupted, a best-effort
178+
* non-blocking {@code queue.offer()} is attempted. If that also fails,
179+
* the stream is force-closed (same rationale as {@link #complete()}).
180+
* The original error is not propagated to the consumer in this edge case;
181+
* instead the consumer sees a clean stream closure — this is acceptable
182+
* because the interrupt itself indicates an external cancellation.
167183
*/
168184
public void error(Throwable t) {
169185
if (closed.get()) {
@@ -175,8 +191,9 @@ public void error(Throwable t) {
175191
} catch (InterruptedException e) {
176192
if (!closed.get()) {
177193
Thread.currentThread().interrupt();
178-
// Best-effort delivery: use non-blocking offer as fallback
179-
queue.offer(item);
194+
if (!queue.offer(item)) {
195+
close();
196+
}
180197
}
181198
}
182199
}

src/spec/doc/core-async-await.adoc

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -973,17 +973,26 @@ Several concurrency hazards are handled transparently:
973973
allowing it to clean up promptly even if blocked in I/O or a long computation.
974974
975975
| **TOCTOU race prevention**
976-
| `moveNext()` uses a CAS-based double-check after taking from the queue: if the generator was
977-
closed between the call to `moveNext()` and the queue handoff, the taken value is discarded
978-
and `false` is returned, preventing stale values from leaking to the consumer.
976+
| `moveNext()` uses a double-check pattern: the `closed` flag is re-checked _after_
977+
registering the consumer thread. This closes a race window where `close()` could execute
978+
between the initial check and the `consumerThread.set()` call, which would leave the
979+
consumer stranded in `queue.take()` with no one to interrupt it.
979980
980-
| **Single-consumer enforcement**
981-
| A single consumer is enforced by CAS on an internal flag. Attempting to call `moveNext()`
982-
from multiple threads concurrently throws `IllegalStateException`, catching misuse early.
981+
| **Thread-safe consumer tracking**
982+
| The consumer thread is tracked via `AtomicReference<Thread>` during `moveNext()`. This
983+
enables `close()` to interrupt a blocked consumer. Note: concurrent `moveNext()` calls
984+
from multiple threads are not supported and may produce unpredictable results — async
985+
generators are inherently single-consumer (just like C#'s `IAsyncEnumerator`).
983986
984987
| **Idempotent close**
985988
| `close()` is guarded by `AtomicBoolean.compareAndSet()`, making it safe to call multiple
986989
times from any thread without side effects.
990+
991+
| **Signal delivery under interrupt**
992+
| If the producer's `complete()` or `error()` signal is interrupted and the non-blocking
993+
fallback delivery fails (no consumer waiting), the generator force-closes itself. This
994+
prevents the consumer from blocking indefinitely on a subsequent `moveNext()` — a defensive
995+
measure against unexpected thread interruption outside the normal close path.
987996
|===
988997
989998
These mechanisms ensure that `yield return` / `for await` code remains as simple as writing

src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1906,7 +1906,7 @@ class AsyncApiTest {
19061906

19071907
@Test
19081908
void testGroovyPromiseIsCompletedExceptionally() {
1909-
def cf = new java.util.concurrent.CompletableFuture<>()
1909+
def cf = new CompletableFuture<>()
19101910
def promise = GroovyPromise.of(cf)
19111911
assert !promise.isCompletedExceptionally()
19121912
cf.completeExceptionally(new RuntimeException())
@@ -1919,4 +1919,119 @@ class AsyncApiTest {
19191919
new GroovyPromise(null)
19201920
}
19211921
}
1922+
1923+
// ================================================================
1924+
// AsyncStreamGenerator: complete()/error() robustness under interrupt
1925+
// ================================================================
1926+
1927+
/**
1928+
* Verifies that when a producer's complete() signal is interrupted and
1929+
* the best-effort offer() fails (no consumer waiting), the generator
1930+
* force-closes so a subsequent moveNext() returns false instead of
1931+
* blocking indefinitely.
1932+
*/
1933+
@Test
1934+
void testGeneratorCompleteForceClosesOnOfferFailure() {
1935+
def gen = new AsyncStreamGenerator<Integer>()
1936+
def producerThread = Thread.currentThread()
1937+
gen.attachProducer(producerThread)
1938+
1939+
// Interrupt the current thread so that queue.put(DONE) inside
1940+
// complete() throws InterruptedException. Since no consumer is
1941+
// blocked in take(), the non-blocking offer(DONE) will also fail,
1942+
// triggering the force-close path.
1943+
producerThread.interrupt()
1944+
gen.complete()
1945+
1946+
// Clear the interrupt flag set by the force-close path
1947+
Thread.interrupted()
1948+
1949+
gen.detachProducer(producerThread)
1950+
1951+
// Consumer should see a cleanly closed stream — not block forever
1952+
def result = gen.moveNext()
1953+
assert !AsyncSupport.await(result) : "moveNext() should return false after force-close"
1954+
}
1955+
1956+
/**
1957+
* Verifies that when a producer's error() signal is interrupted and
1958+
* the best-effort offer() fails, the generator force-closes so the
1959+
* consumer does not hang.
1960+
*/
1961+
@Test
1962+
void testGeneratorErrorForceClosesOnOfferFailure() {
1963+
def gen = new AsyncStreamGenerator<Integer>()
1964+
def producerThread = Thread.currentThread()
1965+
gen.attachProducer(producerThread)
1966+
1967+
producerThread.interrupt()
1968+
gen.error(new RuntimeException("test error"))
1969+
1970+
Thread.interrupted()
1971+
gen.detachProducer(producerThread)
1972+
1973+
def result = gen.moveNext()
1974+
assert !AsyncSupport.await(result) : "moveNext() should return false after force-close"
1975+
}
1976+
1977+
/**
1978+
* Verifies that complete() is a no-op when the stream is already closed.
1979+
*/
1980+
@Test
1981+
void testGeneratorCompleteAfterClose() {
1982+
def gen = new AsyncStreamGenerator<Integer>()
1983+
gen.close()
1984+
// Should not throw or block
1985+
gen.complete()
1986+
def result = gen.moveNext()
1987+
assert !AsyncSupport.await(result)
1988+
}
1989+
1990+
/**
1991+
* Verifies that error() is a no-op when the stream is already closed.
1992+
*/
1993+
@Test
1994+
void testGeneratorErrorAfterClose() {
1995+
def gen = new AsyncStreamGenerator<Integer>()
1996+
gen.close()
1997+
// Should not throw or block
1998+
gen.error(new RuntimeException("ignored"))
1999+
def result = gen.moveNext()
2000+
assert !AsyncSupport.await(result)
2001+
}
2002+
2003+
/**
2004+
* Verifies that close() is idempotent — multiple calls do not throw or
2005+
* cause double-interrupt of threads.
2006+
*/
2007+
@Test
2008+
void testGeneratorCloseIdempotent() {
2009+
def gen = new AsyncStreamGenerator<Integer>()
2010+
gen.close()
2011+
gen.close()
2012+
gen.close()
2013+
// All should be no-ops; moveNext should still return false
2014+
assert !AsyncSupport.await(gen.moveNext())
2015+
}
2016+
2017+
/**
2018+
* Verifies that attachProducer on an already-closed stream immediately
2019+
* interrupts the producer thread, allowing the generator body to exit.
2020+
*/
2021+
@Test
2022+
void testAttachProducerOnClosedStreamInterrupts() {
2023+
def gen = new AsyncStreamGenerator<Integer>()
2024+
gen.close()
2025+
2026+
def interrupted = new AtomicBoolean(false)
2027+
def latch = new CountDownLatch(1)
2028+
def t = new Thread({
2029+
gen.attachProducer(Thread.currentThread())
2030+
interrupted.set(Thread.currentThread().isInterrupted())
2031+
latch.countDown()
2032+
})
2033+
t.start()
2034+
latch.await(5, TimeUnit.SECONDS)
2035+
assert interrupted.get() : "Producer should be interrupted when attached to a closed stream"
2036+
}
19222037
}

src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,7 @@ class AsyncPatternsTest {
696696
}
697697

698698
@Test
699-
void testCancellationPattern() {
699+
void testCancellationPatternWithClosureExpression() {
700700
assertScript '''
701701
import java.util.concurrent.CancellationException
702702
import groovy.concurrent.Awaitable

0 commit comments

Comments
 (0)