Skip to content

Commit e90cdb0

Browse files
qwwdfsadelizarov
authored andcommitted
Execute EventLoop#invokeOnTimeout in DefaultDispatcher to allow busy-wait loops
Fixes #479
1 parent aca2a2d commit e90cdb0

File tree

3 files changed

+40
-15
lines changed

3 files changed

+40
-15
lines changed

core/kotlinx-coroutines-core/src/DefaultExecutor.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,17 @@ internal object DefaultExecutor : EventLoopBase(), Runnable {
3838
return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
3939
}
4040

41+
/**
42+
* All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
43+
* ```
44+
* runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } }
45+
* ```
46+
*
47+
* Livelock is possible only if runBlocking is called on [DefaultDispatcher], but it's not exposed as public API
48+
*/
49+
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
50+
DelayedRunnableTask(time, unit, block).also { schedule(it) }
51+
4152
override fun run() {
4253
timeSource.registerTimeLoopThread()
4354
try {

core/kotlinx-coroutines-core/src/EventLoop.kt

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import kotlinx.coroutines.experimental.internal.*
99
import kotlinx.coroutines.experimental.timeunit.*
1010
import java.util.concurrent.locks.*
1111
import kotlin.coroutines.experimental.*
12-
import kotlin.jvm.*
1312

1413
/**
1514
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
@@ -111,9 +110,6 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
111110
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) =
112111
schedule(DelayedResumeTask(time, unit, continuation))
113112

114-
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
115-
DelayedRunnableTask(time, unit, block).also { schedule(it) }
116-
117113
override fun processNextEvent(): Long {
118114
if (!isCorrectThread()) return Long.MAX_VALUE
119115
// queue all delayed tasks that are due to be executed
@@ -141,8 +137,9 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
141137
if (enqueueImpl(task)) {
142138
// todo: we should unpark only when this delayed task became first in the queue
143139
unpark()
144-
} else
140+
} else {
145141
DefaultExecutor.execute(task)
142+
}
146143
}
147144

148145
@Suppress("UNCHECKED_CAST")
@@ -266,23 +263,26 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
266263

267264
fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
268265

269-
fun rescheduleOnShutdown() = synchronized(this) {
266+
@Synchronized
267+
fun rescheduleOnShutdown() {
270268
if (state != DELAYED) return
271269
if (_delayed.value!!.remove(this)) {
272270
state = RESCHEDULED
273271
DefaultExecutor.schedule(this)
274-
} else
272+
} else {
275273
state = REMOVED
274+
}
276275
}
277276

278-
final override fun dispose() = synchronized(this) {
279-
when (state) {
280-
DELAYED -> _delayed.value?.remove(this)
281-
RESCHEDULED -> DefaultExecutor.removeDelayedImpl(this)
282-
else -> return
283-
}
284-
state = REMOVED
277+
@Synchronized
278+
final override fun dispose() {
279+
when (state) {
280+
DELAYED -> _delayed.value?.remove(this)
281+
RESCHEDULED -> DefaultExecutor.removeDelayedImpl(this)
282+
else -> return
285283
}
284+
state = REMOVED
285+
}
286286

287287
override fun toString(): String = "Delayed[nanos=$nanoTime]"
288288
}
@@ -302,7 +302,8 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
302302
}
303303
}
304304

305-
private inner class DelayedRunnableTask(
305+
// Cannot be moved to DefaultExecutor due to BE bug
306+
internal inner class DelayedRunnableTask(
306307
time: Long, timeUnit: TimeUnit,
307308
private val block: Runnable
308309
) : DelayedTask(time, timeUnit) {

core/kotlinx-coroutines-core/test/RunBlockingTest.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,19 @@ import kotlin.coroutines.experimental.*
88
import kotlin.test.*
99

1010
class RunBlockingTest : TestBase() {
11+
12+
@Test
13+
fun testWithTimeoutBusyWait() = runBlocking {
14+
val value = withTimeoutOrNull(10) {
15+
while (isActive) {
16+
// Busy wait
17+
}
18+
"value"
19+
}
20+
21+
assertEquals("value", value)
22+
}
23+
1124
@Test
1225
fun testPrivateEventLoop() {
1326
expect(1)

0 commit comments

Comments
 (0)