Skip to content

Commit d9ae2bc

Browse files
committed
No Job in newSingleThreadContext and newFixedThreadPoolContext anymore
* This resolves the common issue of using `run(ctx)` where ctx comes from either `newSingleThreadContext` or `newFixedThreadPoolContext` invocation. They both used to return a combination of dispatcher + job, and this job was overriding the parent job, thus preventing propagation of cancellation. Not anymore. * ThreadPoolDispatcher class is now public and is the result type for both functions. It has the `close` method to release the thread pool. Fixes #149 Fixes #151
1 parent bcdd8e1 commit d9ae2bc

File tree

7 files changed

+91
-45
lines changed

7 files changed

+91
-45
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,15 @@ public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
3939

4040
private class ExecutorCoroutineDispatcher(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
4141

42-
internal abstract class ExecutorCoroutineDispatcherBase : CoroutineDispatcher(), Delay {
43-
abstract val executor: Executor
44-
42+
/**
43+
* @suppress **This is unstable API and it is subject to change.**
44+
*/
45+
public abstract class ExecutorCoroutineDispatcherBase : CoroutineDispatcher(), Delay {
46+
/**
47+
* @suppress **This is unstable API and it is subject to change.**
48+
*/
49+
internal abstract val executor: Executor
50+
4551
override fun dispatch(context: CoroutineContext, block: Runnable) =
4652
try { executor.execute(timeSource.trackTask(block)) }
4753
catch (e: RejectedExecutionException) {

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,54 +16,77 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import java.io.Closeable
1920
import java.util.concurrent.Executors
2021
import java.util.concurrent.ScheduledExecutorService
2122
import java.util.concurrent.atomic.AtomicInteger
2223
import kotlin.coroutines.experimental.CoroutineContext
2324

2425
/**
2526
* Creates new coroutine execution context with the a single thread and built-in [yield] and [delay] support.
26-
* All continuations are dispatched immediately when invoked inside the thread of this context.
27-
* Resources of this pool (its thread) are reclaimed when job of this context is cancelled.
28-
* The specified [name] defines the name of the new thread.
29-
* An optional [parent] job may be specified upon creation.
27+
* **NOTE: The resulting [ThreadPoolDispatcher] owns native resources (its thread).
28+
* Resources are reclaimed by [ThreadPoolDispatcher.close].**
29+
*
30+
* @param name the base name of the created thread.
3031
*/
32+
fun newSingleThreadContext(name: String): ThreadPoolDispatcher =
33+
newFixedThreadPoolContext(1, name)
34+
35+
/**
36+
* @suppress **Deprecated**: Parent job is no longer supported.
37+
*/
38+
@Deprecated(message = "Parent job is no longer supported, `close` the resulting ThreadPoolDispatcher to release resources",
39+
level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("newSingleThreadContext(name)"))
3140
fun newSingleThreadContext(name: String, parent: Job? = null): CoroutineContext =
32-
newFixedThreadPoolContext(1, name, parent)
41+
newFixedThreadPoolContext(1, name)
3342

3443
/**
3544
* Creates new coroutine execution context with the fixed-size thread-pool and built-in [yield] and [delay] support.
36-
* All continuations are dispatched immediately when invoked inside the threads of this context.
37-
* Resources of this pool (its threads) are reclaimed when job of this context is cancelled.
38-
* The specified [name] defines the names of the threads.
39-
* An optional [parent] job may be specified upon creation.
45+
* **NOTE: The resulting [ThreadPoolDispatcher] owns native resources (its threads).
46+
* Resources are reclaimed by [ThreadPoolDispatcher.close].**
47+
*
48+
* @param nThreads the number of threads.
49+
* @param name the base name of the created threads.
4050
*/
41-
fun newFixedThreadPoolContext(nThreads: Int, name: String, parent: Job? = null): CoroutineContext {
51+
fun newFixedThreadPoolContext(nThreads: Int, name: String): ThreadPoolDispatcher {
4252
require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
43-
val job = Job(parent)
44-
return job + ThreadPoolDispatcher(nThreads, name, job)
53+
return ThreadPoolDispatcher(nThreads, name)
4554
}
4655

56+
/**
57+
* @suppress **Deprecated**: Parent job is no longer supported.
58+
*/
59+
@Deprecated(message = "Parent job is no longer supported, `close` the resulting ThreadPoolDispatcher to release resources",
60+
level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("newFixedThreadPoolContext(nThreads, name)"))
61+
fun newFixedThreadPoolContext(nThreads: Int, name: String, parent: Job? = null): CoroutineContext =
62+
newFixedThreadPoolContext(nThreads, name)
63+
4764
internal class PoolThread(
4865
@JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests
4966
target: Runnable, name: String
5067
) : Thread(target, name) {
5168
init { isDaemon = true }
5269
}
5370

54-
internal class ThreadPoolDispatcher(
71+
/**
72+
* Dispatches coroutine execution to a thread pool of a fixed size. Instances of this dispatcher are
73+
* created with [newSingleThreadContext] and [newFixedThreadPoolContext].
74+
*/
75+
public class ThreadPoolDispatcher internal constructor(
5576
private val nThreads: Int,
56-
private val name: String,
57-
job: Job
58-
) : ExecutorCoroutineDispatcherBase() {
77+
private val name: String
78+
) : ExecutorCoroutineDispatcherBase(), Closeable {
5979
private val threadNo = AtomicInteger()
6080

61-
override val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
81+
internal override val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
6282
PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
6383
}
6484

65-
init {
66-
job.invokeOnCompletion { executor.shutdown() }
85+
/**
86+
* Closes this dispatcher -- shuts down all threads in this pool and releases resources.
87+
*/
88+
public override fun close() {
89+
executor.shutdown()
6790
}
6891

6992
override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"

core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ import kotlinx.coroutines.experimental.*
2222
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
2323

2424
fun main(args: Array<String>) {
25-
val ctx1 = newSingleThreadContext("Ctx1")
26-
val ctx2 = newSingleThreadContext("Ctx2")
27-
runBlocking(ctx1) {
28-
log("Started in ctx1")
29-
run(ctx2) {
30-
log("Working in ctx2")
25+
newSingleThreadContext("Ctx1").use { ctx1 ->
26+
newSingleThreadContext("Ctx2").use { ctx2 ->
27+
runBlocking(ctx1) {
28+
log("Started in ctx1")
29+
run(ctx2) {
30+
log("Working in ctx2")
31+
}
32+
log("Back to ctx1")
33+
}
3134
}
32-
log("Back to ctx1")
3335
}
3436
}

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ExecutorsTest : TestBase() {
3131
runBlocking(context) {
3232
checkThreadName("TestThread")
3333
}
34-
context[Job]!!.cancel()
34+
context.close()
3535
}
3636

3737
@Test
@@ -40,7 +40,7 @@ class ExecutorsTest : TestBase() {
4040
runBlocking(context) {
4141
checkThreadName("TestPool")
4242
}
43-
context[Job]!!.cancel()
43+
context.close()
4444
}
4545

4646
@Test
@@ -63,7 +63,7 @@ class ExecutorsTest : TestBase() {
6363
}
6464
checkThreadName("Ctx1")
6565
}
66-
ctx1[Job]!!.cancel()
67-
ctx2[Job]!!.cancel()
66+
ctx1.close()
67+
ctx2.close()
6868
}
6969
}

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class BroadcastChannelMultiReceiveStressTest(
5252

5353
@After
5454
fun tearDown() {
55-
pool.cancel()
55+
pool.close()
5656
}
5757

5858
@Test

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ class ConflatedChannelCloseStressTest : TestBase() {
3535
val pool = newFixedThreadPoolContext(nSenders + 2, "TestStressClose")
3636

3737
@After
38-
fun tearDown() { pool[Job]!!.cancel() }
38+
fun tearDown() {
39+
pool.close()
40+
}
3941

4042
@Test
4143
fun testStressClose() = runBlocking<Unit> {

coroutines-guide.md

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -815,13 +815,17 @@ It produces the following output (maybe in different order):
815815

816816
<!--- TEST LINES_START_UNORDERED -->
817817

818-
The default dispatcher that we've used in previous sections is representend by [DefaultDispather], which
818+
The default dispatcher that we've used in previous sections is representend by [DefaultDispatcher], which
819819
is equal to [CommonPool] in the current implementation. So, `launch { ... }` is the same
820820
as `launch(DefaultDispather) { ... }`, which is the same as `launch(CommonPool) { ... }`.
821821

822822
The difference between parent [coroutineContext][CoroutineScope.coroutineContext] and
823823
[Unconfined] context will be shown later.
824824

825+
Note, that [newSingleThreadContext] creates a new thread, which is a very expensive resource.
826+
In a real application it must be either released, when no longer needed, using [close][ThreadPoolDispatcher.close]
827+
function, or stored in a top-level variable and reused throughout the application.
828+
825829
### Unconfined vs confined dispatcher
826830

827831
The [Unconfined] coroutine dispatcher starts coroutine in the caller thread, but only until the
@@ -925,22 +929,24 @@ Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
925929
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
926930

927931
fun main(args: Array<String>) {
928-
val ctx1 = newSingleThreadContext("Ctx1")
929-
val ctx2 = newSingleThreadContext("Ctx2")
930-
runBlocking(ctx1) {
931-
log("Started in ctx1")
932-
run(ctx2) {
933-
log("Working in ctx2")
932+
newSingleThreadContext("Ctx1").use { ctx1 ->
933+
newSingleThreadContext("Ctx2").use { ctx2 ->
934+
runBlocking(ctx1) {
935+
log("Started in ctx1")
936+
run(ctx2) {
937+
log("Working in ctx2")
938+
}
939+
log("Back to ctx1")
940+
}
934941
}
935-
log("Back to ctx1")
936942
}
937943
}
938944
```
939945

940946
> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
941947
942-
It demonstrates two new techniques. One is using [runBlocking] with an explicitly specified context, and
943-
the second one is using [run] function to change a context of a coroutine while still staying in the
948+
It demonstrates several new techniques. One is using [runBlocking] with an explicitly specified context, and
949+
the other one is using [run] function to change a context of a coroutine while still staying in the
944950
same coroutine as you can see in the output below:
945951

946952
```text
@@ -951,6 +957,10 @@ same coroutine as you can see in the output below:
951957

952958
<!--- TEST -->
953959

960+
961+
Note, that is example also uses `use` function from the Kotlin standard library to release threads that
962+
are created with [newSingleThreadContext] when they are no longer needed.
963+
954964
### Job in the context
955965

956966
The coroutine's [Job] is part of its context. The coroutine can retrieve it from its own context
@@ -2325,9 +2335,12 @@ Channel was closed
23252335
[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html
23262336
[Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/start.html
23272337
[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
2338+
[DefaultDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-default-dispatcher.html
23282339
[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
23292340
[CoroutineScope.coroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/coroutine-context.html
23302341
[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
2342+
[newSingleThreadContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-single-thread-context.html
2343+
[ThreadPoolDispatcher.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-thread-pool-dispatcher/close.html
23312344
[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
23322345
[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
23332346
[Job()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job.html

0 commit comments

Comments
 (0)