Skip to content

Commit 174c696

Browse files
committed
Implemented "onLock" clause for Mutex.lock and added optional "owner" parameter to all Mutex funs
1 parent b0517ba commit 174c696

File tree

9 files changed

+471
-77
lines changed

9 files changed

+471
-77
lines changed

kotlinx-coroutines-core/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ Top-level suspending functions:
4747
| [SendChannel][kotlinx.coroutines.experimental.channels.SendChannel] | [send][kotlinx.coroutines.experimental.channels.SendChannel.send] | [onSend][kotlinx.coroutines.experimental.selects.SelectBuilder.onSend] | [offer][kotlinx.coroutines.experimental.channels.SendChannel.offer]
4848
| [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [receive][kotlinx.coroutines.experimental.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.experimental.selects.SelectBuilder.onReceive] | [poll][kotlinx.coroutines.experimental.channels.ReceiveChannel.poll]
4949
| [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.experimental.channels.ReceiveChannel.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.experimental.selects.SelectBuilder.onReceiveOrNull] | [poll][kotlinx.coroutines.experimental.channels.ReceiveChannel.poll]
50+
| [Mutex][kotlinx.coroutines.experimental.sync.Mutex] | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock] | [onLock][kotlinx.coroutines.experimental.selects.SelectBuilder.onLock] | [tryLock][kotlinx.coroutines.experimental.sync.Mutex.tryLock]
5051

5152
Cancellation support for user-defined suspending functions is available with [suspendCancellableCoroutine]
5253
helper function. [NonCancellable] job object is provided to suppress cancellation with
@@ -100,6 +101,7 @@ Select expression to perform multiple suspending operations simultaneously until
100101
<!--- INDEX kotlinx.coroutines.experimental.sync -->
101102
[kotlinx.coroutines.experimental.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
102103
[kotlinx.coroutines.experimental.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/lock.html
104+
[kotlinx.coroutines.experimental.sync.Mutex.tryLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/try-lock.html
103105
<!--- INDEX kotlinx.coroutines.experimental.channels -->
104106
[kotlinx.coroutines.experimental.channels.produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
105107
[kotlinx.coroutines.experimental.channels.ProducerJob]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-job/index.html
@@ -122,4 +124,5 @@ Select expression to perform multiple suspending operations simultaneously until
122124
[kotlinx.coroutines.experimental.selects.SelectBuilder.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-send.html
123125
[kotlinx.coroutines.experimental.selects.SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-receive.html
124126
[kotlinx.coroutines.experimental.selects.SelectBuilder.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-receive-or-null.html
127+
[kotlinx.coroutines.experimental.selects.SelectBuilder.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-lock.html
125128
<!--- END -->

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -764,14 +764,6 @@ internal class UnregisterOnCompletion(
764764
override fun toString(): String = "UnregisterOnCompletion[$registration]"
765765
}
766766

767-
private class CancelOnCompletion(
768-
parentJob: Job,
769-
@JvmField val subordinateJob: Job
770-
) : JobNode<Job>(parentJob) {
771-
override fun invoke(reason: Throwable?) { subordinateJob.cancel(reason) }
772-
override fun toString(): String = "CancelOnCompletion[$subordinateJob]"
773-
}
774-
775767
private class ParentOnCompletion(
776768
parentJob: Job,
777769
@JvmField val subordinateJob: JobSupport
@@ -804,8 +796,6 @@ private class SelectJoinOnCompletion<R>(
804796
override fun toString(): String = "SelectJoinOnCompletion[$select]"
805797
}
806798

807-
808-
809799
private class JobImpl(parent: Job? = null) : JobSupport(true) {
810800
init { initParentJob(parent) }
811801
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
192192
element: E,
193193
select: SelectInstance<R>,
194194
block: suspend () -> R
195-
) : AddLastDesc(queue, SendSelect(element, select, block)) {
195+
) : AddLastDesc<SendSelect<R>>(queue, SendSelect(element, select, block)) {
196196
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
197197
if (affected is ReceiveOrClosed<*>) {
198198
return affected as? Closed<*> ?: ENQUEUE_FAILED
@@ -370,11 +370,11 @@ public abstract class AbstractChannel<E> : Channel<E> {
370370
}
371371
}
372372

373-
private inner class TryEnqueueReceiveDesc<E, R>(
373+
private inner class TryEnqueueReceiveDesc<in E, R>(
374374
select: SelectInstance<R>,
375375
block: suspend (E?) -> R,
376376
nullOnClose: Boolean
377-
) : AddLastDesc(queue, ReceiveSelect(select, block, nullOnClose)) {
377+
) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, nullOnClose)) {
378378
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
379379
if (affected is Send) return ENQUEUE_FAILED
380380
return null

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public typealias RemoveFirstDesc<T> = LockFreeLinkedListNode.RemoveFirstDesc<T>
4848
/**
4949
* @suppress **This is unstable API and it is subject to change.**
5050
*/
51-
public typealias AddLastDesc = LockFreeLinkedListNode.AddLastDesc
51+
public typealias AddLastDesc<T> = LockFreeLinkedListNode.AddLastDesc<T>
5252

5353
/**
5454
* Doubly-linked concurrent list node with remove support.
@@ -160,7 +160,7 @@ public open class LockFreeLinkedListNode {
160160
}
161161
}
162162

163-
public fun describeAddLast(node: Node): AddLastDesc = AddLastDesc(this, node)
163+
public fun <T : Node> describeAddLast(node: T): AddLastDesc<T> = AddLastDesc(this, node)
164164

165165
/**
166166
* Adds last item to this list atomically if the [condition] is true.
@@ -293,7 +293,10 @@ public open class LockFreeLinkedListNode {
293293

294294
// ------ multi-word atomic operations helpers ------
295295

296-
public open class AddLastDesc(val queue: Node, val node: Node) : AbstractAtomicDesc() {
296+
public open class AddLastDesc<out T : Node>(
297+
@JvmField val queue: Node,
298+
@JvmField val node: T
299+
) : AbstractAtomicDesc() {
297300
init {
298301
// require freshly allocated node here
299302
check(node._next === node && node._prev === node)
@@ -337,7 +340,9 @@ public open class LockFreeLinkedListNode {
337340
}
338341
}
339342

340-
public open class RemoveFirstDesc<T>(val queue: Node) : AbstractAtomicDesc() {
343+
public open class RemoveFirstDesc<T>(
344+
@JvmField val queue: Node
345+
) : AbstractAtomicDesc() {
341346
@Suppress("UNCHECKED_CAST")
342347
public val result: T get() = affectedNode!! as T
343348

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import kotlinx.coroutines.experimental.channels.ClosedSendChannelException
2222
import kotlinx.coroutines.experimental.channels.ReceiveChannel
2323
import kotlinx.coroutines.experimental.channels.SendChannel
2424
import kotlinx.coroutines.experimental.internal.AtomicDesc
25+
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
26+
import kotlinx.coroutines.experimental.sync.Mutex
2527
import kotlin.coroutines.experimental.Continuation
2628
import kotlin.coroutines.experimental.CoroutineContext
2729
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
@@ -66,6 +68,14 @@ public interface SelectBuilder<in R> {
6668
* the original [close][SendChannel.close] cause exception if the channel has _failed_.
6769
*/
6870
public fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R)
71+
72+
/**
73+
* Clause for [Mutex.lock] suspending function that selects the given [block] when the mutex is locked.
74+
*
75+
* @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
76+
* is already locked with the same token (same identity), this clause throws [IllegalStateException].
77+
*/
78+
public fun Mutex.onLock(owner: Any? = null, block: suspend () -> R)
6979
}
7080

7181
/**
@@ -107,6 +117,8 @@ public interface SelectInstance<in R> {
107117
public fun invokeOnCompletion(handler: CompletionHandler): Job.Registration
108118

109119
public fun unregisterOnCompletion(registration: Job.Registration)
120+
121+
public fun removeOnCompletion(node: LockFreeLinkedListNode)
110122
}
111123

112124
/**
@@ -133,6 +145,7 @@ public interface SelectInstance<in R> {
133145
* | [SendChannel] | [send][SendChannel.send] | [onSend][SelectBuilder.onSend] | [offer][SendChannel.offer]
134146
* | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][SelectBuilder.onReceive] | [poll][ReceiveChannel.poll]
135147
* | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][SelectBuilder.onReceiveOrNull] | [poll][ReceiveChannel.poll]
148+
* | [Mutex] | [lock][Mutex.lock] | [onLock][SelectBuilder.onLock] | [tryLock][Mutex.tryLock]
136149
*
137150
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
138151
* function is suspended, this function immediately resumes with [CancellationException].
@@ -224,7 +237,23 @@ internal class SelectBuilderImpl<in R>(
224237
registerSelectReceiveOrNull(this@SelectBuilderImpl, block)
225238
}
226239

240+
override fun Mutex.onLock(owner: Any?, block: suspend () -> R) {
241+
registerSelectLock(this@SelectBuilderImpl, owner, block)
242+
}
243+
227244
override fun unregisterOnCompletion(registration: Job.Registration) {
228245
invokeOnCompletion(UnregisterOnCompletion(this, registration))
229246
}
247+
248+
override fun removeOnCompletion(node: LockFreeLinkedListNode) {
249+
invokeOnCompletion(RemoveOnCompletion(this, node))
250+
}
251+
}
252+
253+
private class RemoveOnCompletion(
254+
select: SelectBuilderImpl<*>,
255+
@JvmField val node: LockFreeLinkedListNode
256+
) : JobNode<SelectBuilderImpl<*>>(select) {
257+
override fun invoke(reason: Throwable?) { node.remove() }
258+
override fun toString(): String = "RemoveOnCompletion[$node]"
230259
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import kotlinx.coroutines.experimental.Deferred
2020
import kotlinx.coroutines.experimental.Job
2121
import kotlinx.coroutines.experimental.channels.ReceiveChannel
2222
import kotlinx.coroutines.experimental.channels.SendChannel
23+
import kotlinx.coroutines.experimental.sync.Mutex
2324
import java.util.*
2425
import kotlin.coroutines.experimental.Continuation
2526
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
@@ -86,4 +87,8 @@ internal class UnbiasedSelectBuilderImpl<in R>(cont: Continuation<R>) : SelectBu
8687
override fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R) {
8788
clauses += { registerSelectReceiveOrNull(instance, block) }
8889
}
90+
91+
override fun Mutex.onLock(owner: Any?, block: suspend () -> R) {
92+
clauses += { registerSelectLock(instance, owner, block) }
93+
}
8994
}

0 commit comments

Comments
 (0)