Skip to content

Commit 6427e0e

Browse files
authored
Reduce contention on worker's lock in CoroutineScheduler (#3686)
The previous implementation was prone to a non-trivial contention that caused EDT freezes and, potentially, Android's ANRs. Two root causes were identified: 1) Thread() constructor that has a non-trivial complexity along with JVM upcalls and is significantly slower than any other regular allocation 2) Thread.start() is on itself a JVM upcall that ends up on a global JVM lock[s] Thread.start() is now invoked when the lock is released to reduce contention. The first root cause is not addressed as optimistic thread allocation may lead to a potential CPU waste due to how optimistically lock-less detection is and because Thread.start() is an order of magnitude slower anyway Fixes #3652
1 parent 9244752 commit 6427e0e

File tree

9 files changed

+43
-25
lines changed

9 files changed

+43
-25
lines changed

kotlinx-coroutines-core/common/src/JobSupport.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
208208
assert { state.isCompleting } // consistency check -- must be marked as completing
209209
val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
210210
// Create the final exception and seal the state so that no more exceptions can be added
211-
var wasCancelling = false // KLUDGE: we cannot have contract for our own expect fun synchronized
211+
val wasCancelling: Boolean
212212
val finalException = synchronized(state) {
213213
wasCancelling = state.isCancelling
214214
val exceptions = state.sealLocked(proposedException)

kotlinx-coroutines-core/common/src/flow/StateFlow.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,8 @@ private class StateFlowImpl<T>(
319319
updateState(expect ?: NULL, update ?: NULL)
320320

321321
private fun updateState(expectedState: Any?, newState: Any): Boolean {
322-
var curSequence = 0
323-
var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
322+
var curSequence: Int
323+
var curSlots: Array<StateFlowSlot?>? // benign race, we will not use it
324324
synchronized(this) {
325325
val oldState = _state.value
326326
if (expectedState != null && oldState != expectedState) return false // CAS support

kotlinx-coroutines-core/common/src/flow/internal/AbstractSharedFlow.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : Sync
4141
@Suppress("UNCHECKED_CAST")
4242
protected fun allocateSlot(): S {
4343
// Actually create slot under lock
44-
var subscriptionCount: SubscriptionCountStateFlow? = null
44+
val subscriptionCount: SubscriptionCountStateFlow?
4545
val slot = synchronized(this) {
4646
val slots = when (val curSlots = slots) {
4747
null -> createSlotArray(2).also { slots = it }
@@ -72,7 +72,7 @@ internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : Sync
7272
@Suppress("UNCHECKED_CAST")
7373
protected fun freeSlot(slot: S) {
7474
// Release slot under lock
75-
var subscriptionCount: SubscriptionCountStateFlow? = null
75+
val subscriptionCount: SubscriptionCountStateFlow?
7676
val resumes = synchronized(this) {
7777
nCollectors--
7878
subscriptionCount = _subscriptionCount // retrieve under lock if initialized

kotlinx-coroutines-core/common/src/internal/Synchronized.common.kt

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.internal
66

77
import kotlinx.coroutines.*
8+
import kotlin.contracts.*
89

910
/**
1011
* @suppress **This an internal API and should not be used from general code.**
@@ -16,4 +17,16 @@ public expect open class SynchronizedObject() // marker abstract class
1617
* @suppress **This an internal API and should not be used from general code.**
1718
*/
1819
@InternalCoroutinesApi
19-
public expect inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T
20+
public expect inline fun <T> synchronizedImpl(lock: SynchronizedObject, block: () -> T): T
21+
22+
/**
23+
* @suppress **This an internal API and should not be used from general code.**
24+
*/
25+
@OptIn(ExperimentalContracts::class)
26+
@InternalCoroutinesApi
27+
public inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T {
28+
contract {
29+
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
30+
}
31+
return synchronizedImpl(lock, block)
32+
}

kotlinx-coroutines-core/js/src/internal/Synchronized.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,4 @@ public actual typealias SynchronizedObject = Any
1616
* @suppress **This an internal API and should not be used from general code.**
1717
*/
1818
@InternalCoroutinesApi
19-
public actual inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T =
20-
block()
19+
public actual inline fun <T> synchronizedImpl(lock: SynchronizedObject, block: () -> T): T = block()

kotlinx-coroutines-core/jvm/src/internal/ResizableAtomicArray.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ internal class ResizableAtomicArray<T>(initialLength: Int) {
2828
val curLen = curArray.length()
2929
if (index < curLen) {
3030
curArray[index] = value
31-
} else {
32-
val newArray = AtomicReferenceArray<T>((index + 1).coerceAtLeast(2 * curLen))
33-
for (i in 0 until curLen) newArray[i] = curArray[i]
34-
newArray[index] = value
35-
array = newArray // copy done
31+
return
3632
}
33+
// It would be nice to copy array in batch instead of 1 by 1, but it seems like Java has no API for that
34+
val newArray = AtomicReferenceArray<T>((index + 1).coerceAtLeast(2 * curLen))
35+
for (i in 0 until curLen) newArray[i] = curArray[i]
36+
newArray[index] = value
37+
array = newArray // copy done
3738
}
3839
}

kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ public actual typealias SynchronizedObject = Any
1616
* @suppress **This an internal API and should not be used from general code.**
1717
*/
1818
@InternalCoroutinesApi
19-
public actual inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T =
19+
public actual inline fun <T> synchronizedImpl(lock: SynchronizedObject, block: () -> T): T =
2020
kotlin.synchronized(lock, block)

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -252,16 +252,21 @@ internal class CoroutineScheduler(
252252

253253
/**
254254
* State of worker threads.
255-
* [workers] is array of lazily created workers up to [maxPoolSize] workers.
255+
* [workers] is a dynamic array of lazily created workers up to [maxPoolSize] workers.
256256
* [createdWorkers] is count of already created workers (worker with index lesser than [createdWorkers] exists).
257-
* [blockingTasks] is count of pending (either in the queue or being executed) tasks
257+
* [blockingTasks] is count of pending (either in the queue or being executed) blocking tasks.
258+
*
259+
* Workers array is also used as a lock for workers' creation and termination sequence.
258260
*
259261
* **NOTE**: `workers[0]` is always `null` (never used, works as sentinel value), so
260262
* workers are 1-indexed, code path in [Worker.trySteal] is a bit faster and index swap during termination
261-
* works properly
263+
* works properly.
264+
*
265+
* Initial size is `Dispatchers.Default` size * 2 to prevent unnecessary resizes for slightly or steadily loaded
266+
* applications.
262267
*/
263268
@JvmField
264-
val workers = ResizableAtomicArray<Worker>(corePoolSize + 1)
269+
val workers = ResizableAtomicArray<Worker>((corePoolSize + 1) * 2)
265270

266271
/**
267272
* The `Long` value describing the state of workers in this pool.
@@ -464,12 +469,13 @@ internal class CoroutineScheduler(
464469
}
465470
}
466471

467-
/*
472+
/**
468473
* Returns the number of CPU workers after this function (including new worker) or
469474
* 0 if no worker was created.
470475
*/
471476
private fun createNewWorker(): Int {
472-
synchronized(workers) {
477+
val worker: Worker
478+
return synchronized(workers) {
473479
// Make sure we're not trying to resurrect terminated scheduler
474480
if (isTerminated) return -1
475481
val state = controlState.value
@@ -487,12 +493,11 @@ internal class CoroutineScheduler(
487493
* 2) Make it observable by increment created workers count
488494
* 3) Only then start the worker, otherwise it may miss its own creation
489495
*/
490-
val worker = Worker(newIndex)
496+
worker = Worker(newIndex)
491497
workers.setSynchronized(newIndex, worker)
492498
require(newIndex == incrementCreatedWorkers())
493-
worker.start()
494-
return cpuWorkers + 1
495-
}
499+
cpuWorkers + 1
500+
}.also { worker.start() } // Start worker when the lock is released to reduce contention, see #3652
496501
}
497502

498503
/**

kotlinx-coroutines-core/native/src/internal/Synchronized.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ public actual typealias SynchronizedObject = kotlinx.atomicfu.locks.Synchronized
1717
* @suppress **This an internal API and should not be used from general code.**
1818
*/
1919
@InternalCoroutinesApi
20-
public actual inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T = lock.withLock2(block)
20+
public actual inline fun <T> synchronizedImpl(lock: SynchronizedObject, block: () -> T): T = lock.withLock2(block)

0 commit comments

Comments
 (0)