Skip to content

Commit 98a9705

Browse files
elizarovqwwdfsad
authored andcommitted
Move event loop infrastructure to common code
This makes code of EventLoop (including its task queue impl) shared between JVM and Native making support and further fixes easier. It it not actually used in JS and DCE should remove it.
1 parent cb7f37b commit 98a9705

18 files changed

+581
-1141
lines changed

kotlinx-coroutines-core/common/src/Annotations.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,5 @@ public annotation class ObsoleteCoroutinesApi
5252
*/
5353
@Retention(value = AnnotationRetention.BINARY)
5454
@Experimental(level = Experimental.Level.ERROR)
55+
@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION, AnnotationTarget.TYPEALIAS, AnnotationTarget.PROPERTY)
5556
public annotation class InternalCoroutinesApi

kotlinx-coroutines-core/common/src/EventLoop.common.kt

Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.atomicfu.*
78
import kotlinx.coroutines.internal.*
9+
import kotlin.coroutines.*
10+
import kotlin.jvm.*
811

912
/**
1013
* Extended by [CoroutineDispatcher] implementations that have event loop inside and can
@@ -133,5 +136,309 @@ internal object ThreadLocalEventLoop {
133136
}
134137
}
135138

139+
@SharedImmutable
140+
private val DISPOSED_TASK = Symbol("REMOVED_TASK")
141+
142+
// results for scheduleImpl
143+
private const val SCHEDULE_OK = 0
144+
private const val SCHEDULE_COMPLETED = 1
145+
private const val SCHEDULE_DISPOSED = 2
146+
147+
private const val MS_TO_NS = 1_000_000L
148+
private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
149+
150+
internal fun delayToNanos(timeMillis: Long): Long = when {
151+
timeMillis <= 0 -> 0L
152+
timeMillis >= MAX_MS -> Long.MAX_VALUE
153+
else -> timeMillis * MS_TO_NS
154+
}
155+
156+
internal fun delayNanosToMillis(timeNanos: Long): Long =
157+
timeNanos / MS_TO_NS
158+
159+
@SharedImmutable
160+
private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
161+
162+
private typealias Queue<T> = LockFreeTaskQueueCore<T>
163+
164+
internal expect abstract class EventLoopImplPlatform() : EventLoop {
165+
protected fun unpark()
166+
}
167+
168+
internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
169+
// null | CLOSED_EMPTY | task | Queue<Runnable>
170+
private val _queue = atomic<Any?>(null)
171+
172+
// Allocated only only once
173+
private val _delayed = atomic<ThreadSafeHeap<DelayedTask>?>(null)
174+
175+
@Volatile
176+
private var isCompleted = false
177+
178+
override val isEmpty: Boolean get() {
179+
if (!isUnconfinedQueueEmpty) return false
180+
val delayed = _delayed.value
181+
if (delayed != null && !delayed.isEmpty) return false
182+
val queue = _queue.value
183+
return when (queue) {
184+
null -> true
185+
is Queue<*> -> queue.isEmpty
186+
else -> queue === CLOSED_EMPTY
187+
}
188+
}
189+
190+
protected override val nextTime: Long
191+
get() {
192+
if (super.nextTime == 0L) return 0L
193+
val queue = _queue.value
194+
when {
195+
queue === null -> {} // empty queue -- proceed
196+
queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
197+
queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
198+
else -> return 0 // non-empty queue
199+
}
200+
val delayed = _delayed.value ?: return Long.MAX_VALUE
201+
val nextDelayedTask = delayed.peek() ?: return Long.MAX_VALUE
202+
return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
203+
}
204+
205+
override fun shutdown() {
206+
// Clean up thread-local reference here -- this event loop is shutting down
207+
ThreadLocalEventLoop.resetEventLoop()
208+
// We should signal that this event loop should not accept any more tasks
209+
// and process queued events (that could have been added after last processNextEvent)
210+
isCompleted = true
211+
closeQueue()
212+
// complete processing of all queued tasks
213+
while (processNextEvent() <= 0) { /* spin */ }
214+
// reschedule the rest of delayed tasks
215+
rescheduleAllDelayed()
216+
}
217+
218+
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) =
219+
schedule(DelayedResumeTask(timeMillis, continuation))
220+
221+
override fun processNextEvent(): Long {
222+
// unconfined events take priority
223+
if (processUnconfinedEvent()) return nextTime
224+
// queue all delayed tasks that are due to be executed
225+
val delayed = _delayed.value
226+
if (delayed != null && !delayed.isEmpty) {
227+
val now = nanoTime()
228+
while (true) {
229+
// make sure that moving from delayed to queue removes from delayed only after it is added to queue
230+
// to make sure that 'isEmpty' and `nextTime` that check both of them
231+
// do not transiently report that both delayed and queue are empty during move
232+
delayed.removeFirstIf {
233+
if (it.timeToExecute(now)) {
234+
enqueueImpl(it)
235+
} else
236+
false
237+
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
238+
}
239+
}
240+
// then process one event from queue
241+
dequeue()?.run()
242+
return nextTime
243+
}
244+
245+
public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
246+
247+
public fun enqueue(task: Runnable) {
248+
if (enqueueImpl(task)) {
249+
// todo: we should unpark only when this delayed task became first in the queue
250+
unpark()
251+
} else {
252+
DefaultExecutor.enqueue(task)
253+
}
254+
}
255+
256+
@Suppress("UNCHECKED_CAST")
257+
private fun enqueueImpl(task: Runnable): Boolean {
258+
_queue.loop { queue ->
259+
if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
260+
when (queue) {
261+
null -> if (_queue.compareAndSet(null, task)) return true
262+
is Queue<*> -> {
263+
when ((queue as Queue<Runnable>).addLast(task)) {
264+
Queue.ADD_SUCCESS -> return true
265+
Queue.ADD_CLOSED -> return false
266+
Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
267+
}
268+
}
269+
else -> when {
270+
queue === CLOSED_EMPTY -> return false
271+
else -> {
272+
// update to full-blown queue to add one more
273+
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
274+
newQueue.addLast(queue as Runnable)
275+
newQueue.addLast(task)
276+
if (_queue.compareAndSet(queue, newQueue)) return true
277+
}
278+
}
279+
}
280+
}
281+
}
282+
283+
@Suppress("UNCHECKED_CAST")
284+
private fun dequeue(): Runnable? {
285+
_queue.loop { queue ->
286+
when (queue) {
287+
null -> return null
288+
is Queue<*> -> {
289+
val result = (queue as Queue<Runnable>).removeFirstOrNull()
290+
if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
291+
_queue.compareAndSet(queue, queue.next())
292+
}
293+
else -> when {
294+
queue === CLOSED_EMPTY -> return null
295+
else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
296+
}
297+
}
298+
}
299+
}
300+
301+
private fun closeQueue() {
302+
assert { isCompleted }
303+
_queue.loop { queue ->
304+
when (queue) {
305+
null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
306+
is Queue<*> -> {
307+
queue.close()
308+
return
309+
}
310+
else -> when {
311+
queue === CLOSED_EMPTY -> return
312+
else -> {
313+
// update to full-blown queue to close
314+
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
315+
newQueue.addLast(queue as Runnable)
316+
if (_queue.compareAndSet(queue, newQueue)) return
317+
}
318+
}
319+
}
320+
}
321+
322+
}
323+
324+
public fun schedule(delayedTask: DelayedTask) {
325+
when (scheduleImpl(delayedTask)) {
326+
SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
327+
SCHEDULE_COMPLETED -> DefaultExecutor.schedule(delayedTask)
328+
SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
329+
else -> error("unexpected result")
330+
}
331+
}
332+
333+
private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
334+
335+
private fun scheduleImpl(delayedTask: DelayedTask): Int {
336+
if (isCompleted) return SCHEDULE_COMPLETED
337+
val delayed = _delayed.value ?: run {
338+
_delayed.compareAndSet(null, ThreadSafeHeap())
339+
_delayed.value!!
340+
}
341+
return delayedTask.schedule(delayed, this)
342+
}
343+
344+
// It performs "hard" shutdown for test cleanup purposes
345+
protected fun resetAll() {
346+
_queue.value = null
347+
_delayed.value = null
348+
}
349+
350+
// This is a "soft" (normal) shutdown
351+
private fun rescheduleAllDelayed() {
352+
while (true) {
353+
/*
354+
* `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
355+
* synchronized on DelayedTask itself. All other operation are synchronized both on
356+
* DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
357+
* first removes DelayedTask from the heap (under synchronization) then
358+
* assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
359+
*/
360+
val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
361+
delayedTask.rescheduleOnShutdown()
362+
}
363+
}
364+
365+
internal abstract class DelayedTask(
366+
timeMillis: Long
367+
) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
368+
private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
369+
370+
override var heap: ThreadSafeHeap<*>?
371+
get() = _heap as? ThreadSafeHeap<*>
372+
set(value) {
373+
require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
374+
_heap = value
375+
}
376+
377+
override var index: Int = -1
378+
379+
@JvmField val nanoTime: Long = nanoTime() + delayToNanos(timeMillis)
380+
381+
override fun compareTo(other: DelayedTask): Int {
382+
val dTime = nanoTime - other.nanoTime
383+
return when {
384+
dTime > 0 -> 1
385+
dTime < 0 -> -1
386+
else -> 0
387+
}
388+
}
389+
390+
fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
391+
392+
@Synchronized
393+
fun schedule(delayed: ThreadSafeHeap<DelayedTask>, eventLoop: EventLoopImplBase): Int {
394+
if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
395+
return if (delayed.addLastIf(this) { !eventLoop.isCompleted }) SCHEDULE_OK else SCHEDULE_COMPLETED
396+
}
397+
398+
// note: DefaultExecutor.schedule performs `schedule` (above) which does sync & checks for DISPOSED_TASK
399+
fun rescheduleOnShutdown() = DefaultExecutor.schedule(this)
400+
401+
@Synchronized
402+
final override fun dispose() {
403+
val heap = _heap
404+
if (heap === DISPOSED_TASK) return // already disposed
405+
@Suppress("UNCHECKED_CAST")
406+
(heap as? ThreadSafeHeap<DelayedTask>)?.remove(this) // remove if it is in heap (first)
407+
_heap = DISPOSED_TASK // never add again to any heap
408+
}
409+
410+
override fun toString(): String = "Delayed[nanos=$nanoTime]"
411+
}
412+
413+
private inner class DelayedResumeTask(
414+
timeMillis: Long,
415+
private val cont: CancellableContinuation<Unit>
416+
) : DelayedTask(timeMillis) {
417+
init {
418+
// Note that this operation isn't lock-free, but very short
419+
cont.disposeOnCancellation(this)
420+
}
421+
422+
override fun run() {
423+
with(cont) { resumeUndispatched(Unit) }
424+
}
425+
}
426+
427+
internal class DelayedRunnableTask(
428+
time: Long,
429+
private val block: Runnable
430+
) : DelayedTask(time) {
431+
override fun run() { block.run() }
432+
override fun toString(): String = super.toString() + block.toString()
433+
}
434+
}
435+
136436
internal expect fun createEventLoop(): EventLoop
137437

438+
internal expect fun nanoTime(): Long
439+
440+
internal expect object DefaultExecutor {
441+
public fun enqueue(task: Runnable)
442+
public fun schedule(delayedTask: EventLoopImplBase.DelayedTask)
443+
}
444+

0 commit comments

Comments
 (0)