Skip to content

Commit 8bd5254

Browse files
committed
Mutex added
1 parent 992e631 commit 8bd5254

File tree

4 files changed

+256
-2
lines changed

4 files changed

+256
-2
lines changed

kotlinx-coroutines-core/README.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@ These builders can be used with the following [coroutine dispatchers][CoroutineD
2121
| [Executor.toCoroutineDispatcher][java.util.concurrent.Executor.toCoroutineDispatcher] | Extension to convert any executor
2222
| [Unconfined] | Does not confine coroutine execution in any way
2323

24-
The following top-level suspending functions are provided to be used _inside coroutines_:
24+
Synchronization primitives for coroutines:
25+
26+
| **Name** | **Suspending functions** | **Description**
27+
| ---------- | ----------------------------------------------------------- | ---------------
28+
| [Mutex] | [lock][Mutex.lock] | Mutual exclusion
29+
| [Channel] | [send][SendChannel.send], [receive][ReceiveChannel.receive] | Communication channel (aka queue or exchanger)
30+
31+
The following _top-level_ suspending functions are provided to be used inside coroutines:
2532

2633
| **Name** | **Description**
2734
| ------------- | ---------------
@@ -46,6 +53,8 @@ debugging facilities.
4653
[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
4754
[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
4855
[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
56+
[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-mutex/index.html
57+
[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-mutex/lock.html
4958
[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
5059
[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
5160
[java.util.concurrent.Executor.toCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/java.util.concurrent.-executor/to-coroutine-dispatcher.html
@@ -61,8 +70,11 @@ debugging facilities.
6170
[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
6271
[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
6372
<!--- INDEX kotlinx.coroutines.experimental.channels -->
73+
[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
6474
[ProducerJob]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-job/index.html
6575
[ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-scope/index.html
76+
[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
77+
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
6678
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
6779
<!--- END -->
6880

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
20+
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
21+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
22+
23+
/**
24+
* Mutual exclusion for coroutines.
25+
*
26+
* Mutex has two states: _locked_ and _unlocked_.
27+
* It is **non-reentrant**, that is invoking [lock] even from the same thread/coroutine that currently holds
28+
* the lock still suspends the invoker.
29+
*
30+
* @param locked initial state of the mutex
31+
*/
32+
public class Mutex(locked: Boolean = false) {
33+
// State is: Empty | UnlockOp | LockFreeLinkedListHead (queue of Waiter objects)
34+
@Volatile
35+
private var state: Any? = if (locked) EmptyLocked else EmptyUnlocked // shared objects while we have no waiters
36+
37+
private companion object {
38+
@JvmStatic
39+
private val STATE: AtomicReferenceFieldUpdater<Mutex, Any?> =
40+
AtomicReferenceFieldUpdater.newUpdater(Mutex::class.java, Any::class.java, "state")
41+
42+
@JvmStatic
43+
private val EmptyLocked = Empty(true)
44+
45+
@JvmStatic
46+
private val EmptyUnlocked = Empty(false)
47+
}
48+
49+
/**
50+
* Tries to lock this mutex, returning `false` if this mutex is already locked.
51+
*/
52+
public fun tryLock(): Boolean {
53+
while (true) { // lock-free loop on state
54+
val state = this.state
55+
when (state) {
56+
is Empty -> {
57+
if (state.locked) return false
58+
if (STATE.compareAndSet(this, state, EmptyLocked)) return true
59+
}
60+
is UnlockOp -> state.helpComplete() // help
61+
else -> return false
62+
}
63+
}
64+
}
65+
66+
/**
67+
* Locks this mutex, suspending caller while the mutex is locked.
68+
*
69+
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
70+
* function is suspended, this function immediately resumes with [CancellationException].
71+
* Cancellation of suspended lock invocation is *atomic* -- when this function
72+
* throws [CancellationException] it means that the mutex was not locked.
73+
*
74+
* Note, that this function does not check for cancellation when it is not suspended.
75+
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
76+
*/
77+
public suspend fun lock() {
78+
// fast-path -- try lock
79+
if (tryLock()) return
80+
// slow-path -- suspend
81+
return lockSuspend()
82+
}
83+
84+
private suspend fun lockSuspend() = suspendCancellableCoroutine<Unit>(holdCancellability = true) sc@ { cont ->
85+
val waiter = Waiter(cont)
86+
loop@ while (true) { // lock-free loop on state
87+
val state = this.state
88+
when (state) {
89+
is Empty -> {
90+
if (state.locked) {
91+
// try upgrade to queue & retry
92+
STATE.compareAndSet(this, state, LockFreeLinkedListHead())
93+
continue@loop
94+
} else {
95+
// try lock
96+
if (STATE.compareAndSet(this, state, EmptyLocked)) {
97+
// locked
98+
cont.resume(Unit)
99+
return@sc
100+
}
101+
}
102+
}
103+
is UnlockOp -> { // help & retry
104+
state.helpComplete()
105+
continue@loop
106+
}
107+
else -> {
108+
state as LockFreeLinkedListHead // type assertion
109+
if (state.addLastIf(waiter, { this.state === state })) {
110+
// added to waiter list!
111+
cont.initCancellability()
112+
cont.removeOnCompletion(waiter)
113+
return@sc
114+
}
115+
}
116+
}
117+
}
118+
}
119+
120+
/**
121+
* Unlocks this mutex. Throws [IllegalStateException] if invoked on a mutex that is not locked.
122+
*/
123+
public fun unlock() {
124+
while (true) { // lock-free loop on state
125+
val state = this.state
126+
when (state) {
127+
is Empty -> {
128+
check(state.locked) { "Mutex is not locked" }
129+
if (STATE.compareAndSet(this, state, EmptyUnlocked)) return
130+
}
131+
is UnlockOp -> state.helpComplete()
132+
else -> {
133+
state as LockFreeLinkedListHead // type assertion
134+
val waiter = state.removeFirstOrNull()
135+
if (waiter == null) {
136+
val op = UnlockOp(state)
137+
if (STATE.compareAndSet(this, state, op) && op.helpComplete()) return
138+
} else {
139+
val cont = (waiter as Waiter).cont
140+
val token = cont.tryResume(Unit)
141+
if (token != null) {
142+
// successfully resumed waiter that now is holding the lock
143+
cont.completeResume(token)
144+
return
145+
}
146+
}
147+
}
148+
}
149+
}
150+
}
151+
152+
private class Empty(val locked: Boolean) {
153+
override fun toString(): String = "Empty[${if (locked) "Locked" else "Unlocked"}]";
154+
}
155+
156+
private class Waiter(val cont: CancellableContinuation<Unit>) : LockFreeLinkedListNode()
157+
158+
// atomic unlock operation that checks that waiters queue is empty
159+
private inner class UnlockOp(val queue: LockFreeLinkedListHead) {
160+
fun helpComplete(): Boolean {
161+
val success = queue.isEmpty // Note: queue cannot change anymore (so decision is consistent)
162+
val update: Any = if (success) EmptyUnlocked else queue
163+
STATE.compareAndSet(this@Mutex, this@UnlockOp, update)
164+
return success
165+
}
166+
}
167+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import org.junit.Assert.*
20+
import org.junit.Test
21+
22+
class MutexTest : TestBase() {
23+
@Test
24+
fun testSimple() = runBlocking<Unit> {
25+
val mutex = Mutex()
26+
expect(1)
27+
launch(context) {
28+
expect(4)
29+
mutex.lock() // suspends
30+
expect(7) // now got lock
31+
mutex.unlock()
32+
expect(8)
33+
}
34+
expect(2)
35+
mutex.lock() // locked
36+
expect(3)
37+
yield() // yield to child
38+
expect(5)
39+
mutex.unlock()
40+
expect(6)
41+
yield() // now child has lock
42+
finish(9)
43+
}
44+
45+
@Test
46+
fun tryLockTest() {
47+
val mutex = Mutex()
48+
assertTrue(mutex.tryLock())
49+
assertFalse(mutex.tryLock())
50+
mutex.unlock()
51+
assertTrue(mutex.tryLock())
52+
assertFalse(mutex.tryLock())
53+
mutex.unlock()
54+
}
55+
56+
@Test
57+
fun testStress() = runBlocking<Unit> {
58+
val n = 1000
59+
val k = 100
60+
var shared = 0
61+
val mutex = Mutex()
62+
val jobs = List(n) {
63+
launch(CommonPool) {
64+
repeat(k) {
65+
mutex.lock()
66+
shared++
67+
mutex.unlock()
68+
}
69+
}
70+
}
71+
jobs.forEach { it.join() }
72+
println("Shared value = $shared")
73+
assertEquals(n * k, shared)
74+
}
75+
}

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ open class TestBase {
4343
@After
4444
fun onCompletion() {
4545
error.get()?.let { throw it }
46-
check(finished.get()) { "Expecting that 'finish(...)' was invoked, but it was not" }
46+
check(actionIndex.get() == 0 || finished.get()) { "Expecting that 'finish(...)' was invoked, but it was not" }
4747
}
4848

4949
}

0 commit comments

Comments
 (0)