Skip to content

Commit 92b0485

Browse files
committed
Renamed ScheduledExecutor to DefaultExecutor;
consistently schedule coroutines to it on any failure in all other executors (on RejectedExecutionException, etc)
1 parent 19bf4d5 commit 92b0485

File tree

10 files changed

+107
-72
lines changed

10 files changed

+107
-72
lines changed

coroutines-guide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -816,14 +816,14 @@ Produces the output:
816816
```text
817817
'Unconfined': I'm working in thread main
818818
'context': I'm working in thread main
819-
'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
819+
'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
820820
'context': After delay in thread main
821821
```
822822

823823
<!--- TEST LINES_START -->
824824

825825
So, the coroutine that had inherited `context` of `runBlocking {...}` continues to execute in the `main` thread,
826-
while the unconfined one had resumed in the scheduler thread that [delay] function is using.
826+
while the unconfined one had resumed in the default executor thread that [delay] function is using.
827827

828828
### Debugging coroutines and threads
829829

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package kotlinx.coroutines.experimental
1818

1919
import java.util.concurrent.ExecutorService
2020
import java.util.concurrent.Executors
21+
import java.util.concurrent.RejectedExecutionException
2122
import java.util.concurrent.TimeUnit
2223
import java.util.concurrent.atomic.AtomicInteger
2324
import kotlin.coroutines.experimental.CoroutineContext
@@ -62,7 +63,8 @@ object CommonPool : CoroutineDispatcher() {
6263
_pool ?: createPool().also { _pool = it }
6364

6465
override fun dispatch(context: CoroutineContext, block: Runnable) =
65-
(_pool ?: getOrCreatePoolSync()).execute(block)
66+
try { (_pool ?: getOrCreatePoolSync()).execute(block) }
67+
catch (e: RejectedExecutionException) { defaultExecutor.execute(block) }
6668

6769
// used for tests
6870
@Synchronized
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import java.util.concurrent.ScheduledExecutorService
20+
import java.util.concurrent.ScheduledThreadPoolExecutor
21+
import java.util.concurrent.TimeUnit
22+
23+
private const val DEFAULT_KEEP_ALIVE = 1000L
24+
25+
private val KEEP_ALIVE =
26+
try { java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE) }
27+
catch (e: SecurityException) { DEFAULT_KEEP_ALIVE }
28+
29+
@Volatile
30+
private var _executor: ScheduledExecutorService? = null
31+
32+
internal val defaultExecutor: ScheduledExecutorService
33+
get() = _executor ?: getOrCreateExecutorSync()
34+
35+
@Synchronized
36+
private fun getOrCreateExecutorSync(): ScheduledExecutorService =
37+
_executor ?: ScheduledThreadPoolExecutor(1) { r ->
38+
Thread(r, "kotlinx.coroutines.DefaultExecutor").apply { isDaemon = true }
39+
}.apply {
40+
setKeepAliveTime(KEEP_ALIVE, TimeUnit.MILLISECONDS)
41+
allowCoreThreadTimeOut(true)
42+
executeExistingDelayedTasksAfterShutdownPolicy = false
43+
// "setRemoveOnCancelPolicy" is available only since JDK7, so try it via reflection
44+
try {
45+
val m = this::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.javaPrimitiveType)
46+
m.invoke(this, true)
47+
} catch (ex: Throwable) { /* ignore */ }
48+
_executor = this
49+
}
50+
51+
// used for tests
52+
@Synchronized
53+
internal fun defaultExecutorShutdownNow() {
54+
_executor?.shutdownNow()
55+
}
56+
57+
@Synchronized
58+
internal fun defaultExecutorShutdownNowAndRelease() {
59+
_executor?.apply {
60+
shutdownNow()
61+
_executor = null
62+
}
63+
}
64+

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public interface Delay {
6767
* This implementation uses a built-in single-threaded scheduled executor service.
6868
*/
6969
fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
70-
DisposableFutureHandle(scheduledExecutor.schedule(block, time, unit))
70+
DisposableFutureHandle(defaultExecutor.schedule(block, time, unit))
7171
}
7272

7373
/**
@@ -88,7 +88,7 @@ suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
8888
val delay = cont.context[ContinuationInterceptor] as? Delay
8989
if (delay != null)
9090
delay.scheduleResumeAfterDelay(time, unit, cont) else
91-
cont.cancelFutureOnCompletion(scheduledExecutor.schedule(ResumeRunnable(cont), time, unit))
91+
cont.cancelFutureOnCompletion(defaultExecutor.schedule(ResumeRunnable(cont), time, unit))
9292
}
9393
}
9494

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ internal class EventLoopImpl(
8383
// todo: we should unpark only when this task became first in the queue
8484
unpark()
8585
} else {
86-
block.run() // otherwise run it right here (as if Unconfined)
86+
// otherwise submit to a default executor
87+
defaultExecutor.execute(block)
8788
}
8889
}
8990

@@ -92,7 +93,8 @@ internal class EventLoopImpl(
9293
// todo: we should unpark only when this delayed task became first in the queue
9394
unpark()
9495
} else {
95-
scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit) // otherwise reschedule to other time pool
96+
// otherwise schedule to a default executor
97+
defaultExecutor.schedule(ResumeRunnable(continuation), time, unit)
9698
}
9799
}
98100

@@ -103,7 +105,8 @@ internal class EventLoopImpl(
103105
unpark()
104106
return delayedTask
105107
}
106-
return DisposableFutureHandle(scheduledExecutor.schedule(block, time, unit))
108+
// otherwise schedule to a default executor
109+
return DisposableFutureHandle(defaultExecutor.schedule(block, time, unit))
107110
}
108111

109112
override fun processNextEvent(): Long {
@@ -129,19 +132,14 @@ internal class EventLoopImpl(
129132

130133
fun shutdown() {
131134
assert(!isActive)
135+
assert(Thread.currentThread() === thread)
132136
// complete processing of all queued tasks
133-
while (true) {
134-
val queuedTask = (queue.removeFirstOrNull() ?: break) as QueuedTask
135-
queuedTask.run()
136-
}
137-
// reschedule or execute delayed tasks
137+
while (processNextEvent() <= 0) { /* spin */ }
138+
// reschedule the rest of delayed tasks
139+
val now = System.nanoTime()
138140
while (true) {
139141
val delayedTask = delayed.removeFirst() ?: break
140-
val now = System.nanoTime()
141-
if (delayedTask.timeToExecute(now))
142-
delayedTask.run()
143-
else
144-
delayedTask.rescheduleOnShutdown(now)
142+
delayedTask.rescheduleOnShutdown(now)
145143
}
146144
}
147145

@@ -196,7 +194,11 @@ internal class EventLoopImpl(
196194
fun rescheduleOnShutdown(now: Long) = synchronized(delayed) {
197195
if (delayed.remove(this)) {
198196
assert (scheduledAfterShutdown == null)
199-
scheduledAfterShutdown = scheduledExecutor.schedule(this, nanoTime - now, TimeUnit.NANOSECONDS)
197+
val remaining = nanoTime - now
198+
scheduledAfterShutdown =
199+
if (remaining > 0)
200+
defaultExecutor.schedule(this, remaining, TimeUnit.NANOSECONDS)
201+
else defaultExecutor.submit(this)
200202
}
201203
}
202204

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package kotlinx.coroutines.experimental
1818

1919
import java.util.concurrent.Executor
20+
import java.util.concurrent.RejectedExecutionException
2021
import java.util.concurrent.ScheduledExecutorService
2122
import java.util.concurrent.TimeUnit
2223
import kotlin.coroutines.experimental.Continuation
@@ -42,19 +43,25 @@ private class ExecutorCoroutineDispatcher(override val executor: Executor) : Exe
4243
internal abstract class ExecutorCoroutineDispatcherBase : CoroutineDispatcher(), Delay {
4344
abstract val executor: Executor
4445

45-
override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
46+
override fun dispatch(context: CoroutineContext, block: Runnable) =
47+
try { executor.execute(block) }
48+
catch (e: RejectedExecutionException) { defaultExecutor.execute(block) }
4649

4750
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
48-
val timeout = (executor as? ScheduledExecutorService)
49-
?.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit)
50-
?: scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit)
51+
val timeout =
52+
try { (executor as? ScheduledExecutorService)
53+
?.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit) }
54+
catch (e: RejectedExecutionException) { null }
55+
?: defaultExecutor.schedule(ResumeRunnable(continuation), time, unit)
5156
continuation.cancelFutureOnCompletion(timeout)
5257
}
5358

5459
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
55-
val timeout = (executor as? ScheduledExecutorService)
56-
?.schedule(block, time, unit)
57-
?: scheduledExecutor.schedule(block, time, unit)
60+
val timeout =
61+
try { (executor as? ScheduledExecutorService)
62+
?.schedule(block, time, unit) }
63+
catch (e: RejectedExecutionException) { null }
64+
?: defaultExecutor.schedule(block, time, unit)
5865
return DisposableFutureHandle(timeout)
5966
}
6067

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt

Lines changed: 2 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,53 +18,13 @@ package kotlinx.coroutines.experimental
1818

1919
import kotlinx.coroutines.experimental.selects.SelectBuilder
2020
import kotlinx.coroutines.experimental.selects.select
21-
import java.util.concurrent.ScheduledExecutorService
22-
import java.util.concurrent.ScheduledThreadPoolExecutor
2321
import java.util.concurrent.TimeUnit
2422
import kotlin.coroutines.experimental.Continuation
2523
import kotlin.coroutines.experimental.ContinuationInterceptor
2624
import kotlin.coroutines.experimental.CoroutineContext
2725
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
2826
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2927

30-
private val KEEP_ALIVE = java.lang.Long.getLong("kotlinx.coroutines.ScheduledExecutor.keepAlive", 1000L)
31-
32-
@Volatile
33-
private var _scheduledExecutor: ScheduledExecutorService? = null
34-
35-
internal val scheduledExecutor: ScheduledExecutorService get() =
36-
_scheduledExecutor ?: getOrCreateScheduledExecutorSync()
37-
38-
@Synchronized
39-
private fun getOrCreateScheduledExecutorSync(): ScheduledExecutorService =
40-
_scheduledExecutor ?: ScheduledThreadPoolExecutor(1) { r ->
41-
Thread(r, "kotlinx.coroutines.ScheduledExecutor").apply { isDaemon = true }
42-
}.apply {
43-
setKeepAliveTime(KEEP_ALIVE, TimeUnit.MILLISECONDS)
44-
allowCoreThreadTimeOut(true)
45-
executeExistingDelayedTasksAfterShutdownPolicy = false
46-
// "setRemoveOnCancelPolicy" is available only since JDK7, so try it via reflection
47-
try {
48-
val m = this::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.javaPrimitiveType)
49-
m.invoke(this, true)
50-
} catch (ex: Throwable) { /* ignore */ }
51-
_scheduledExecutor = this
52-
}
53-
54-
// used for tests
55-
@Synchronized
56-
internal fun scheduledExecutorShutdownNow() {
57-
_scheduledExecutor?.shutdownNow()
58-
}
59-
60-
@Synchronized
61-
internal fun scheduledExecutorShutdownNowAndRelease() {
62-
_scheduledExecutor?.apply {
63-
shutdownNow()
64-
_scheduledExecutor = null
65-
}
66-
}
67-
6828
/**
6929
* Runs a given suspending [block] of code inside a coroutine with a specified timeout and throws
7030
* [CancellationException] if timeout was exceeded.
@@ -93,7 +53,7 @@ public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISE
9353
// schedule cancellation of this coroutine on time
9454
if (delay != null)
9555
completion.disposeOnCompletion(delay.invokeOnTimeout(time, unit, completion)) else
96-
completion.cancelFutureOnCompletion(scheduledExecutor.schedule(completion, time, unit))
56+
completion.cancelFutureOnCompletion(defaultExecutor.schedule(completion, time, unit))
9757
completion.initParentJob(context[Job])
9858
// restart block using new coroutine with new job,
9959
// however start it as undispatched coroutine, because we are already in the proper context
@@ -141,7 +101,7 @@ public suspend fun <T> withTimeoutOrNull(time: Long, unit: TimeUnit = TimeUnit.M
141101
// schedule cancellation of this coroutine on time
142102
if (delay != null)
143103
completion.disposeOnCompletion(delay.invokeOnTimeout(time, unit, completion)) else
144-
completion.cancelFutureOnCompletion(scheduledExecutor.schedule(completion, time, unit))
104+
completion.cancelFutureOnCompletion(defaultExecutor.schedule(completion, time, unit))
145105
completion.initParentJob(context[Job])
146106
// restart block using new coroutine with new job,
147107
// however start it as undispatched coroutine, because we are already in the proper context

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ internal class SelectBuilderImpl<in R>(
440440
val delay = context[ContinuationInterceptor] as? Delay
441441
if (delay != null)
442442
disposeOnSelect(delay.invokeOnTimeout(time, unit, action)) else
443-
disposeOnSelect(DisposableFutureHandle(scheduledExecutor.schedule(action, time, unit)))
443+
disposeOnSelect(DisposableFutureHandle(defaultExecutor.schedule(action, time, unit)))
444444
}
445445

446446
private class DisposeNode(

kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ class GuideTest {
171171
test { guide.context.example02.main(emptyArray()) }.verifyLinesStart(
172172
" 'Unconfined': I'm working in thread main",
173173
" 'context': I'm working in thread main",
174-
" 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor",
174+
" 'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor",
175175
" 'context': After delay in thread main"
176176
)
177177
}

kotlinx-coroutines-core/src/test/kotlin/guide/test/TestUtil.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ fun test(block: () -> Unit): List<String> {
4242
// capture output
4343
bytes = bytesOut.toByteArray()
4444
// the shutdown
45-
scheduledExecutorShutdownNow()
45+
defaultExecutorShutdownNow()
4646
shutdownDispatcherPools()
4747
CommonPool.shutdownAndRelease(10000L) // wait at most 10 sec
48-
scheduledExecutorShutdownNowAndRelease()
48+
defaultExecutorShutdownNowAndRelease()
4949
System.setOut(oldOut)
5050
System.setErr(oldErr)
5151

0 commit comments

Comments
 (0)