Skip to content

Commit 8b38fa2

Browse files
committed
Coroutines now wait for their children
JobSuport.attachChild is introduced; Job "Completing" state is introduced; withTimeout is a proper coroutine; Better diagnostics in cancellation and unexpected exception messages; Fixed cancellable suspending function to throw CancellationException; Job.getCompletionException renamed to Job.getCancellationException; Introduced Deferred.getCompletionExceptionOrNull Updated docs for Job & Deferred to explain parent/child; Deprecate and hide legacy Job.invokeOnCompletion signatures; Updated guide for parent-child relations and related stuff
1 parent 339ccf3 commit 8b38fa2

File tree

54 files changed

+1341
-539
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1341
-539
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ import kotlin.coroutines.experimental.Continuation
2020
import kotlin.coroutines.experimental.CoroutineContext
2121

2222
/**
23-
* Abstract class to simplify writing of coroutine completion objects that
24-
* implement completion [Continuation], [Job], and [CoroutineScope] interfaces.
25-
* It stores the result of continuation in the state of the job.
23+
* Abstract class for coroutines.
24+
*
25+
* * Coroutines implement completion [Continuation], [Job], and [CoroutineScope] interfaces.
26+
* * Coroutine stores the result of continuation in the state of the job.
27+
* * Coroutine waits for children coroutines to finish before completing.
28+
* * Coroutines are cancelled through an intermediate _cancelling_ state.
2629
*
2730
* @param active when `true` coroutine is created in _active_ state, when `false` in _new_ state. See [Job] for details.
2831
* @suppress **This is unstable API and it is subject to change.**
@@ -35,35 +38,26 @@ public abstract class AbstractCoroutine<in T>(
3538
public final override val context: CoroutineContext = parentContext + this
3639
public final override val coroutineContext: CoroutineContext get() = context
3740

41+
// all coroutines are cancelled through an intermediate cancelling state
3842
final override val hasCancellingState: Boolean get() = true
3943

44+
protected open val defaultResumeMode: Int get() = MODE_ATOMIC_DEFAULT
45+
4046
final override fun resume(value: T) {
41-
loopOnState { state ->
42-
when (state) {
43-
is Incomplete -> if (updateState(state, value, MODE_ATOMIC_DEFAULT)) return
44-
is Cancelled -> return // ignore resumes on cancelled continuation
45-
else -> error("Already resumed, but got value $value")
46-
}
47-
}
47+
makeCompleting(value, defaultResumeMode)
4848
}
4949

5050
final override fun resumeWithException(exception: Throwable) {
51-
loopOnState { state ->
52-
when (state) {
53-
is Incomplete -> {
54-
if (updateState(state, CompletedExceptionally(exception), MODE_ATOMIC_DEFAULT)) return
55-
}
56-
is Cancelled -> {
57-
// ignore resumes on cancelled continuation, but handle exception if a different one is here
58-
if (exception !== state.exception) handleCoroutineException(context, exception)
59-
return
60-
}
61-
else -> throw IllegalStateException("Already resumed, but got exception $exception", exception)
62-
}
63-
}
51+
makeCompleting(CompletedExceptionally(exception), defaultResumeMode)
6452
}
6553

6654
final override fun handleException(exception: Throwable) {
6755
handleCoroutineException(parentContext, exception)
6856
}
69-
}
57+
58+
override fun nameString(): String {
59+
val coroutineName = context.coroutineName ?: return super.nameString()
60+
return "\"$coroutineName\":${super.nameString()}"
61+
}
62+
}
63+

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,10 @@ public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T
141141
@Throws(InterruptedException::class)
142142
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
143143
val currentThread = Thread.currentThread()
144-
val eventLoop = if (context[ContinuationInterceptor] == null) EventLoopImpl(currentThread) else null
144+
val eventLoop = if (context[ContinuationInterceptor] == null) BlockingEventLoop(currentThread) else null
145145
val newContext = newCoroutineContext(context + (eventLoop ?: EmptyCoroutineContext))
146146
val coroutine = BlockingCoroutine<T>(newContext, currentThread, privateEventLoop = eventLoop != null)
147147
coroutine.initParentJob(context[Job])
148-
eventLoop?.initParentJob(coroutine)
149148
block.startCoroutine(coroutine, coroutine)
150149
return coroutine.joinBlocking()
151150
}
@@ -156,9 +155,9 @@ private open class StandaloneCoroutine(
156155
private val parentContext: CoroutineContext,
157156
active: Boolean
158157
) : AbstractCoroutine<Unit>(parentContext, active) {
159-
override fun afterCompletion(state: Any?, mode: Int) {
158+
override fun onCancellation(exceptionally: CompletedExceptionally?) {
160159
// note the use of the parent's job context below!
161-
if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.exception)
160+
if (exceptionally != null) handleCoroutineException(parentContext, exceptionally.exception)
162161
}
163162
}
164163

@@ -209,10 +208,14 @@ private class BlockingCoroutine<T>(
209208
private val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop
210209

211210
init {
212-
if (privateEventLoop) require(eventLoop is EventLoopImpl)
211+
if (privateEventLoop) require(eventLoop is BlockingEventLoop)
213212
}
214213

215214
override fun afterCompletion(state: Any?, mode: Int) {
215+
// signal termination to event loop (don't accept more tasks)
216+
if (privateEventLoop)
217+
(eventLoop as BlockingEventLoop).isCompleted = true
218+
// wake up blocked thread
216219
if (Thread.currentThread() != blockedThread)
217220
LockSupport.unpark(blockedThread)
218221
}
@@ -228,11 +231,12 @@ private class BlockingCoroutine<T>(
228231
timeSource.parkNanos(this, parkNanos)
229232
}
230233
// process queued events (that could have been added after last processNextEvent and before cancel
231-
if (privateEventLoop) (eventLoop as EventLoopImpl).shutdown()
234+
if (privateEventLoop) (eventLoop as BlockingEventLoop).shutdown()
232235
timeSource.unregisterTimeLoopThread()
233236
// now return result
234237
val state = this.state
235238
(state as? CompletedExceptionally)?.let { throw it.exception }
236239
return state as T
237240
}
241+
238242
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public inline suspend fun <T> suspendAtomicCancellableCoroutine(
162162
* @suppress **This is unstable API and it is subject to change.**
163163
*/
164164
public fun CancellableContinuation<*>.removeOnCancel(node: LockFreeLinkedListNode): DisposableHandle =
165-
invokeOnCompletion(RemoveOnCancel(this, node))
165+
invokeOnCompletion(handler = RemoveOnCancel(this, node))
166166

167167
// --------------- implementation details ---------------
168168

@@ -245,7 +245,7 @@ internal class CancellableContinuationImpl<in T>(
245245
}
246246

247247
override fun completeResume(token: Any) {
248-
completeUpdateState(token, state, resumeMode)
248+
completeUpdateState(token as Incomplete, state, resumeMode)
249249
}
250250

251251
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
@@ -258,7 +258,8 @@ internal class CancellableContinuationImpl<in T>(
258258
resumeWithExceptionImpl(exception, if (dc.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
259259
}
260260

261-
override fun toString(): String = super.toString() + "[${delegate.toDebugString()}]"
261+
override fun nameString(): String =
262+
"CancellableContinuation(${delegate.toDebugString()})"
262263
}
263264

264265
private class CompletedIdempotentResult(

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,33 +89,40 @@ public fun newCoroutineContext(context: CoroutineContext): CoroutineContext =
8989
* Executes a block using a given coroutine context.
9090
*/
9191
internal inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T {
92-
val oldName = updateContext(context)
92+
val oldName = context.updateThreadContext()
9393
try {
9494
return block()
9595
} finally {
96-
restoreContext(oldName)
96+
restoreThreadContext(oldName)
9797
}
9898
}
9999

100100
@PublishedApi
101-
internal fun updateContext(context: CoroutineContext): String? {
101+
internal fun CoroutineContext.updateThreadContext(): String? {
102102
if (!DEBUG) return null
103-
val newId = context[CoroutineId] ?: return null
103+
val coroutineId = this[CoroutineId] ?: return null
104+
val coroutineName = this[CoroutineName]?.name ?: "coroutine"
104105
val currentThread = Thread.currentThread()
105106
val oldName = currentThread.name
106-
val coroutineName = context[CoroutineName]?.name ?: "coroutine"
107107
currentThread.name = buildString(oldName.length + coroutineName.length + 10) {
108108
append(oldName)
109109
append(" @")
110110
append(coroutineName)
111111
append('#')
112-
append(newId.id)
112+
append(coroutineId.id)
113113
}
114114
return oldName
115115
}
116116

117+
internal val CoroutineContext.coroutineName: String? get() {
118+
if (!DEBUG) return null
119+
val coroutineId = this[CoroutineId] ?: return null
120+
val coroutineName = this[CoroutineName]?.name ?: "coroutine"
121+
return "$coroutineName#${coroutineId.id}"
122+
}
123+
117124
@PublishedApi
118-
internal fun restoreContext(oldName: String?) {
125+
internal fun restoreThreadContext(oldName: String?) {
119126
if (oldName != null) Thread.currentThread().name = oldName
120127
}
121128

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ internal class DispatchTask<in T>(
115115
val job = if (cancellable) context[Job] else null
116116
withCoroutineContext(context) {
117117
when {
118-
job != null && !job.isActive -> continuation.resumeWithException(job.getCompletionException())
118+
job != null && !job.isActive -> continuation.resumeWithException(job.getCancellationException())
119119
exception -> continuation.resumeWithException(value as Throwable)
120120
else -> continuation.resume(value as T)
121121
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineExceptionHandler.kt

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ import kotlin.coroutines.experimental.CoroutineContext
2222

2323
/**
2424
* Helper function for coroutine builder implementations to handle uncaught exception in coroutines.
25+
*
2526
* It tries to handle uncaught exception in the following way:
2627
* * If there is [CoroutineExceptionHandler] in the context, then it is used.
2728
* * Otherwise, if exception is [CancellationException] then it is ignored
2829
* (because that is the supposed mechanism to cancel the running coroutine)
29-
* * Otherwise, if there is a [Job] in the context, then [Job.cancel] is invoked and if it
30-
* returns `true` (it was still active), then the exception is considered to be handled.
31-
* * Otherwise, current thread's [Thread.uncaughtExceptionHandler] is used.
30+
* * Otherwise:
31+
* * if there is a [Job] in the context, then [Job.cancel] is invoked;
32+
* * and current thread's [Thread.uncaughtExceptionHandler] is invoked.
3233
*/
3334
fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
3435
context[CoroutineExceptionHandler]?.let {
@@ -37,15 +38,23 @@ fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
3738
}
3839
// ignore CancellationException (they are normal means to terminate a coroutine)
3940
if (exception is CancellationException) return
40-
// quit if successfully pushed exception as cancellation reason
41-
if (context[Job]?.cancel(exception) ?: false) return
42-
// otherwise just use thread's handler
41+
// try cancel job in the context
42+
context[Job]?.cancel(exception)
43+
// use thread's handler
4344
val currentThread = Thread.currentThread()
4445
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
4546
}
4647

4748
/**
4849
* An optional element on the coroutine context to handle uncaught exceptions.
50+
*
51+
* By default, when no handler is installed, uncaught exception are handled in the following way:
52+
* * If exception is [CancellationException] then it is ignored
53+
* (because that is the supposed mechanism to cancel the running coroutine)
54+
* * Otherwise:
55+
* * if there is a [Job] in the context, then [Job.cancel] is invoked;
56+
* * and current thread's [Thread.uncaughtExceptionHandler] is invoked.
57+
*
4958
* See [handleCoroutineException].
5059
*/
5160
public interface CoroutineExceptionHandler : CoroutineContext.Element {

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import kotlin.coroutines.experimental.CoroutineContext
3232
* | --------------------------------------- | ---------- | ------------- | -------------------------- | ------------- |
3333
* | _New_ (optional initial state) | `false` | `false` | `false` | `false` |
3434
* | _Active_ (default initial state) | `true` | `false` | `false` | `false` |
35+
* | _Completing_ (optional transient state) | `true` | `false` | `false` | `false` |
3536
* | _Cancelling_ (optional transient state) | `false` | `false` | `false` | `true` |
3637
* | _Cancelled_ (final state) | `false` | `true` | `true` | `true` |
3738
* | _Resolved_ (final state) | `false` | `true` | `false` | `false` |
@@ -46,18 +47,19 @@ import kotlin.coroutines.experimental.CoroutineContext
4647
* _cancelling_ state immediately. A simple implementation of deferred -- [CompletableDeferred],
4748
* that is not backed by a coroutine, does not have a _cancelling_ state, but becomes _cancelled_
4849
* on [cancel] immediately. Coroutines, on the other hand, become _cancelled_ only when they finish
49-
* executing their code.
50+
* executing their code and after all their [children][attachChild] complete.
5051
*
5152
* ```
52-
* +-----+ start +--------+ complete +-----------+
53-
* | New | ---------------> | Active | ---------+-> | Resolved |
54-
* +-----+ +--------+ | |(completed)|
55-
* | | | +-----------+
56-
* | cancel | cancel |
57-
* V V | +-----------+
58-
* +-----------+ finish +------------+ +-> | Failed |
59-
* | Cancelled | <--------- | Cancelling | |(completed)|
60-
* |(completed)| +------------+ +-----------+
53+
* wait children
54+
* +-----+ start +--------+ complete +-------------+ finish +-----------+
55+
* | New | ---------------> | Active | ----------> | Completing | ---+-> | Resolved |
56+
* +-----+ +--------+ +-------------+ | |(completed)|
57+
* | | | | +-----------+
58+
* | cancel | cancel | cancel |
59+
* V V | | +-----------+
60+
* +-----------+ finish +------------+ | +-> | Failed |
61+
* | Cancelled | <--------- | Cancelling | <---------------+ |(completed)|
62+
* |(completed)| +------------+ +-----------+
6163
* +-----------+
6264
* ```
6365
*
@@ -68,7 +70,9 @@ import kotlin.coroutines.experimental.CoroutineContext
6870
* or the cancellation cause inside the coroutine.
6971
*
7072
* A deferred value can have a _parent_ job. A deferred value with a parent is cancelled when its parent is
71-
* cancelled or completes.
73+
* cancelled or completes. Parent waits for all its [children][attachChild] to complete in _completing_ or
74+
* _cancelling_ state. _Completing_ state is purely internal. For an outside observer a _completing_
75+
* deferred is still active, while internally it is waiting for its children.
7276
*
7377
* All functions on this interface and on all interfaces derived from it are **thread-safe** and can
7478
* be safely invoked from concurrent coroutines without external synchronization.
@@ -108,10 +112,20 @@ public interface Deferred<out T> : Job {
108112
* [completed exceptionally][isCompletedExceptionally].
109113
*
110114
* This function is designed to be used from [invokeOnCompletion] handlers, when there is an absolute certainty that
111-
* the value is already complete.
115+
* the value is already complete. See also [getCompletionExceptionOrNull].
112116
*/
113117
public fun getCompleted(): T
114118

119+
/**
120+
* Returns *completion exception* result if this deferred [completed exceptionally][isCompletedExceptionally],
121+
* `null` if it is completed normally, or throws [IllegalStateException] if this deferred value has not
122+
* [completed][isCompleted] yet.
123+
*
124+
* This function is designed to be used from [invokeOnCompletion] handlers, when there is an absolute certainty that
125+
* the value is already complete. See also [getCompleted].
126+
*/
127+
public fun getCompletionExceptionOrNull(): Throwable?
128+
115129
/**
116130
* @suppress **Deprecated**: Use `isActive`.
117131
*/

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -247,20 +247,11 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
247247
}
248248
}
249249

250-
internal class EventLoopImpl(
250+
internal abstract class ThreadEventLoop(
251251
private val thread: Thread
252252
) : EventLoopBase() {
253-
private var parentJob: Job? = null
254-
255-
override val canComplete: Boolean get() = parentJob != null
256-
override val isCompleted: Boolean get() = parentJob?.isCompleted == true
257253
override fun isCorrectThread(): Boolean = Thread.currentThread() === thread
258254

259-
fun initParentJob(coroutine: Job) {
260-
require(this.parentJob == null)
261-
this.parentJob = coroutine
262-
}
263-
264255
override fun unpark() {
265256
if (Thread.currentThread() !== thread)
266257
timeSource.unpark(thread)
@@ -274,5 +265,23 @@ internal class EventLoopImpl(
274265
// reschedule the rest of delayed tasks
275266
rescheduleAllDelayed()
276267
}
268+
269+
}
270+
271+
private class EventLoopImpl(thread: Thread) : ThreadEventLoop(thread) {
272+
private var parentJob: Job? = null
273+
274+
override val canComplete: Boolean get() = parentJob != null
275+
override val isCompleted: Boolean get() = parentJob?.isCompleted == true
276+
277+
fun initParentJob(parentJob: Job) {
278+
require(this.parentJob == null)
279+
this.parentJob = parentJob
280+
}
277281
}
278282

283+
internal class BlockingEventLoop(thread: Thread) : ThreadEventLoop(thread) {
284+
override val canComplete: Boolean get() = true
285+
@Volatile
286+
public override var isCompleted: Boolean = false
287+
}

0 commit comments

Comments
 (0)