Skip to content

Commit e2fca1a

Browse files
committed
Apply new convention (without restricted controllers)
1 parent b9f427d commit e2fca1a

File tree

6 files changed

+109
-92
lines changed

6 files changed

+109
-92
lines changed

kotlinx-coroutines-async-common/src/main/kotlin/runWithCurrentContinuation.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package kotlinx.coroutines
22

33
import java.util.concurrent.atomic.AtomicReference
4+
import kotlin.coroutines.*
5+
import kotlin.coroutines.SUSPENDED
6+
import kotlin.coroutines.suspendWithCurrentContinuation
47

58
suspend fun <T> runWithCurrentContinuation(
69
block: (Continuation<T>) -> Unit
@@ -25,7 +28,7 @@ private class SafeContinuation<in T>(val delegate: Continuation<T>) : Continuati
2528
}
2629

2730
fun returnResult(): Any? {
28-
if (result.get() == Undecided) result.compareAndSet(Undecided, Suspend)
31+
if (result.get() == Undecided) result.compareAndSet(Undecided, SUSPENDED)
2932
val result = result.get()
3033
if (result is Fail) throw result.e else return result
3134
}

kotlinx-coroutines-async-example-ui/src/main/kotlin/main.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import kotlinx.coroutines.asyncUI
2+
import kotlinx.coroutines.await
23
import java.awt.Insets
34
import java.util.concurrent.CompletableFuture
45
import javax.swing.*

kotlinx-coroutines-async/src/main/kotlin/async.kt

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import java.nio.channels.CompletionHandler
99
import java.util.concurrent.CompletableFuture
1010
import java.util.concurrent.TimeUnit
1111
import javax.swing.SwingUtilities
12+
import kotlin.coroutines.Continuation
13+
import kotlin.coroutines.ResumeInterceptor
14+
import kotlin.coroutines.startCoroutine
1215

1316
/**
1417
* Run asynchronous computations based on [c] coroutine parameter
@@ -26,13 +29,50 @@ import javax.swing.SwingUtilities
2629
*
2730
* @return CompletableFuture object representing result of computations
2831
*/
32+
33+
2934
fun <T> async(
3035
continuationWrapper: ContinuationWrapper? = null,
31-
coroutine c: FutureController<T>.() -> Continuation<Unit>
36+
c: suspend () -> T
3237
): CompletableFuture<T> {
33-
val controller = FutureController<T>(continuationWrapper)
34-
c(controller).resume(Unit)
35-
return controller.future
38+
val future = CompletableFuture<T>()
39+
40+
c.startCoroutine(
41+
object : Continuation<T> {
42+
override fun resumeWithException(exception: Throwable) {
43+
future.completeExceptionally(exception)
44+
}
45+
46+
override fun resume(data: T) {
47+
future.complete(data)
48+
}
49+
},
50+
51+
if (continuationWrapper != null) {
52+
object: ResumeInterceptor {
53+
override fun <P> interceptResume(data: P, continuation: Continuation<P>): Boolean {
54+
continuationWrapper {
55+
continuation.resume(data)
56+
}
57+
58+
return true
59+
}
60+
61+
override fun interceptResumeWithException(exception: Throwable, continuation: Continuation<*>): Boolean {
62+
continuationWrapper {
63+
continuation.resumeWithException(exception)
64+
}
65+
66+
return true
67+
}
68+
}
69+
}
70+
else {
71+
null
72+
}
73+
)
74+
75+
return future
3676
}
3777

3878
/**
@@ -45,27 +85,14 @@ fun <T> async(
4585
* @See async
4686
*/
4787
fun asyncUI(
48-
coroutine c: FutureController<Unit>.() -> Continuation<Unit>
88+
c: suspend () -> Unit
4989
) {
50-
if (SwingUtilities.isEventDispatchThread()) {
51-
async({ SwingUtilities.invokeLater(it) }, c)
52-
}
53-
else {
54-
SwingUtilities.invokeLater {
55-
async({ SwingUtilities.invokeLater(it) }, c)
56-
}
57-
}
90+
async({ SwingUtilities.invokeLater(it) }, c)
5891
}
5992

6093
typealias ContinuationWrapper = (() -> Unit) -> Unit
6194

62-
@AllowSuspendExtensions
63-
class FutureController<T>(
64-
val continuationWrapper: ContinuationWrapper?
65-
) {
66-
val future = CompletableFuture<T>()
67-
68-
suspend fun <V> await(f: CompletableFuture<V>): V =
95+
suspend fun <V> await(f: CompletableFuture<V>): V =
6996
runWithCurrentContinuation {
7097
f.whenComplete { value, throwable ->
7198
if (throwable == null)
@@ -74,25 +101,6 @@ class FutureController<T>(
74101
it.resumeWithException(throwable)
75102
}
76103
}
77-
78-
inline operator fun interceptResume(crossinline x: () -> Unit) {
79-
if (continuationWrapper != null) {
80-
continuationWrapper.invoke { x() }
81-
}
82-
else {
83-
x()
84-
}
85-
}
86-
87-
operator fun handleResult(value: T, c: Continuation<Nothing>) {
88-
future.complete(value)
89-
}
90-
91-
operator fun handleException(t: Throwable, c: Continuation<Nothing>) {
92-
future.completeExceptionally(t)
93-
}
94-
}
95-
96104
//
97105
// IO parts
98106
//
Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,44 @@
11
package kotlinx.coroutines
22

3+
import kotlin.coroutines.Continuation
4+
import kotlin.coroutines.SUSPENDED
5+
import kotlin.coroutines.createCoroutine
6+
import kotlin.coroutines.suspendWithCurrentContinuation
7+
38
/**
49
* Creates a Sequence object based on received coroutine [c].
510
*
611
* Each call of 'yield' suspend function within the coroutine lambda generates
712
* next element of resulting sequence.
813
*/
9-
fun <T> generate(
10-
coroutine c: GeneratorController<T>.() -> Continuation<Unit>
11-
): Sequence<T> =
12-
object : Sequence<T> {
13-
override fun iterator(): Iterator<T> {
14-
val iterator = GeneratorController<T>()
15-
iterator.setNextStep(c(iterator))
16-
return iterator
17-
}
14+
interface Generator<in T> {
15+
suspend fun yield(value: T)
16+
}
17+
18+
fun <T> generate(block: suspend Generator<T>.() -> Unit): Sequence<T> = GeneratedSequence(block)
19+
20+
private class GeneratedSequence<out T>(private val block: suspend Generator<T>.() -> Unit) : Sequence<T> {
21+
override fun iterator(): Iterator<T> = GeneratedIterator(block)
22+
}
23+
24+
private class GeneratedIterator<T>(block: suspend Generator<T>.() -> Unit) : AbstractIterator<T>(), Generator<T> {
25+
private var nextStep: Continuation<Unit> = block.createCoroutine(this, object : Continuation<Unit> {
26+
override fun resume(data: Unit) {
27+
done()
1828
}
1929

20-
class GeneratorController<T> internal constructor() : AbstractIterator<T>() {
21-
private lateinit var nextStep: Continuation<Unit>
30+
override fun resumeWithException(exception: Throwable) {
31+
throw exception
32+
}
33+
})
2234

2335
override fun computeNext() {
2436
nextStep.resume(Unit)
2537
}
26-
27-
internal fun setNextStep(step: Continuation<Unit>) {
28-
nextStep = step
29-
}
30-
31-
suspend fun yield(value: T) = suspendWithCurrentContinuation<Unit> { c ->
38+
suspend override fun yield(value: T) = suspendWithCurrentContinuation<Unit> { c ->
3239
setNext(value)
33-
setNextStep(c)
40+
nextStep = c
3441

35-
Suspend
42+
SUSPENDED
3643
}
37-
38-
operator fun handleResult(result: Unit, c: Continuation<Nothing>) {
39-
done()
40-
}
41-
}
44+
}

kotlinx-coroutines-rx-example/src/main/kotlin/main.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package kotlinx.coroutines.example
22

33
import kotlinx.coroutines.asyncRx
4+
import kotlinx.coroutines.awaitSingle
45
import retrofit2.Retrofit
56
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory
67
import retrofit2.converter.gson.GsonConverterFactory

kotlinx-coroutines-rx/src/main/kotlin/asyncRx.kt

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package kotlinx.coroutines
22

33
import rx.Observable
44
import rx.subjects.AsyncSubject
5+
import kotlin.coroutines.Continuation
6+
import kotlin.coroutines.startCoroutine
57

68
/**
79
* Run asynchronous computations based on [c] coroutine parameter
@@ -16,40 +18,39 @@ import rx.subjects.AsyncSubject
1618
* @return Observable with single value containing expression returned from coroutine
1719
*/
1820
fun <T> asyncRx(
19-
coroutine c: RxController<T>.() -> Continuation<Unit>
21+
c: suspend () -> T
2022
): Observable<T> {
21-
val controller = RxController<T>()
22-
c(controller).resume(Unit)
23-
24-
return controller.result
23+
val result: AsyncSubject<T> = AsyncSubject.create<T>()
24+
25+
c.startCoroutine(
26+
object: Continuation<T> {
27+
override fun resumeWithException(exception: Throwable) {
28+
result.onError(exception)
29+
}
30+
31+
override fun resume(data: T) {
32+
result.onNext(data)
33+
result.onCompleted()
34+
}
35+
}
36+
)
37+
38+
return result
2539
}
2640

27-
@AllowSuspendExtensions
28-
class RxController<T> internal constructor() {
29-
internal val result: AsyncSubject<T> = AsyncSubject.create<T>()
30-
31-
suspend fun <V> Observable<V>.awaitFirst() = first().awaitOne()
3241

33-
suspend fun <V> Observable<V>.awaitLast() = last().awaitOne()
42+
suspend fun <V> Observable<V>.awaitFirst(): V = first().awaitOne()
3443

35-
suspend fun <V> Observable<V>.awaitSingle() = single().awaitOne()
44+
suspend fun <V> Observable<V>.awaitLast(): V = last().awaitOne()
3645

37-
private suspend fun <V> Observable<V>.awaitOne() = runWithCurrentContinuation<V> { x ->
38-
subscribe(x::resume, x::resumeWithException)
39-
}
46+
suspend fun <V> Observable<V>.awaitSingle(): V = single().awaitOne()
4047

41-
suspend fun <V> Observable<V>.applyForEachAndAwait(
42-
block: (V) -> Unit
43-
) = runWithCurrentContinuation<Unit> { x->
44-
this.subscribe(block, x::resumeWithException, { x.resume(Unit) })
45-
}
46-
47-
operator fun handleResult(v: T, x: Continuation<Nothing>) {
48-
result.onNext(v)
49-
result.onCompleted()
50-
}
48+
private suspend fun <V> Observable<V>.awaitOne(): V = runWithCurrentContinuation<V> { x ->
49+
subscribe(x::resume, x::resumeWithException)
50+
}
5151

52-
operator fun handleException(t: Throwable, x: Continuation<Nothing>) {
53-
result.onError(t)
54-
}
52+
suspend fun <V> Observable<V>.applyForEachAndAwait(
53+
block: (V) -> Unit
54+
) = runWithCurrentContinuation<Unit> { x->
55+
this.subscribe(block, x::resumeWithException, { x.resume(Unit) })
5556
}

0 commit comments

Comments
 (0)