Skip to content

Commit 9892dc2

Browse files
authored
Remove created threads restriction from BlockingCoroutineDispatcherThreadLimitStressTest (#4061)
* Better documentation * Remove incorrect assertion from BlockingCoroutineDispatcherThreadLimitStressTest Fixes #3712
1 parent 29e8213 commit 9892dc2

File tree

3 files changed

+48
-46
lines changed

3 files changed

+48
-46
lines changed

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,33 @@ import kotlin.jvm.internal.Ref.ObjectRef
1010
import kotlin.math.*
1111

1212
/**
13-
* Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutines
14-
* over worker threads, including both CPU-intensive and blocking tasks, in the most efficient manner.
13+
* Coroutine scheduler (pool of shared threads) with a primary target to distribute dispatched coroutines
14+
* over worker threads, including both CPU-intensive and potentially blocking tasks, in the most efficient manner.
1515
*
16-
* Current scheduler implementation has two optimization targets:
17-
* - Efficiency in the face of communication patterns (e.g. actors communicating via channel)
18-
* - Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool.
16+
* The current scheduler implementation has two optimization targets:
17+
* - Efficiency in the face of communication patterns (e.g. actors communicating via channel).
18+
* - Dynamic thread state and resizing to schedule blocking calls without re-dispatching coroutine to a separate "blocking" thread pool.
1919
*
2020
* ### Structural overview
2121
*
22-
* Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to
23-
* [maxPoolSize] lazily created threads to execute blocking tasks.
24-
* Every worker has a local queue in addition to a global scheduler queue
25-
* and the global queue has priority over local queue to avoid starvation of externally-submitted
26-
* (e.g. from Android UI thread) tasks.
27-
* Work-stealing is implemented on top of that queues to provide
28-
* even load distribution and illusion of centralized run queue.
22+
* The scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to
23+
* [maxPoolSize] lazily created threads to execute blocking tasks.
24+
* The scheduler has two global queues -- one for CPU tasks and one for blocking tasks.
25+
* These queues are used for tasks that a submited externally (from threads not belonging to the scheduler)
26+
* and as overflow buffers for thread-local queues.
27+
*
28+
* Every worker has a local queue in addition to global scheduler queues.
29+
* The queue to pick the task from is selected randomly to avoid starvation of both local queue and
30+
* global queue submitted tasks.
31+
* Work-stealing is implemented on top of that queues to provide even load distribution and an illusion of centralized run queue.
2932
*
3033
* ### Scheduling policy
3134
*
3235
* When a coroutine is dispatched from within a scheduler worker, it's placed into the head of worker run queue.
3336
* If the head is not empty, the task from the head is moved to the tail. Though it is an unfair scheduling policy,
3437
* it effectively couples communicating coroutines into one and eliminates scheduling latency
3538
* that arises from placing tasks to the end of the queue.
36-
* Placing former head to the tail is necessary to provide semi-FIFO order, otherwise, queue degenerates to stack.
39+
* Placing former head to the tail is necessary to provide semi-FIFO order, otherwise, queue degenerates to a stack.
3740
* When a coroutine is dispatched from an external thread, it's put into the global queue.
3841
* The original idea with a single-slot LIFO buffer comes from Golang runtime scheduler by D. Vyukov.
3942
* It was proven to be "fair enough", performant and generally well accepted and initially was a significant inspiration
@@ -45,39 +48,41 @@ import kotlin.math.*
4548
* before parking when his local queue is empty.
4649
* A non-standard solution is implemented to provide tasks affinity: a task from FIFO buffer may be stolen
4750
* only if it is stale enough based on the value of [WORK_STEALING_TIME_RESOLUTION_NS].
48-
* For this purpose, monotonic global clock is used, and every task has associated with its submission time.
51+
* For this purpose, monotonic global clock is used, and every task has a submission time associated with task.
4952
* This approach shows outstanding results when coroutines are cooperative,
50-
* but as downside scheduler now depends on a high-resolution global clock,
51-
* which may limit scalability on NUMA machines. Tasks from LIFO buffer can be stolen on a regular basis.
53+
* but as a downside, the scheduler now depends on a high-resolution global clock,
54+
* which may limit scalability on NUMA machines.
5255
*
5356
* ### Thread management
54-
* One of the hardest parts of the scheduler is decentralized management of the threads with the progress guarantees
57+
*
58+
* One of the hardest parts of the scheduler is decentralized management of the threads with progress guarantees
5559
* similar to the regular centralized executors.
5660
* The state of the threads consists of [controlState] and [parkedWorkersStack] fields.
57-
* The former field incorporates the amount of created threads, CPU-tokens and blocking tasks
58-
* that require a thread compensation,
59-
* while the latter represents intrusive versioned Treiber stack of idle workers.
60-
* When a worker cannot find any work, they first add themselves to the stack,
61+
* The former field incorporates the number of created threads, CPU-tokens and blocking tasks
62+
* that require thread compensation,
63+
* while the latter represents an intrusive versioned Treiber stack of idle workers.
64+
* When a worker cannot find any work, it first adds itself to the stack,
6165
* then re-scans the queue to avoid missing signals and then attempts to park
62-
* with additional rendezvous against unnecessary parking.
66+
* with an additional rendezvous against unnecessary parking.
6367
* If a worker finds a task that it cannot yet steal due to time constraints, it stores this fact in its state
6468
* (to be uncounted when additional work is signalled) and parks for such duration.
6569
*
66-
* When a new task arrives in the scheduler (whether it is local or global queue),
70+
* When a new task arrives to the scheduler (whether it is a local or a global queue),
6771
* either an idle worker is being signalled, or a new worker is attempted to be created.
6872
* (Only [corePoolSize] workers can be created for regular CPU tasks)
6973
*
7074
* ### Support for blocking tasks
75+
*
7176
* The scheduler also supports the notion of [blocking][TASK_PROBABLY_BLOCKING] tasks.
72-
* When executing or enqueuing blocking tasks, the scheduler notifies or creates one more worker in
73-
* addition to core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
74-
* to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
75-
* "CPU permits" -- [corePoolSize] special tokens that permit an arbitrary worker to execute and steal CPU-bound tasks.
76-
* When worker encounters blocking tasks, it basically hands off its permit to another thread (not directly though) to
77-
* keep invariant "scheduler always has at least min(pending CPU tasks, core pool size)
77+
* When executing or enqueuing blocking tasks, the scheduler notifies or creates an additional worker in
78+
* addition to the core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
79+
* available to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
80+
* "CPU permits" -- #[corePoolSize] special tokens that allow an arbitrary worker to execute and steal CPU-bound tasks.
81+
* When a worker encounters a blocking tasks, it releases its permit to the scheduler to
82+
* keep an invariant "scheduler always has at least min(pending CPU tasks, core pool size)
7883
* and at most core pool size threads to execute CPU tasks".
7984
* To avoid overprovision, workers without CPU permit are allowed to scan [globalBlockingQueue]
80-
* and steal **only** blocking tasks from other workers.
85+
* and steal **only** blocking tasks from other workers which imposes a non-trivial complexity to the queue management.
8186
*
8287
* The scheduler does not limit the count of pending blocking tasks, potentially creating up to [maxPoolSize] threads.
8388
* End users do not have access to the scheduler directly and can dispatch blocking tasks only with

kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherThreadLimitStressTest.kt

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class BlockingCoroutineDispatcherThreadLimitStressTest : SchedulerTestBase() {
1414
corePoolSize = CORES_COUNT
1515
}
1616

17-
private val observedConcurrency = ConcurrentHashMap<Int, Boolean>()
17+
private val observedParallelism = ConcurrentHashMap<Int, Boolean>().keySet(true)
1818
private val concurrentWorkers = AtomicInteger(0)
1919

2020
@Test
@@ -27,41 +27,32 @@ class BlockingCoroutineDispatcherThreadLimitStressTest : SchedulerTestBase() {
2727
async(limitingDispatcher) {
2828
try {
2929
val currentlyExecuting = concurrentWorkers.incrementAndGet()
30-
observedConcurrency[currentlyExecuting] = true
31-
assertTrue(currentlyExecuting <= CORES_COUNT)
30+
observedParallelism.add(currentlyExecuting)
3231
} finally {
3332
concurrentWorkers.decrementAndGet()
3433
}
3534
}
3635
}
37-
tasks.forEach { it.await() }
38-
for (i in CORES_COUNT + 1..CORES_COUNT * 2) {
39-
require(i !in observedConcurrency.keys) { "Unexpected state: $observedConcurrency" }
40-
}
41-
checkPoolThreadsCreated(0..CORES_COUNT + 1)
36+
tasks.awaitAll()
37+
assertEquals(1, observedParallelism.single(), "Expected parallelism should be 1, had $observedParallelism")
4238
}
4339
}
4440

4541
@Test
46-
@Ignore
4742
fun testLimitParallelism() = runBlocking {
4843
val limitingDispatcher = blockingDispatcher(CORES_COUNT)
4944
val iterations = 50_000 * stressTestMultiplier
5045
val tasks = (1..iterations).map {
5146
async(limitingDispatcher) {
5247
try {
5348
val currentlyExecuting = concurrentWorkers.incrementAndGet()
54-
observedConcurrency[currentlyExecuting] = true
55-
assertTrue(currentlyExecuting <= CORES_COUNT)
49+
observedParallelism.add(currentlyExecuting)
5650
} finally {
5751
concurrentWorkers.decrementAndGet()
5852
}
5953
}
6054
}
61-
tasks.forEach { it.await() }
62-
for (i in CORES_COUNT + 1..CORES_COUNT * 2) {
63-
require(i !in observedConcurrency.keys) { "Unexpected state: $observedConcurrency" }
64-
}
65-
checkPoolThreadsCreated(CORES_COUNT..CORES_COUNT * 3)
55+
tasks.awaitAll()
56+
assertTrue(observedParallelism.max() <= CORES_COUNT, "Unexpected state: $observedParallelism")
6657
}
6758
}

kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ abstract class SchedulerTestBase : TestBase() {
9595
}
9696
}
9797

98+
/**
99+
* Implementation note:
100+
* Our [Dispatcher.IO] is a [limitedParallelism][CoroutineDispatcher.limitedParallelism] dispatcher
101+
* on top of unbounded scheduler. We want to test this scenario, but on top of non-singleton
102+
* scheduler so we can control the number of threads, thus this method.
103+
*/
98104
internal fun SchedulerCoroutineDispatcher.blocking(parallelism: Int = 16): CoroutineDispatcher {
99105
return object : CoroutineDispatcher() {
100106

0 commit comments

Comments
 (0)