Skip to content

Commit 216e4d3

Browse files
elizarovqwwdfsad
authored andcommitted
Removed wrapper from LimitingBlockingDispatcher by introducing TaskContext
1 parent c5bc9fe commit 216e4d3

File tree

5 files changed

+61
-49
lines changed

5 files changed

+61
-49
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -322,14 +322,14 @@ internal class CoroutineScheduler(
322322
* this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
323323
*
324324
* @param block runnable to be dispatched
325-
* @param mode mode of given [block] which is used as a hint to a dynamic resizing mechanism
325+
* @param taskContext concurrency context of given [block]
326326
* @param fair whether the task should be dispatched fairly (strict FIFO) or not (semi-FIFO)
327327
*/
328-
fun dispatch(block: Runnable, mode: TaskMode = TaskMode.NON_BLOCKING, fair: Boolean = false) {
328+
fun dispatch(block: Runnable, taskContext: TaskContext? = null, fair: Boolean = false) {
329329
// TODO at some point make DispatchTask extend Task and make its field settable to save an allocation
330-
val task = Task(block, schedulerTimeSource.nanoTime(), mode)
330+
val task = Task(block, schedulerTimeSource.nanoTime(), taskContext)
331331
// try to submit the task to the local queue and act depending on the result
332-
when (submitToLocalQueue(task, mode, fair)) {
332+
when (submitToLocalQueue(task, fair)) {
333333
ADDED -> return
334334
NOT_ADDED -> {
335335
globalQueue.addLast(task) // offload task to local queue
@@ -452,11 +452,11 @@ internal class CoroutineScheduler(
452452
/**
453453
* Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP].
454454
*/
455-
private fun submitToLocalQueue(task: Task, mode: TaskMode, fair: Boolean): Int {
455+
private fun submitToLocalQueue(task: Task, fair: Boolean): Int {
456456
val worker = Thread.currentThread() as? Worker ?: return NOT_ADDED
457457
var result = ADDED
458458

459-
if (mode == TaskMode.NON_BLOCKING) {
459+
if (task.mode == TaskMode.NON_BLOCKING) {
460460
/*
461461
* If the worker is currently executing blocking task and tries to dispatch non-blocking task, it's one the following reasons:
462462
* 1) Blocking worker is finishing its block and resumes non-blocking continuation
@@ -685,23 +685,23 @@ internal class CoroutineScheduler(
685685
wasIdle = false
686686
}
687687
beforeTask(task)
688-
runSafely(task.block)
688+
runSafely(task)
689689
afterTask(task)
690690
}
691691
}
692692
tryReleaseCpu(WorkerState.TERMINATED)
693693
}
694694

695-
private fun runSafely(block: Runnable) {
695+
private fun runSafely(task: Task) {
696696
try {
697-
block.run()
697+
task.run()
698698
} catch (t: Throwable) {
699699
uncaughtExceptionHandler.uncaughtException(this, t)
700700
}
701701
}
702702

703-
private fun beforeTask(job: Task) {
704-
if (job.mode != TaskMode.NON_BLOCKING) {
703+
private fun beforeTask(task: Task) {
704+
if (task.mode != TaskMode.NON_BLOCKING) {
705705
/*
706706
* We should release CPU *before* checking for CPU starvation,
707707
* otherwise requestCpuWorker() will not count current thread as blocking
@@ -720,7 +720,7 @@ internal class CoroutineScheduler(
720720
return
721721
}
722722
val now = schedulerTimeSource.nanoTime()
723-
if (now - job.submissionTime >= WORK_STEALING_TIME_RESOLUTION_NS &&
723+
if (now - task.submissionTime >= WORK_STEALING_TIME_RESOLUTION_NS &&
724724
now - lastExhaustionTime >= WORK_STEALING_TIME_RESOLUTION_NS * 5
725725
) {
726726
lastExhaustionTime = now
@@ -729,8 +729,8 @@ internal class CoroutineScheduler(
729729
}
730730

731731

732-
private fun afterTask(job: Task) {
733-
if (job.mode != TaskMode.NON_BLOCKING) {
732+
private fun afterTask(task: Task) {
733+
if (task.mode != TaskMode.NON_BLOCKING) {
734734
decrementBlockingWorkers()
735735
assert(state == WorkerState.BLOCKING) { "Expected BLOCKING state, but has $state" }
736736
state = WorkerState.RETIRING

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ class ExperimentalCoroutineDispatcher(
1616
) : CoroutineDispatcher(), Delay, Closeable {
1717
private val coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize)
1818

19-
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
19+
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
20+
coroutineScheduler.dispatch(block)
2021

21-
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block, fair = true)
22+
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
23+
coroutineScheduler.dispatch(block, fair = true)
2224

2325
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>): Unit =
2426
DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
@@ -38,32 +40,33 @@ class ExperimentalCoroutineDispatcher(
3840
*/
3941
fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
4042
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
41-
return LimitingBlockingDispatcher(parallelism, TaskMode.PROBABLY_BLOCKING, this)
43+
return LimitingBlockingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
4244
}
4345

44-
internal fun dispatchBlocking(block: Runnable, context: TaskMode, fair: Boolean): Unit = coroutineScheduler.dispatch(block, context, fair)
46+
internal fun dispatchWithContext(block: Runnable, context: TaskContext?, fair: Boolean): Unit =
47+
coroutineScheduler.dispatch(block, context, fair)
4548
}
4649

4750
private class LimitingBlockingDispatcher(
51+
val dispatcher: ExperimentalCoroutineDispatcher,
4852
val parallelism: Int,
49-
val taskContext: TaskMode,
50-
val dispatcher: ExperimentalCoroutineDispatcher
51-
) : CoroutineDispatcher(), Delay {
53+
override val taskMode: TaskMode
54+
) : CoroutineDispatcher(), Delay, TaskContext {
5255

5356
private val queue = ConcurrentLinkedQueue<Runnable>()
5457
private val inFlightTasks = atomic(0)
5558

5659
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
5760

5861
private fun dispatch(block: Runnable, fair: Boolean) {
59-
var taskToSchedule = wrap(block)
62+
var taskToSchedule = block
6063
while (true) {
6164
// Commit in-flight tasks slot
6265
val inFlight = inFlightTasks.incrementAndGet()
6366

6467
// Fast path, if parallelism limit is not reached, dispatch task and return
6568
if (inFlight <= parallelism) {
66-
dispatcher.dispatchBlocking(taskToSchedule, taskContext, fair)
69+
dispatcher.dispatchWithContext(taskToSchedule, this, fair)
6770
return
6871
}
6972

@@ -97,10 +100,6 @@ private class LimitingBlockingDispatcher(
97100
return "${super.toString()}[dispatcher = $dispatcher]"
98101
}
99102

100-
private fun wrap(block: Runnable): Runnable {
101-
return block as? WrappedTask ?: WrappedTask(block)
102-
}
103-
104103
/**
105104
* Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any.
106105
*
@@ -114,11 +113,11 @@ private class LimitingBlockingDispatcher(
114113
* ```
115114
* it's more profitable to execute bar at the end of `blocking` rather than pending blocking task
116115
*/
117-
private fun afterTask() {
116+
override fun afterTask() {
118117
var next = queue.poll()
119118
// If we have pending tasks in current blocking context, dispatch first
120119
if (next != null) {
121-
dispatcher.dispatchBlocking(next, taskContext, true)
120+
dispatcher.dispatchWithContext(next, this, true)
122121
return
123122
}
124123
inFlightTasks.decrementAndGet()
@@ -138,16 +137,6 @@ private class LimitingBlockingDispatcher(
138137
dispatch(next, true)
139138
}
140139

141-
private inner class WrappedTask(val runnable: Runnable) : Runnable {
142-
override fun run() {
143-
try {
144-
runnable.run()
145-
} finally {
146-
afterTask()
147-
}
148-
}
149-
}
150-
151140
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) =
152141
dispatcher.scheduleResumeAfterDelay(time, unit, continuation)
153142
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,32 @@ internal enum class TaskMode {
4343
PROBABLY_BLOCKING,
4444
}
4545

46+
internal interface TaskContext {
47+
val taskMode: TaskMode
48+
fun afterTask()
49+
}
50+
4651
internal class Task(
47-
val block: Runnable,
48-
val submissionTime: Long,
49-
val mode: TaskMode
50-
) : LockFreeMPMCQueueNode<Task>() {
52+
@JvmField val block: Runnable,
53+
@JvmField val submissionTime: Long,
54+
@JvmField val taskContext: TaskContext?
55+
) : Runnable, LockFreeMPMCQueueNode<Task>() {
56+
val mode: TaskMode get() = taskContext?.taskMode ?: TaskMode.NON_BLOCKING
57+
58+
override fun run() {
59+
if (taskContext == null) {
60+
block.run()
61+
} else {
62+
try {
63+
block.run()
64+
} finally {
65+
taskContext.afterTask()
66+
}
67+
}
68+
}
69+
5170
override fun toString(): String =
52-
"Task[${block.classSimpleName}@${block.hexAddress}, $submissionTime, $mode]"
71+
"Task[${block.classSimpleName}@${block.hexAddress}, $submissionTime, $taskContext]"
5372
}
5473

5574
// Open for tests

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ class CoroutineSchedulerTest : TestBase() {
99
@Test
1010
fun testModesExternalSubmission() { // Smoke
1111
CoroutineScheduler(1, 1).use {
12-
for (value in TaskMode.values()) {
12+
for (mode in TaskMode.values()) {
1313
val latch = CountDownLatch(1)
1414
it.dispatch(Runnable {
1515
latch.countDown()
16-
}, mode = value)
16+
}, TaskContextImpl(mode))
1717

1818
latch.await()
1919
}
@@ -25,10 +25,10 @@ class CoroutineSchedulerTest : TestBase() {
2525
CoroutineScheduler(2, 2).use {
2626
val latch = CountDownLatch(1)
2727
it.dispatch(Runnable {
28-
for (value in TaskMode.values()) {
28+
for (mode in TaskMode.values()) {
2929
it.dispatch(Runnable {
3030
latch.countDown()
31-
}, mode = value)
31+
}, TaskContextImpl(mode))
3232
}
3333
})
3434

@@ -126,4 +126,8 @@ class CoroutineSchedulerTest : TestBase() {
126126
check(ratio >= 0.9)
127127
}
128128
}
129+
130+
private class TaskContextImpl(override val taskMode: TaskMode) : TaskContext {
131+
override fun afterTask() {}
132+
}
129133
}

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ internal fun GlobalQueue.asTimeList(): List<Long> {
126126
return result
127127
}
128128

129-
internal fun task(n: Long) = Task(Runnable {}, n, TaskMode.NON_BLOCKING)
129+
internal fun task(n: Long) = Task(Runnable {}, n, null)
130130

131131
internal fun WorkQueue.drain(): List<Long> {
132132
var task: Task? = poll()

0 commit comments

Comments
 (0)