Skip to content

Commit 4518e05

Browse files
committed
EventLoop implements Delay
* Predictable single-threaded scheduling in runBlocking * withTimeout used from runBlocking will release memory (important for tests)
1 parent 72e0195 commit 4518e05

File tree

3 files changed

+181
-43
lines changed

3 files changed

+181
-43
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,11 @@ public suspend fun <T> run(context: CoroutineContext, block: suspend CoroutineSc
8181
@Throws(InterruptedException::class)
8282
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
8383
val currentThread = Thread.currentThread()
84-
val privateEventLoop = if (context[ContinuationInterceptor] == null) EventLoopImpl(currentThread) else null
85-
val newContext = newCoroutineContext(context + (privateEventLoop ?: EmptyCoroutineContext))
86-
val coroutine = BlockingCoroutine<T>(newContext, currentThread, privateEventLoop != null)
84+
val eventLoop = if (context[ContinuationInterceptor] == null) EventLoopImpl(currentThread) else null
85+
val newContext = newCoroutineContext(context + (eventLoop ?: EmptyCoroutineContext))
86+
val coroutine = BlockingCoroutine<T>(newContext, currentThread, privateEventLoop = eventLoop != null)
8787
coroutine.initParentJob(context[Job])
88-
privateEventLoop?.initParentJob(coroutine)
88+
eventLoop?.initParentJob(coroutine)
8989
block.startCoroutine(coroutine, coroutine)
9090
return coroutine.joinBlocking()
9191
}
@@ -104,7 +104,7 @@ private open class StandaloneCoroutine(
104104

105105
private class LazyStandaloneCoroutine(
106106
parentContext: CoroutineContext,
107-
val block: suspend CoroutineScope.() -> Unit
107+
private val block: suspend CoroutineScope.() -> Unit
108108
) : StandaloneCoroutine(parentContext, active = false) {
109109
override fun onStart() {
110110
block.startCoroutine(this, this)
@@ -120,27 +120,31 @@ private class InnerCoroutine<in T>(
120120

121121
private class BlockingCoroutine<T>(
122122
override val parentContext: CoroutineContext,
123-
val blockedThread: Thread,
124-
val hasPrivateEventLoop: Boolean
123+
private val blockedThread: Thread,
124+
private val privateEventLoop: Boolean
125125
) : AbstractCoroutine<T>(active = true) {
126126
val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop
127127

128+
init {
129+
if (privateEventLoop) require(eventLoop is EventLoopImpl)
130+
}
131+
128132
override fun afterCompletion(state: Any?, mode: Int) {
129133
if (Thread.currentThread() != blockedThread)
130134
LockSupport.unpark(blockedThread)
131135
}
132136

133137
@Suppress("UNCHECKED_CAST")
134138
fun joinBlocking(): T {
135-
while (isActive) {
139+
while (true) {
136140
if (Thread.interrupted()) throw InterruptedException().also { cancel(it) }
137-
if (eventLoop == null || !eventLoop.processNextEvent())
138-
LockSupport.park(this)
139-
}
140-
// process remaining events (that could have been added after last processNextEvent and before cancel
141-
if (hasPrivateEventLoop) {
142-
while (eventLoop!!.processNextEvent()) { /* just spin */ }
141+
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
142+
// note: process next even may look unpark flag, so check !isActive before parking
143+
if (!isActive) break
144+
LockSupport.parkNanos(this, parkNanos)
143145
}
146+
// process queued events (that could have been added after last processNextEvent and before cancel
147+
if (privateEventLoop) (eventLoop as EventLoopImpl).shutdown()
144148
// now return result
145149
val state = this.state
146150
(state as? CompletedExceptionally)?.let { throw it.exception }

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt

Lines changed: 130 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,29 @@ package kotlinx.coroutines.experimental
1818

1919
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
2020
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
21+
import java.util.concurrent.ConcurrentSkipListMap
22+
import java.util.concurrent.TimeUnit
23+
import java.util.concurrent.atomic.AtomicLong
2124
import java.util.concurrent.locks.LockSupport
2225
import kotlin.coroutines.experimental.CoroutineContext
2326

2427
/**
2528
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
26-
* be asked to process next event from their event queue. It is used by [runBlocking] to
29+
* be asked to process next event from their event queue.
30+
*
31+
* It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to
2732
* continue processing events when invoked from the event dispatch thread.
2833
*/
2934
public interface EventLoop {
3035
/**
31-
* Processes next event in this event loop and returns `true` or returns `false` if there are
32-
* no events to process or when invoked from the wrong thread.
36+
* Processes next event in this event loop.
37+
*
38+
* The result of this function is to be interpreted like this:
39+
* * `<= 0` -- there are potentially more events for immediate processing;
40+
* * `> 0` -- a number of nanoseconds to wait for next scheduled event;
41+
* * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread.
3342
*/
34-
public fun processNextEvent(): Boolean
43+
public fun processNextEvent(): Long
3544

3645
public companion object Factory {
3746
/**
@@ -43,7 +52,7 @@ public interface EventLoop {
4352
* ```
4453
* while (needsToBeRunning) {
4554
* if (Thread.interrupted()) break // or handle somehow
46-
* if (!eventLoop.processNextEvent()) LockSupport.park() // event loop will unpark
55+
* LockSupport.parkNanos(eventLoop.processNextEvent()) // event loop will unpark
4756
* }
4857
* ```
4958
*/
@@ -55,48 +64,140 @@ public interface EventLoop {
5564
}
5665

5766
internal class EventLoopImpl(
58-
val thread: Thread
59-
) : CoroutineDispatcher(), EventLoop {
60-
val queue = LockFreeLinkedListHead()
61-
var parentJob: Job? = null
67+
private val thread: Thread
68+
) : CoroutineDispatcher(), EventLoop, Delay {
69+
private val queue = LockFreeLinkedListHead()
70+
private val delayed = ConcurrentSkipListMap<DelayedTask, DelayedTask>()
71+
private val nextSequence = AtomicLong()
72+
private var parentJob: Job? = null
6273

6374
fun initParentJob(coroutine: Job) {
6475
require(this.parentJob == null)
6576
this.parentJob = coroutine
6677
}
6778

6879
override fun dispatch(context: CoroutineContext, block: Runnable) {
69-
schedule(Dispatch(block))
80+
if (scheduleQueued(QueuedRunnableTask(block))) {
81+
unpark()
82+
} else {
83+
block.run()
84+
}
7085
}
7186

72-
fun schedule(node: Node): Boolean {
73-
val added = if (parentJob == null) {
74-
queue.addLast(node)
75-
true
76-
} else
77-
queue.addLastIf(node) { !parentJob!!.isCompleted }
78-
if (added) {
79-
if (Thread.currentThread() !== thread)
80-
LockSupport.unpark(thread)
87+
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
88+
if (scheduleDelayed(DelayedResumeTask(time, unit, continuation))) {
89+
// todo: we should unpark only when this delayed task became first in the queue
90+
unpark()
8191
} else {
82-
node.run()
92+
scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit)
8393
}
84-
return added
8594
}
8695

87-
override fun processNextEvent(): Boolean {
88-
if (Thread.currentThread() !== thread) return false
89-
(queue.removeFirstOrNull() as? Runnable)?.apply {
90-
run()
96+
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
97+
DelayedRunnableTask(time, unit, block).also { scheduleDelayed(it) }
98+
99+
override fun processNextEvent(): Long {
100+
if (Thread.currentThread() !== thread) return Long.MAX_VALUE
101+
// queue all delayed tasks that are due to be executed
102+
while (true) {
103+
val delayedTask = delayed.firstEntry()?.key ?: break
104+
val now = System.nanoTime()
105+
if (delayedTask.nanoTime - now > 0) break
106+
if (!scheduleQueued(delayedTask)) break
107+
delayed.remove(delayedTask)
108+
}
109+
// then process one event from queue
110+
(queue.removeFirstOrNull() as? QueuedTask)?.let { queuedTask ->
111+
queuedTask()
112+
}
113+
if (!queue.isEmpty) return 0
114+
val nextDelayedTask = delayed.firstEntry()?.key ?: return Long.MAX_VALUE
115+
return nextDelayedTask.nanoTime - System.nanoTime()
116+
}
117+
118+
fun shutdown() {
119+
// complete processing of all queued tasks
120+
while (true) {
121+
val queuedTask = (queue.removeFirstOrNull() ?: break) as QueuedTask
122+
queuedTask()
123+
}
124+
// cancel all delayed tasks
125+
while (true) {
126+
val delayedTask = delayed.pollFirstEntry()?.key ?: break
127+
delayedTask.cancel()
128+
}
129+
}
130+
131+
override fun toString(): String = "EventLoopImpl@${Integer.toHexString(System.identityHashCode(this))}"
132+
133+
private fun scheduleQueued(queuedTask: QueuedTask): Boolean {
134+
if (parentJob == null) {
135+
queue.addLast(queuedTask)
91136
return true
92137
}
138+
return queue.addLastIf(queuedTask, { !parentJob!!.isCompleted })
139+
}
140+
141+
private fun scheduleDelayed(delayedTask: DelayedTask): Boolean {
142+
delayed.put(delayedTask, delayedTask)
143+
if (parentJob?.isActive != false) return true
144+
delayedTask.dispose()
93145
return false
94146
}
95147

96-
abstract class Node : LockFreeLinkedListNode(), Runnable
148+
private fun unpark() {
149+
if (Thread.currentThread() !== thread)
150+
LockSupport.unpark(thread)
151+
}
97152

98-
class Dispatch(block: Runnable) : Node(), Runnable by block
153+
private abstract class QueuedTask : LockFreeLinkedListNode(), () -> Unit
99154

100-
override fun toString(): String = "EventLoopImpl@${Integer.toHexString(System.identityHashCode(this))}"
101-
}
155+
private class QueuedRunnableTask(
156+
private val block: Runnable
157+
) : QueuedTask() {
158+
override fun invoke() { block.run() }
159+
}
160+
161+
private abstract inner class DelayedTask(
162+
time: Long, timeUnit: TimeUnit
163+
) : QueuedTask(), Comparable<DelayedTask>, DisposableHandle {
164+
@JvmField val nanoTime: Long = System.nanoTime() + timeUnit.toNanos(time)
165+
@JvmField val sequence: Long = nextSequence.getAndIncrement()
166+
167+
override fun compareTo(other: DelayedTask): Int {
168+
val dTime = nanoTime - other.nanoTime
169+
if (dTime > 0) return 1
170+
if (dTime < 0) return -1
171+
val dSequence = sequence - other.sequence
172+
return if (dSequence > 0) 1 else if (dSequence < 0) -1 else 0
173+
}
174+
175+
override final fun dispose() {
176+
delayed.remove(this)
177+
cancel()
178+
}
102179

180+
open fun cancel() {}
181+
}
182+
183+
private inner class DelayedResumeTask(
184+
time: Long, timeUnit: TimeUnit,
185+
private val cont: CancellableContinuation<Unit>
186+
) : DelayedTask(time, timeUnit) {
187+
override fun invoke() {
188+
with(cont) { resumeUndispatched(Unit) }
189+
}
190+
override fun cancel() {
191+
if (!cont.isActive) return
192+
val remaining = nanoTime - System.nanoTime()
193+
scheduledExecutor.schedule(ResumeRunnable(cont), remaining, TimeUnit.NANOSECONDS)
194+
}
195+
}
196+
197+
private inner class DelayedRunnableTask(
198+
time: Long, timeUnit: TimeUnit,
199+
private val block: Runnable
200+
) : DelayedTask(time, timeUnit) {
201+
override fun invoke() { block.run() }
202+
}
203+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import org.junit.Test
20+
21+
class RunBlockingTest {
22+
/**
23+
* Tests that a 100% CPU-consuming loop will react on timeout if it has yields.
24+
*/
25+
@Test(expected = CancellationException::class)
26+
fun testYieldBlockingWithTimeout() = runBlocking {
27+
withTimeout(100) {
28+
while (true) {
29+
yield()
30+
}
31+
}
32+
}
33+
}

0 commit comments

Comments
 (0)