@@ -20,49 +20,73 @@ import kotlin.random.*
20
20
*
21
21
* Current scheduler implementation has two optimization targets:
22
22
* * Efficiency in the face of communication patterns (e.g., actors communicating via channel)
23
- * * Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool
23
+ * * Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool.
24
24
*
25
25
* ### Structural overview
26
26
*
27
- * Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to [maxPoolSize] lazily created threads
28
- * to execute blocking tasks. Every worker has a local queue in addition to a global scheduler queue and the global queue
29
- * has priority over local queue to avoid starvation of externally-submitted (e.g. from Android UI thread) tasks.
30
- * Work-stealing is implemented on top of that queues to provide even load distribution and illusion of centralized run queue.
27
+ * Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to
28
+ * [maxPoolSize] lazily created threads to execute blocking tasks.
29
+ * Every worker has a local queue in addition to a global scheduler queue
30
+ * and the global queue has priority over local queue to avoid starvation of externally-submitted
31
+ * (e.g. from Android UI thread) tasks.
32
+ * Work-stealing is implemented on top of that queues to provide
33
+ * even load distribution and illusion of centralized run queue.
31
34
*
32
35
* ### Scheduling policy
33
36
*
34
37
* When a coroutine is dispatched from within a scheduler worker, it's placed into the head of worker run queue.
35
- * If the head is not empty, the task from the head is moved to the tail. Though it is unfair scheduling policy,
38
+ * If the head is not empty, the task from the head is moved to the tail. Though it is an unfair scheduling policy,
36
39
* it effectively couples communicating coroutines into one and eliminates scheduling latency
37
- * that arises from placing task to the end of the queue.
38
- * Placing former head to the tail is necessary to provide semi-FIFO order, otherwise queue degenerates to stack.
40
+ * that arises from placing tasks to the end of the queue.
41
+ * Placing former head to the tail is necessary to provide semi-FIFO order, otherwise, queue degenerates to stack.
39
42
* When a coroutine is dispatched from an external thread, it's put into the global queue.
43
+ * The original idea with a single-slot LIFO buffer comes from Golang runtime scheduler by D. Vyukov.
44
+ * It was proven to be "fair enough", performant and generally well accepted and initially was a significant inspiration
45
+ * source for the coroutine scheduler.
40
46
*
41
47
* ### Work stealing and affinity
42
48
*
43
- * To provide even tasks distribution worker tries to steal tasks from other workers queues before parking when his local queue is empty.
44
- * A non-standard solution is implemented to provide tasks affinity: task from FIFO buffer may be stolen only if it is stale enough
45
- * (based on the value of [WORK_STEALING_TIME_RESOLUTION_NS]).
46
- * For this purpose monotonic global clock ([System.nanoTime]) is used and every task has associated with it submission time.
47
- * This approach shows outstanding results when coroutines are cooperative, but as downside scheduler now depends on high-resolution global clock
49
+ * To provide even tasks distribution worker tries to steal tasks from other workers queues
50
+ * before parking when his local queue is empty.
51
+ * A non-standard solution is implemented to provide tasks affinity: a task from FIFO buffer may be stolen
52
+ * only if it is stale enough based on the value of [WORK_STEALING_TIME_RESOLUTION_NS].
53
+ * For this purpose, monotonic global clock is used, and every task has associated with its submission time.
54
+ * This approach shows outstanding results when coroutines are cooperative,
55
+ * but as downside scheduler now depends on a high-resolution global clock,
48
56
* which may limit scalability on NUMA machines. Tasks from LIFO buffer can be stolen on a regular basis.
49
57
*
50
58
* ### Thread management
51
- * One of the hardest parts of the scheduler is decentralized management of the threads with the progress guarantees similar
52
- * to the regular centralized executors. The state of the threads consists of [controlState] and [parkedWorkersStack] fields.
53
- * The former field incorporates the amount of created threads, CPU-tokens and blocking tasks that require a thread compensation,
59
+ * One of the hardest parts of the scheduler is decentralized management of the threads with the progress guarantees
60
+ * similar to the regular centralized executors.
61
+ * The state of the threads consists of [controlState] and [parkedWorkersStack] fields.
62
+ * The former field incorporates the amount of created threads, CPU-tokens and blocking tasks
63
+ * that require a thread compensation,
54
64
* while the latter represents intrusive versioned Treiber stack of idle workers.
55
- * When a worker cannot find any work, he first adds itself to the stack, then re-scans the queue (to avoid missing signal)
56
- * and then attempts to park itself (there is additional layer of signalling against unnecessary park/unpark).
57
- * If worker finds a task that it cannot yet steal due to timer constraints, it stores this fact in its state
65
+ * When a worker cannot find any work, they first add themselves to the stack,
66
+ * then re-scans the queue to avoid missing signals and then attempts to park
67
+ * with additional rendezvous against unnecessary parking.
68
+ * If a worker finds a task that it cannot yet steal due to time constraints, it stores this fact in its state
58
69
* (to be uncounted when additional work is signalled) and parks for such duration.
59
70
*
60
- * When a new task arrives to the scheduler (whether it's local or global queue), either an idle worker is being signalled, or
61
- * a new worker is attempted to be created (only [corePoolSize] workers can be created for regular CPU tasks).
71
+ * When a new task arrives in the scheduler (whether it is local or global queue),
72
+ * either an idle worker is being signalled, or a new worker is attempted to be created.
73
+ * Only [corePoolSize] workers can be created for regular CPU tasks)
62
74
*
63
- * ### Dynamic resizing and support of blocking tasks
75
+ * ### Support for blocking tasks
76
+ * The scheduler also supports the notion of [blocking][TaskMode.PROBABLY_BLOCKING] tasks.
77
+ * When executing or enqueuing blocking tasks, the scheduler notifies or creates one more worker in
78
+ * addition to core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
79
+ * to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
80
+ * "CPU permits" -- [corePoolSize] special tokens that permit an arbitrary worker to execute and steal CPU-bound tasks.
81
+ * When worker encounters blocking tasks, it basically hands off its permit to another thread (not directly though) to
82
+ * keep invariant "scheduler always has at least min(pending CPU tasks, core pool size)
83
+ * and at most core pool size threads to execute CPU tasks".
84
+ * To avoid overprovision, workers without CPU permit are allowed to scan [globalBlockingQueue]
85
+ * and steal **only** blocking tasks from other workers.
64
86
*
65
- * TODO
87
+ * The scheduler does not limit the count of pending blocking tasks, potentially creating up to [maxPoolSize] threads.
88
+ * End users do not have access to the scheduler directly and can dispatch blocking tasks only with
89
+ * [LimitingDispatcher] that does control concurrency level by its own mechanism.
66
90
*/
67
91
@Suppress(" NOTHING_TO_INLINE" )
68
92
internal class CoroutineScheduler (
@@ -469,7 +493,7 @@ internal class CoroutineScheduler(
469
493
*/
470
494
if (worker.state == = WorkerState .TERMINATED ) return task
471
495
// Do not add CPU tasks in local queue if we are not able to execute it
472
- if (task.mode == TaskMode .NON_BLOCKING && worker.isBlocking ) {
496
+ if (task.mode == = TaskMode .NON_BLOCKING && worker.state == = WorkerState . BLOCKING ) {
473
497
return task
474
498
}
475
499
worker.mayHaveLocalTasks = true
@@ -487,7 +511,6 @@ internal class CoroutineScheduler(
487
511
* E.g. for [1b, 1b, 2c, 1d] means that pool has
488
512
* two blocking workers with queue size 1, one worker with CPU permit and queue size 1
489
513
* and one dormant (executing his local queue before parking) worker with queue size 1.
490
- * TODO revisit
491
514
*/
492
515
override fun toString (): String {
493
516
var parkedWorkers = 0
@@ -530,10 +553,10 @@ internal class CoroutineScheduler(
530
553
" running workers queues = $queueSizes , " +
531
554
" global CPU queue size = ${globalCpuQueue.size} , " +
532
555
" global blocking queue size = ${globalBlockingQueue.size} , " +
533
- " Control State Workers {" +
534
- " created = ${createdWorkers(state)} , " +
535
- " blocking = ${blockingTasks(state)} , " +
536
- " CPU acquired = ${corePoolSize - availableCpuPermits(state)} " +
556
+ " Control State {" +
557
+ " created workers = ${createdWorkers(state)} , " +
558
+ " blocking tasks = ${blockingTasks(state)} , " +
559
+ " CPUs acquired = ${corePoolSize - availableCpuPermits(state)} " +
537
560
" }]"
538
561
}
539
562
@@ -574,9 +597,8 @@ internal class CoroutineScheduler(
574
597
* Worker state. **Updated only by this worker thread**.
575
598
* By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken.
576
599
*/
577
- @Volatile
600
+ @JvmField
578
601
var state = WorkerState .DORMANT
579
- val isBlocking: Boolean get() = state == WorkerState .BLOCKING
580
602
581
603
/* *
582
604
* Small state machine for termination.
@@ -634,15 +656,13 @@ internal class CoroutineScheduler(
634
656
* Tries to acquire CPU token if worker doesn't have one
635
657
* @return whether worker acquired (or already had) CPU token
636
658
*/
637
- private fun tryAcquireCpuPermit (): Boolean {
638
- return when {
639
- state == WorkerState .CPU_ACQUIRED -> true
640
- this @CoroutineScheduler.tryAcquireCpuPermit() -> {
641
- state = WorkerState .CPU_ACQUIRED
642
- true
643
- }
644
- else -> false
659
+ private fun tryAcquireCpuPermit (): Boolean = when {
660
+ state == WorkerState .CPU_ACQUIRED -> true
661
+ this @CoroutineScheduler.tryAcquireCpuPermit() -> {
662
+ state = WorkerState .CPU_ACQUIRED
663
+ true
645
664
}
665
+ else -> false
646
666
}
647
667
648
668
/* *
@@ -711,22 +731,21 @@ internal class CoroutineScheduler(
711
731
private fun tryPark () {
712
732
if (! inStack()) {
713
733
parkingState.value = PARKING_ALLOWED
714
- }
715
- if (parkedWorkersStackPush(this )) {
734
+ parkedWorkersStackPush(this )
716
735
return
717
- } else {
718
- assert { localQueue.size == 0 }
719
- // Failed to get a parking permit => we are not in the stack
720
- while (inStack()) {
721
- if (isTerminated || state == WorkerState . TERMINATED ) break
722
- if (parkingState.value != PARKED && ! parkingState.compareAndSet( PARKING_ALLOWED , PARKED )) {
723
- return
724
- }
725
- tryReleaseCpu( WorkerState . PARKING )
726
- interrupted() // Cleanup interruptions
727
- if (inStack()) {
728
- park()
729
- }
736
+
737
+ }
738
+ assert { localQueue.size == 0 }
739
+ // Failed to get a parking permit => we are not in the stack
740
+ while (inStack()) {
741
+ if (isTerminated || state == WorkerState . TERMINATED ) break
742
+ if (parkingState.value != PARKED && ! parkingState.compareAndSet( PARKING_ALLOWED , PARKED )) {
743
+ return
744
+ }
745
+ tryReleaseCpu( WorkerState . PARKING )
746
+ interrupted() // Cleanup interruptions
747
+ if (inStack()) {
748
+ park()
730
749
}
731
750
}
732
751
}
0 commit comments