Skip to content

Commit daa7922

Browse files
committed
Optimize single completion listener jobs (one fewer object allocation)
1 parent 0764177 commit daa7922

File tree

3 files changed

+177
-30
lines changed

3 files changed

+177
-30
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 141 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,43 @@ public suspend fun Job.join() {
125125
*/
126126
@Suppress("LeakingThis")
127127
public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
128-
// keeps a stack of cancel listeners or a special CANCELLED, other values denote completed scope
128+
/*
129+
=== States ===
130+
name state class is Active?
131+
------ ------------ ---------
132+
EMPTY Empty : Active -- no completion listener
133+
SINGLE JobNode : Active -- a single completion listener
134+
SINGLE+ JobNode : Active -- a single completion listener + NodeList added as its next
135+
LIST NodeList : Active -- a list of listeners (promoted just once, does not got back to JobNode/Empty)
136+
FINAL_C Cancelled : !Active -- cancelled (final state)
137+
FINAL_F Failed : !Active -- failed for other reason (final state)
138+
FINAL_R <any> : !Active -- produced some result
139+
140+
=== Transitions ===
141+
142+
Active states !Active states
143+
+---------+ +----------+
144+
initial -+-> | EMPTY | ------------> | FINAL_* |
145+
| +---------+ +----------+
146+
| | ^ ^
147+
| V | |
148+
| +---------+ |
149+
| | SINGLE | --------------------+
150+
| +---------+ |
151+
| | |
152+
| V |
153+
| +---------+ |
154+
+-- | SINGLE+ | --------------------+
155+
+---------+ |
156+
| |
157+
V |
158+
+---------+ |
159+
| LIST | --------------------+
160+
+---------+
161+
*/
162+
129163
@Volatile
130-
private var state: Any? = ActiveList() // will drop the list on cancel
164+
private var state: Any? = Empty // shared object while we have no listeners
131165

132166
@Volatile
133167
private var registration: Job.Registration? = null
@@ -138,7 +172,10 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
138172
AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "state")
139173
}
140174

141-
// invoke at most once after construction after all other initialization
175+
/**
176+
* Initializes parent job.
177+
* It shall be invoked at most once after construction after all other initialization.
178+
*/
142179
public fun initParentJob(parent: Job?) {
143180
if (parent == null) return
144181
check(registration == null)
@@ -149,11 +186,16 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
149186
if (state !is Active) newRegistration.unregister()
150187
}
151188

152-
protected fun getState(): Any? = state
189+
/**
190+
* Returns current state of this job.
191+
*/
192+
internal fun getState(): Any? = state
153193

194+
/**
195+
* Tries to update current [state][getState] of this job.
196+
*/
154197
protected fun updateState(expect: Any, update: Any?): Boolean {
155-
expect as ActiveList // assert type
156-
require(update !is Active) // only active -> inactive transition is allowed
198+
require(expect is Active && update !is Active) // only active -> inactive transition is allowed
157199
if (!STATE.compareAndSet(this, expect, update)) return false
158200
// #1. Update linked state before invoking completion handlers
159201
onStateUpdate(update)
@@ -162,36 +204,93 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
162204
// #3. Invoke completion handlers
163205
val reason = (update as? CompletedExceptionally)?.cancelReason
164206
var completionException: Throwable? = null
165-
expect.forEach<JobNode> { node ->
166-
try {
167-
node.invoke(reason)
207+
when (expect) {
208+
// SINGLE/SINGLE+ state -- one completion handler (common case)
209+
is JobNode -> try {
210+
expect.invoke(reason)
168211
} catch (ex: Throwable) {
169-
completionException?.apply { addSuppressed(ex) } ?: run { completionException = ex }
212+
completionException = ex
213+
}
214+
// LIST state -- a list of completion handlers
215+
is NodeList -> expect.forEach<JobNode> { node ->
216+
try {
217+
node.invoke(reason)
218+
} catch (ex: Throwable) {
219+
completionException?.apply { addSuppressed(ex) } ?: run { completionException = ex }
220+
}
221+
170222
}
223+
// otherwise -- do nothing (Empty)
224+
else -> check(expect == Empty)
171225
}
172226
// #4. Do other (overridable) processing after completion handlers
173227
completionException?.let { handleCompletionException(it) }
174228
afterCompletion(update)
175229
return true
176230
}
177231

178-
public override val isActive: Boolean get() = state is Active
232+
public final override val isActive: Boolean get() = state is Active
179233

180-
public override fun onCompletion(handler: CompletionHandler): Job.Registration {
234+
public final override fun onCompletion(handler: CompletionHandler): Job.Registration {
181235
var nodeCache: JobNode? = null
182236
while (true) { // lock-free loop on state
183237
val state = this.state
184-
if (state !is Active) {
185-
handler((state as? Cancelled)?.cancelReason)
186-
return EmptyRegistration
238+
when {
239+
// EMPTY state -- no completion handlers
240+
state === Empty -> {
241+
// try move to SINGLE state
242+
val node = nodeCache ?: makeNode(handler).also { nodeCache = it }
243+
if (STATE.compareAndSet(this, state, node)) return node
244+
}
245+
// SINGLE/SINGLE+ state -- one completion handler
246+
state is JobNode -> {
247+
// try promote it to the list (SINGLE+ state)
248+
state.addIfEmpty(NodeList())
249+
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
250+
val list = state.next() // either NodeList or somebody else won the race, updated state
251+
// just attempt converting it to list if state is still the same, then continue lock-free loop
252+
STATE.compareAndSet(this, state, list)
253+
}
254+
// LIST -- a list of completion handlers
255+
state is NodeList -> {
256+
val node = nodeCache ?: makeNode(handler).also { nodeCache = it }
257+
if (state.addLastIf(node) { this.state == state }) return node
258+
}
259+
// is not active anymore
260+
else -> {
261+
handler((state as? Cancelled)?.cancelReason)
262+
return EmptyRegistration
263+
}
264+
}
265+
}
266+
}
267+
268+
internal fun removeNode(node: JobNode) {
269+
// remove logic depends on the state of the job
270+
while (true) { // lock-free loop on job state
271+
val state = this.state
272+
when {
273+
// EMPTY state -- no completion handlers
274+
state === Empty -> return
275+
// SINGE/SINGLE+ state -- one completion handler
276+
state is JobNode -> {
277+
if (state !== this) return // a different job node --> we were already removed
278+
// try remove and revert back to empty state
279+
if (STATE.compareAndSet(this, state, Empty)) return
280+
}
281+
// LIST -- a list of completion handlers
282+
state is NodeList -> {
283+
// remove node from the list
284+
node.remove()
285+
return
286+
}
287+
// is not active anymore
288+
else -> return
187289
}
188-
val node = nodeCache ?: makeNode(handler).apply { nodeCache = this }
189-
state as ActiveList // assert type
190-
if (state.addLastIf(node) { this.state == state }) return node
191290
}
192291
}
193292

194-
public override fun cancel(reason: Throwable?): Boolean {
293+
public final override fun cancel(reason: Throwable?): Boolean {
195294
while (true) { // lock-free loop on state
196295
val state = this.state as? Active ?: return false // quit if not active anymore
197296
if (updateState(state, Cancelled(reason))) return true
@@ -219,16 +318,27 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
219318
(handler as? JobNode)?.also { require(it.job === this) }
220319
?: InvokeOnCompletion(this, handler)
221320

222-
protected interface Active
321+
/**
322+
* Marker interface for active [state][getState] of a job.
323+
*/
324+
public interface Active
223325

224-
private class ActiveList : LockFreeLinkedListHead(), Active
326+
private object Empty : Active
225327

226-
protected abstract class CompletedExceptionally {
328+
private class NodeList : LockFreeLinkedListHead(), Active
329+
330+
/**
331+
* Abstract class for a [state][getState] of a job that had completed exceptionally, including cancellation.
332+
*/
333+
public abstract class CompletedExceptionally {
227334
abstract val cancelReason: Throwable // original reason or fresh CancellationException
228335
abstract val exception: Throwable // the exception to be thrown in continuation
229336
}
230337

231-
protected class Cancelled(specifiedReason: Throwable?) : CompletedExceptionally() {
338+
/**
339+
* Represents a [state][getState] of a cancelled job.
340+
*/
341+
public class Cancelled(specifiedReason: Throwable?) : CompletedExceptionally() {
232342
@Volatile
233343
private var _cancelReason = specifiedReason // materialize CancellationException on first need
234344

@@ -246,20 +356,21 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
246356
.also { _exception = it }
247357
}
248358

249-
protected class Failed(override val exception: Throwable) : CompletedExceptionally() {
359+
/**
360+
* Represents a [state][getState] of a failed job.
361+
*/
362+
public class Failed(override val exception: Throwable) : CompletedExceptionally() {
250363
override val cancelReason: Throwable
251364
get() = exception
252365
}
253366
}
254367

255368
internal abstract class JobNode(
256369
val job: Job
257-
) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler {
258-
override fun unregister() {
259-
// this is an object-allocation optimization -- do not remove if job is not active anymore
260-
if (job.isActive) remove()
261-
}
262-
370+
) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler, JobSupport.Active {
371+
// if unregister is called on this instance, then Job was an instance of JobSupport that added this node it itself
372+
// directly without wrapping
373+
final override fun unregister() = (job as JobSupport).removeNode(this)
263374
override abstract fun invoke(reason: Throwable?)
264375
}
265376

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,16 @@ internal open class LockFreeLinkedListNode {
111111
}
112112
}
113113

114+
internal fun addIfEmpty(node: Node): Boolean {
115+
require(node.isFresh)
116+
PREV.lazySet(node, this)
117+
NEXT.lazySet(node, this)
118+
if (!NEXT.compareAndSet(this, this, node)) return false // this is not an empty list!
119+
// added successfully (linearized add) -- fixup the list
120+
node.finishAdd(this)
121+
return true
122+
}
123+
114124
@PublishedApi
115125
internal fun addLastCC(node: Node, condAdd: CondAdd?): Boolean {
116126
require(node.isFresh)

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DeferTest.kt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,30 @@ class DeferTest : TestBase() {
6060
expect(3)
6161
d.await() // will throw IOException
6262
}
63+
64+
@Test
65+
fun testDeferWithTwoWaiters() = runBlocking {
66+
expect(1)
67+
val d = defer(context) {
68+
expect(2)
69+
yield()
70+
expect(8)
71+
42
72+
}
73+
expect(3)
74+
launch(context) {
75+
expect(4)
76+
check(d.await() == 42)
77+
expect(9)
78+
}
79+
expect(5)
80+
launch(context) {
81+
expect(6)
82+
check(d.await() == 42)
83+
expect(10)
84+
}
85+
expect(7)
86+
yield() // this actually yields control to defer, which produces results and resumes both waiters (in order)
87+
finish(11)
88+
}
6389
}

0 commit comments

Comments
 (0)