Skip to content

Commit c0e19f8

Browse files
committed
Introduced "actor" coroutine builder
1 parent 1e45960 commit c0e19f8

File tree

8 files changed

+165
-31
lines changed

8 files changed

+165
-31
lines changed

coroutines-guide.md

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1700,17 +1700,20 @@ An actor is a combination of a coroutine, the state that is confined and is enca
17001700
and a channel to communicate with other coroutines. A simple actor can be written as a function,
17011701
but an actor with a complex state is better suited for a class.
17021702

1703+
There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its
1704+
scope to receive messages from and combines the send channel into the resulting job object, so that a
1705+
single reference to the actor can be carried around as its handle.
1706+
17031707
```kotlin
17041708
// Message types for counterActor
17051709
sealed class CounterMsg
17061710
object IncCounter : CounterMsg() // one-way message to increment counter
17071711
class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply
17081712

17091713
// This function launches a new counter actor
1710-
fun counterActor(request: ReceiveChannel<CounterMsg>) = launch(CommonPool) {
1714+
fun counterActor() = actor<CounterMsg>(CommonPool) {
17111715
var counter = 0 // actor state
1712-
while (isActive) { // main loop of the actor
1713-
val msg = request.receive()
1716+
for (msg in channel) { // iterate over incoming messages
17141717
when (msg) {
17151718
is IncCounter -> counter++
17161719
is GetCounter -> msg.response.send(counter)
@@ -1719,14 +1722,14 @@ fun counterActor(request: ReceiveChannel<CounterMsg>) = launch(CommonPool) {
17191722
}
17201723

17211724
fun main(args: Array<String>) = runBlocking<Unit> {
1722-
val request = Channel<CounterMsg>()
1723-
counterActor(request)
1725+
val counter = counterActor() // create the actor
17241726
massiveRun(CommonPool) {
1725-
request.send(IncCounter)
1727+
counter.send(IncCounter)
17261728
}
17271729
val response = Channel<Int>()
1728-
request.send(GetCounter(response))
1730+
counter.send(GetCounter(response))
17291731
println("Counter = ${response.receive()}")
1732+
counter.close() // shutdown the actor
17301733
}
17311734
```
17321735

@@ -1737,12 +1740,16 @@ Completed 1000000 actions in xxx ms
17371740
Counter = 1000000
17381741
-->
17391742

1740-
Notice, that it does not matter (for correctness) what context the actor itself is executed in. An actor is
1743+
It does not matter (for correctness) what context the actor itself is executed in. An actor is
17411744
a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
17421745
works as a solution to the problem of shared mutable state.
17431746

1744-
Actor is more efficient than locking under load, because in this case it always has work to do and does not
1745-
have to switch at all.
1747+
Actor is more efficient than locking under load, because in this case it always has work to do and it does not
1748+
have to switch to a different context at all.
1749+
1750+
> Note, that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated
1751+
with the channel that it receives messages from, while a producer is associated with the channel that it
1752+
sends elements to.
17461753

17471754
## Select expression
17481755

@@ -2116,6 +2123,7 @@ Channel was closed
21162123
[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/close.html
21172124
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
21182125
[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/invoke.html
2126+
[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
21192127
<!--- INDEX kotlinx.coroutines.experimental.selects -->
21202128
[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
21212129
[SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-receive.html

kotlinx-coroutines-core/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Coroutine builder functions:
99
| [launch] | [Job] | [CoroutineScope] | Launches coroutine that does not have any result
1010
| [async] | [Deferred] | [CoroutineScope] | Returns a single value with the future result
1111
| [produce][kotlinx.coroutines.experimental.channels.produce] | [ProducerJob][kotlinx.coroutines.experimental.channels.ProducerJob] | [ProducerScope][kotlinx.coroutines.experimental.channels.ProducerScope] | Produces a stream of elements
12+
| [actor][kotlinx.coroutines.experimental.channels.actor] | [ActorJob][kotlinx.coroutines.experimental.channels.ActorJob] | [ActorScope][kotlinx.coroutines.experimental.channels.ActorScope] | Processes a stream of messages
1213
| [runBlocking] | `T` | [CoroutineScope] | Blocks the thread while the coroutine runs
1314

1415
Coroutine dispatchers implementing [CoroutineDispatcher]:
@@ -103,6 +104,9 @@ Select expression to perform multiple suspending operations simultaneously until
103104
[kotlinx.coroutines.experimental.channels.produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
104105
[kotlinx.coroutines.experimental.channels.ProducerJob]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-job/index.html
105106
[kotlinx.coroutines.experimental.channels.ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-scope/index.html
107+
[kotlinx.coroutines.experimental.channels.actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
108+
[kotlinx.coroutines.experimental.channels.ActorJob]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-actor-job/index.html
109+
[kotlinx.coroutines.experimental.channels.ActorScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-actor-scope/index.html
106110
[kotlinx.coroutines.experimental.channels.Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
107111
[kotlinx.coroutines.experimental.channels.SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/send.html
108112
[kotlinx.coroutines.experimental.channels.ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/receive.html
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import kotlinx.coroutines.experimental.CoroutineDispatcher
20+
import kotlinx.coroutines.experimental.CoroutineScope
21+
import kotlinx.coroutines.experimental.Job
22+
import kotlinx.coroutines.experimental.newCoroutineContext
23+
import kotlin.coroutines.experimental.CoroutineContext
24+
import kotlin.coroutines.experimental.startCoroutine
25+
26+
/**
27+
* Scope for [actor] coroutine builder.
28+
*/
29+
public interface ActorScope<out E> : CoroutineScope, ReceiveChannel<E> {
30+
/**
31+
* A reference to the mailbox channel that this coroutine [receives][receive] messages from.
32+
* It is provided for convenience, so that the code in the coroutine can refer
33+
* to the channel as `channel` as apposed to `this`.
34+
* All the [ReceiveChannel] functions on this interface delegate to
35+
* the channel instance returned by this function.
36+
*/
37+
val channel: ReceiveChannel<E>
38+
}
39+
40+
/**
41+
* Return type for [actor] coroutine builder.
42+
*/
43+
public interface ActorJob<in E> : Job, SendChannel<E> {
44+
/**
45+
* A reference to the mailbox channel that this coroutine is receiving messages from.
46+
* All the [SendChannel] functions on this interface delegate to
47+
* the channel instance returned by this function.
48+
*/
49+
val channel: SendChannel<E>
50+
}
51+
52+
/**
53+
* Launches new coroutine that is receiving messages from its mailbox channel
54+
* and returns a reference to the coroutine as an [ActorJob]. The resulting
55+
* object can be used to [send][SendChannel.send] messages to this coroutine.
56+
*
57+
* The scope of the coroutine contains [ActorScope] interface, which implements
58+
* both [CoroutineScope] and [ReceiveChannel], so that coroutine can invoke
59+
* [receive][ReceiveChannel.receive] directly. The channel is [closed][SendChannel.close]
60+
* when the coroutine completes.
61+
* The running coroutine is cancelled when the its job is [cancelled][Job.cancel].
62+
*
63+
* The [context] for the new coroutine must be explicitly specified.
64+
* See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
65+
* The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
66+
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
67+
*
68+
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
69+
* the resulting channel becomes _failed_, so that any attempt to send to such a channel throws exception.
70+
*
71+
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
72+
*
73+
* @param context context of the coroutine
74+
* @param capacity capacity of the channel's buffer (no buffer by default)
75+
* @param block the coroutine code
76+
*/
77+
public fun <E> actor(
78+
context: CoroutineContext,
79+
capacity: Int = 0,
80+
block: suspend ActorScope<E>.() -> Unit
81+
): ActorJob<E> {
82+
val channel = Channel<E>(capacity)
83+
return ActorCoroutine(newCoroutineContext(context), channel).apply {
84+
initParentJob(context[Job])
85+
block.startCoroutine(this, this)
86+
}
87+
}
88+
89+
private class ActorCoroutine<E>(parentContext: CoroutineContext, channel: Channel<E>) :
90+
ChannelCoroutine<E>(parentContext, channel), ActorScope<E>, ActorJob<E>
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import kotlinx.coroutines.experimental.AbstractCoroutine
20+
import kotlinx.coroutines.experimental.JobSupport
21+
import kotlinx.coroutines.experimental.handleCoroutineException
22+
import kotlin.coroutines.experimental.CoroutineContext
23+
24+
internal open class ChannelCoroutine<E>(
25+
override val parentContext: CoroutineContext,
26+
val channel: Channel<E>
27+
) : AbstractCoroutine<Unit>(active = true), Channel<E> by channel {
28+
override fun afterCompletion(state: Any?, mode: Int) {
29+
val cause = (state as? JobSupport.CompletedExceptionally)?.cause
30+
if (!channel.close(cause) && cause != null)
31+
handleCoroutineException(context, cause)
32+
}
33+
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.coroutines.experimental.*
19+
import kotlinx.coroutines.experimental.CoroutineDispatcher
20+
import kotlinx.coroutines.experimental.CoroutineScope
21+
import kotlinx.coroutines.experimental.Job
22+
import kotlinx.coroutines.experimental.newCoroutineContext
2023
import kotlin.coroutines.experimental.CoroutineContext
2124
import kotlin.coroutines.experimental.startCoroutine
2225

@@ -60,7 +63,8 @@ typealias ChannelJob<E> = ProducerJob<E>
6063

6164
/**
6265
* Launches new coroutine to produce a stream of values by sending them to a channel
63-
* and returns a reference to the coroutine as a [ProducerJob].
66+
* and returns a reference to the coroutine as a [ProducerJob]. This resulting
67+
* object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
6468
*
6569
* The scope of the coroutine contains [ProducerScope] interface, which implements
6670
* both [CoroutineScope] and [SendChannel], so that coroutine can invoke
@@ -77,6 +81,10 @@ typealias ChannelJob<E> = ProducerJob<E>
7781
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
7882
*
7983
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
84+
*
85+
* @param context context of the coroutine
86+
* @param capacity capacity of the channel's buffer (no buffer by default)
87+
* @param block the coroutine code
8088
*/
8189
public fun <E> produce(
8290
context: CoroutineContext,
@@ -101,13 +109,5 @@ public fun <E> buildChannel(
101109
): ProducerJob<E> =
102110
produce(context, capacity, block)
103111

104-
private class ProducerCoroutine<E>(
105-
override val parentContext: CoroutineContext,
106-
override val channel: Channel<E>
107-
) : AbstractCoroutine<Unit>(active = true), ProducerScope<E>, ProducerJob<E>, Channel<E> by channel {
108-
override fun afterCompletion(state: Any?, mode: Int) {
109-
val cause = (state as? CompletedExceptionally)?.cause
110-
if (!channel.close(cause) && cause != null)
111-
handleCoroutineException(context, cause)
112-
}
113-
}
112+
private class ProducerCoroutine<E>(parentContext: CoroutineContext, channel: Channel<E>) :
113+
ChannelCoroutine<E>(parentContext, channel), ProducerScope<E>, ProducerJob<E>

kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ var counter = 0
4040

4141
fun main(args: Array<String>) = runBlocking<Unit> {
4242
massiveRun(CommonPool) { // run each coroutine in CommonPool
43-
run(counterContext) { // but confine each increment to a single-threaded context
43+
run(counterContext) { // but confine each increment to the single-threaded context
4444
counter++
4545
}
4646
}

kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ val counterContext = newSingleThreadContext("CounterContext")
3939
var counter = 0
4040

4141
fun main(args: Array<String>) = runBlocking<Unit> {
42-
massiveRun(counterContext) { // run each coroutine in single-threaded context
42+
massiveRun(counterContext) { // run each coroutine in the single-threaded context
4343
counter++
4444
}
4545
println("Counter = $counter")

kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,9 @@ object IncCounter : CounterMsg() // one-way message to increment counter
4242
class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply
4343

4444
// This function launches a new counter actor
45-
fun counterActor(request: ReceiveChannel<CounterMsg>) = launch(CommonPool) {
45+
fun counterActor() = actor<CounterMsg>(CommonPool) {
4646
var counter = 0 // actor state
47-
while (isActive) { // main loop of the actor
48-
val msg = request.receive()
47+
for (msg in channel) { // iterate over incoming messages
4948
when (msg) {
5049
is IncCounter -> counter++
5150
is GetCounter -> msg.response.send(counter)
@@ -54,12 +53,12 @@ fun counterActor(request: ReceiveChannel<CounterMsg>) = launch(CommonPool) {
5453
}
5554

5655
fun main(args: Array<String>) = runBlocking<Unit> {
57-
val request = Channel<CounterMsg>()
58-
counterActor(request)
56+
val counter = counterActor() // create the actor
5957
massiveRun(CommonPool) {
60-
request.send(IncCounter)
58+
counter.send(IncCounter)
6159
}
6260
val response = Channel<Int>()
63-
request.send(GetCounter(response))
61+
counter.send(GetCounter(response))
6462
println("Counter = ${response.receive()}")
63+
counter.close() // shutdown the actor
6564
}

0 commit comments

Comments
 (0)