Skip to content

Commit 37e45cd

Browse files
committed
Implement yield() for experimental scheduler
1 parent 18a9134 commit 37e45cd

File tree

7 files changed

+41
-21
lines changed

7 files changed

+41
-21
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,16 @@ public abstract class CoroutineDispatcher :
7777
*/
7878
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
7979

80+
/**
81+
* Dispatches execution of a runnable [block] onto another thread in the given [context]
82+
* with a hint for dispatcher that current dispatch is triggered by [yield] call, so execution of this
83+
* continuation may be delayed in favor of already dispatched coroutines.
84+
*
85+
* **Implementation note** though yield marker may be passed as a part of [context], this
86+
* is a separate method for performance reasons
87+
*/
88+
public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
89+
8090
/**
8191
* Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
8292
*/

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ internal class DispatchedContinuation<in T>(
101101
val context = continuation.context
102102
_state = value
103103
resumeMode = MODE_CANCELLABLE
104-
dispatcher.dispatch(context, this)
104+
dispatcher.dispatchYield(context, this)
105105
}
106106

107107
override fun toString(): String =

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,9 @@ internal class CoroutineScheduler(
267267
private fun tryUnpark(): Boolean {
268268
while (true) {
269269
val worker = parkedWorkersStack.pop() ?: return false
270-
if (!worker.registeredInParkedQueue.value) {
270+
if (!worker.registeredInStack.value) {
271271
continue // Someone else succeeded
272-
} else if (!worker.registeredInParkedQueue.compareAndSet(true, false)) {
272+
} else if (!worker.registeredInStack.compareAndSet(true, false)) {
273273
continue // Someone else succeeded
274274
}
275275

@@ -434,7 +434,8 @@ internal class CoroutineScheduler(
434434
"retired workers = $retired, " +
435435
"finished workers = $finished, " +
436436
"running workers queues = $queueSizes, "+
437-
"global queue size = ${globalWorkQueue.size()}]"
437+
"global queue size = ${globalWorkQueue.size()}], " +
438+
"control state: ${controlState.value}"
438439
}
439440

440441
// todo: make name of the pool configurable (optional parameter to CoroutineScheduler) and base thread names on it
@@ -473,7 +474,7 @@ internal class CoroutineScheduler(
473474
* Worker registers itself in this queue once and will stay there until
474475
* someone will call [Queue.poll] which return it, then this flag is reset.
475476
*/
476-
val registeredInParkedQueue = atomic(false)
477+
val registeredInStack = atomic(false)
477478
var nextParkedWorker: PoolWorker? = null
478479

479480
/**
@@ -624,7 +625,7 @@ internal class CoroutineScheduler(
624625
parkTimeNs = (parkTimeNs * 3 shr 1).coerceAtMost(MAX_PARK_TIME_NS)
625626
}
626627

627-
if (registeredInParkedQueue.compareAndSet(false, true)) {
628+
if (registeredInStack.compareAndSet(false, true)) {
628629
parkedWorkersStack.push(this)
629630
}
630631

@@ -636,7 +637,7 @@ internal class CoroutineScheduler(
636637

637638
private fun blockingWorkerIdle() {
638639
tryReleaseCpu(WorkerState.PARKING)
639-
if (registeredInParkedQueue.compareAndSet(false, true)) {
640+
if (registeredInStack.compareAndSet(false, true)) {
640641
parkedWorkersStack.push(this)
641642
}
642643

@@ -671,7 +672,7 @@ internal class CoroutineScheduler(
671672
* Either thread successfully deregisters itself from queue (and then terminate) or someone else
672673
* tried to unpark it. In the latter case we should proceed as unparked worker
673674
*/
674-
if (registeredInParkedQueue.value && !registeredInParkedQueue.compareAndSet(true, false)) {
675+
if (registeredInStack.value && !registeredInStack.compareAndSet(true, false)) {
675676
return
676677
}
677678

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@ import java.io.*
66
import java.util.concurrent.*
77
import kotlin.coroutines.experimental.*
88

9+
// TODO make internal after integration wih Ktor
910
class ExperimentalCoroutineDispatcher(corePoolSize: Int = Runtime.getRuntime().availableProcessors(), maxPoolSize: Int = MAX_POOL_SIZE) : CoroutineDispatcher(), Delay, Closeable {
1011

1112
private val coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize)
1213

13-
/**
14-
* TODO: yield doesn't work as expected
15-
*/
1614
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
1715

16+
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block, fair = true)
17+
1818
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>): Unit =
1919
DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
2020

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ internal val QUEUE_SIZE_OFFLOAD_THRESHOLD = readFromSystemProperties(
1212
"kotlinx.coroutines.scheduler.offload.threshold", 96L)
1313

1414
internal val BLOCKING_DEFAULT_PARALLELISM = readFromSystemProperties(
15-
"kotlinx.coroutines.scheduler.blocking.parallelism", 16L).toInt()
15+
"kotlinx.coroutines.scheduler.blocking.parallelism", 16)
1616

1717
internal val MAX_POOL_SIZE = readFromSystemProperties(
18-
"kotlinx.coroutines.scheduler.max.pool.size", Runtime.getRuntime().availableProcessors() * 128L).toInt()
18+
"kotlinx.coroutines.scheduler.max.pool.size", Runtime.getRuntime().availableProcessors() * 128)
1919

2020
internal var schedulerTimeSource: TimeSource = NanoTimeSource
2121

@@ -36,7 +36,9 @@ internal object NanoTimeSource : TimeSource() {
3636
override fun nanoTime() = System.nanoTime()
3737
}
3838

39-
private fun readFromSystemProperties(propertyName: String, defaultValue: Long): Long {
39+
internal fun readFromSystemProperties(propertyName: String, defaultValue: Int): Int = readFromSystemProperties(propertyName, defaultValue.toLong()).toInt()
40+
41+
internal fun readFromSystemProperties(propertyName: String, defaultValue: Long): Long {
4042
val value = try {
4143
System.getProperty(propertyName)
4244
} catch (e: SecurityException) {

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,19 @@ internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default
2727
*/
2828
internal class WorkQueue {
2929

30-
// todo: There is non-atomicity in computing bufferSize (indices update separately).
31-
// todo: It can lead to arbitrary values of resulting bufferSize.
32-
// todo: Consider merging both indices into a single Long.
33-
// todo: Alternatively, prove that sporadic arbitrary result here is Ok (does not seems the case now)
30+
/*
31+
* We read two independent counter here.
32+
* Producer index is incremented only by owner
33+
* Consumer index is incremented both by owner and external threads
34+
*
35+
* The only harmful race is:
36+
* [T1] readProducerIndex (1) preemption(2) readConsumerIndex(5)
37+
* [T2] changeProducerIndex (3)
38+
* [T3] changeConsumerIndex (4)
39+
*
40+
* Which can lead to resulting size bigger than actual size at any moment of time.
41+
* This is in general harmless because steal will be blocked by timer
42+
*/
3443
internal val bufferSize: Int get() = producerIndex.value - consumerIndex.value
3544

3645
// TODO replace with inlined array when atomicfu will support it

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineDispatcherTest.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package kotlinx.coroutines.experimental.scheduling
22

33
import kotlinx.coroutines.experimental.*
44
import org.junit.*
5-
import org.junit.Ignore
65
import org.junit.Test
76
import java.util.concurrent.atomic.*
87
import kotlin.coroutines.experimental.*
@@ -143,8 +142,7 @@ class CoroutineDispatcherTest : SchedulerTestBase() {
143142
checkPoolThreadsCreated(4)
144143
}
145144

146-
@Test(timeout = 1_000) // Failing test until yield() is not fixed
147-
@Ignore
145+
@Test(timeout = 1_000)
148146
fun testYield() = runBlocking {
149147
corePoolSize = 1
150148
maxPoolSize = 1

0 commit comments

Comments
 (0)