Skip to content

Commit ca9d5be

Browse files
committed
Fix withTimeout/OrNull bug with spurious concurrency on cancellation
1 parent 6e5f8a6 commit ca9d5be

File tree

10 files changed

+214
-76
lines changed

10 files changed

+214
-76
lines changed

coroutines-guide.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -542,12 +542,13 @@ It produces the following output:
542542
I'm sleeping 0 ...
543543
I'm sleeping 1 ...
544544
I'm sleeping 2 ...
545-
Exception in thread "main" java.util.concurrent.CancellationException: Timed out waiting for 1300 MILLISECONDS
545+
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutException: Timed out waiting for 1300 MILLISECONDS
546546
```
547547

548548
<!--- TEST STARTS_WITH -->
549549

550-
We have not seen the [CancellationException] stack trace printed on the console before. That is because
550+
The `TimeoutException` that is thrown by [withTimeout] is a private subclass of [CancellationException].
551+
We have not seen its stack trace printed on the console before. That is because
551552
inside a cancelled coroutine `CancellationException` is considered to be a normal reason for coroutine completion.
552553
However, in this example we have used `withTimeout` right inside the `main` function.
553554

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -241,25 +241,15 @@ internal open class CancellableContinuationImpl<in T>(
241241
when (mode) {
242242
MODE_DISPATCHED -> delegate.resumeWithException(exception)
243243
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWithException(exception)
244-
MODE_DIRECT -> {
245-
if (delegate is DispatchedContinuation)
246-
delegate.continuation.resumeWithException(exception)
247-
else
248-
delegate.resumeWithException(exception)
249-
}
244+
MODE_DIRECT -> delegate.resumeDirectWithException(exception)
250245
else -> error("Invalid mode $mode")
251246
}
252247
} else {
253248
val value = getSuccessfulResult<T>(state)
254249
when (mode) {
255250
MODE_DISPATCHED -> delegate.resume(value)
256251
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatched(value)
257-
MODE_DIRECT -> {
258-
if (delegate is DispatchedContinuation)
259-
delegate.continuation.resume(value)
260-
else
261-
delegate.resume(value)
262-
}
252+
MODE_DIRECT -> delegate.resumeDirect(value)
263253
else -> error("Invalid mode $mode")
264254
}
265255
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,13 @@ internal class DispatchedContinuation<in T>(
153153
})
154154
}
155155
}
156+
157+
internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {
158+
is DispatchedContinuation -> continuation.resume(value)
159+
else -> resume(value)
160+
}
161+
162+
internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable) = when (this) {
163+
is DispatchedContinuation -> continuation.resumeWithException(exception)
164+
else -> resumeWithException(exception)
165+
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,10 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
353353
}
354354

355355
internal open fun onParentCompletion(cause: Throwable?) {
356-
cancel()
356+
// if parent was completed with CancellationException then use it as the cause of our cancellation, too.
357+
// however, we shall not use application specific exceptions here. So if parent crashes due to IOException,
358+
// we cannot and should not cancel the child with IOException
359+
cancel(cause as? CancellationException)
357360
}
358361

359362
/**

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt

Lines changed: 58 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19-
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
2019
import kotlinx.coroutines.experimental.selects.SelectBuilder
2120
import kotlinx.coroutines.experimental.selects.select
2221
import java.util.concurrent.ScheduledExecutorService
2322
import java.util.concurrent.ScheduledThreadPoolExecutor
2423
import java.util.concurrent.TimeUnit
2524
import kotlin.coroutines.experimental.Continuation
2625
import kotlin.coroutines.experimental.ContinuationInterceptor
26+
import kotlin.coroutines.experimental.CoroutineContext
27+
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
2728
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2829

2930
private val KEEP_ALIVE = java.lang.Long.getLong("kotlinx.coroutines.ScheduledExecutor.keepAlive", 1000L)
@@ -68,9 +69,10 @@ internal fun scheduledExecutorShutdownNowAndRelease() {
6869
* Runs a given suspending [block] of code inside a coroutine with a specified timeout and throws
6970
* [CancellationException] if timeout was exceeded.
7071
*
71-
* The code that is executing inside the [block] is cancelled on timeout and throws [CancellationException]
72-
* exception inside of it, too. However, even the code in the block suppresses the exception,
73-
* this `withTimeout` function invocation still throws [CancellationException].
72+
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
73+
* cancellable suspending function inside the block throws [CancellationException], so normally that exception,
74+
* if uncaught, also gets thrown by `withTimeout` as a result.
75+
* However, the code in the block can suppresses [CancellationException].
7476
*
7577
* The sibling function that does not throw exception on timeout is [withTimeoutOrNull].
7678
* Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
@@ -84,27 +86,40 @@ internal fun scheduledExecutorShutdownNowAndRelease() {
8486
public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T {
8587
require(time >= 0) { "Timeout time $time cannot be negative" }
8688
if (time <= 0L) throw CancellationException("Timed out immediately")
87-
return suspendCoroutineOrReturn sc@ { delegate: Continuation<T> ->
88-
// schedule cancellation of this continuation on time
89-
val cont = TimeoutExceptionContinuation(time, unit, delegate)
90-
val delay = cont.context[ContinuationInterceptor] as? Delay
89+
return suspendCoroutineOrReturn { cont: Continuation<T> ->
90+
val context = cont.context
91+
val coroutine = TimeoutExceptionCoroutine(time, unit, cont)
92+
val delay = context[ContinuationInterceptor] as? Delay
93+
// schedule cancellation of this coroutine on time
9194
if (delay != null)
92-
cont.disposeOnCompletion(delay.invokeOnTimeout(time, unit, cont)) else
93-
cont.cancelFutureOnCompletion(scheduledExecutor.schedule(cont, time, unit))
94-
// restart block using cancellable context of this continuation,
95+
coroutine.disposeOnCompletion(delay.invokeOnTimeout(time, unit, coroutine)) else
96+
coroutine.cancelFutureOnCompletion(scheduledExecutor.schedule(coroutine, time, unit))
97+
coroutine.initParentJob(context[Job])
98+
// restart block using new coroutine with new job,
9599
// however start it as undispatched coroutine, because we are already in the proper context
96-
block.startCoroutineUndispatched(cont)
97-
cont.getResult()
100+
block.startCoroutineUninterceptedOrReturn(coroutine)
98101
}
99102
}
100103

104+
private class TimeoutExceptionCoroutine<in T>(
105+
private val time: Long,
106+
private val unit: TimeUnit,
107+
private val cont: Continuation<T>
108+
) : JobSupport(active = true), Runnable, Continuation<T> {
109+
override val context: CoroutineContext = cont.context + this // mix in this Job into the context
110+
override fun run() { cancel(TimeoutException(time, unit)) }
111+
override fun resume(value: T) { cont.resumeDirect(value) }
112+
override fun resumeWithException(exception: Throwable) { cont.resumeDirectWithException(exception) }
113+
}
114+
101115
/**
102116
* Runs a given suspending block of code inside a coroutine with a specified timeout and returns
103117
* `null` if timeout was exceeded.
104118
*
105-
* The code that is executing inside the [block] is cancelled on timeout and throws [CancellationException]
106-
* exception inside of it. However, even the code in the block does not catch the cancellation exception,
107-
* this `withTimeoutOrNull` function invocation still returns `null` on timeout.
119+
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
120+
* cancellable suspending function inside the block throws [CancellationException]. Normally that exception,
121+
* if uncaught by the block, gets converted into the `null` result of `withTimeoutOrNull`.
122+
* However, the code in the block can suppresses [CancellationException].
108123
*
109124
* The sibling function that throws exception on timeout is [withTimeout].
110125
* Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
@@ -118,33 +133,39 @@ public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISE
118133
public suspend fun <T> withTimeoutOrNull(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T? {
119134
require(time >= 0) { "Timeout time $time cannot be negative" }
120135
if (time <= 0L) return null
121-
return suspendCoroutineOrReturn sc@ { delegate: Continuation<T?> ->
122-
// schedule cancellation of this continuation on time
123-
val cont = TimeoutNullContinuation<T>(delegate)
124-
val delay = cont.context[ContinuationInterceptor] as? Delay
136+
return suspendCoroutineOrReturn { cont: Continuation<T?> ->
137+
val context = cont.context
138+
val coroutine = TimeoutNullCoroutine(time, unit, cont)
139+
val delay = context[ContinuationInterceptor] as? Delay
140+
// schedule cancellation of this coroutine on time
125141
if (delay != null)
126-
cont.disposeOnCompletion(delay.invokeOnTimeout(time, unit, cont)) else
127-
cont.cancelFutureOnCompletion(scheduledExecutor.schedule(cont, time, unit))
128-
// restart block using cancellable context of this continuation,
142+
coroutine.disposeOnCompletion(delay.invokeOnTimeout(time, unit, coroutine)) else
143+
coroutine.cancelFutureOnCompletion(scheduledExecutor.schedule(coroutine, time, unit))
144+
coroutine.initParentJob(context[Job])
145+
// restart block using new coroutine with new job,
129146
// however start it as undispatched coroutine, because we are already in the proper context
130-
block.startCoroutineUndispatched(cont)
131-
cont.getResult()
147+
try {
148+
block.startCoroutineUninterceptedOrReturn(coroutine)
149+
} catch (e: TimeoutException) {
150+
null // replace inner timeout exception with null result
151+
}
132152
}
133153
}
134154

135-
private class TimeoutExceptionContinuation<in T>(
155+
private class TimeoutNullCoroutine<in T>(
136156
private val time: Long,
137157
private val unit: TimeUnit,
138-
delegate: Continuation<T>
139-
) : CancellableContinuationImpl<T>(delegate, active = true), Runnable {
140-
override val defaultResumeMode get() = MODE_DIRECT
141-
override fun run() { cancel(CancellationException("Timed out waiting for $time $unit")) }
158+
private val cont: Continuation<T?>
159+
) : JobSupport(active = true), Runnable, Continuation<T> {
160+
override val context: CoroutineContext = cont.context + this // mix in this Job into the context
161+
override fun run() { cancel(TimeoutException(time, unit)) }
162+
override fun resume(value: T) { cont.resumeDirect(value) }
163+
override fun resumeWithException(exception: Throwable) {
164+
// suppress inner timeout exception and replace it with null
165+
if (exception is TimeoutException)
166+
cont.resumeDirect(null) else
167+
cont.resumeDirectWithException(exception)
168+
}
142169
}
143170

144-
private class TimeoutNullContinuation<in T>(
145-
delegate: Continuation<T?>
146-
) : CancellableContinuationImpl<T?>(delegate, active = true), Runnable {
147-
override val defaultResumeMode get() = MODE_DIRECT
148-
override val ignoreRepeatedResume: Boolean get() = true
149-
override fun run() { resume(null, mode = 0) /* dispatch resume */ }
150-
}
171+
private class TimeoutException(time: Long, unit: TimeUnit) : CancellationException("Timed out waiting for $time $unit")

kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class GuideTest {
120120
"I'm sleeping 0 ...",
121121
"I'm sleeping 1 ...",
122122
"I'm sleeping 2 ...",
123-
"Exception in thread \"main\" java.util.concurrent.CancellationException: Timed out waiting for 1300 MILLISECONDS"
123+
"Exception in thread \"main\" kotlinx.coroutines.experimental.TimeoutException: Timed out waiting for 1300 MILLISECONDS"
124124
)
125125
}
126126

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutOrNullTest.kt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.hamcrest.core.IsEqual
2020
import org.hamcrest.core.IsNull
2121
import org.junit.Assert.assertThat
2222
import org.junit.Test
23+
import java.io.IOException
2324

2425
class WithTimeoutOrNullTest : TestBase() {
2526
/**
@@ -47,6 +48,51 @@ class WithTimeoutOrNullTest : TestBase() {
4748
finish(8)
4849
}
4950

51+
@Test
52+
fun testNullOnTimeout() = runBlocking {
53+
expect(1)
54+
val result = withTimeoutOrNull(100) {
55+
expect(2)
56+
delay(1000)
57+
expectUnreached()
58+
"OK"
59+
}
60+
assertThat(result, IsNull())
61+
finish(3)
62+
}
63+
64+
@Test
65+
fun testSuppressException() = runBlocking {
66+
expect(1)
67+
val result = withTimeoutOrNull(100) {
68+
expect(2)
69+
try {
70+
delay(1000)
71+
} catch (e: CancellationException) {
72+
expect(3)
73+
}
74+
"OK"
75+
}
76+
assertThat(result, IsEqual("OK"))
77+
finish(4)
78+
}
79+
80+
@Test(expected = IOException::class)
81+
fun testReplaceException() = runBlocking {
82+
expect(1)
83+
withTimeoutOrNull(100) {
84+
expect(2)
85+
try {
86+
delay(1000)
87+
} catch (e: CancellationException) {
88+
finish(3)
89+
throw IOException(e)
90+
}
91+
"OK"
92+
}
93+
expectUnreached()
94+
}
95+
5096
/**
5197
* Tests that a 100% CPU-consuming loop will react on timeout if it has yields.
5298
*/

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutOrNullThreadDispatchTest.kt

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ import org.hamcrest.core.IsEqual
2020
import org.hamcrest.core.IsNull
2121
import org.junit.After
2222
import org.junit.Assert
23-
import org.junit.Assert.assertThat
2423
import org.junit.Test
2524
import java.util.concurrent.ExecutorService
2625
import java.util.concurrent.Executors
2726
import java.util.concurrent.ThreadFactory
27+
import java.util.concurrent.atomic.AtomicInteger
2828
import kotlin.coroutines.experimental.CoroutineContext
2929

3030
class WithTimeoutOrNullThreadDispatchTest : TestBase() {
@@ -51,16 +51,25 @@ class WithTimeoutOrNullThreadDispatchTest : TestBase() {
5151
}
5252
}
5353

54+
5455
@Test
5556
fun testCancellationDispatchCustomNoDelay() {
57+
// it also checks that there is at most once scheduled request in flight (no spurious concurrency)
58+
var error: String? = null
5659
checkCancellationDispatch {
5760
executor = Executors.newSingleThreadExecutor(it)
61+
val scheduled = AtomicInteger(0)
5862
object : CoroutineDispatcher() {
5963
override fun dispatch(context: CoroutineContext, block: Runnable) {
60-
executor!!.execute(block)
64+
if (scheduled.incrementAndGet() > 1) error = "Two requests are scheduled concurrently"
65+
executor!!.execute {
66+
scheduled.decrementAndGet()
67+
block.run()
68+
}
6169
}
6270
}
6371
}
72+
error?.let { error(it) }
6473
}
6574

6675
private fun checkCancellationDispatch(factory: (ThreadFactory) -> CoroutineDispatcher) = runBlocking {
@@ -70,22 +79,21 @@ class WithTimeoutOrNullThreadDispatchTest : TestBase() {
7079
run(dispatcher) {
7180
expect(2)
7281
Assert.assertThat(Thread.currentThread(), IsEqual(thread))
73-
val result =
74-
withTimeoutOrNull(100) {
75-
try {
76-
expect(3)
77-
delay(1000)
78-
expectUnreached()
79-
} catch (e: CancellationException) {
80-
expect(4)
81-
Assert.assertThat(Thread.currentThread(), IsEqual(thread))
82-
}
83-
expect(5)
84-
"FAIL"
82+
val result = withTimeoutOrNull(100) {
83+
try {
84+
expect(3)
85+
delay(1000)
86+
expectUnreached()
87+
} catch (e: CancellationException) {
88+
expect(4)
89+
Assert.assertThat(Thread.currentThread(), IsEqual(thread))
90+
throw e // rethrow
8591
}
86-
assertThat(result, IsNull())
87-
expect(6)
92+
}
93+
Assert.assertThat(Thread.currentThread(), IsEqual(thread))
94+
Assert.assertThat(result, IsNull())
95+
expect(5)
8896
}
89-
finish(7)
97+
finish(6)
9098
}
9199
}

0 commit comments

Comments
 (0)