Skip to content

Commit 4b0ef7b

Browse files
committed
ValueBroadcastChannel
* BroadcastChannel interface is introduced * SubscriptionReceiveChannel is moved to core module from reactive modules * "Rx Subject vs BroadcastChannel" section in reactive guide
1 parent 30dd5c1 commit 4b0ef7b

File tree

17 files changed

+816
-60
lines changed

17 files changed

+816
-60
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -577,8 +577,6 @@ public abstract class AbstractChannel<E> : Channel<E> {
577577
// ------ protected ------
578578

579579
protected companion object {
580-
private const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
581-
582580
/** @suppress **This is unstable API and it is subject to change.** */
583581
@JvmField
584582
val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
@@ -754,19 +752,8 @@ public abstract class AbstractChannel<E> : Channel<E> {
754752
protected class Closed<in E>(
755753
@JvmField val closeCause: Throwable?
756754
) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
757-
@Volatile
758-
private var _sendException: Throwable? = null
759-
760-
val sendException: Throwable get() = _sendException ?:
761-
(closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE))
762-
.also { _sendException = it }
763-
764-
@Volatile
765-
private var _receiveException: Throwable? = null
766-
767-
val receiveException: Throwable get() = _receiveException ?:
768-
(closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE))
769-
.also { _receiveException = it }
755+
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
756+
val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
770757

771758
override val offerResult get() = this
772759
override val pollResult get() = this
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import java.io.Closeable
20+
21+
/**
22+
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
23+
* that subscribe for the elements using [open] function and unsubscribe using [SubscriptionReceiveChannel.close]
24+
* function.
25+
*/
26+
public interface BroadcastChannel<E> : SendChannel<E> {
27+
/**
28+
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
29+
* The resulting channel shall be [closed][SubscriptionReceiveChannel.close] to unsubscribe from this
30+
* broadcast channel.
31+
*/
32+
public fun open(): SubscriptionReceiveChannel<E>
33+
}
34+
35+
/**
36+
* Return type for [BroadcastChannel.open] that can be used to [receive] elements from the
37+
* open subscription and to [close] it to unsubscribe.
38+
*/
39+
public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
40+
/**
41+
* Closes this subscription.
42+
*/
43+
public override fun close()
44+
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@ public interface ChannelIterator<out E> {
222222
* Channel is a non-blocking primitive for communication between sender using [SendChannel] and receiver using [ReceiveChannel].
223223
* Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
224224
* but it has suspending operations instead of blocking ones and it can be closed.
225+
*
226+
* See [Channel()][Channel.invoke] factory function for the description of available channel implementations.
225227
*/
226228
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
227229
/**
@@ -244,10 +246,10 @@ public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
244246
* Creates a channel with specified buffer capacity (or without a buffer by default).
245247
*
246248
* The resulting channel type depends on the specified [capacity] parameter:
247-
* * when `capacity` is 0 -- creates [RendezvousChannel];
248-
* * when `capacity` is [UNLIMITED] -- creates [LinkedListChannel];
249-
* * when `capacity` is [CONFLATED] -- creates [ConflatedChannel];
250-
* * otherwise -- creates [ArrayChannel].
249+
* * when `capacity` is 0 -- creates [RendezvousChannel] without a buffer;
250+
* * when `capacity` is [UNLIMITED] -- creates [LinkedListChannel] with buffer of unlimited size;
251+
* * when `capacity` is [CONFLATED] -- creates [ConflatedChannel] that conflates back-to-back sends;
252+
* * otherwise -- creates [ArrayChannel] with a buffer of the specified `capacity`.
251253
*/
252254
public operator fun <E> invoke(capacity: Int = 0): Channel<E> {
253255
return when (capacity) {

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,22 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19+
internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
20+
1921
/**
20-
* Performs the given [action] on each received element.
22+
* Performs the given [action] for each received element.
2123
*/
2224
// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
2325
public suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit) {
2426
for (element in this) action(element)
2527
}
28+
29+
/**
30+
* Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
31+
*/
32+
// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
33+
public suspend fun <E> BroadcastChannel<E>.consumeEach(action: suspend (E) -> Unit) {
34+
open().use { channel ->
35+
for (x in channel) action(x)
36+
}
37+
}
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import kotlinx.coroutines.experimental.MODE_DIRECT
20+
import kotlinx.coroutines.experimental.internal.Symbol
21+
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
22+
import kotlinx.coroutines.experimental.selects.SelectInstance
23+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
24+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
25+
26+
/**
27+
* Broadcasts the most recently sent value (aka [value]) to all [open] subscribers.
28+
*
29+
* Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received,
30+
* while previously sent elements **are lost**.
31+
* Sender to this broadcast channel never suspends and [offer] always returns `true`.
32+
*
33+
* A secondary constructor can be used to create an instance of this class that already holds a value.
34+
*
35+
* This implementation is fully lock-free. In this implementation
36+
* [opening][open] and [closing][SubscriptionReceiveChannel.close] subscription takes O(N) time, where N is the
37+
* number of subscribers.
38+
*/
39+
public class ValueBroadcastChannel<E>() : BroadcastChannel<E> {
40+
/**
41+
* Creates an instance of this class that already holds a value.
42+
*
43+
* It is as a shortcut to creating an instance with a default constructor and
44+
* immediately sending a value: `ValueBroadcastChannel().apply { offer(value) }`.
45+
*/
46+
constructor(value: E) : this() {
47+
state = State<E>(value, null)
48+
}
49+
50+
@Suppress("UNCHECKED_CAST")
51+
@Volatile
52+
private var state: Any = INITIAL_STATE // State | Closed
53+
54+
@Volatile
55+
private var updating = 0
56+
57+
private companion object {
58+
@JvmField
59+
val STATE: AtomicReferenceFieldUpdater<ValueBroadcastChannel<*>, Any> = AtomicReferenceFieldUpdater.
60+
newUpdater(ValueBroadcastChannel::class.java, Any::class.java, "state")
61+
62+
@JvmField
63+
val UPDATING: AtomicIntegerFieldUpdater<ValueBroadcastChannel<*>> = AtomicIntegerFieldUpdater.
64+
newUpdater(ValueBroadcastChannel::class.java, "updating")
65+
66+
@JvmField
67+
val CLOSED = Closed(null)
68+
69+
@JvmField
70+
val UNDEFINED = Symbol("UNDEFINED")
71+
72+
@JvmField
73+
val INITIAL_STATE = State<Any?>(UNDEFINED, null)
74+
}
75+
76+
private class State<E>(
77+
@JvmField val value: Any?, // UNDEFINED | E
78+
@JvmField val subscribers: Array<Subscriber<E>>?
79+
)
80+
81+
private class Closed(@JvmField val closeCause: Throwable?) {
82+
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
83+
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
84+
}
85+
86+
/**
87+
* The most recently sent element to this channel.
88+
*
89+
* Access to this property throws [IllegalStateException] when this class is constructed without
90+
* initial value and no value was sent yet or if it was [closed][close] _normally_ and
91+
* throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
92+
*/
93+
@Suppress("UNCHECKED_CAST")
94+
public val value: E get() {
95+
val state = this.state
96+
when (state) {
97+
is Closed -> throw state.valueException
98+
is State<*> -> {
99+
if (state.value === UNDEFINED) throw IllegalStateException("No value")
100+
return state.value as E
101+
}
102+
else -> error("Invalid state $state")
103+
}
104+
}
105+
106+
/**
107+
* The most recently sent element to this channel or `null` when this class is constructed without
108+
* initial value and no value was sent yet or if it was [closed][close].
109+
*/
110+
@Suppress("UNCHECKED_CAST")
111+
public val valueOrNull: E? get() {
112+
val state = this.state
113+
when (state) {
114+
is Closed -> return null
115+
is State<*> -> {
116+
if (state.value === UNDEFINED) return null
117+
return state.value as E
118+
}
119+
else -> error("Invalid state $state")
120+
}
121+
}
122+
123+
override val isClosedForSend: Boolean get() = state is Closed
124+
override val isFull: Boolean get() = false
125+
126+
@Suppress("UNCHECKED_CAST")
127+
override fun open(): SubscriptionReceiveChannel<E> {
128+
val subscriber = Subscriber<E>(this)
129+
while (true) { // lock-free loop on state
130+
val state = this.state
131+
when (state) {
132+
is Closed -> {
133+
subscriber.close(state.closeCause)
134+
return subscriber
135+
}
136+
is State<*> -> {
137+
if (state.value !== UNDEFINED)
138+
subscriber.offerInternal(state.value as E)
139+
val update = State(state.value, addSubscriber((state as State<E>).subscribers, subscriber))
140+
if (STATE.compareAndSet(this, state, update))
141+
return subscriber
142+
}
143+
else -> error("Invalid state $state")
144+
}
145+
}
146+
}
147+
148+
@Suppress("UNCHECKED_CAST")
149+
private fun closeSubscriber(subscriber: Subscriber<E>) {
150+
while (true) { // lock-free loop on state
151+
val state = this.state
152+
when (state) {
153+
is Closed -> return
154+
is State<*> -> {
155+
val update = State(state.value, removeSubscriber((state as State<E>).subscribers!!, subscriber))
156+
if (STATE.compareAndSet(this, state, update))
157+
return
158+
}
159+
else -> error("Invalid state $state")
160+
}
161+
}
162+
}
163+
164+
private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
165+
if (list == null) return Array<Subscriber<E>>(1) { subscriber }
166+
return list + subscriber
167+
}
168+
169+
@Suppress("UNCHECKED_CAST")
170+
private fun removeSubscriber(list: Array<Subscriber<E>>, subscriber: Subscriber<E>): Array<Subscriber<E>>? {
171+
val n = list.size
172+
val i = list.indexOf(subscriber)
173+
check(i >= 0)
174+
if (n == 1) return null
175+
val update = arrayOfNulls<Subscriber<E>>(n - 1)
176+
System.arraycopy(list, 0, update, 0, i)
177+
System.arraycopy(list, i + 1, update, i, n - i - 1)
178+
return update as Array<Subscriber<E>>
179+
}
180+
181+
@Suppress("UNCHECKED_CAST")
182+
override fun close(cause: Throwable?): Boolean {
183+
while (true) { // lock-free loop on state
184+
val state = this.state
185+
when (state) {
186+
is Closed -> return false
187+
is State<*> -> {
188+
val update = if (cause == null) CLOSED else Closed(cause)
189+
if (STATE.compareAndSet(this, state, update)) {
190+
(state as State<E>).subscribers?.forEach { it.close(cause) }
191+
return true
192+
}
193+
}
194+
else -> error("Invalid state $state")
195+
}
196+
}
197+
}
198+
199+
/**
200+
* Sends the value to all subscribed receives and stores this value as the most recent state for
201+
* future subscribers. This implementation never suspends.
202+
*
203+
* It throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
204+
* It throws the original [close] cause exception if the channel has _failed_.
205+
*/
206+
suspend override fun send(element: E) {
207+
offerInternal(element)?.let { throw it.sendException }
208+
}
209+
210+
/**
211+
* Sends the value to all subscribed receives and stores this value as the most recent state for
212+
* future subscribers. This implementation always returns `true`.
213+
*
214+
* It throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
215+
* It throws the original [close] cause exception if the channel has _failed_.
216+
*/
217+
override fun offer(element: E): Boolean {
218+
offerInternal(element)?.let { throw it.sendException }
219+
return true
220+
}
221+
222+
@Suppress("UNCHECKED_CAST")
223+
private fun offerInternal(element: E): Closed? {
224+
// If some other thread is updating the state in its offer operation we assume that our offer had linearized
225+
// before that offer (we lost) and that offer overwrote us and conflated our offer.
226+
if (!UPDATING.compareAndSet(this, 0, 1)) return null
227+
try {
228+
while (true) { // lock-free loop on state
229+
val state = this.state
230+
when (state) {
231+
is Closed -> return state
232+
is State<*> -> {
233+
val update = State(element, (state as State<E>).subscribers)
234+
if (STATE.compareAndSet(this, state, update)) {
235+
// Note: Using offerInternal here to ignore the case when this subscriber was
236+
// already concurrently closed (assume the close had conflated our offer for this
237+
// particular subscriber).
238+
state.subscribers?.forEach { it.offerInternal(element) }
239+
return null
240+
}
241+
}
242+
else -> error("Invalid state $state")
243+
}
244+
}
245+
} finally {
246+
updating = 0 // reset the updating flag to zero even when something goes wrong
247+
}
248+
}
249+
250+
override fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R) {
251+
if (!select.trySelect(idempotent = null)) return
252+
offerInternal(element)?.let {
253+
select.resumeSelectWithException(it.sendException, MODE_DIRECT)
254+
return
255+
}
256+
block.startCoroutineUndispatched(select.completion)
257+
}
258+
259+
private class Subscriber<E>(
260+
private val broadcastChannel: ValueBroadcastChannel<E>
261+
) : ConflatedChannel<E>(), SubscriptionReceiveChannel<E> {
262+
override fun close() {
263+
if (close(cause = null))
264+
broadcastChannel.closeSubscriber(this)
265+
}
266+
267+
public override fun offerInternal(element: E): Any = super.offerInternal(element)
268+
}
269+
}

0 commit comments

Comments
 (0)