Skip to content

Commit 8385ec9

Browse files
committed
BroadcastChannel factory function
1 parent 4f69102 commit 8385ec9

File tree

6 files changed

+133
-8
lines changed

6 files changed

+133
-8
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class ArrayBroadcastChannel<E>(
4141
val capacity: Int
4242
) : AbstractSendChannel<E>(), BroadcastChannel<E> {
4343
init {
44-
check(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
44+
require(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
4545
}
4646

4747
private val bufferLock = ReentrantLock()

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public open class ArrayChannel<E>(
3535
val capacity: Int
3636
) : AbstractChannel<E>() {
3737
init {
38-
check(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
38+
require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
3939
}
4040

4141
private val lock = ReentrantLock()

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,40 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19+
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
20+
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
1921
import java.io.Closeable
2022

2123
/**
2224
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
2325
* that subscribe for the elements using [open] function and unsubscribe using [SubscriptionReceiveChannel.close]
2426
* function.
27+
*
28+
* See [BroadcastChannel()][BroadcastChannel.invoke] factory function for the description of available
29+
* broadcast channel implementations.
2530
*/
2631
public interface BroadcastChannel<E> : SendChannel<E> {
32+
/**
33+
* Factory for broadcast channels.
34+
*/
35+
public companion object Factory {
36+
/**
37+
* Creates a broadcast channel with the specified buffer capacity.
38+
*
39+
* The resulting channel type depends on the specified [capacity] parameter:
40+
* * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel];
41+
* * when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
42+
* * otherwise -- throws [IllegalArgumentException].
43+
*/
44+
public operator fun <E> invoke(capacity: Int): BroadcastChannel<E> =
45+
when (capacity) {
46+
0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
47+
UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
48+
CONFLATED -> ConflatedBroadcastChannel()
49+
else -> ArrayBroadcastChannel(capacity)
50+
}
51+
}
52+
2753
/**
2854
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
2955
* The resulting channel shall be [closed][SubscriptionReceiveChannel.close] to unsubscribe from this

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -243,22 +243,22 @@ public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
243243
public const val CONFLATED = -1
244244

245245
/**
246-
* Creates a channel with specified buffer capacity (or without a buffer by default).
246+
* Creates a channel with the specified buffer capacity (or without a buffer by default).
247247
*
248248
* The resulting channel type depends on the specified [capacity] parameter:
249-
* * when `capacity` is 0 -- creates [RendezvousChannel] without a buffer;
249+
* * when `capacity` is 0 (default) -- creates [RendezvousChannel] without a buffer;
250250
* * when `capacity` is [UNLIMITED] -- creates [LinkedListChannel] with buffer of unlimited size;
251251
* * when `capacity` is [CONFLATED] -- creates [ConflatedChannel] that conflates back-to-back sends;
252-
* * otherwise -- creates [ArrayChannel] with a buffer of the specified `capacity`.
252+
* * when `capacity` is positive, but less than [UNLIMITED] -- creates [ArrayChannel] with a buffer of the specified `capacity`;
253+
* * otherwise -- throws [IllegalArgumentException].
253254
*/
254-
public operator fun <E> invoke(capacity: Int = 0): Channel<E> {
255-
return when (capacity) {
255+
public operator fun <E> invoke(capacity: Int = 0): Channel<E> =
256+
when (capacity) {
256257
0 -> RendezvousChannel()
257258
UNLIMITED -> LinkedListChannel()
258259
CONFLATED -> ConflatedChannel()
259260
else -> ArrayChannel(capacity)
260261
}
261-
}
262262
}
263263
}
264264

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import org.hamcrest.MatcherAssert.assertThat
20+
import org.hamcrest.core.IsInstanceOf
21+
import org.junit.Test
22+
23+
class BroadcastChannelFactoryTest {
24+
@Test(expected = IllegalArgumentException::class)
25+
fun testRendezvousChannelNotSupported() {
26+
BroadcastChannel<Int>(0)
27+
}
28+
29+
@Test(expected = IllegalArgumentException::class)
30+
fun testLinkedListChannelNotSupported() {
31+
BroadcastChannel<Int>(Channel.UNLIMITED)
32+
}
33+
34+
@Test
35+
fun testConflatedBroadcastChannel() {
36+
assertThat(BroadcastChannel<Int>(Channel.CONFLATED), IsInstanceOf(ConflatedBroadcastChannel::class.java))
37+
}
38+
39+
@Test
40+
fun testArrayBroadcastChannel() {
41+
assertThat(BroadcastChannel<Int>(1), IsInstanceOf(ArrayBroadcastChannel::class.java))
42+
assertThat(BroadcastChannel<Int>(10), IsInstanceOf(ArrayBroadcastChannel::class.java))
43+
}
44+
45+
@Test(expected = IllegalArgumentException::class)
46+
fun testInvalidCapacityNotSupported() {
47+
BroadcastChannel<Int>(-2)
48+
}
49+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import org.hamcrest.MatcherAssert.assertThat
20+
import org.hamcrest.core.IsInstanceOf
21+
import org.junit.Test
22+
23+
class ChannelFactoryTest {
24+
@Test
25+
fun testRendezvousChannel() {
26+
assertThat(Channel<Int>(), IsInstanceOf(RendezvousChannel::class.java))
27+
assertThat(Channel<Int>(0), IsInstanceOf(RendezvousChannel::class.java))
28+
}
29+
30+
@Test
31+
fun testLinkedListChannel() {
32+
assertThat(Channel<Int>(Channel.UNLIMITED), IsInstanceOf(LinkedListChannel::class.java))
33+
}
34+
35+
@Test
36+
fun testConflatedChannel() {
37+
assertThat(Channel<Int>(Channel.CONFLATED), IsInstanceOf(ConflatedChannel::class.java))
38+
}
39+
40+
@Test
41+
fun testArrayChannel() {
42+
assertThat(Channel<Int>(1), IsInstanceOf(ArrayChannel::class.java))
43+
assertThat(Channel<Int>(10), IsInstanceOf(ArrayChannel::class.java))
44+
}
45+
46+
@Test(expected = IllegalArgumentException::class)
47+
fun testInvalidCapacityNotSupported() {
48+
Channel<Int>(-2)
49+
}
50+
}

0 commit comments

Comments
 (0)