16
16
17
17
package kotlinx.coroutines.experimental
18
18
19
- import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
20
- import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
21
- import kotlinx.coroutines.experimental.internal.ThreadSafeHeap
22
- import kotlinx.coroutines.experimental.internal.ThreadSafeHeapNode
23
- import java.util.concurrent.TimeUnit
24
- import java.util.concurrent.locks.LockSupport
25
- import kotlin.coroutines.experimental.CoroutineContext
19
+ import kotlinx.atomicfu.*
20
+ import kotlinx.coroutines.experimental.internal.*
21
+ import java.util.concurrent.*
22
+ import java.util.concurrent.locks.*
23
+ import kotlin.coroutines.experimental.*
26
24
27
25
/* *
28
26
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
@@ -67,6 +65,7 @@ public interface EventLoop {
67
65
* }
68
66
* ```
69
67
*/
68
+ @Suppress(" FunctionName" )
70
69
public fun EventLoop (thread : Thread = Thread .currentThread(), parentJob : Job ? = null): CoroutineDispatcher =
71
70
EventLoopImpl (thread).apply {
72
71
if (parentJob != null ) initParentJob(parentJob)
@@ -76,30 +75,49 @@ private const val DELAYED = 0
76
75
private const val REMOVED = 1
77
76
private const val RESCHEDULED = 2
78
77
78
+ @Suppress(" PrivatePropertyName" )
79
+ private val CLOSED_EMPTY = Symbol (" CLOSED_EMPTY" )
80
+
81
+ private typealias Queue <T > = LockFreeMPSCQueueCore <T >
82
+
79
83
internal abstract class EventLoopBase : CoroutineDispatcher (), Delay, EventLoop {
80
- private val queue = LockFreeLinkedListHead ()
81
- private val delayed = ThreadSafeHeap <DelayedTask >()
84
+ // null | CLOSED_EMPTY | task | Queue<Runnable>
85
+ private val _queue = atomic<Any ?>(null )
86
+
87
+ // Allocated only only once
88
+ private val _delayed = atomic<ThreadSafeHeap <DelayedTask >? > (null )
82
89
83
- protected abstract val canComplete: Boolean
84
90
protected abstract val isCompleted: Boolean
85
91
protected abstract fun unpark ()
86
92
protected abstract fun isCorrectThread (): Boolean
87
93
88
94
protected val isEmpty: Boolean
89
- get() = queue.isEmpty && delayed.isEmpty
95
+ get() = isQueueEmpty && isDelayedEmpty
96
+
97
+ private val isQueueEmpty: Boolean get() {
98
+ val queue = _queue .value
99
+ return when (queue) {
100
+ null -> true
101
+ is Queue <* > -> queue.isEmpty
102
+ else -> queue == = CLOSED_EMPTY
103
+ }
104
+ }
105
+
106
+ private val isDelayedEmpty: Boolean get() {
107
+ val delayed = _delayed .value
108
+ return delayed == null || delayed.isEmpty
109
+ }
90
110
91
111
private val nextTime: Long
92
112
get() {
93
- if (! queue.isEmpty) return 0
113
+ if (! isQueueEmpty) return 0
114
+ val delayed = _delayed .value ? : return Long .MAX_VALUE
94
115
val nextDelayedTask = delayed.peek() ? : return Long .MAX_VALUE
95
116
return (nextDelayedTask.nanoTime - timeSource.nanoTime()).coerceAtLeast(0 )
96
117
}
97
118
98
- fun execute (block : Runnable ) =
99
- enqueue(block.toQueuedTask())
100
-
101
119
override fun dispatch (context : CoroutineContext , block : Runnable ) =
102
- enqueue (block.toQueuedTask() )
120
+ execute (block)
103
121
104
122
override fun scheduleResumeAfterDelay (time : Long , unit : TimeUnit , continuation : CancellableContinuation <Unit >) =
105
123
schedule(DelayedResumeTask (time, unit, continuation))
@@ -110,43 +128,101 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
110
128
override fun processNextEvent (): Long {
111
129
if (! isCorrectThread()) return Long .MAX_VALUE
112
130
// queue all delayed tasks that are due to be executed
113
- if (! delayed.isEmpty) {
131
+ val delayed = _delayed .value
132
+ if (delayed != null && ! delayed.isEmpty) {
114
133
val now = timeSource.nanoTime()
115
134
while (true ) {
116
135
// make sure that moving from delayed to queue removes from delayed only after it is added to queue
117
136
// to make sure that 'isEmpty' and `nextTime` that check both of them
118
137
// do not transiently report that both delayed and queue are empty during move
119
138
delayed.removeFirstIf {
120
139
if (it.timeToExecute(now)) {
121
- queue.addLast(it)
122
- true // proceed with remove
140
+ enqueueImpl(it)
123
141
} else
124
142
false
125
- } ? : break // quit loop when nothing more to remove
143
+ } ? : break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
126
144
}
127
145
}
128
146
// then process one event from queue
129
- (queue.removeFirstOrNull() as ? QueuedTask )?.run ()
147
+ dequeue( )?.run ()
130
148
return nextTime
131
149
}
132
150
133
- private fun Runnable.toQueuedTask (): QueuedTask =
134
- if (this is QueuedTask && isFresh) this else QueuedRunnableTask (this )
135
-
136
- internal fun enqueue (queuedTask : QueuedTask ) {
137
- if (enqueueImpl(queuedTask)) {
151
+ @Suppress(" MemberVisibilityCanBePrivate" ) // todo: remove suppress when KT-22030 is fixed
152
+ internal fun execute (task : Runnable ) {
153
+ if (enqueueImpl(task)) {
138
154
// todo: we should unpark only when this delayed task became first in the queue
139
155
unpark()
140
156
} else
141
- DefaultExecutor .enqueue(queuedTask)
157
+ DefaultExecutor .execute(task)
158
+ }
159
+
160
+ @Suppress(" UNCHECKED_CAST" )
161
+ private fun enqueueImpl (task : Runnable ): Boolean {
162
+ _queue .loop { queue ->
163
+ if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
164
+ when (queue) {
165
+ null -> if (_queue .compareAndSet(null , task)) return true
166
+ is Queue <* > -> {
167
+ when ((queue as Queue <Runnable >).addLast(task)) {
168
+ Queue .ADD_SUCCESS -> return true
169
+ Queue .ADD_CLOSED -> return false
170
+ Queue .ADD_FROZEN -> _queue .compareAndSet(queue, queue.next())
171
+ }
172
+ }
173
+ else -> when {
174
+ queue == = CLOSED_EMPTY -> return false
175
+ else -> {
176
+ // update to full-blown queue to add one more
177
+ val newQueue = Queue <Runnable >(Queue .INITIAL_CAPACITY )
178
+ newQueue.addLast(queue as Runnable )
179
+ newQueue.addLast(task)
180
+ if (_queue .compareAndSet(queue, newQueue)) return true
181
+ }
182
+ }
183
+ }
184
+ }
142
185
}
143
186
144
- private fun enqueueImpl (queuedTask : QueuedTask ): Boolean {
145
- if (! canComplete) {
146
- queue.addLast(queuedTask)
147
- return true
187
+ @Suppress(" UNCHECKED_CAST" )
188
+ private fun dequeue (): Runnable ? {
189
+ _queue .loop { queue ->
190
+ when (queue) {
191
+ null -> return null
192
+ is Queue <* > -> {
193
+ val result = (queue as Queue <Runnable >).removeFirstOrNull()
194
+ if (result != = Queue .REMOVE_FROZEN ) return result as Runnable ?
195
+ _queue .compareAndSet(queue, queue.next())
196
+ }
197
+ else -> when {
198
+ queue == = CLOSED_EMPTY -> return null
199
+ else -> if (_queue .compareAndSet(queue, null )) return queue as Runnable
200
+ }
201
+ }
148
202
}
149
- return queue.addLastIf(queuedTask) { ! isCompleted }
203
+ }
204
+
205
+ protected fun closeQueue () {
206
+ assert (isCompleted)
207
+ _queue .loop { queue ->
208
+ when (queue) {
209
+ null -> if (_queue .compareAndSet(null , CLOSED_EMPTY )) return
210
+ is Queue <* > -> {
211
+ queue.close()
212
+ return
213
+ }
214
+ else -> when {
215
+ queue == = CLOSED_EMPTY -> return
216
+ else -> {
217
+ // update to full-blown queue to close
218
+ val newQueue = Queue <Runnable >(Queue .INITIAL_CAPACITY )
219
+ newQueue.addLast(queue as Runnable )
220
+ if (_queue .compareAndSet(queue, newQueue)) return
221
+ }
222
+ }
223
+ }
224
+ }
225
+
150
226
}
151
227
152
228
internal fun schedule (delayedTask : DelayedTask ) {
@@ -158,43 +234,37 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
158
234
}
159
235
160
236
private fun scheduleImpl (delayedTask : DelayedTask ): Boolean {
161
- if (! canComplete) {
162
- delayed.addLast(delayedTask)
163
- return true
237
+ if (isCompleted) return false
238
+ val delayed = _delayed .value ? : run {
239
+ _delayed .compareAndSet(null , ThreadSafeHeap ())
240
+ _delayed .value!!
164
241
}
165
242
return delayed.addLastIf(delayedTask) { ! isCompleted }
166
243
}
167
244
168
245
internal fun removeDelayedImpl (delayedTask : DelayedTask ) {
169
- delayed .remove(delayedTask)
246
+ _delayed .value? .remove(delayedTask)
170
247
}
171
248
172
- protected fun clearAll () {
173
- while (true ) queue.removeFirstOrNull() ? : break
174
- while (true ) delayed.removeFirstOrNull() ? : break
249
+ // It performs "hard" shutdown for test cleanup purposes
250
+ protected fun resetAll () {
251
+ _queue .value = null
252
+ _delayed .value = null
175
253
}
176
254
255
+ // This is a "soft" (normal) shutdown
177
256
protected fun rescheduleAllDelayed () {
178
257
while (true ) {
179
- val delayedTask = delayed .removeFirstOrNull() ? : break
258
+ val delayedTask = _delayed .value? .removeFirstOrNull() ? : break
180
259
delayedTask.rescheduleOnShutdown()
181
260
}
182
261
}
183
262
184
- internal abstract class QueuedTask : LockFreeLinkedListNode (), Runnable
185
-
186
- private class QueuedRunnableTask (
187
- private val block : Runnable
188
- ) : QueuedTask() {
189
- override fun run () { block.run () }
190
- override fun toString (): String = block.toString()
191
- }
192
-
193
263
internal abstract inner class DelayedTask (
194
264
time : Long , timeUnit : TimeUnit
195
- ) : QueuedTask() , Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
265
+ ) : Runnable , Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
196
266
override var index: Int = - 1
197
- var state = DELAYED
267
+ var state = DELAYED // Guarded by by lock on this task for reschedule/dispose purposes
198
268
@JvmField val nanoTime: Long = timeSource.nanoTime() + timeUnit.toNanos(time)
199
269
200
270
override fun compareTo (other : DelayedTask ): Int {
@@ -208,18 +278,18 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
208
278
209
279
fun timeToExecute (now : Long ): Boolean = now - nanoTime >= 0L
210
280
211
- fun rescheduleOnShutdown () = synchronized(delayed ) {
281
+ fun rescheduleOnShutdown () = synchronized(this ) {
212
282
if (state != DELAYED ) return
213
- if (delayed .remove(this )) {
283
+ if (_delayed .value !! .remove(this )) {
214
284
state = RESCHEDULED
215
285
DefaultExecutor .schedule(this )
216
286
} else
217
287
state = REMOVED
218
288
}
219
289
220
- override final fun dispose () = synchronized(delayed ) {
290
+ final override fun dispose () = synchronized(this ) {
221
291
when (state) {
222
- DELAYED -> delayed .remove(this )
292
+ DELAYED -> _delayed .value? .remove(this )
223
293
RESCHEDULED -> DefaultExecutor .removeDelayedImpl(this )
224
294
else -> return
225
295
}
@@ -258,7 +328,7 @@ internal abstract class ThreadEventLoop(
258
328
}
259
329
260
330
fun shutdown () {
261
- assert (isCompleted )
331
+ closeQueue( )
262
332
assert (isCorrectThread())
263
333
// complete processing of all queued tasks
264
334
while (processNextEvent() <= 0 ) { /* spin */ }
@@ -270,7 +340,6 @@ internal abstract class ThreadEventLoop(
270
340
private class EventLoopImpl (thread : Thread ) : ThreadEventLoop(thread) {
271
341
private var parentJob: Job ? = null
272
342
273
- override val canComplete: Boolean get() = parentJob != null
274
343
override val isCompleted: Boolean get() = parentJob?.isCompleted == true
275
344
276
345
fun initParentJob (parentJob : Job ) {
@@ -280,7 +349,6 @@ private class EventLoopImpl(thread: Thread) : ThreadEventLoop(thread) {
280
349
}
281
350
282
351
internal class BlockingEventLoop (thread : Thread ) : ThreadEventLoop(thread) {
283
- override val canComplete: Boolean get() = true
284
352
@Volatile
285
353
public override var isCompleted: Boolean = false
286
354
}
0 commit comments