Skip to content

Commit f157cec

Browse files
committed
Job machinery improvements:
* Code style & documentation * Use identity hashset for exception aggregation
1 parent 33e7ce4 commit f157cec

File tree

6 files changed

+79
-37
lines changed

6 files changed

+79
-37
lines changed

common/kotlinx-coroutines-core-common/src/JobSupport.kt

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -88,35 +88,35 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
8888
8989
+ Job object is created
9090
## NEW: state == EMPTY_ACTIVE | is InactiveNodeList
91-
+ initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
92-
~ waits for start
93-
>> start / join / await invoked
91+
+ initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
92+
~ waits for start
93+
>> start / join / await invoked
9494
## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
95-
+ onStartInternal / onStart (lazy coroutine is started)
96-
~ active coroutine is working
97-
>> childFailed / fail invoked
95+
+ onStartInternal / onStart (lazy coroutine is started)
96+
~ active coroutine is working (or scheduled to execution)
97+
>> childFailed / fail invoked
9898
## FAILING: state is Finishing, state.rootCause != null
99-
------ failing listeners are not admitted anymore, invokeOnCompletion(onFailing=true) returns NonDisposableHandle
100-
------ new children get immediately cancelled, but are still admitted to the list
101-
+ onFailing
102-
+ notifyFailing (invoke all failing listeners -- cancel all children, suspended functions resume with exception)
103-
+ failParent (rootCause of failure is communicated to the parent, parent starts failing, too)
104-
~ waits for completion of coroutine body
105-
>> makeCompleting / makeCompletingOnce invoked
99+
------ failing listeners are not admitted anymore, invokeOnCompletion(onFailing=true) returns NonDisposableHandle
100+
------ new children get immediately cancelled, but are still admitted to the list
101+
+ onFailing
102+
+ notifyFailing (invoke all failing listeners -- cancel all children, suspended functions resume with exception)
103+
+ failParent (rootCause of failure is communicated to the parent, parent starts failing, too)
104+
~ waits for completion of coroutine body
105+
>> makeCompleting / makeCompletingOnce invoked
106106
## COMPLETING: state is Finishing, state.isCompleting == true
107-
------ new children are not admitted anymore, attachChild returns NonDisposableHandle
108-
~ waits for children
109-
>> last child completes
110-
- computes the final exception
107+
------ new children are not admitted anymore, attachChild returns NonDisposableHandle
108+
~ waits for children
109+
>> last child completes
110+
- computes the final exception
111111
## SEALED: state is Finishing, state.isSealed == true
112-
------ cancel/childFailed returns false (cannot handle exceptions anymore)
113-
+ failParent (final exception is communicated to the parent, parent incorporates it)
114-
+ handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
115-
## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
116-
------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
117-
+ parentHandle.dispose
118-
+ notifyCompletion (invoke all completion listeners)
119-
+ onCompletionInternal / onCompleted / onCompletedExceptionally
112+
------ cancel/childFailed returns false (cannot handle exceptions anymore)
113+
+ failParent (final exception is communicated to the parent, parent incorporates it)
114+
+ handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
115+
## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
116+
------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
117+
+ parentHandle.dispose
118+
+ notifyCompletion (invoke all completion listeners)
119+
+ onCompletionInternal / onCompleted / onCompletedExceptionally
120120
121121
---------------------------------------------------------------------------------
122122
*/
@@ -220,11 +220,10 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
220220
// failed job final state
221221
else -> CompletedExceptionally(finalException)
222222
}
223-
// Now handle exception
224-
if (finalException != null) {
225-
if (!failParent(finalException)) {
226-
handleJobException(finalException)
227-
}
223+
224+
// Now handle exception if parent can't handle it
225+
if (finalException != null && !failParent(finalException)) {
226+
handleJobException(finalException)
228227
}
229228
// Then CAS to completed state -> it must succeed
230229
require(_state.compareAndSet(state, finalState)) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
@@ -271,8 +270,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
271270

272271
private fun suppressExceptions(rootCause: Throwable, exceptions: List<Throwable>): Boolean {
273272
if (exceptions.size <= 1) return false // nothing more to do here
274-
// TODO it should be identity set and optimized for small footprints
275-
val seenExceptions = HashSet<Throwable>(exceptions.size)
273+
val seenExceptions = identitySet<Throwable>(exceptions.size)
276274
var suppressed = false
277275
for (i in 1 until exceptions.size) {
278276
val unwrapped = unwrap(exceptions[i])
@@ -673,8 +671,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
673671
loopOnState { state ->
674672
when (state) {
675673
is Finishing -> { // already finishing -- collect exceptions
676-
var notifyRootCause: Throwable? = null
677-
synchronized(state) {
674+
val notifyRootCause = synchronized(state) {
678675
if (state.isSealed) return false // too late, already sealed -- cannot add exception nor mark cancelled
679676
// add exception, do nothing is parent is cancelling child that is already failing
680677
val wasFailing = state.isFailing // will notify if was not failing
@@ -686,7 +683,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
686683
// mark as cancelling if cancel was requested
687684
if (cancel) state.isCancelling = true
688685
// take cause for notification is was not failing before
689-
notifyRootCause = state.rootCause.takeIf { !wasFailing }
686+
state.rootCause.takeIf { !wasFailing }
690687
}
691688
notifyRootCause?.let { notifyFailing(state.list, it) }
692689
return true
@@ -774,8 +771,12 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
774771
private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?, mode: Int): Int {
775772
if (state !is Incomplete)
776773
return COMPLETING_ALREADY_COMPLETING
777-
// FAST PATH -- no children to wait for && simple state (no list) && not failing => can complete immediately
778-
// Failures always have to go through Finishing state to serialize exception handling
774+
/*
775+
* FAST PATH -- no children to wait for && simple state (no list) && not failing => can complete immediately
776+
* Failures always have to go through Finishing state to serialize exception handling.
777+
* Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
778+
* which may miss unhandled exception.
779+
*/
779780
if ((state is Empty || state is JobNode<*>) && state !is ChildJob && proposedUpdate !is CompletedExceptionally) {
780781
if (!tryFinalizeSimpleState(state, proposedUpdate, mode)) return COMPLETING_RETRY
781782
return COMPLETING_COMPLETED

common/kotlinx-coroutines-core-common/src/internal/Concurrent.common.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,5 @@ internal expect class ReentrantLock() {
2020
}
2121

2222
internal expect inline fun <T> ReentrantLock.withLock(action: () -> T): T
23+
24+
internal expect fun <E> identitySet(expectedSize: Int): MutableSet<E>

core/kotlinx-coroutines-core/src/internal/Concurrent.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.experimental.internal
66

7+
import java.util.*
78
import java.util.concurrent.*
89
import kotlin.concurrent.withLock as withLockJvm
910

@@ -13,3 +14,5 @@ internal actual fun <E> subscriberList(): SubscribersList<E> = CopyOnWriteArrayL
1314
internal actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLock
1415

1516
internal actual inline fun <T> ReentrantLock.withLock(action: () -> T) = this.withLockJvm(action)
17+
18+
internal actual fun <E> identitySet(expectedSize: Int): MutableSet<E> = Collections.newSetFromMap(IdentityHashMap(expectedSize))

core/kotlinx-coroutines-core/test/exceptions/JobExceptionHandlingTest.kt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,4 +270,36 @@ class JobExceptionHandlingTest : TestBase() {
270270
assertTrue(suppressed[0] is IOException)
271271
assertTrue(suppressed[1] is IllegalArgumentException)
272272
}
273+
274+
@Test
275+
fun testBadException() = runTest(unhandled = listOf({e -> e is BadException})) {
276+
val job = launch(Job()) {
277+
expect(2)
278+
launch {
279+
expect(3)
280+
throw BadException()
281+
}
282+
283+
launch(start = CoroutineStart.ATOMIC) {
284+
expect(4)
285+
throw BadException()
286+
}
287+
288+
yield()
289+
BadException()
290+
}
291+
292+
expect(1)
293+
yield()
294+
yield()
295+
expect(5)
296+
job.join()
297+
finish(6)
298+
}
299+
300+
private class BadException : Exception() {
301+
override fun hashCode(): Int {
302+
throw AssertionError()
303+
}
304+
}
273305
}

js/kotlinx-coroutines-core-js/src/internal/Concurrent.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,5 @@ internal class NoOpLock {
1414
}
1515

1616
internal actual fun <E> subscriberList(): SubscribersList<E> = CopyOnWriteList()
17+
18+
internal actual fun <E> identitySet(expectedSize: Int): MutableSet<E> = HashSet(expectedSize)

native/kotlinx-coroutines-core-native/src/internal/Concurrent.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,5 @@ internal class NoOpLock {
1414
}
1515

1616
internal actual fun <E> subscriberList(): MutableList<E> = CopyOnWriteList<E>()
17+
18+
internal actual fun <E> identitySet(expectedSize: Int): MutableSet<E> = HashSet()

0 commit comments

Comments
 (0)