Skip to content

Commit 489cac2

Browse files
committed
JDK8 docs, impl and tests improved
1 parent 5bc9442 commit 489cac2

File tree

3 files changed

+140
-44
lines changed
  • integration/kotlinx-coroutines-jdk8

3 files changed

+140
-44
lines changed

integration/kotlinx-coroutines-jdk8/pom.xml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
-->
1717

1818
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
19-
2019
<modelVersion>4.0.0</modelVersion>
2120

2221
<parent>
@@ -35,12 +34,19 @@
3534
</build>
3635

3736
<dependencies>
37+
<!-- dependency on coroutines core -->
3838
<dependency>
3939
<groupId>org.jetbrains.kotlinx</groupId>
4040
<artifactId>kotlinx-coroutines-core</artifactId>
4141
<version>${project.version}</version>
42-
<scope>compile</scope>
42+
</dependency>
43+
<!-- coroutines test framework dependency -->
44+
<dependency>
45+
<groupId>org.jetbrains.kotlinx</groupId>
46+
<artifactId>kotlinx-coroutines-core</artifactId>
47+
<version>${project.version}</version>
48+
<classifier>tests</classifier>
49+
<scope>test</scope>
4350
</dependency>
4451
</dependencies>
45-
4652
</project>

integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package kotlinx.coroutines.experimental.future
1919
import kotlinx.coroutines.experimental.*
2020
import java.util.concurrent.CompletableFuture
2121
import java.util.concurrent.CompletionStage
22+
import java.util.concurrent.ExecutionException
2223
import java.util.function.BiConsumer
2324
import kotlin.coroutines.experimental.Continuation
2425
import kotlin.coroutines.experimental.CoroutineContext
@@ -29,16 +30,16 @@ import kotlin.coroutines.experimental.suspendCoroutine
2930
* This coroutine builder uses [CommonPool] context by default and is conceptually similar to [CompletableFuture.supplyAsync].
3031
*
3132
* The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
32-
* If the [context] for the new coroutine is explicitly specified and does not include a coroutine interceptor,
33-
* then [CoroutineDispatcher] element.
34-
* See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
35-
* The specified context is added to the context of the parent running coroutine (if any) inside which this function
36-
* is invoked. The [Job] of the resulting coroutine is a child of the job of the parent coroutine (if any).
33+
* If the [context] for the new coroutine is omitted or is explicitly specified but does not include a
34+
* coroutine interceptor, then [CommonPool] is used.
35+
* See [CoroutineDispatcher] for other standard [context] implementations that are provided by `kotlinx.coroutines`.
36+
* The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
37+
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
3738
*
3839
* By default, the coroutine is immediately scheduled for execution.
3940
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
4041
* A value of [CoroutineStart.LAZY] is not supported
41-
* (since `ListenableFuture` framework does not provide the corresponding capability) and
42+
* (since `CompletableFuture` framework does not provide the corresponding capability) and
4243
* produces [IllegalArgumentException].
4344
*
4445
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
@@ -58,7 +59,7 @@ public fun <T> future(
5859
val future = CompletableFutureCoroutine<T>(newContext + job)
5960
job.cancelFutureOnCompletion(future)
6061
future.whenComplete { _, exception -> job.cancel(exception) }
61-
start(block, receiver=future, completion=future)
62+
start(block, receiver=future, completion=future) // use the specified start strategy
6263
return future
6364
}
6465

@@ -91,7 +92,7 @@ public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
9192
* Awaits for completion of the completion stage without blocking a thread.
9293
*
9394
* This suspending function is not cancellable, because there is no way to cancel a `CompletionStage`.
94-
* Use `CompletableFuture.await()` for cancellation support.
95+
* Use `CompletableFuture.await()` for cancellable wait.
9596
*/
9697
public suspend fun <T> CompletionStage<T>.await(): T = suspendCoroutine { cont: Continuation<T> ->
9798
whenComplete(ContinuationConsumer(cont))
@@ -103,35 +104,36 @@ public suspend fun <T> CompletionStage<T>.await(): T = suspendCoroutine { cont:
103104
* This suspending function is cancellable.
104105
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
105106
* stops waiting for the future and immediately resumes with [CancellationException].
107+
*
108+
* Note, that `CompletableFuture` does not support prompt removal of installed listeners, so on cancellation of this wait
109+
* a few small objects will remain in the `CompletableFuture` stack of completion actions until the future completes.
110+
* However, the care is taken to clear the reference to the waiting coroutine itself, so that its memory can be
111+
* released even if the future never completes.
106112
*/
107113
public suspend fun <T> CompletableFuture<T>.await(): T {
108-
if (isDone) { // fast path when CompletableFuture is already done (does not suspend)
109-
// then only way to get unwrapped exception from the CompletableFuture is via whenComplete anyway
110-
var result: T? = null
111-
var exception: Throwable? = null
112-
whenComplete { r, e ->
113-
result = r
114-
exception = e
114+
// fast path when CompletableFuture is already done (does not suspend)
115+
if (isDone) {
116+
try {
117+
return get()
118+
} catch (e: ExecutionException) {
119+
throw e.cause ?: e // unwrap original cause from ExecutionException
115120
}
116-
if (exception != null) throw exception!!
117-
return result as T
118121
}
119122
// slow path -- suspend
120123
return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
121124
val consumer = ContinuationConsumer(cont)
122-
val completionFuture = whenComplete(consumer)
125+
whenComplete(consumer)
123126
cont.invokeOnCompletion {
124-
completionFuture.cancel(false) // cancel future
125-
consumer.cont = null // shall clear reference to continuation, because CompletableFuture continues to keep it
127+
consumer.cont = null // shall clear reference to continuation
126128
}
127129
}
128130
}
129131

130132
private class ContinuationConsumer<T>(
131-
@JvmField var cont: Continuation<T>?
133+
@Volatile @JvmField var cont: Continuation<T>?
132134
) : BiConsumer<T?, Throwable?> {
133135
override fun accept(result: T?, exception: Throwable?) {
134-
val cont = this.cont ?: return // atomically read current value unless null, benign data race when it is set to null
136+
val cont = this.cont ?: return // atomically read current value unless null
135137
if (exception == null) // the future has been completed normally
136138
cont.resume(result as T)
137139
else // the future has completed with an exception
@@ -152,6 +154,7 @@ private class ContinuationConsumer<T>(
152154
replaceWith = ReplaceWith("asCompletableFuture()"))
153155
public fun <T> Deferred<T>.toCompletableFuture(): CompletableFuture<T> = asCompletableFuture()
154156

157+
@Suppress("DeprecatedCallableAddReplaceWith") // todo: the warning is incorrectly shown, see KT-17917
155158
@Deprecated("Use the other version. This one is for binary compatibility only.", level=DeprecationLevel.HIDDEN)
156159
public fun <T> future(
157160
context: CoroutineContext = CommonPool,

integration/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/future/FutureTest.kt

Lines changed: 106 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,46 @@
1616

1717
package kotlinx.coroutines.experimental.future
1818

19-
import kotlinx.coroutines.experimental.CoroutineDispatcher
19+
import kotlinx.coroutines.experimental.*
20+
import org.hamcrest.core.IsEqual
21+
import org.junit.Assert.*
2022
import org.junit.Test
2123
import java.util.concurrent.CompletableFuture
24+
import java.util.concurrent.CompletionStage
2225
import java.util.concurrent.ExecutionException
2326
import java.util.concurrent.atomic.AtomicInteger
2427
import kotlin.coroutines.experimental.CoroutineContext
25-
import org.junit.Assert.*
26-
import java.util.concurrent.CompletionStage
2728

28-
class FutureTest {
29+
class FutureTest : TestBase() {
2930
@Test
30-
fun testSimple() {
31+
fun testSimpleAwait() {
3132
val future = future {
3233
CompletableFuture.supplyAsync {
3334
"O"
3435
}.await() + "K"
3536
}
36-
assertEquals("OK", future.get())
37+
assertThat(future.get(), IsEqual("OK"))
38+
}
39+
40+
@Test
41+
fun testCompletedFuture() {
42+
val toAwait = CompletableFuture<String>()
43+
toAwait.complete("O")
44+
val future = future {
45+
toAwait.await() + "K"
46+
}
47+
assertThat(future.get(), IsEqual("OK"))
48+
}
49+
50+
@Test
51+
fun testCompletedCompletionStage() {
52+
val completable = CompletableFuture<String>()
53+
completable.complete("O")
54+
val toAwait: CompletionStage<String> = completable
55+
val future = future {
56+
toAwait.await() + "K"
57+
}
58+
assertThat(future.get(), IsEqual("OK"))
3759
}
3860

3961
@Test
@@ -44,7 +66,7 @@ class FutureTest {
4466
}
4567
assertFalse(future.isDone)
4668
toAwait.complete("O")
47-
assertEquals("OK", future.get())
69+
assertThat(future.get(), IsEqual("OK"))
4870
}
4971

5072
@Test
@@ -56,11 +78,11 @@ class FutureTest {
5678
}
5779
assertFalse(future.isDone)
5880
completable.complete("O")
59-
assertEquals("OK", future.get())
81+
assertThat(future.get(), IsEqual("OK"))
6082
}
6183

6284
@Test
63-
fun testDoneFutureExceptionally() {
85+
fun testCompletedFutureExceptionally() {
6486
val toAwait = CompletableFuture<String>()
6587
toAwait.completeExceptionally(RuntimeException("O"))
6688
val future = future<String> {
@@ -70,11 +92,11 @@ class FutureTest {
7092
e.message!!
7193
} + "K"
7294
}
73-
assertEquals("OK", future.get())
95+
assertThat(future.get(), IsEqual("OK"))
7496
}
7597

7698
@Test
77-
fun testDoneCompletionStageExceptionally() {
99+
fun testCompletedCompletionStageExceptionally() {
78100
val completable = CompletableFuture<String>()
79101
val toAwait: CompletionStage<String> = completable
80102
completable.completeExceptionally(RuntimeException("O"))
@@ -85,11 +107,11 @@ class FutureTest {
85107
e.message!!
86108
} + "K"
87109
}
88-
assertEquals("OK", future.get())
110+
assertThat(future.get(), IsEqual("OK"))
89111
}
90112

91113
@Test
92-
fun testAwaitedFutureCompletedExceptionally() {
114+
fun testWaitForFutureWithException() {
93115
val toAwait = CompletableFuture<String>()
94116
val future = future<String> {
95117
try {
@@ -100,11 +122,11 @@ class FutureTest {
100122
}
101123
assertFalse(future.isDone)
102124
toAwait.completeExceptionally(RuntimeException("O"))
103-
assertEquals("OK", future.get())
125+
assertThat(future.get(), IsEqual("OK"))
104126
}
105127

106128
@Test
107-
fun testAwaitedCompletionStageCompletedExceptionally() {
129+
fun testWaitForCompletionStageWithException() {
108130
val completable = CompletableFuture<String>()
109131
val toAwait: CompletionStage<String> = completable
110132
val future = future<String> {
@@ -116,7 +138,7 @@ class FutureTest {
116138
}
117139
assertFalse(future.isDone)
118140
completable.completeExceptionally(RuntimeException("O"))
119-
assertEquals("OK", future.get())
141+
assertThat(future.get(), IsEqual("OK"))
120142
}
121143

122144
@Test
@@ -125,15 +147,80 @@ class FutureTest {
125147
if (CompletableFuture.supplyAsync { true }.await()) {
126148
throw IllegalStateException("OK")
127149
}
128-
CompletableFuture.supplyAsync { "fail" }.await()
150+
"fail"
129151
}
130152
try {
131153
future.get()
132154
fail("'get' should've throw an exception")
133155
} catch (e: ExecutionException) {
134156
assertTrue(e.cause is IllegalStateException)
135-
assertEquals("OK", e.cause!!.message)
157+
assertThat(e.cause!!.message, IsEqual("OK"))
158+
}
159+
}
160+
161+
@Test
162+
fun testCompletedDeferredAsCompletableFuture() = runBlocking {
163+
expect(1)
164+
val deferred = async(context, CoroutineStart.UNDISPATCHED) {
165+
expect(2) // completed right away
166+
"OK"
167+
}
168+
expect(3)
169+
val future = deferred.asCompletableFuture()
170+
assertThat(future.await(), IsEqual("OK"))
171+
finish(4)
172+
}
173+
174+
@Test
175+
fun testWaitForDeferredAsCompletableFuture() = runBlocking {
176+
expect(1)
177+
val deferred = async(context) {
178+
expect(3) // will complete later
179+
"OK"
180+
}
181+
expect(2)
182+
val future = deferred.asCompletableFuture()
183+
assertThat(future.await(), IsEqual("OK")) // await yields main thread to deferred coroutine
184+
finish(4)
185+
}
186+
187+
@Test
188+
fun testCancellableAwaitFuture() = runBlocking {
189+
expect(1)
190+
val toAwait = CompletableFuture<String>()
191+
val job = launch(context, CoroutineStart.UNDISPATCHED) {
192+
expect(2)
193+
try {
194+
toAwait.await() // suspends
195+
} catch (e: CancellationException) {
196+
expect(5) // should throw cancellation exception
197+
throw e
198+
}
199+
}
200+
expect(3)
201+
job.cancel() // cancel the job
202+
toAwait.complete("fail") // too late, the waiting job was already cancelled
203+
expect(4) // job processing of cancellation was scheduled, not executed yet
204+
yield() // yield main thread to job
205+
finish(6)
206+
}
207+
208+
@Test
209+
fun testNonCancellableAwaitCompletionStage() = runBlocking {
210+
expect(1)
211+
val completable = CompletableFuture<String>()
212+
val toAwait: CompletionStage<String> = completable
213+
val job = launch(context, CoroutineStart.UNDISPATCHED) {
214+
expect(2)
215+
assertThat(toAwait.await(), IsEqual("OK")) // suspends
216+
expect(5)
136217
}
218+
expect(3)
219+
job.cancel() // cancel the job
220+
completable.complete("OK") // ok, because await on completion stage is not cancellable
221+
expect(4) // job processing of was scheduled, not executed yet
222+
yield() // yield main thread to job
223+
finish(6)
137224
}
138225

139226
@Test
@@ -159,7 +246,7 @@ class FutureTest {
159246
}.await()
160247
result
161248
}
162-
assertEquals("OK", future.get())
249+
assertThat(future.get(), IsEqual("OK"))
163250
}
164251

165252
private fun wrapContinuation(wrapper: (() -> Unit) -> Unit): CoroutineDispatcher = object : CoroutineDispatcher() {

0 commit comments

Comments
 (0)