Skip to content

Commit 6304ef7

Browse files
committed
Use new convention for suspend function
See https://github.com/Kotlin/kotlin-coroutines/issues/71
1 parent 0b6d19a commit 6304ef7

File tree

11 files changed

+167
-83
lines changed

11 files changed

+167
-83
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3+
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<parent>
7+
<groupId>org.jetbrains.kotlinx</groupId>
8+
<artifactId>kotlinx-coroutines</artifactId>
9+
<version>0.1-SNAPSHOT</version>
10+
</parent>
11+
12+
<artifactId>kotlinx-coroutines-async-common</artifactId>
13+
<packaging>jar</packaging>
14+
15+
<name>Common parts for async libraries</name>
16+
17+
<properties>
18+
<kotlin.version>1.1-SNAPSHOT</kotlin.version>
19+
</properties>
20+
21+
<build>
22+
<sourceDirectory>src/main/kotlin</sourceDirectory>
23+
<testSourceDirectory>src/test/kotlin</testSourceDirectory>
24+
</build>
25+
26+
<dependencies>
27+
<dependency>
28+
<groupId>org.apache.commons</groupId>
29+
<artifactId>commons-io</artifactId>
30+
<version>1.3.2</version>
31+
<scope>test</scope>
32+
</dependency>
33+
</dependencies>
34+
35+
</project>
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package kotlinx.coroutines
2+
3+
import java.util.concurrent.atomic.AtomicReference
4+
5+
suspend fun <T> runWithCurrentContinuation(
6+
block: (Continuation<T>) -> Unit
7+
): T = suspendWithCurrentContinuation { continuation ->
8+
val safe = SafeContinuation<T>(continuation)
9+
block(safe)
10+
return@suspendWithCurrentContinuation safe.returnResult()
11+
}
12+
13+
private class SafeContinuation<in T>(val delegate: Continuation<T>) : Continuation<T> {
14+
// consensus on result with asynchronous calls to continuation
15+
val result = AtomicReference<Any?>(Undecided)
16+
17+
override fun resume(data: T) {
18+
if (result.compareAndSet(Undecided, data)) return
19+
delegate.resume(data)
20+
}
21+
22+
override fun resumeWithException(exception: Throwable) {
23+
if (result.compareAndSet(Undecided, Fail(exception))) return
24+
delegate.resumeWithException(exception)
25+
}
26+
27+
fun returnResult(): Any? {
28+
if (result.get() == Undecided) result.compareAndSet(Undecided, Suspend)
29+
val result = result.get()
30+
if (result is Fail) throw result.e else return result
31+
}
32+
}
33+
34+
private object Undecided
35+
private class Fail(val e: Throwable)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package kotlinx.coroutines
2+
3+
class RunWithCCTest {
4+
// TODO
5+
}

kotlinx-coroutines-async/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@
3030
<version>1.3.2</version>
3131
<scope>test</scope>
3232
</dependency>
33+
34+
<dependency>
35+
<groupId>org.jetbrains.kotlinx</groupId>
36+
<artifactId>kotlinx-coroutines-async-common</artifactId>
37+
<version>${version}</version>
38+
<scope>compile</scope>
39+
</dependency>
3340
</dependencies>
3441

3542
</project>

kotlinx-coroutines-async/src/main/kotlin/async.kt

Lines changed: 65 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -61,23 +61,27 @@ typealias ContinuationWrapper = (() -> Unit) -> Unit
6161

6262
@AllowSuspendExtensions
6363
class FutureController<T>(
64-
private val continuationWrapper: ContinuationWrapper?
64+
val continuationWrapper: ContinuationWrapper?
6565
) {
6666
val future = CompletableFuture<T>()
6767

68-
suspend fun <V> await(f: CompletableFuture<V>, machine: Continuation<V>) {
69-
f.whenComplete { value, throwable ->
70-
wrapContinuationIfNeeded {
68+
suspend fun <V> await(f: CompletableFuture<V>): V =
69+
runWithCurrentContinuation {
70+
f.whenComplete { value, throwable ->
7171
if (throwable == null)
72-
machine.resume(value)
72+
it.resume(value)
7373
else
74-
machine.resumeWithException(throwable)
74+
it.resumeWithException(throwable)
7575
}
7676
}
77-
}
7877

79-
private fun wrapContinuationIfNeeded(block: () -> Unit) {
80-
continuationWrapper?.invoke(block) ?: block()
78+
inline operator fun interceptResume(crossinline x: () -> Unit) {
79+
if (continuationWrapper != null) {
80+
continuationWrapper.invoke { x() }
81+
}
82+
else {
83+
x()
84+
}
8185
}
8286

8387
operator fun handleResult(value: T, c: Continuation<Nothing>) {
@@ -87,74 +91,68 @@ class FutureController<T>(
8791
operator fun handleException(t: Throwable, c: Continuation<Nothing>) {
8892
future.completeExceptionally(t)
8993
}
94+
}
9095

91-
//
92-
// IO parts
93-
//
94-
suspend fun AsynchronousFileChannel.aRead(
95-
buf: ByteBuffer,
96-
position: Long,
97-
c: Continuation<Int>
98-
) {
99-
this.read(buf, position, null, AsyncIOHandler(c))
100-
}
96+
//
97+
// IO parts
98+
//
99+
suspend fun AsynchronousFileChannel.aRead(
100+
buf: ByteBuffer,
101+
position: Long
102+
) = runWithCurrentContinuation<Int> { c ->
103+
this.read(buf, position, null, AsyncIOHandler(c))
104+
}
101105

102-
suspend fun AsynchronousFileChannel.aWrite(
103-
buf: ByteBuffer,
104-
position: Long,
105-
c: Continuation<Int>
106-
) {
107-
this.write(buf, position, null, AsyncIOHandler(c))
108-
}
106+
suspend fun AsynchronousFileChannel.aWrite(
107+
buf: ByteBuffer,
108+
position: Long
109+
) = runWithCurrentContinuation<Int> { c ->
110+
this.write(buf, position, null, AsyncIOHandler(c))
111+
}
109112

110-
suspend fun AsynchronousServerSocketChannel.aAccept(
111-
c: Continuation<AsynchronousSocketChannel>
112-
) {
113-
this.accept(null, AsyncIOHandler(c))
114-
}
113+
suspend fun AsynchronousServerSocketChannel.aAccept() =
114+
runWithCurrentContinuation<AsynchronousSocketChannel> { c ->
115+
this.accept(null, AsyncIOHandler(c))
116+
}
115117

116-
suspend fun AsynchronousSocketChannel.aConnect(
117-
socketAddress: SocketAddress,
118-
c: Continuation<Unit>
119-
) {
120-
this.connect(socketAddress, null, AsyncVoidIOHandler(c))
121-
}
118+
suspend fun AsynchronousSocketChannel.aConnect(
119+
socketAddress: SocketAddress
120+
) = runWithCurrentContinuation<Unit> { c ->
121+
this.connect(socketAddress, null, AsyncVoidIOHandler(c))
122+
}
122123

123-
suspend fun AsynchronousSocketChannel.aRead(
124-
buf: ByteBuffer,
125-
timeout: Long = 0L,
126-
timeUnit: TimeUnit = TimeUnit.MILLISECONDS,
127-
c: Continuation<Int>
128-
) {
129-
this.read(buf, timeout, timeUnit, null, AsyncIOHandler(c))
130-
}
124+
suspend fun AsynchronousSocketChannel.aRead(
125+
buf: ByteBuffer,
126+
timeout: Long = 0L,
127+
timeUnit: TimeUnit = TimeUnit.MILLISECONDS
128+
) = runWithCurrentContinuation<Int> { c ->
129+
this.read(buf, timeout, timeUnit, null, AsyncIOHandler(c))
130+
}
131131

132-
suspend fun AsynchronousSocketChannel.aWrite(
133-
buf: ByteBuffer,
134-
timeout: Long = 0L,
135-
timeUnit: TimeUnit = TimeUnit.MILLISECONDS,
136-
c: Continuation<Int>
137-
) {
138-
this.write(buf, timeout, timeUnit, null, AsyncIOHandler(c))
139-
}
132+
suspend fun AsynchronousSocketChannel.aWrite(
133+
buf: ByteBuffer,
134+
timeout: Long = 0L,
135+
timeUnit: TimeUnit = TimeUnit.MILLISECONDS
136+
) = runWithCurrentContinuation<Int> { c ->
137+
this.write(buf, timeout, timeUnit, null, AsyncIOHandler(c))
138+
}
140139

141-
private class AsyncIOHandler<E>(val c: Continuation<E>) : CompletionHandler<E, Nothing?> {
142-
override fun completed(result: E, attachment: Nothing?) {
143-
c.resume(result)
144-
}
140+
private class AsyncIOHandler<E>(val c: Continuation<E>) : CompletionHandler<E, Nothing?> {
141+
override fun completed(result: E, attachment: Nothing?) {
142+
c.resume(result)
143+
}
145144

146-
override fun failed(exc: Throwable, attachment: Nothing?) {
147-
c.resumeWithException(exc)
148-
}
145+
override fun failed(exc: Throwable, attachment: Nothing?) {
146+
c.resumeWithException(exc)
149147
}
148+
}
150149

151-
private class AsyncVoidIOHandler(val c: Continuation<Unit>) : CompletionHandler<Void?, Nothing?> {
152-
override fun completed(result: Void?, attachment: Nothing?) {
153-
c.resume(Unit)
154-
}
150+
private class AsyncVoidIOHandler(val c: Continuation<Unit>) : CompletionHandler<Void?, Nothing?> {
151+
override fun completed(result: Void?, attachment: Nothing?) {
152+
c.resume(Unit)
153+
}
155154

156-
override fun failed(exc: Throwable, attachment: Nothing?) {
157-
c.resumeWithException(exc)
158-
}
155+
override fun failed(exc: Throwable, attachment: Nothing?) {
156+
c.resumeWithException(exc)
159157
}
160158
}

kotlinx-coroutines-async/src/test/kotlin/AsyncIOTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package kotlinx.coroutines.asyncIO
22

3+
import kotlinx.coroutines.*
34
import kotlinx.coroutines.async
45
import org.apache.commons.io.FileUtils
56
import org.junit.Rule
@@ -27,7 +28,7 @@ class AsyncIOTest {
2728

2829
FileUtils.writeStringToFile(
2930
inputFile,
30-
(1..100000).map { it.toString() }.joinToString(""))
31+
(1..100000).map(Int::toString).joinToString(""))
3132

3233
val input = AsynchronousFileChannel.open(inputFile.toPath())
3334
val output =

kotlinx-coroutines-async/src/test/kotlin/AsyncTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class AsyncTest {
7878
it()
7979
depth.andDecrement
8080
}) {
81-
assertEquals(0, depth.get(), "Part before first suspension should not be wrapped")
81+
assertEquals(1, depth.get(), "Part before first suspension must be wrapped")
8282

8383
val result =
8484
await(CompletableFuture.supplyAsync {

kotlinx-coroutines-generate/src/main/kotlin/generate.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ class GeneratorController<T> internal constructor() : AbstractIterator<T>() {
2828
nextStep = step
2929
}
3030

31-
suspend fun yield(value: T, c: Continuation<Unit>) {
31+
suspend fun yield(value: T) = suspendWithCurrentContinuation<Unit> { c ->
3232
setNext(value)
3333
setNextStep(c)
34+
35+
Suspend
3436
}
3537

3638
operator fun handleResult(result: Unit, c: Continuation<Nothing>) {

kotlinx-coroutines-rx/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@
2929
<artifactId>rxjava</artifactId>
3030
<version>1.1.5</version>
3131
</dependency>
32+
33+
<dependency>
34+
<groupId>org.jetbrains.kotlinx</groupId>
35+
<artifactId>kotlinx-coroutines-async-common</artifactId>
36+
<version>${version}</version>
37+
<scope>compile</scope>
38+
</dependency>
3239
</dependencies>
3340

3441
</project>

kotlinx-coroutines-rx/src/main/kotlin/asyncRx.kt

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,19 @@ fun <T> asyncRx(
2828
class RxController<T> internal constructor() {
2929
internal val result: AsyncSubject<T> = AsyncSubject.create<T>()
3030

31-
suspend fun <V> Observable<V>.awaitFirst(x: Continuation<V>) {
32-
this.first().subscribeWithContinuation(x)
33-
}
31+
suspend fun <V> Observable<V>.awaitFirst() = first().awaitOne()
3432

35-
suspend fun <V> Observable<V>.awaitLast(x: Continuation<V>) {
36-
this.last().subscribeWithContinuation(x)
37-
}
33+
suspend fun <V> Observable<V>.awaitLast() = last().awaitOne()
3834

39-
suspend fun <V> Observable<V>.awaitSingle(x: Continuation<V>) {
40-
this.single().subscribeWithContinuation(x)
41-
}
35+
suspend fun <V> Observable<V>.awaitSingle() = single().awaitOne()
4236

43-
private fun <V> Observable<V>.subscribeWithContinuation(x: Continuation<V>) {
37+
private suspend fun <V> Observable<V>.awaitOne() = runWithCurrentContinuation<V> { x ->
4438
subscribe(x::resume, x::resumeWithException)
4539
}
4640

4741
suspend fun <V> Observable<V>.applyForEachAndAwait(
48-
block: (V) -> Unit,
49-
x: Continuation<Unit>
50-
) {
42+
block: (V) -> Unit
43+
) = runWithCurrentContinuation<Unit> { x->
5144
this.subscribe(block, x::resumeWithException, { x.resume(Unit) })
5245
}
5346

0 commit comments

Comments
 (0)