Skip to content

Commit 0939f23

Browse files
author
Sergey Mashkov
committed
Make mutable continuation internally accessible
1 parent 4ee9cfb commit 0939f23

File tree

2 files changed

+148
-73
lines changed

2 files changed

+148
-73
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/InlineRendezvousSwap.kt

Lines changed: 2 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package kotlinx.coroutines.experimental.io.internal
22

3-
import kotlinx.atomicfu.*
43
import kotlinx.coroutines.experimental.*
5-
import java.util.concurrent.CancellationException
6-
import kotlin.coroutines.experimental.*
74
import kotlin.coroutines.experimental.intrinsics.*
85

96
/**
@@ -13,8 +10,8 @@ import kotlin.coroutines.experimental.intrinsics.*
1310
* - no poll/offer
1411
*/
1512
internal class InlineRendezvousSwap<T : Any> {
16-
private val senderCont = DelegatedContinuation<Unit>()
17-
private val receiverCont = DelegatedContinuation<T>()
13+
private val senderCont = MutableDelegateContinuation<Unit>()
14+
private val receiverCont = MutableDelegateContinuation<T>()
1815

1916
@Suppress("NOTHING_TO_INLINE")
2017
suspend inline fun send(e: T) = suspendCoroutineOrReturn<Unit> { c ->
@@ -42,74 +39,6 @@ internal class InlineRendezvousSwap<T : Any> {
4239
result
4340
}
4441

45-
internal class DelegatedContinuation<T : Any> : Continuation<T> {
46-
private val delegate = atomic<Continuation<T>?>(null)
47-
private var result: Any? = null
48-
private var e: Throwable? = null
49-
50-
private fun reset() {
51-
delegate.getAndSet(null)?.takeUnless { it === this }?.resumeWithException(Cancellation)
52-
result = null
53-
e = null
54-
}
55-
56-
fun swap(actual: Continuation<T>): Any? {
57-
require(actual !== this)
58-
59-
delegate.update { c ->
60-
when {
61-
c === this -> return result().also { reset() }
62-
else -> actual
63-
}
64-
}
65-
66-
return COROUTINE_SUSPENDED
67-
}
68-
69-
private fun result(): Any? {
70-
e?.let { reset(); throw it }
71-
return result
72-
}
73-
74-
override val context: CoroutineContext
75-
get() = delegate.value?.takeUnless { it === this }?.context ?: EmptyCoroutineContext
76-
77-
override fun resumeWithException(exception: Throwable) {
78-
e = exception
79-
val c = delegate.getAndUpdate { c ->
80-
when {
81-
c === this -> return
82-
c == null -> this
83-
else -> null
84-
}
85-
}
86-
87-
if (c != null) {
88-
reset()
89-
c.resumeWithException(exception)
90-
}
91-
}
92-
93-
override fun resume(value: T) {
94-
result = value
95-
val c = delegate.getAndUpdate { c ->
96-
when {
97-
c === this -> return
98-
c == null -> this
99-
else -> null
100-
}
101-
}
102-
103-
if (c != null) {
104-
reset()
105-
c.resume(value)
106-
}
107-
}
108-
}
109-
110-
companion object {
111-
private val Cancellation = CancellationException()
112-
}
11342
}
11443

11544
fun main(args: Array<String>) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package kotlinx.coroutines.experimental.io.internal
2+
3+
import kotlinx.atomicfu.*
4+
import kotlinx.coroutines.experimental.*
5+
import kotlin.coroutines.experimental.*
6+
import kotlin.coroutines.experimental.intrinsics.*
7+
8+
/**
9+
* Semi-cancellable reusable continuation. Unlike regular continuation this implementation has limitations:
10+
* - could be resumed only once per swap, undefined behaviour otherwise
11+
* - [T] should be neither [Throwable] nor [Continuation]
12+
* - value shouldn't be null
13+
*/
14+
internal class MutableDelegateContinuation<T : Any> : Continuation<T> {
15+
private val state = atomic<Any?>(null)
16+
private val handler = atomic<JobRelation?>(null)
17+
18+
fun swap(actual: Continuation<T>): Any {
19+
loop@while (true) {
20+
val before = state.value
21+
22+
when (before) {
23+
null -> {
24+
if (!state.compareAndSet(null, actual)) continue@loop
25+
parent(actual.context)
26+
return COROUTINE_SUSPENDED
27+
}
28+
else -> {
29+
if (!state.compareAndSet(before, null)) continue@loop
30+
if (before is Throwable) throw before
31+
@Suppress("UNCHECKED_CAST")
32+
return before as T
33+
}
34+
}
35+
}
36+
}
37+
38+
fun close() {
39+
resumeWithException(Cancellation)
40+
handler.getAndSet(null)?.dispose()
41+
}
42+
43+
private fun parent(context: CoroutineContext) {
44+
val job = context[Job]
45+
if (handler.value?.job === job) return
46+
47+
if (job == null) {
48+
handler.getAndSet(null)?.dispose()
49+
} else {
50+
val handler = JobRelation(job)
51+
val old = this.handler.getAndUpdate { j ->
52+
when {
53+
j == null -> handler
54+
j.job === job -> return
55+
else -> handler
56+
}
57+
}
58+
old?.dispose()
59+
}
60+
}
61+
62+
override val context: CoroutineContext
63+
get() = (state.value as? Continuation<*>)?.context ?: EmptyCoroutineContext
64+
65+
override fun resume(value: T) {
66+
loop@while(true) {
67+
val before = state.value
68+
69+
when (before) {
70+
null -> {
71+
if (!state.compareAndSet(null, value)) continue@loop
72+
return
73+
}
74+
is Continuation<*> -> {
75+
if (!state.compareAndSet(before, null)) continue@loop
76+
@Suppress("UNCHECKED_CAST")
77+
val cont = before as Continuation<T>
78+
return cont.resume(value)
79+
}
80+
else -> return
81+
}
82+
}
83+
}
84+
85+
override fun resumeWithException(exception: Throwable) {
86+
loop@while(true) {
87+
val before = state.value
88+
89+
when (before) {
90+
null -> {
91+
if (!state.compareAndSet(null, exception)) continue@loop
92+
return
93+
}
94+
is Continuation<*> -> {
95+
if (!state.compareAndSet(before, null)) continue@loop
96+
@Suppress("UNCHECKED_CAST")
97+
val cont = before as Continuation<T>
98+
return cont.resumeWithException(exception)
99+
}
100+
else -> return
101+
}
102+
}
103+
}
104+
105+
private fun resumeWithExceptionContinuationOnly(job: Job, exception: Throwable) {
106+
var c: Continuation<*>? = null
107+
108+
state.update {
109+
if (it !is Continuation<*>) return
110+
if (it.context[Job] !== job) return
111+
c = it
112+
null
113+
}
114+
115+
c!!.resumeWithException(exception)
116+
}
117+
118+
private inner class JobRelation(val job: Job) : CompletionHandler, DisposableHandle {
119+
private var handler: DisposableHandle = NonDisposableHandle
120+
121+
init {
122+
val h = job.invokeOnCompletion(onCancelling = true, handler = this)
123+
if (job.isActive) {
124+
handler = h
125+
}
126+
}
127+
128+
override fun invoke(cause: Throwable?) {
129+
this@MutableDelegateContinuation.handler.compareAndSet(this, null)
130+
dispose()
131+
132+
if (cause != null) {
133+
resumeWithExceptionContinuationOnly(job, cause)
134+
}
135+
}
136+
137+
override fun dispose() {
138+
handler.dispose()
139+
handler = NonDisposableHandle
140+
}
141+
}
142+
143+
private companion object {
144+
val Cancellation = CancellationException("Continuation terminated")
145+
}
146+
}

0 commit comments

Comments
 (0)