Skip to content

Commit 537c359

Browse files
committed
Turn all consumeEach and withLock into inline suspend functions
1 parent 5f3008a commit 537c359

File tree

5 files changed

+53
-23
lines changed
  • kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental
  • reactive
    • kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive
    • kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1
    • kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2

5 files changed

+53
-23
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,29 @@ internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
2121
/**
2222
* Performs the given [action] for each received element.
2323
*/
24-
// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
25-
public suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit) {
24+
public inline suspend fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit) {
2625
for (element in this) action(element)
2726
}
2827

28+
/**
29+
* @suppress: **Deprecated**: binary compatibility with old code
30+
*/
31+
@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
32+
public suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit) =
33+
consumeEach { action(it) }
34+
2935
/**
3036
* Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
3137
*/
32-
// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
33-
public suspend fun <E> BroadcastChannel<E>.consumeEach(action: suspend (E) -> Unit) {
38+
public inline suspend fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit) {
3439
openSubscription().use { channel ->
3540
for (x in channel) action(x)
3641
}
3742
}
43+
44+
/**
45+
* @suppress: **Deprecated**: binary compatibility with old code
46+
*/
47+
@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
48+
public suspend fun <E> BroadcastChannel<E>.consumeEach(action: suspend (E) -> Unit) =
49+
consumeEach { action(it) }

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,7 @@ public fun Mutex(locked: Boolean = false): Mutex = MutexImpl(locked)
121121
*
122122
* @return the return value of the action.
123123
*/
124-
// :todo: this function needs to be make inline as soon as this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
125-
public suspend fun <T> Mutex.withLock(owner: Any? = null, action: suspend () -> T): T {
124+
public inline suspend fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
126125
lock(owner)
127126
try {
128127
return action()
@@ -131,24 +130,26 @@ public suspend fun <T> Mutex.withLock(owner: Any? = null, action: suspend () ->
131130
}
132131
}
133132

133+
/**
134+
* @suppress: **Deprecated**: binary compatibility with old code
135+
*/
136+
@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
137+
public suspend fun <T> Mutex.withLock(owner: Any? = null, action: suspend () -> T): T =
138+
withLock(owner) { action() }
139+
134140
/**
135141
* @suppress: **Deprecated**: Use [withLock]
136142
*/
137143
@Deprecated("Use `withLock(owner, action)", level = DeprecationLevel.HIDDEN)
138-
public suspend fun <T> Mutex.withLock(action: suspend () -> T): T = withLock(null, action)
144+
public suspend fun <T> Mutex.withLock(action: suspend () -> T): T =
145+
withLock { action() }
139146

140147
/**
141148
* @suppress: **Deprecated**: Use [withLock]
142149
*/
143150
@Deprecated("Use `withLock`", replaceWith = ReplaceWith("withLock(action)"))
144-
public suspend fun <T> Mutex.withMutex(action: suspend () -> T): T {
145-
lock()
146-
try {
147-
return action()
148-
} finally {
149-
unlock()
150-
}
151-
}
151+
public suspend fun <T> Mutex.withMutex(action: suspend () -> T): T =
152+
withLock { action() }
152153

153154
private val LOCK_FAIL = Symbol("LOCK_FAIL")
154155
private val ENQUEUE_FAIL = Symbol("ENQUEUE_FAIL")

reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Channel.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,19 @@ public operator fun <T> Publisher<T>.iterator() = openSubscription().iterator()
5858
/**
5959
* Subscribes to this [Publisher] and performs the specified action for each received element.
6060
*/
61-
// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
62-
public suspend fun <T> Publisher<T>.consumeEach(action: suspend (T) -> Unit) {
61+
public inline suspend fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) {
6362
openSubscription().use { channel ->
6463
for (x in channel) action(x)
6564
}
6665
}
6766

67+
/**
68+
* @suppress: **Deprecated**: binary compatibility with old code
69+
*/
70+
@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
71+
public suspend fun <T> Publisher<T>.consumeEach(action: suspend (T) -> Unit) =
72+
consumeEach { action(it) }
73+
6874
private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Subscriber<T> {
6975
@Volatile
7076
@JvmField

reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,19 @@ public operator fun <T> Observable<T>.iterator() = openSubscription().iterator()
5959
/**
6060
* Subscribes to this [Observable] and performs the specified action for each received element.
6161
*/
62-
// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
63-
public suspend fun <T> Observable<T>.consumeEach(action: suspend (T) -> Unit) {
62+
public inline suspend fun <T> Observable<T>.consumeEach(action: (T) -> Unit) {
6463
openSubscription().use { channel ->
6564
for (x in channel) action(x)
6665
}
6766
}
6867

68+
/**
69+
* @suppress: **Deprecated**: binary compatibility with old code
70+
*/
71+
@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
72+
public suspend fun <T> Observable<T>.consumeEach(action: suspend (T) -> Unit) =
73+
consumeEach { action(it) }
74+
6975
private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T> {
7076
@JvmField
7177
val subscriber: ChannelSubscriber = ChannelSubscriber()

reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,7 @@ public operator fun <T> ObservableSource<T>.iterator() = openSubscription().iter
7171
/**
7272
* Subscribes to this [MaybeSource] and performs the specified action for each received element.
7373
*/
74-
// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
75-
public suspend fun <T> MaybeSource<T>.consumeEach(action: suspend (T) -> Unit) {
74+
public inline suspend fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) {
7675
openSubscription().use { channel ->
7776
for (x in channel) action(x)
7877
}
@@ -81,13 +80,19 @@ public suspend fun <T> MaybeSource<T>.consumeEach(action: suspend (T) -> Unit) {
8180
/**
8281
* Subscribes to this [ObservableSource] and performs the specified action for each received element.
8382
*/
84-
// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
85-
public suspend fun <T> ObservableSource<T>.consumeEach(action: suspend (T) -> Unit) {
83+
public inline suspend fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) {
8684
openSubscription().use { channel ->
8785
for (x in channel) action(x)
8886
}
8987
}
9088

89+
/**
90+
* @suppress: **Deprecated**: binary compatibility with old code
91+
*/
92+
@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
93+
public suspend fun <T> ObservableSource<T>.consumeEach(action: suspend (T) -> Unit) =
94+
consumeEach { action(it) }
95+
9196
private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Observer<T>, MaybeObserver<T> {
9297
@Volatile
9398
var subscription: Disposable? = null

0 commit comments

Comments
 (0)