Skip to content

Commit 0f66a6d

Browse files
committed
BroadcastChannel.open is renamed to openSubscription
Fixed #54
1 parent 921b0cf commit 0f66a6d

File tree

20 files changed

+94
-65
lines changed

20 files changed

+94
-65
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import kotlin.concurrent.withLock
2727
* Sender suspends only when buffer is full due to one of the receives being slow to consume and
2828
* receiver suspends only when buffer is empty.
2929
*
30-
* Note, that elements that are sent to the broadcast channel while there are no [open] subscribers are immediately
30+
* Note, that elements that are sent to the broadcast channel while there are no [openSubscription] subscribers are immediately
3131
* lost.
3232
*
3333
* This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
@@ -61,7 +61,7 @@ class ArrayBroadcastChannel<E>(
6161
override val isBufferAlwaysFull: Boolean get() = false
6262
override val isBufferFull: Boolean get() = size >= capacity
6363

64-
override fun open(): SubscriptionReceiveChannel<E> {
64+
override fun openSubscription(): SubscriptionReceiveChannel<E> {
6565
val sub = Subscriber(this, head)
6666
subs.add(sub)
6767
// between creating and adding of subscription into the list the buffer head could have been bumped past it,

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.io.Closeable
2222

2323
/**
2424
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
25-
* that subscribe for the elements using [open] function and unsubscribe using [SubscriptionReceiveChannel.close]
25+
* that subscribe for the elements using [openSubscription] function and unsubscribe using [SubscriptionReceiveChannel.close]
2626
* function.
2727
*
2828
* See [BroadcastChannel()][BroadcastChannel.invoke] factory function for the description of available
@@ -55,11 +55,18 @@ public interface BroadcastChannel<E> : SendChannel<E> {
5555
* The resulting channel shall be [closed][SubscriptionReceiveChannel.close] to unsubscribe from this
5656
* broadcast channel.
5757
*/
58-
public fun open(): SubscriptionReceiveChannel<E>
58+
public fun openSubscription(): SubscriptionReceiveChannel<E>
59+
60+
/**
61+
* @suppress **Deprecated**: Renamed to [openSubscription]
62+
*/
63+
@Deprecated(message = "Renamed to `openSubscription`",
64+
replaceWith = ReplaceWith("openSubscription()"))
65+
public fun open(): SubscriptionReceiveChannel<E> = openSubscription()
5966
}
6067

6168
/**
62-
* Return type for [BroadcastChannel.open] that can be used to [receive] elements from the
69+
* Return type for [BroadcastChannel.openSubscription] that can be used to [receive] elements from the
6370
* open subscription and to [close] it to unsubscribe.
6471
*/
6572
public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit
3131
*/
3232
// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
3333
public suspend fun <E> BroadcastChannel<E>.consumeEach(action: suspend (E) -> Unit) {
34-
open().use { channel ->
34+
openSubscription().use { channel ->
3535
for (x in channel) action(x)
3636
}
3737
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
2323
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
2424

2525
/**
26-
* Broadcasts the most recently sent element (aka [value]) to all [open] subscribers.
26+
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
2727
*
2828
* Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received,
2929
* while previously sent elements **are lost**.
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
3333
* A secondary constructor can be used to create an instance of this class that already holds a value.
3434
*
3535
* This implementation is fully lock-free. In this implementation
36-
* [opening][open] and [closing][SubscriptionReceiveChannel.close] subscription takes O(N) time, where N is the
36+
* [opening][openSubscription] and [closing][SubscriptionReceiveChannel.close] subscription takes O(N) time, where N is the
3737
* number of subscribers.
3838
*/
3939
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
@@ -124,7 +124,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
124124
override val isFull: Boolean get() = false
125125

126126
@Suppress("UNCHECKED_CAST")
127-
override fun open(): SubscriptionReceiveChannel<E> {
127+
override fun openSubscription(): SubscriptionReceiveChannel<E> {
128128
val subscriber = Subscriber<E>(this)
129129
while (true) { // lock-free loop on state
130130
val state = this.state

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class ArrayBroadcastChannelTest : TestBase() {
2828
expect(1)
2929
val broadcast = ArrayBroadcastChannel<Int>(1)
3030
assertThat(broadcast.isClosedForSend, IsEqual(false))
31-
val first = broadcast.open()
31+
val first = broadcast.openSubscription()
3232
launch(context, CoroutineStart.UNDISPATCHED) {
3333
expect(2)
3434
assertThat(first.receive(), IsEqual(1)) // suspends
@@ -46,7 +46,7 @@ class ArrayBroadcastChannelTest : TestBase() {
4646
expect(4)
4747
yield() // to the first receiver
4848
expect(6)
49-
val second = broadcast.open()
49+
val second = broadcast.openSubscription()
5050
launch(context, CoroutineStart.UNDISPATCHED) {
5151
expect(7)
5252
assertThat(second.receive(), IsEqual(2)) // suspends
@@ -72,7 +72,7 @@ class ArrayBroadcastChannelTest : TestBase() {
7272
fun testSendSuspend() = runBlocking {
7373
expect(1)
7474
val broadcast = ArrayBroadcastChannel<Int>(1)
75-
val first = broadcast.open()
75+
val first = broadcast.openSubscription()
7676
launch(context) {
7777
expect(4)
7878
assertThat(first.receive(), IsEqual(1))
@@ -91,7 +91,7 @@ class ArrayBroadcastChannelTest : TestBase() {
9191
fun testConcurrentSendCompletion() = runBlocking {
9292
expect(1)
9393
val broadcast = ArrayBroadcastChannel<Int>(1)
94-
val sub = broadcast.open()
94+
val sub = broadcast.openSubscription()
9595
// launch 3 concurrent senders (one goes buffer, two other suspend)
9696
for (x in 1..3) {
9797
launch(context, CoroutineStart.UNDISPATCHED) {
@@ -121,7 +121,7 @@ class ArrayBroadcastChannelTest : TestBase() {
121121
broadcast.send(2)
122122
broadcast.send(3)
123123
expect(2) // should not suspend anywhere above
124-
val sub = broadcast.open()
124+
val sub = broadcast.openSubscription()
125125
launch(context, CoroutineStart.UNDISPATCHED) {
126126
expect(3)
127127
assertThat(sub.receive(), IsEqual(4)) // suspends

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelNotifyStressTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
9393
}
9494

9595
suspend fun waitForEvent(): Int =
96-
broadcast.open().use {
96+
broadcast.openSubscription().use {
9797
it.receive()
9898
}
9999
}

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import kotlinx.coroutines.experimental.*
2020
import org.hamcrest.core.IsEqual
2121
import org.hamcrest.core.IsInstanceOf
2222
import org.hamcrest.core.IsNull
23-
import org.junit.Assert.*
23+
import org.junit.Assert.assertThat
2424
import org.junit.Test
2525

2626
class ConflatedBroadcastChannelTest : TestBase() {
@@ -32,7 +32,7 @@ class ConflatedBroadcastChannelTest : TestBase() {
3232
assertThat(broadcast.valueOrNull, IsNull())
3333
launch(context, CoroutineStart.UNDISPATCHED) {
3434
expect(2)
35-
val sub = broadcast.open()
35+
val sub = broadcast.openSubscription()
3636
assertThat(sub.poll(), IsNull())
3737
expect(3)
3838
assertThat(sub.receive(), IsEqual("one")) // suspends
@@ -51,7 +51,7 @@ class ConflatedBroadcastChannelTest : TestBase() {
5151
expect(7)
5252
launch(context, CoroutineStart.UNDISPATCHED) {
5353
expect(8)
54-
val sub = broadcast.open()
54+
val sub = broadcast.openSubscription()
5555
assertThat(sub.receive(), IsEqual("one")) // does not suspend
5656
expect(9)
5757
assertThat(sub.receive(), IsEqual("two")) // suspends
@@ -93,7 +93,7 @@ class ConflatedBroadcastChannelTest : TestBase() {
9393
assertThat(broadcast.valueOrNull, IsEqual(1))
9494
launch(context, CoroutineStart.UNDISPATCHED) {
9595
expect(2)
96-
val sub = broadcast.open()
96+
val sub = broadcast.openSubscription()
9797
assertThat(sub.receive(), IsEqual(1))
9898
expect(3)
9999
assertThat(exceptionFrom { sub.receive() }, IsInstanceOf(ClosedReceiveChannelException::class.java)) // suspends

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ enum class TestChannelKind {
6262
private class ChannelViaBroadcast<E>(
6363
private val broadcast: BroadcastChannel<E>
6464
): Channel<E>, SendChannel<E> by broadcast {
65-
val sub = broadcast.open()
65+
val sub = broadcast.openSubscription()
6666

6767
override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
6868
override val isEmpty: Boolean get() = sub.isEmpty

reactive/coroutines-guide-reactive.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ method with it.
223223

224224
An example in the previous section uses `source.consumeEach { ... }` snippet to open a subscription
225225
and receive all the elements from it. If we need more control on how what to do with
226-
the elements that are being received from the channel, we can use [Publisher.open][org.reactivestreams.Publisher.open]
226+
the elements that are being received from the channel, we can use [Publisher.openSubscription][org.reactivestreams.Publisher.openSubscription]
227227
as shown in the following example:
228228

229229
<!--- INCLUDE
@@ -238,7 +238,7 @@ fun main(args: Array<String>) = runBlocking<Unit> {
238238
.doOnSubscribe { println("OnSubscribe") } // provide some insight
239239
.doFinally { println("Finally") } // ... into what's going on
240240
var cnt = 0
241-
source.open().use { channel -> // open channel to the source
241+
source.openSubscription().use { channel -> // open channel to the source
242242
for (x in channel) { // iterate over the channel to receive elements from it
243243
println(x)
244244
if (++cnt >= 3) break // break when 3 elements are printed
@@ -262,7 +262,7 @@ Finally
262262

263263
<!--- TEST -->
264264

265-
With an explicit `open` we should [close][SubscriptionReceiveChannel.close] the corresponding
265+
With an explicit `openSubscription` we should [close][SubscriptionReceiveChannel.close] the corresponding
266266
subscription to unsubscribe from the source. However, instead of invoking `close` explicitly,
267267
this code relies on [use](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html)
268268
function from Kotlin's standard library.
@@ -676,11 +676,11 @@ import kotlinx.coroutines.experimental.selects.whileSelect
676676

677677
```kotlin
678678
fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
679-
this@takeUntil.open().use { thisChannel -> // explicitly open channel to Publisher<T>
680-
other.open().use { otherChannel -> // explicitly open channel to Publisher<U>
679+
this@takeUntil.openSubscription().use { thisChannel -> // explicitly open channel to Publisher<T>
680+
other.openSubscription().use { otherChannel -> // explicitly open channel to Publisher<U>
681681
whileSelect {
682-
otherChannel.onReceive { false } // bail out on any received element from `other`
683-
thisChannel.onReceive { send(it); true } // resend element from this channel and continue
682+
otherChannel.onReceive { false } // bail out on any received element from `other`
683+
thisChannel.onReceive { send(it); true } // resend element from this channel and continue
684684
}
685685
}
686686
}
@@ -1071,7 +1071,7 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker
10711071
<!--- INDEX kotlinx.coroutines.experimental.reactive -->
10721072
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/publish.html
10731073
[org.reactivestreams.Publisher.consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/consume-each.html
1074-
[org.reactivestreams.Publisher.open]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/open.html
1074+
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/open-subscription.html
10751075
<!--- MODULE kotlinx-coroutines-rx2 -->
10761076
<!--- INDEX kotlinx.coroutines.experimental.rx2 -->
10771077
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-flowable.html

reactive/kotlinx-coroutines-reactive/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Suspending extension functions and suspending iteration:
1616
| [Publisher.awaitFirstOrDefault][org.reactivestreams.Publisher.awaitFirstOrDefault] | Returns the first value from the given publisher or default
1717
| [Publisher.awaitLast][org.reactivestreams.Publisher.awaitFirst] | Returns the last value from the given publisher
1818
| [Publisher.awaitSingle][org.reactivestreams.Publisher.awaitSingle] | Returns the single value from the given publisher
19-
| [Publisher.open][org.reactivestreams.Publisher.open] | Subscribes to publisher and returns [ReceiveChannel]
19+
| [Publisher.openSubscription][org.reactivestreams.Publisher.openSubscription] | Subscribes to publisher and returns [ReceiveChannel]
2020
| [Publisher.iterator][org.reactivestreams.Publisher.iterator] | Subscribes to publisher and returns [ChannelIterator]
2121

2222
Conversion functions:
@@ -37,7 +37,7 @@ Conversion functions:
3737
[org.reactivestreams.Publisher.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-first.html
3838
[org.reactivestreams.Publisher.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-first-or-default.html
3939
[org.reactivestreams.Publisher.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-single.html
40-
[org.reactivestreams.Publisher.open]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/open.html
40+
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/open-subscription.html
4141
[org.reactivestreams.Publisher.iterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/iterator.html
4242
[kotlinx.coroutines.experimental.channels.ReceiveChannel.asPublisher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/kotlinx.coroutines.experimental.channels.-receive-channel/as-publisher.html
4343
<!--- END -->

0 commit comments

Comments
 (0)