Skip to content

Commit 222f3f2

Browse files
committed
Removed suspendCoroutineOrReturn use in preparation for Kotlin 1.3
Replaced with suspendCoroutineUninterceptedOrReturn
1 parent d1c8a3c commit 222f3f2

File tree

12 files changed

+79
-57
lines changed

12 files changed

+79
-57
lines changed

common/kotlinx-coroutines-core-common/src/Builders.common.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,27 +106,27 @@ public suspend fun <T> withContext(
106106
context: CoroutineContext,
107107
start: CoroutineStart = CoroutineStart.DEFAULT,
108108
block: suspend () -> T
109-
): T = suspendCoroutineOrReturn sc@ { cont ->
110-
val oldContext = cont.context
109+
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
110+
val oldContext = uCont.context
111111
// fast path #1 if there is no change in the actual context:
112112
if (context === oldContext || context is CoroutineContext.Element && oldContext[context.key] === context)
113-
return@sc block.startCoroutineUninterceptedOrReturn(cont)
113+
return@sc block.startCoroutineUninterceptedOrReturn(uCont)
114114
// compute new context
115115
val newContext = oldContext + context
116116
// fast path #2 if the result is actually the same
117117
if (newContext === oldContext)
118-
return@sc block.startCoroutineUninterceptedOrReturn(cont)
118+
return@sc block.startCoroutineUninterceptedOrReturn(uCont)
119119
// fast path #3 if the new dispatcher is the same as the old one.
120120
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
121121
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
122-
val newContinuation = RunContinuationDirect(newContext, cont)
122+
val newContinuation = RunContinuationDirect(newContext, uCont)
123123
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
124124
}
125125
// slowest path otherwise -- use new interceptor, sync to its result via a full-blown instance of RunCompletion
126126
require(!start.isLazy) { "$start start is not supported" }
127127
val completion = RunCompletion(
128128
context = newContext,
129-
delegate = cont,
129+
delegate = uCont.intercepted(), // delegate to continuation intercepted with old dispatcher on completion
130130
resumeMode = if (start == CoroutineStart.ATOMIC) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE
131131
)
132132
completion.initParentJobInternal(newContext[Job]) // attach to job

common/kotlinx-coroutines-core-common/src/CancellableContinuation.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ public suspend inline fun <T> suspendCancellableCoroutine(
172172
holdCancellability: Boolean = false,
173173
crossinline block: (CancellableContinuation<T>) -> Unit
174174
): T =
175-
suspendCoroutineOrReturn { cont ->
176-
val cancellable = CancellableContinuationImpl(cont, resumeMode = MODE_CANCELLABLE)
175+
suspendCoroutineUninterceptedOrReturn { uCont ->
176+
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
177177
if (!holdCancellability) cancellable.initCancellability()
178178
block(cancellable)
179179
cancellable.getResult()
@@ -191,8 +191,8 @@ public suspend inline fun <T> suspendAtomicCancellableCoroutine(
191191
holdCancellability: Boolean = false,
192192
crossinline block: (CancellableContinuation<T>) -> Unit
193193
): T =
194-
suspendCoroutineOrReturn { cont ->
195-
val cancellable = CancellableContinuationImpl(cont, resumeMode = MODE_ATOMIC_DEFAULT)
194+
suspendCoroutineUninterceptedOrReturn { uCont ->
195+
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT)
196196
if (!holdCancellability) cancellable.initCancellability()
197197
block(cancellable)
198198
cancellable.getResult()

common/kotlinx-coroutines-core-common/src/JobSupport.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -402,10 +402,8 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
402402

403403
public final override suspend fun join() {
404404
if (!joinInternal()) { // fast-path no wait
405-
return suspendCoroutineOrReturn { cont ->
406-
cont.context.checkCompletion()
407-
Unit // do not suspend
408-
}
405+
coroutineContext.checkCompletion()
406+
return // do not suspend
409407
}
410408
return joinSuspend() // slow-path wait
411409
}

common/kotlinx-coroutines-core-common/src/ResumeMode.kt

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
package kotlinx.coroutines.experimental
66

7-
import kotlin.coroutines.experimental.Continuation
7+
import kotlin.coroutines.experimental.*
8+
import kotlin.coroutines.experimental.intrinsics.*
89

910
@PublishedApi internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
1011
@PublishedApi internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
@@ -36,3 +37,25 @@ internal fun <T> Continuation<T>.resumeWithExceptionMode(exception: Throwable, m
3637
else -> error("Invalid mode $mode")
3738
}
3839
}
40+
41+
internal fun <T> Continuation<T>.resumeUninterceptedMode(value: T, mode: Int) {
42+
when (mode) {
43+
MODE_ATOMIC_DEFAULT -> intercepted().resume(value)
44+
MODE_CANCELLABLE -> intercepted().resumeCancellable(value)
45+
MODE_DIRECT -> resume(value)
46+
MODE_UNDISPATCHED -> resume(value)
47+
MODE_IGNORE -> {}
48+
else -> error("Invalid mode $mode")
49+
}
50+
}
51+
52+
internal fun <T> Continuation<T>.resumeUninterceptedWithExceptionMode(exception: Throwable, mode: Int) {
53+
when (mode) {
54+
MODE_ATOMIC_DEFAULT -> intercepted().resumeWithException(exception)
55+
MODE_CANCELLABLE -> intercepted().resumeCancellableWithException(exception)
56+
MODE_DIRECT -> resumeWithException(exception)
57+
MODE_UNDISPATCHED -> resumeWithException(exception)
58+
MODE_IGNORE -> {}
59+
else -> error("Invalid mode $mode")
60+
}
61+
}

common/kotlinx-coroutines-core-common/src/Scheduled.kt

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ public suspend fun <T> withTimeout(time: Int, block: suspend CoroutineScope.() -
5151
*/
5252
public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend CoroutineScope.() -> T): T {
5353
if (time <= 0L) throw CancellationException("Timed out immediately")
54-
return suspendCoroutineOrReturn { cont: Continuation<T> ->
55-
setupTimeout(TimeoutCoroutine(time, unit, cont), block)
54+
return suspendCoroutineUninterceptedOrReturn { uCont ->
55+
setupTimeout(TimeoutCoroutine(time, unit, uCont), block)
5656
}
5757
}
5858

@@ -61,7 +61,7 @@ private fun <U, T: U> setupTimeout(
6161
block: suspend CoroutineScope.() -> T
6262
): Any? {
6363
// schedule cancellation of this coroutine on time
64-
val cont = coroutine.cont
64+
val cont = coroutine.uCont
6565
val context = cont.context
6666
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine.unit, coroutine))
6767
// restart block using new coroutine with new job,
@@ -79,8 +79,8 @@ public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISE
7979
private open class TimeoutCoroutine<U, in T: U>(
8080
@JvmField val time: Long,
8181
@JvmField val unit: TimeUnit,
82-
@JvmField val cont: Continuation<U>
83-
) : AbstractCoroutine<T>(cont.context, active = true), Runnable, Continuation<T> {
82+
@JvmField val uCont: Continuation<U> // unintercepted continuation
83+
) : AbstractCoroutine<T>(uCont.context, active = true), Runnable, Continuation<T> {
8484
override val defaultResumeMode: Int get() = MODE_DIRECT
8585

8686
@Suppress("LeakingThis")
@@ -91,9 +91,9 @@ private open class TimeoutCoroutine<U, in T: U>(
9191
@Suppress("UNCHECKED_CAST")
9292
internal override fun onCompletionInternal(state: Any?, mode: Int) {
9393
if (state is CompletedExceptionally)
94-
cont.resumeWithExceptionMode(state.cause, mode)
94+
uCont.resumeUninterceptedWithExceptionMode(state.cause, mode)
9595
else
96-
cont.resumeMode(state as T, mode)
96+
uCont.resumeUninterceptedMode(state as T, mode)
9797
}
9898

9999
override fun nameString(): String =
@@ -140,8 +140,8 @@ public suspend fun <T> withTimeoutOrNull(time: Int, block: suspend CoroutineScop
140140
*/
141141
public suspend fun <T> withTimeoutOrNull(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend CoroutineScope.() -> T): T? {
142142
if (time <= 0L) return null
143-
return suspendCoroutineOrReturn { cont: Continuation<T?> ->
144-
setupTimeout(TimeoutOrNullCoroutine(time, unit, cont), block)
143+
return suspendCoroutineUninterceptedOrReturn { uCont ->
144+
setupTimeout(TimeoutOrNullCoroutine(time, unit, uCont), block)
145145
}
146146
}
147147

@@ -155,17 +155,17 @@ public suspend fun <T> withTimeoutOrNull(time: Long, unit: TimeUnit = TimeUnit.M
155155
private class TimeoutOrNullCoroutine<T>(
156156
time: Long,
157157
unit: TimeUnit,
158-
cont: Continuation<T?>
159-
) : TimeoutCoroutine<T?, T>(time, unit, cont) {
158+
uCont: Continuation<T?> // unintercepted continuation
159+
) : TimeoutCoroutine<T?, T>(time, unit, uCont) {
160160
@Suppress("UNCHECKED_CAST")
161161
internal override fun onCompletionInternal(state: Any?, mode: Int) {
162162
if (state is CompletedExceptionally) {
163163
val exception = state.cause
164164
if (exception is TimeoutCancellationException && exception.coroutine === this)
165-
cont.resumeMode(null, mode) else
166-
cont.resumeWithExceptionMode(exception, mode)
165+
uCont.resumeUninterceptedMode(null, mode) else
166+
uCont.resumeUninterceptedWithExceptionMode(exception, mode)
167167
} else
168-
cont.resumeMode(state as T, mode)
168+
uCont.resumeUninterceptedMode(state as T, mode)
169169
}
170170
}
171171

common/kotlinx-coroutines-core-common/src/Yield.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ import kotlin.coroutines.experimental.intrinsics.*
1515
* If the [Job] of the current coroutine is cancelled or completed when this suspending function is invoked or while
1616
* this function is waiting for dispatching, it resumes with [CancellationException].
1717
*/
18-
public suspend fun yield(): Unit = suspendCoroutineOrReturn sc@ { cont ->
19-
val context = cont.context
18+
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
19+
val context = uCont.context
2020
context.checkCompletion()
21-
if (cont !is DispatchedContinuation<Unit>) return@sc Unit
21+
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
2222
if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
2323
cont.dispatchYield(Unit)
2424
COROUTINE_SUSPENDED

common/kotlinx-coroutines-core-common/src/selects/Select.kt

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,8 @@ public interface SelectInstance<in R> {
169169
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
170170
*/
171171
public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R =
172-
// TODO suspendCoroutineUnintercepted
173-
suspendCoroutineOrReturn { cont ->
174-
val scope = SelectBuilderImpl(cont)
172+
suspendCoroutineUninterceptedOrReturn { uCont ->
173+
val scope = SelectBuilderImpl(uCont)
175174
try {
176175
builder(scope)
177176
} catch (e: Throwable) {
@@ -187,7 +186,7 @@ private val RESUMED: Any = Symbol("RESUMED")
187186

188187
@PublishedApi
189188
internal class SelectBuilderImpl<in R>(
190-
private val delegate: Continuation<R>
189+
private val uCont: Continuation<R> // unintercepted delegate continuation
191190
) : LockFreeLinkedListHead(), SelectBuilder<R>,
192191
SelectInstance<R>, Continuation<R> {
193192
// selection state is "this" (list of nodes) initially and is replaced by idempotent marker (or null) when selected
@@ -216,7 +215,7 @@ internal class SelectBuilderImpl<in R>(
216215
+-------------------+
217216
*/
218217

219-
override val context: CoroutineContext get() = delegate.context
218+
override val context: CoroutineContext get() = uCont.context
220219

221220
override val completion: Continuation<R> get() = this
222221

@@ -239,21 +238,21 @@ internal class SelectBuilderImpl<in R>(
239238
// Resumes in MODE_DIRECT
240239
override fun resume(value: R) {
241240
doResume({ value }) {
242-
delegate.resumeDirect(value)
241+
uCont.resume(value)
243242
}
244243
}
245244

246245
// Resumes in MODE_DIRECT
247246
override fun resumeWithException(exception: Throwable) {
248247
doResume({ Fail(exception) }) {
249-
delegate.resumeDirectWithException(exception)
248+
uCont.resumeWithException(exception)
250249
}
251250
}
252251

253252
// Resumes in MODE_CANCELLABLE
254253
override fun resumeSelectCancellableWithException(exception: Throwable) {
255254
doResume({ Fail(exception) }) {
256-
delegate.resumeCancellableWithException(exception)
255+
uCont.intercepted().resumeCancellableWithException(exception)
257256
}
258257
}
259258

common/kotlinx-coroutines-core-common/src/selects/SelectUnbiased.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ import kotlin.coroutines.experimental.intrinsics.*
1919
* See [select] function description for all the other details.
2020
*/
2121
public suspend inline fun <R> selectUnbiased(crossinline builder: SelectBuilder<R>.() -> Unit): R =
22-
suspendCoroutineOrReturn { cont ->
23-
val scope = UnbiasedSelectBuilderImpl(cont)
22+
suspendCoroutineUninterceptedOrReturn { uCont ->
23+
val scope = UnbiasedSelectBuilderImpl(uCont)
2424
try {
2525
builder(scope)
2626
} catch (e: Throwable) {
@@ -31,9 +31,9 @@ public suspend inline fun <R> selectUnbiased(crossinline builder: SelectBuilder<
3131

3232

3333
@PublishedApi
34-
internal class UnbiasedSelectBuilderImpl<in R>(cont: Continuation<R>) :
34+
internal class UnbiasedSelectBuilderImpl<in R>(uCont: Continuation<R>) :
3535
SelectBuilder<R> {
36-
val instance = SelectBuilderImpl(cont)
36+
val instance = SelectBuilderImpl(uCont)
3737
val clauses = arrayListOf<() -> Unit>()
3838

3939
@PublishedApi

core/kotlinx-coroutines-core/test/channels/TickerChannelCommonTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import org.junit.runners.*
1212
import kotlin.test.*
1313

1414
@RunWith(Parameterized::class)
15-
class TimerChannelCommonTest(private val channelFactory: Channel) : TestBase() {
15+
class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() {
1616
companion object {
1717
@Parameterized.Parameters(name = "{0}")
1818
@JvmStatic

core/kotlinx-coroutines-io/src/ByteBufferChannel.kt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2216,10 +2216,10 @@ internal class ByteBufferChannel(
22162216
private suspend fun readSuspendImpl(size: Int): Boolean {
22172217
if (!readSuspendPredicate(size)) return true
22182218

2219-
return suspendCoroutineOrReturn { raw ->
2219+
return suspendCoroutineUninterceptedOrReturn { uCont ->
22202220
val c = readSuspendContinuationCache
22212221
suspensionForSize(size, c)
2222-
c.swap(raw)
2222+
c.swap(uCont.intercepted())
22232223
}
22242224
}
22252225

@@ -2269,13 +2269,15 @@ internal class ByteBufferChannel(
22692269

22702270
writeSuspensionSize = size
22712271
if (attachedJob != null) {
2272-
return suspendCoroutineOrReturn(writeSuspension)
2272+
return suspendCoroutineUninterceptedOrReturn { uCont ->
2273+
writeSuspension(uCont.intercepted())
2274+
}
22732275
}
22742276

2275-
return suspendCoroutineOrReturn { raw ->
2277+
return suspendCoroutineUninterceptedOrReturn { uCont ->
22762278
val c = writeSuspendContinuationCache
22772279
writeSuspension(c)
2278-
c.swap(raw)
2280+
c.swap(uCont.intercepted())
22792281
}
22802282
}
22812283

0 commit comments

Comments
 (0)