Skip to content

Commit a047a11

Browse files
committed
EventLoop scheduled tasks impl is rewritten
1 parent 7400ff0 commit a047a11

File tree

3 files changed

+275
-37
lines changed

3 files changed

+275
-37
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt

Lines changed: 61 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package kotlinx.coroutines.experimental
1818

1919
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
2020
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
21-
import java.util.concurrent.ConcurrentSkipListMap
21+
import kotlinx.coroutines.experimental.internal.ThreadSafeHeap
22+
import kotlinx.coroutines.experimental.internal.ThreadSafeHeapNode
23+
import java.util.concurrent.Future
2224
import java.util.concurrent.TimeUnit
2325
import java.util.concurrent.atomic.AtomicLong
2426
import java.util.concurrent.locks.LockSupport
@@ -67,7 +69,7 @@ internal class EventLoopImpl(
6769
private val thread: Thread
6870
) : CoroutineDispatcher(), EventLoop, Delay {
6971
private val queue = LockFreeLinkedListHead()
70-
private val delayed = ConcurrentSkipListMap<DelayedTask, DelayedTask>()
72+
private val delayed = ThreadSafeHeap<DelayedTask>()
7173
private val nextSequence = AtomicLong()
7274
private var parentJob: Job? = null
7375

@@ -78,9 +80,10 @@ internal class EventLoopImpl(
7880

7981
override fun dispatch(context: CoroutineContext, block: Runnable) {
8082
if (scheduleQueued(QueuedRunnableTask(block))) {
83+
// todo: we should unpark only when this task became first in the queue
8184
unpark()
8285
} else {
83-
block.run()
86+
block.run() // otherwise run it right here (as if Unconfined)
8487
}
8588
}
8689

@@ -89,42 +92,56 @@ internal class EventLoopImpl(
8992
// todo: we should unpark only when this delayed task became first in the queue
9093
unpark()
9194
} else {
92-
scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit)
95+
scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit) // otherwise reschedule to other time pool
9396
}
9497
}
9598

96-
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
97-
DelayedRunnableTask(time, unit, block).also { scheduleDelayed(it) }
99+
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
100+
val delayedTask = DelayedRunnableTask(time, unit, block)
101+
if (scheduleDelayed(delayedTask)) {
102+
// todo: we should unpark only when this delayed task became first in the queue
103+
unpark()
104+
return delayedTask
105+
}
106+
return DisposableFutureHandle(scheduledExecutor.schedule(block, time, unit))
107+
}
98108

99109
override fun processNextEvent(): Long {
100110
if (Thread.currentThread() !== thread) return Long.MAX_VALUE
101111
// queue all delayed tasks that are due to be executed
102-
while (true) {
103-
val delayedTask = delayed.firstEntry()?.key ?: break
112+
if (!delayed.isEmpty) {
104113
val now = System.nanoTime()
105-
if (delayedTask.nanoTime - now > 0) break
106-
if (!scheduleQueued(delayedTask)) break
107-
delayed.remove(delayedTask)
114+
while (true) {
115+
val delayedTask = delayed.removeFirstIf { it.timeToExecute(now) } ?: break
116+
queue.addLast(delayedTask)
117+
}
108118
}
109119
// then process one event from queue
110120
(queue.removeFirstOrNull() as? QueuedTask)?.let { queuedTask ->
111-
queuedTask()
121+
queuedTask.run()
112122
}
113123
if (!queue.isEmpty) return 0
114-
val nextDelayedTask = delayed.firstEntry()?.key ?: return Long.MAX_VALUE
115-
return nextDelayedTask.nanoTime - System.nanoTime()
124+
val nextDelayedTask = delayed.peek() ?: return Long.MAX_VALUE
125+
return (nextDelayedTask.nanoTime - System.nanoTime()).coerceAtLeast(0)
116126
}
117127

128+
private val isActive: Boolean get() = parentJob?.isCompleted != true
129+
118130
fun shutdown() {
131+
assert(!isActive)
119132
// complete processing of all queued tasks
120133
while (true) {
121134
val queuedTask = (queue.removeFirstOrNull() ?: break) as QueuedTask
122-
queuedTask()
135+
queuedTask.run()
123136
}
124-
// cancel all delayed tasks
137+
// reschedule or execute delayed tasks
125138
while (true) {
126-
val delayedTask = delayed.pollFirstEntry()?.key ?: break
127-
delayedTask.cancel()
139+
val delayedTask = delayed.removeFirst() ?: break
140+
val now = System.nanoTime()
141+
if (delayedTask.timeToExecute(now))
142+
delayedTask.run()
143+
else
144+
delayedTask.rescheduleOnShutdown(now)
128145
}
129146
}
130147

@@ -133,35 +150,38 @@ internal class EventLoopImpl(
133150
queue.addLast(queuedTask)
134151
return true
135152
}
136-
return queue.addLastIf(queuedTask, { !parentJob!!.isCompleted })
153+
return queue.addLastIf(queuedTask) { isActive }
137154
}
138155

139156
private fun scheduleDelayed(delayedTask: DelayedTask): Boolean {
140-
delayed.put(delayedTask, delayedTask)
141-
if (parentJob?.isActive != false) return true
142-
delayedTask.dispose()
143-
return false
157+
if (parentJob == null) {
158+
delayed.addLast(delayedTask)
159+
return true
160+
}
161+
return delayed.addLastIf(delayedTask) { isActive }
144162
}
145163

146164
private fun unpark() {
147165
if (Thread.currentThread() !== thread)
148166
LockSupport.unpark(thread)
149167
}
150168

151-
private abstract class QueuedTask : LockFreeLinkedListNode(), () -> Unit
169+
private abstract class QueuedTask : LockFreeLinkedListNode(), Runnable
152170

153171
private class QueuedRunnableTask(
154172
private val block: Runnable
155173
) : QueuedTask() {
156-
override fun invoke() { block.run() }
174+
override fun run() { block.run() }
157175
override fun toString(): String = block.toString()
158176
}
159177

160178
private abstract inner class DelayedTask(
161179
time: Long, timeUnit: TimeUnit
162-
) : QueuedTask(), Comparable<DelayedTask>, DisposableHandle {
180+
) : QueuedTask(), Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
181+
override var index: Int = -1
163182
@JvmField val nanoTime: Long = System.nanoTime() + timeUnit.toNanos(time)
164183
@JvmField val sequence: Long = nextSequence.getAndIncrement()
184+
private var scheduledAfterShutdown: Future<*>? = null
165185

166186
override fun compareTo(other: DelayedTask): Int {
167187
val dTime = nanoTime - other.nanoTime
@@ -171,12 +191,21 @@ internal class EventLoopImpl(
171191
return if (dSequence > 0) 1 else if (dSequence < 0) -1 else 0
172192
}
173193

174-
override final fun dispose() {
175-
delayed.remove(this)
176-
cancel()
194+
fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
195+
196+
fun rescheduleOnShutdown(now: Long) = synchronized(delayed) {
197+
if (delayed.remove(this)) {
198+
assert (scheduledAfterShutdown == null)
199+
scheduledAfterShutdown = scheduledExecutor.schedule(this, nanoTime - now, TimeUnit.NANOSECONDS)
200+
}
177201
}
178202

179-
open fun cancel() {}
203+
override final fun dispose() = synchronized(delayed) {
204+
if (!delayed.remove(this)) {
205+
scheduledAfterShutdown?.cancel(false)
206+
scheduledAfterShutdown = null
207+
}
208+
}
180209

181210
override fun toString(): String = "Delayed[nanos=$nanoTime,seq=$sequence]"
182211
}
@@ -185,21 +214,16 @@ internal class EventLoopImpl(
185214
time: Long, timeUnit: TimeUnit,
186215
private val cont: CancellableContinuation<Unit>
187216
) : DelayedTask(time, timeUnit) {
188-
override fun invoke() {
217+
override fun run() {
189218
with(cont) { resumeUndispatched(Unit) }
190219
}
191-
override fun cancel() {
192-
if (!cont.isActive) return
193-
val remaining = nanoTime - System.nanoTime()
194-
scheduledExecutor.schedule(ResumeRunnable(cont), remaining, TimeUnit.NANOSECONDS)
195-
}
196220
}
197221

198222
private inner class DelayedRunnableTask(
199223
time: Long, timeUnit: TimeUnit,
200224
private val block: Runnable
201225
) : DelayedTask(time, timeUnit) {
202-
override fun invoke() { block.run() }
226+
override fun run() { block.run() }
203227
override fun toString(): String = super.toString() + block.toString()
204228
}
205229
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.internal
18+
19+
/**
20+
* @suppress **This is unstable API and it is subject to change.**
21+
*/
22+
public interface ThreadSafeHeapNode {
23+
public var index: Int
24+
}
25+
26+
/**
27+
* Synchronized binary heap.
28+
*
29+
* @suppress **This is unstable API and it is subject to change.**
30+
*/
31+
public class ThreadSafeHeap<T> where T: ThreadSafeHeapNode, T: Comparable<T> {
32+
private var a: Array<T?>? = null
33+
34+
@JvmField @PublishedApi @Volatile
35+
internal var size = 0
36+
37+
public val isEmpty: Boolean get() = size == 0
38+
39+
public fun peek(): T? = synchronized(this) { firstImpl() }
40+
41+
public fun removeFirst(): T? = synchronized(this) {
42+
if (size > 0) {
43+
removeAtImpl(0)
44+
} else
45+
null
46+
}
47+
48+
public inline fun removeFirstIf(predicate: (T) -> Boolean): T? = synchronized(this) {
49+
val first = firstImpl() ?: return@synchronized null
50+
if (predicate(first)) {
51+
removeAtImpl(0)
52+
} else
53+
null
54+
}
55+
56+
public fun addLast(node: T) = synchronized(this) {
57+
addImpl(node)
58+
}
59+
60+
public fun addLastIf(node: T, cond: () -> Boolean): Boolean = synchronized(this) {
61+
if (cond()) {
62+
addImpl(node)
63+
true
64+
} else
65+
false
66+
}
67+
68+
public fun remove(node: T): Boolean = synchronized(this) {
69+
if (node.index < 0) {
70+
false
71+
} else {
72+
removeAtImpl(node.index)
73+
true
74+
}
75+
}
76+
77+
@PublishedApi
78+
internal fun firstImpl(): T? = a?.get(0)
79+
80+
@PublishedApi
81+
internal fun removeAtImpl(index: Int): T {
82+
check(size > 0)
83+
val a = this.a!!
84+
size--
85+
if (index < size) {
86+
swap(index, size)
87+
var i = index
88+
while (true) {
89+
var j = 2 * i + 1
90+
if (j >= size) break
91+
if (j + 1 < size && a[j + 1]!! < a[j]!!) j++
92+
if (a[i]!! <= a[j]!!) break
93+
swap(i, j)
94+
i = j
95+
}
96+
}
97+
val result = a[size]!!
98+
result.index = -1
99+
a[size] = null
100+
return result
101+
}
102+
103+
@PublishedApi
104+
internal fun addImpl(node: T) {
105+
val a = realloc()
106+
var i = size++
107+
a[i] = node
108+
node.index = i
109+
while (i > 0) {
110+
val j = (i - 1) / 2
111+
if (a[j]!! <= a[i]!!) break
112+
swap(i, j)
113+
i = j
114+
}
115+
}
116+
117+
118+
@Suppress("UNCHECKED_CAST")
119+
private fun realloc(): Array<T?> {
120+
val a = this.a
121+
return when {
122+
a == null -> (arrayOfNulls<ThreadSafeHeapNode>(4) as Array<T?>).also { this.a = it }
123+
size >= a.size -> a.copyOf(size * 2).also { this.a = it }
124+
else -> a
125+
}
126+
}
127+
128+
private fun swap(i: Int, j: Int) {
129+
val a = a!!
130+
val ni = a[j]!!
131+
val nj = a[i]!!
132+
a[i] = ni
133+
a[j] = nj
134+
ni.index = i
135+
nj.index = j
136+
}
137+
}

0 commit comments

Comments
 (0)