Skip to content

Commit 58a7add

Browse files
committed
Fixed initialization of Job with parent (initParentJob), fixed handling on uncaught exceptions in standalone coroutines
1 parent 53a0a40 commit 58a7add

File tree

8 files changed

+235
-51
lines changed

8 files changed

+235
-51
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@ import kotlin.coroutines.CoroutineContext
99
* It stores the result of continuation in the state of the job.
1010
*/
1111
@Suppress("LeakingThis")
12-
public abstract class AbstractCoroutine<in T>(
13-
parentContext: CoroutineContext
14-
) : JobSupport(parentContext[Job]), Continuation<T> {
15-
override val context: CoroutineContext = parentContext + this // mixes this job into this context
12+
public abstract class AbstractCoroutine<in T>(parentContext: CoroutineContext) : JobSupport(), Continuation<T> {
13+
override val context: CoroutineContext = parentContext + this // merges this job into this context
1614

1715
final override fun resume(value: T) {
1816
while (true) { // lock-free loop on state

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,19 @@ public fun <T> runBlocking(context: CoroutineContext, block: suspend () -> T): T
6060
private class StandaloneCoroutine(
6161
val parentContext: CoroutineContext
6262
) : AbstractCoroutine<Unit>(parentContext) {
63+
init { initParentJob(parentContext[Job]) }
64+
6365
override fun afterCompletion(state: Any?) {
6466
// note the use of the parent context below!
65-
if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.exception)
67+
if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.cancelReason)
6668
}
6769
}
6870

6971
private class BlockingCoroutine<T>(parentContext: CoroutineContext) : AbstractCoroutine<T>(parentContext) {
7072
val blockedThread: Thread = Thread.currentThread()
7173

74+
init { initParentJob(parentContext[Job]) }
75+
7276
override fun afterCompletion(state: Any?) {
7377
LockSupport.unpark(blockedThread)
7478
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,35 @@ public interface CancellableContinuation<in T> : Continuation<T>, Job
1717

1818
/**
1919
* Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
20-
* the [block].
20+
* the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
2121
*/
2222
public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T =
23-
suspendCoroutineOrReturn { c ->
24-
val safe = SafeCancellableContinuation(c)
23+
suspendCoroutineOrReturn { cont ->
24+
val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
2525
block(safe)
2626
safe.getResult()
2727
}
2828

2929
// --------------- implementation details ---------------
3030

31+
@PublishedApi
32+
internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
33+
val job = cont.context[Job]
34+
// fast path when parent job is already complete (we don't even construct SafeCancellableContinuation object)
35+
job?.isActive?.let { if (!it) throw CancellationException() }
36+
return job
37+
}
38+
3139
@PublishedApi
3240
internal class SafeCancellableContinuation<in T>(
33-
private val delegate: Continuation<T>
41+
private val delegate: Continuation<T>,
42+
parentJob: Job?
3443
) : AbstractCoroutine<T>(delegate.context), CancellableContinuation<T> {
3544
// only updated from the thread that invoked suspendCancellableCoroutine
3645
private var suspendedThread: Thread? = Thread.currentThread()
3746

47+
init { initParentJob(parentJob) }
48+
3849
fun getResult(): Any? {
3950
if (suspendedThread != null) {
4051
suspendedThread = null

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineExceptionHandler.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,21 @@ import kotlin.coroutines.CoroutineContext
77
* Helper function for coroutine builder implementations to handle uncaught exception in coroutines.
88
* It tries to handle uncaught exception in the following way:
99
* * If there is [CoroutineExceptionHandler] in the context, then it is used.
10+
* * Otherwise, if exception is [CancellationException] then it is ignored
11+
* (because that is the supposed mechanism to cancel the running coroutine)
1012
* * Otherwise, if there is a [Job] in the context, then [Job.cancel] is invoked and if it
1113
* returns `true` (it was still active), then the exception is considered to be handled.
12-
* * Otherwise, if exception is [CancellationException] then it is ignored.
1314
* * Otherwise, current thread's [Thread.uncaughtExceptionHandler] is used.
1415
*/
1516
fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
1617
context[CoroutineExceptionHandler]?.let {
1718
it.handleException(context, exception)
1819
return
1920
}
20-
// quit if successfully pushed exception as cancellation cancelReason
21-
if (context[Job]?.cancel(exception) ?: false) return
2221
// ignore CancellationException (they are normal means to terminate a coroutine)
2322
if (exception is CancellationException) return
23+
// quit if successfully pushed exception as cancellation reason
24+
if (context[Job]?.cancel(exception) ?: false) return
2425
// otherwise just use thread's handler
2526
val currentThread = Thread.currentThread()
2627
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public fun <T> defer(context: CoroutineContext, block: suspend () -> T) : Deferr
3939
private class DeferredCoroutine<T>(
4040
parentContext: CoroutineContext
4141
) : AbstractCoroutine<T>(parentContext), Deferred<T> {
42+
init { initParentJob(parentContext[Job]) }
43+
4244
@Suppress("UNCHECKED_CAST")
4345
suspend override fun await(): T {
4446
// quick check if already complete (avoid extra object creation)

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package kotlinx.coroutines.experimental
22

33
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
44
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
5-
import java.util.concurrent.CancellationException
65
import java.util.concurrent.Future
76
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
87
import kotlin.coroutines.AbstractCoroutineContextElement
@@ -28,7 +27,7 @@ public interface Job : CoroutineContext.Element {
2827
/**
2928
* Creates new job object. It is optionally a child of a [parent] job.
3029
*/
31-
public operator fun invoke(parent: Job? = null): Job = JobSupport(parent)
30+
public operator fun invoke(parent: Job? = null): Job = JobImpl(parent)
3231
}
3332

3433
/**
@@ -67,9 +66,12 @@ public interface Job : CoroutineContext.Element {
6766
}
6867
}
6968

70-
typealias CompletionHandler = (Throwable?) -> Unit
69+
public typealias CompletionHandler = (Throwable?) -> Unit
7170

72-
typealias CancellationException = CancellationException
71+
/**
72+
* Thrown by cancellable suspending functions if the [Job] of the coroutine is cancelled while it is suspending.
73+
*/
74+
public typealias CancellationException = java.util.concurrent.CancellationException
7375

7476
/**
7577
* Unregisters a specified [registration] when this job is complete.
@@ -118,30 +120,39 @@ public suspend fun Job.join() {
118120
* state and mare store addition state information for completed jobs, like their result values.
119121
*/
120122
@Suppress("LeakingThis")
121-
public open class JobSupport(
122-
parent: Job? = null
123-
) : AbstractCoroutineContextElement(Job), Job {
123+
public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
124124
// keeps a stack of cancel listeners or a special CANCELLED, other values denote completed scope
125125
@Volatile
126126
private var state: Any? = ActiveList() // will drop the list on cancel
127127

128-
// directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
129-
private val registration: Job.Registration? = parent?.onCompletion(CancelOnCompletion(parent, this))
128+
@Volatile
129+
private var registration: Job.Registration? = null
130130

131131
protected companion object {
132132
@JvmStatic
133133
private val STATE: AtomicReferenceFieldUpdater<JobSupport, Any?> =
134134
AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "state")
135135
}
136136

137+
// invoke at most once after construction after all other initialization
138+
protected fun initParentJob(parent: Job?) {
139+
if (parent == null) return
140+
check(registration == null)
141+
// directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
142+
val newRegistration = parent.onCompletion(CancelOnCompletion(parent, this))
143+
registration = newRegistration
144+
// now check our state _after_ registering (see updateState order of actions)
145+
if (state !is Active) newRegistration.unregister()
146+
}
147+
137148
protected fun getState(): Any? = state
138149

139150
protected fun updateState(expect: Any, update: Any?): Boolean {
140151
expect as ActiveList // assert type
141152
require(update !is Active) // only active -> inactive transition is allowed
142153
if (!STATE.compareAndSet(this, expect, update)) return false
143154
// #1. Unregister from parent job
144-
registration?.unregister()
155+
registration?.unregister() // volatile read registration _after_ state was updated
145156
// #2 Invoke completion handlers
146157
val reason = (update as? CompletedExceptionally)?.cancelReason
147158
var completionException: Throwable? = null
@@ -202,21 +213,26 @@ public open class JobSupport(
202213
private class ActiveList : LockFreeLinkedListHead(), Active
203214

204215
protected abstract class CompletedExceptionally {
205-
abstract val cancelReason: Throwable?
206-
abstract val exception: Throwable
216+
abstract val cancelReason: Throwable // original reason or fresh CancellationException
217+
abstract val exception: Throwable // the exception to be thrown in continuation
207218
}
208219

209-
protected class Cancelled(override val cancelReason: Throwable?) : CompletedExceptionally() {
220+
protected class Cancelled(specifiedReason: Throwable?) : CompletedExceptionally() {
221+
@Volatile
222+
private var _cancelReason = specifiedReason // materialize CancellationException on first need
223+
224+
override val cancelReason: Throwable get() =
225+
_cancelReason ?: // atomic read volatile var or else create new
226+
CancellationException().also { _cancelReason = it }
227+
210228
@Volatile
211229
private var _exception: Throwable? = null // convert reason to CancellationException on first need
230+
212231
override val exception: Throwable get() =
213-
_exception ?: // atomic read volatile var or else
214-
run {
215-
val result = cancelReason as? CancellationException ?:
216-
CancellationException().apply { if (cancelReason != null) initCause(cancelReason) }
217-
_exception = result
218-
result
219-
}
232+
_exception ?: // atomic read volatile var or else build new
233+
(cancelReason as? CancellationException ?:
234+
CancellationException(cancelReason.message).apply { initCause(cancelReason) })
235+
.also { _exception = it }
220236
}
221237

222238
protected class Failed(override val exception: Throwable) : CompletedExceptionally() {
@@ -288,3 +304,7 @@ private class RemoveOnCompletion(
288304
override fun invoke(reason: Throwable?) { node.remove() }
289305
override fun toString() = "RemoveOnCompletion[$node]"
290306
}
307+
308+
private class JobImpl(parent: Job? = null) : JobSupport() {
309+
init { initParentJob(parent) }
310+
}

0 commit comments

Comments
 (0)