Skip to content

Commit 1216e91

Browse files
committed
Select statement with onSend/onReceive/onAwait clauses
1 parent ee88fbe commit 1216e91

24 files changed

+2178
-341
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,10 @@ public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, bl
9393
// --------------- implementation ---------------
9494

9595
private open class StandaloneCoroutine(
96-
val parentContext: CoroutineContext,
96+
override val parentContext: CoroutineContext,
9797
active: Boolean
98-
) : AbstractCoroutine<Unit>(parentContext, active) {
99-
override fun afterCompletion(state: Any?) {
98+
) : AbstractCoroutine<Unit>(active) {
99+
override fun afterCompletion(state: Any?, mode: Int) {
100100
// note the use of the parent's job context below!
101101
if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.exception)
102102
}
@@ -119,13 +119,13 @@ private class InnerCoroutine<in T>(
119119
}
120120

121121
private class BlockingCoroutine<T>(
122-
context: CoroutineContext,
122+
override val parentContext: CoroutineContext,
123123
val blockedThread: Thread,
124124
val hasPrivateEventLoop: Boolean
125-
) : AbstractCoroutine<T>(context, active = true) {
126-
val eventLoop: EventLoop? = context[ContinuationInterceptor] as? EventLoop
125+
) : AbstractCoroutine<T>(active = true) {
126+
val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop
127127

128-
override fun afterCompletion(state: Any?) {
128+
override fun afterCompletion(state: Any?, mode: Int) {
129129
if (Thread.currentThread() != blockedThread)
130130
LockSupport.unpark(blockedThread)
131131
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 85 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package kotlinx.coroutines.experimental
1919
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
2020
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
2121
import kotlin.coroutines.experimental.Continuation
22-
import kotlin.coroutines.experimental.ContinuationInterceptor
22+
import kotlin.coroutines.experimental.CoroutineContext
2323
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
2424
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2525
import kotlin.coroutines.experimental.suspendCoroutine
@@ -57,18 +57,27 @@ public interface CancellableContinuation<in T> : Continuation<T>, Job {
5757
* Tries to resume this continuation with a given value and returns non-null object token if it was successful,
5858
* or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
5959
* [completeResume] must be invoked with it.
60+
*
61+
* When [idempotent] is not `null`, this function performs _idempotent_ operation, so that
62+
* further invocations with the same non-null reference produce the same result.
63+
*
64+
* @suppress **This is unstable API and it is subject to change.**
6065
*/
61-
public fun tryResume(value: T): Any?
66+
public fun tryResume(value: T, idempotent: Any? = null): Any?
6267

6368
/**
6469
* Tries to resume this continuation with a given exception and returns non-null object token if it was successful,
6570
* or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
6671
* [completeResume] must be invoked with it.
72+
*
73+
* @suppress **This is unstable API and it is subject to change.**
6774
*/
6875
public fun tryResumeWithException(exception: Throwable): Any?
6976

7077
/**
7178
* Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
79+
*
80+
* @suppress **This is unstable API and it is subject to change.**
7281
*/
7382
public fun completeResume(token: Any)
7483

@@ -113,7 +122,7 @@ public inline suspend fun <T> suspendCancellableCoroutine(
113122
crossinline block: (CancellableContinuation<T>) -> Unit
114123
): T =
115124
suspendCoroutineOrReturn { cont ->
116-
val cancellable = CancellableContinuationImpl(cont, getParentJobOrAbort(cont), active = true)
125+
val cancellable = CancellableContinuationImpl(cont, active = true)
117126
if (!holdCancellability) cancellable.initCancellability()
118127
block(cancellable)
119128
cancellable.getResult()
@@ -140,58 +149,67 @@ private class RemoveOnCancel(
140149
override fun toString() = "RemoveOnCancel[$node]"
141150
}
142151

143-
@PublishedApi
144-
internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
145-
val job = cont.context[Job]
146-
// fast path when parent job is already complete (we don't even construct CancellableContinuationImpl object)
147-
if (job != null && !job.isActive) throw job.getCompletionException()
148-
return job
149-
}
150-
151152
@PublishedApi
152153
internal open class CancellableContinuationImpl<in T>(
153-
private val delegate: Continuation<T>,
154-
private val parentJob: Job?,
154+
@JvmField
155+
protected val delegate: Continuation<T>,
155156
active: Boolean
156-
) : AbstractCoroutine<T>(delegate.context, active), CancellableContinuation<T> {
157+
) : AbstractCoroutine<T>(active), CancellableContinuation<T> {
157158
@Volatile
158159
private var decision = UNDECIDED
159160

160-
private companion object {
161+
override val parentContext: CoroutineContext
162+
get() = delegate.context
163+
164+
protected companion object {
165+
@JvmStatic
161166
val DECISION: AtomicIntegerFieldUpdater<CancellableContinuationImpl<*>> =
162167
AtomicIntegerFieldUpdater.newUpdater(CancellableContinuationImpl::class.java, "decision")
163168

164169
const val UNDECIDED = 0
165170
const val SUSPENDED = 1
166171
const val RESUMED = 2
167-
const val YIELD = 3 // used by cancellable "yield"
168-
const val UNDISPATCHED = 4 // used by "undispatchedXXX"
172+
173+
const val MODE_UNDISPATCHED = 1
174+
const val MODE_DIRECT = 2
175+
176+
@Suppress("UNCHECKED_CAST")
177+
fun <T> getSuccessfulResult(state: Any?): T = if (state is CompletedIdempotentResult) state.result as T else state as T
169178
}
170179

171180
override fun initCancellability() {
172-
initParentJob(parentJob)
181+
initParentJob(delegate.context[Job])
173182
}
174183

175184
@PublishedApi
176185
internal fun getResult(): Any? {
177186
val decision = this.decision // volatile read
178-
when (decision) {
179-
UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
180-
YIELD -> return COROUTINE_SUSPENDED
181-
}
187+
if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
182188
// otherwise, afterCompletion was already invoked, and the result is in the state
183189
val state = this.state
184190
if (state is CompletedExceptionally) throw state.exception
185-
return state
191+
return getSuccessfulResult(state)
186192
}
187193

188194
override val isCancelled: Boolean get() = state is Cancelled
189195

190-
override fun tryResume(value: T): Any? {
196+
override fun tryResume(value: T, idempotent: Any?): Any? {
191197
while (true) { // lock-free loop on state
192198
val state = this.state // atomic read
193199
when (state) {
194-
is Incomplete -> if (tryUpdateState(state, value)) return state
200+
is Incomplete -> {
201+
val idempotentStart = state.idempotentStart
202+
val update: Any? = if (idempotent == null && idempotentStart == null) value else
203+
CompletedIdempotentResult(idempotentStart, idempotent, value, state)
204+
if (tryUpdateState(state, update)) return state
205+
}
206+
is CompletedIdempotentResult -> {
207+
if (state.idempotentResume === idempotent) {
208+
check(state.result === value) { "Non-idempotent resume" }
209+
return state.token
210+
} else
211+
return null
212+
}
195213
else -> return null // cannot resume -- not active anymore
196214
}
197215
}
@@ -201,56 +219,69 @@ internal open class CancellableContinuationImpl<in T>(
201219
while (true) { // lock-free loop on state
202220
val state = this.state // atomic read
203221
when (state) {
204-
is Incomplete -> if (tryUpdateState(state, CompletedExceptionally(exception))) return state
222+
is Incomplete -> {
223+
if (tryUpdateState(state, CompletedExceptionally(state.idempotentStart, exception))) return state
224+
}
205225
else -> return null // cannot resume -- not active anymore
206226
}
207227
}
208228
}
209229

210230
override fun completeResume(token: Any) {
211-
completeUpdateState(token, state)
231+
completeUpdateState(token, state, mode = 0)
212232
}
213233

214-
@Suppress("UNCHECKED_CAST")
215-
override fun afterCompletion(state: Any?) {
234+
override fun afterCompletion(state: Any?, mode: Int) {
216235
val decision = this.decision // volatile read
217236
if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return // will get result in getResult
218237
// otherwise, getResult has already commenced, i.e. it was resumed later or in other thread
219-
when {
220-
decision == UNDISPATCHED -> undispatchedCompletion(state)
221-
state is CompletedExceptionally -> delegate.resumeWithException(state.exception)
222-
decision == YIELD && delegate is DispatchedContinuation -> delegate.resumeYield(parentJob, state as T)
223-
else -> delegate.resume(state as T)
238+
if (state is CompletedExceptionally) {
239+
val exception = state.exception
240+
when (mode) {
241+
0 -> delegate.resumeWithException(exception)
242+
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWithException(exception)
243+
MODE_DIRECT -> {
244+
if (delegate is DispatchedContinuation)
245+
delegate.continuation.resumeWithException(exception)
246+
else
247+
delegate.resumeWithException(exception)
248+
}
249+
else -> error("Invalid mode $mode")
250+
}
251+
} else {
252+
val value = getSuccessfulResult<T>(state)
253+
when (mode) {
254+
0 -> delegate.resume(value)
255+
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatched(value)
256+
MODE_DIRECT -> {
257+
if (delegate is DispatchedContinuation)
258+
delegate.continuation.resume(value)
259+
else
260+
delegate.resume(value)
261+
}
262+
else -> error("Invalid mode $mode")
263+
}
224264
}
225265
}
226266

227-
@Suppress("UNCHECKED_CAST")
228-
private fun undispatchedCompletion(state: Any?) {
229-
delegate as DispatchedContinuation // type assertion -- was checked in resumeUndispatched
230-
if (state is CompletedExceptionally)
231-
delegate.resumeUndispatchedWithException(state.exception)
232-
else
233-
delegate.resumeUndispatched(state as T)
234-
}
235-
236-
// can only be invoked in the same thread as getResult (see "yield"), afterCompletion may be concurrent
237-
fun resumeYield(value: T) {
238-
if ((context[ContinuationInterceptor] as? CoroutineDispatcher)?.isDispatchNeeded(context) == true)
239-
DECISION.compareAndSet(this, UNDECIDED, YIELD) // try mark as needing dispatch
240-
resume(value)
241-
}
242-
243267
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
244268
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
245269
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
246-
DECISION.compareAndSet(this@CancellableContinuationImpl, SUSPENDED, UNDISPATCHED)
247-
resume(value)
270+
resume(value, MODE_UNDISPATCHED)
248271
}
249272

250273
override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
251274
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
252275
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
253-
DECISION.compareAndSet(this@CancellableContinuationImpl, SUSPENDED, UNDISPATCHED)
254-
resumeWithException(exception)
276+
resumeWithException(exception, MODE_UNDISPATCHED)
277+
}
278+
279+
private class CompletedIdempotentResult(
280+
idempotentStart: Any?,
281+
@JvmField val idempotentResume: Any?,
282+
@JvmField val result: Any?,
283+
@JvmField val token: Incomplete
284+
) : CompletedIdempotentStart(idempotentStart) {
285+
override fun toString(): String = "CompletedIdempotentResult[$result]"
255286
}
256287
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ public abstract class CoroutineDispatcher :
9494
}
9595

9696
internal class DispatchedContinuation<in T>(
97-
val dispatcher: CoroutineDispatcher,
98-
val continuation: Continuation<T>
97+
@JvmField val dispatcher: CoroutineDispatcher,
98+
@JvmField val continuation: Continuation<T>
9999
): Continuation<T> by continuation {
100100
override fun resume(value: T) {
101101
val context = continuation.context
@@ -132,20 +132,15 @@ internal class DispatchedContinuation<in T>(
132132
}
133133

134134
// used by "yield" implementation
135-
fun resumeYield(job: Job?, value: T) {
135+
internal fun dispatchYield(job: Job?, value: T) {
136136
val context = continuation.context
137-
if (dispatcher.isDispatchNeeded(context))
138-
dispatcher.dispatch(context, Runnable {
139-
withCoroutineContext(context) {
140-
if (job?.isCompleted == true)
141-
continuation.resumeWithException(job.getCompletionException())
142-
else
143-
continuation.resume(value)
144-
}
145-
})
146-
else
137+
dispatcher.dispatch(context, Runnable {
147138
withCoroutineContext(context) {
148-
continuation.resume(value)
139+
if (job != null && job.isCompleted)
140+
continuation.resumeWithException(job.getCompletionException())
141+
else
142+
continuation.resume(value)
149143
}
144+
})
150145
}
151146
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,33 +49,48 @@ public interface CoroutineScope {
4949
* implement completion [Continuation], [Job], and [CoroutineScope] interfaces.
5050
* It stores the result of continuation in the state of the job.
5151
*
52-
* @param context the new context for the coroutine. Use [newCoroutineContext] to create it.
5352
* @param active when `true` coroutine is created in _active_ state, when `false` in _new_ state. See [Job] for details.
5453
* @suppress **This is unstable API and it is subject to change.**
5554
*/
5655
public abstract class AbstractCoroutine<in T>(
57-
context: CoroutineContext,
5856
active: Boolean
5957
) : JobSupport(active), Continuation<T>, CoroutineScope {
58+
// context must be Ok for unsafe publishing (it is persistent),
59+
// so we don't mark this _context variable as volatile, but leave potential benign race here
60+
private var _context: CoroutineContext? = null // created on first need
61+
6062
@Suppress("LeakingThis")
61-
override val context: CoroutineContext = context + this // merges this job into this context
63+
public final override val context: CoroutineContext
64+
get() = _context ?: createContext().also { _context = it }
65+
66+
protected abstract val parentContext: CoroutineContext
67+
68+
protected open fun createContext() = parentContext + this
69+
70+
protected open fun defaultResumeMode(): Int = 0
6271

63-
final override fun resume(value: T) {
72+
final override fun resume(value: T) = resume(value, defaultResumeMode())
73+
74+
protected fun resume(value: T, mode: Int) {
6475
while (true) { // lock-free loop on state
6576
val state = this.state // atomic read
6677
when (state) {
67-
is Incomplete -> if (updateState(state, value)) return
78+
is Incomplete -> if (updateState(state, value, mode)) return
6879
is Cancelled -> return // ignore resumes on cancelled continuation
6980
else -> throw IllegalStateException("Already resumed, but got value $value")
7081
}
7182
}
7283
}
7384

74-
final override fun resumeWithException(exception: Throwable) {
85+
final override fun resumeWithException(exception: Throwable) = resumeWithException(exception, defaultResumeMode())
86+
87+
protected fun resumeWithException(exception: Throwable, mode: Int) {
7588
while (true) { // lock-free loop on state
7689
val state = this.state // atomic read
7790
when (state) {
78-
is Incomplete -> if (updateState(state, CompletedExceptionally(exception))) return
91+
is Incomplete -> {
92+
if (updateState(state, CompletedExceptionally(state.idempotentStart, exception), mode)) return
93+
}
7994
is Cancelled -> {
8095
// ignore resumes on cancelled continuation, but handle exception if a different one is here
8196
if (exception != state.exception) handleCoroutineException(context, exception)

0 commit comments

Comments
 (0)