Skip to content

Commit 167ca63

Browse files
committed
Implement CopyOnWriteList on JS, add reproducer for #412
Fixes #412
1 parent 1ce6c0b commit 167ca63

File tree

5 files changed

+174
-18
lines changed

5 files changed

+174
-18
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,14 @@ class ArrayBroadcastChannel<E>(
4444
require(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
4545
}
4646

47+
/*
48+
* Writes to buffer are guarded by bufferLock, but reads from buffer are concurrent with writes
49+
* - Write element to buffer then write "tail" (volatile)
50+
* - Read "tail" (volatile), then read element from buffer
51+
* So read/writes to buffer need not be volatile
52+
*/
4753
private val bufferLock = ReentrantLock()
48-
private val buffer = arrayOfNulls<Any?>(capacity) // guarded by bufferLock
54+
private val buffer = arrayOfNulls<Any?>(capacity)
4955

5056
// head & tail are Long (64 bits) and we assume that they never wrap around
5157
// head, tail, and size are guarded by bufferLock
@@ -56,14 +62,7 @@ class ArrayBroadcastChannel<E>(
5662
@Volatile
5763
private var size: Int = 0
5864

59-
/*
60-
Writes to buffer are guarded by bufferLock, but reads from buffer are concurrent with writes
61-
- Write element to buffer then write "tail" (volatile)
62-
- Read "tail" (volatile), then read element from buffer
63-
So read/writes to buffer need not be volatile
64-
*/
65-
66-
private val subs = subscriberList<Subscriber<E>>()
65+
private val subscribers = subscriberList<Subscriber<E>>()
6766

6867
override val isBufferAlwaysFull: Boolean get() = false
6968
override val isBufferFull: Boolean get() = size >= capacity
@@ -81,7 +80,7 @@ class ArrayBroadcastChannel<E>(
8180

8281
public override fun cancel(cause: Throwable?): Boolean =
8382
close(cause).also {
84-
for (sub in subs) sub.cancel(cause)
83+
for (sub in subscribers) sub.cancel(cause)
8584
}
8685

8786
// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
@@ -96,7 +95,7 @@ class ArrayBroadcastChannel<E>(
9695
this.size = size + 1
9796
this.tail = tail + 1
9897
}
99-
// if offered successfully, then check subs outside of lock
98+
// if offered successfully, then check subscribers outside of lock
10099
checkSubOffers()
101100
return OFFER_SUCCESS
102101
}
@@ -117,7 +116,7 @@ class ArrayBroadcastChannel<E>(
117116
this.size = size + 1
118117
this.tail = tail + 1
119118
}
120-
// if offered successfully, then check subs outside of lock
119+
// if offered successfully, then check subscribers outside of lock
121120
checkSubOffers()
122121
return OFFER_SUCCESS
123122
}
@@ -126,7 +125,7 @@ class ArrayBroadcastChannel<E>(
126125
var updated = false
127126
var hasSubs = false
128127
@Suppress("LoopToCallChain") // must invoke `checkOffer` on every sub
129-
for (sub in subs) {
128+
for (sub in subscribers) {
130129
hasSubs = true
131130
if (sub.checkOffer()) updated = true
132131
}
@@ -142,12 +141,12 @@ class ArrayBroadcastChannel<E>(
142141
bufferLock.withLock {
143142
if (addSub != null) {
144143
addSub.subHead = tail // start from last element
145-
val wasEmpty = subs.isEmpty()
146-
subs.add(addSub)
144+
val wasEmpty = subscribers.isEmpty()
145+
subscribers.add(addSub)
147146
if (!wasEmpty) return // no need to update when adding second and etc sub
148147
}
149148
if (removeSub != null) {
150-
subs.remove(removeSub)
149+
subscribers.remove(removeSub)
151150
if (head != removeSub.subHead) return // no need to update
152151
}
153152
val minHead = computeMinHead()
@@ -190,7 +189,7 @@ class ArrayBroadcastChannel<E>(
190189

191190
private fun computeMinHead(): Long {
192191
var minHead = Long.MAX_VALUE
193-
for (sub in subs)
192+
for (sub in subscribers)
194193
minHead = minHead.coerceAtMost(sub.subHead) // volatile (atomic) reads of subHead
195194
return minHead
196195
}

common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,30 @@ import kotlin.coroutines.experimental.*
2121
import kotlin.test.*
2222

2323
class ArrayBroadcastChannelTest : TestBase() {
24+
25+
@Test
26+
fun testConcurrentModification() = runTest {
27+
val channel = ArrayBroadcastChannel<Int>(1)
28+
val s1 = channel.openSubscription()
29+
val s2 = channel.openSubscription()
30+
31+
val job1 = launch(Unconfined, CoroutineStart.UNDISPATCHED) {
32+
expect(1)
33+
s1.receive()
34+
s1.cancel()
35+
}
36+
37+
val job2 = launch(Unconfined, CoroutineStart.UNDISPATCHED) {
38+
expect(2)
39+
s2.receive()
40+
}
41+
42+
expect(3)
43+
channel.send(1)
44+
joinAll(job1, job2)
45+
finish(4)
46+
}
47+
2448
@Test
2549
fun testBasic() = runTest {
2650
expect(1)

common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,29 @@ import kotlin.test.*
2222

2323
class ConflatedBroadcastChannelTest : TestBase() {
2424

25+
@Test
26+
fun testConcurrentModification() = runTest {
27+
val channel = ConflatedBroadcastChannel<Int>()
28+
val s1 = channel.openSubscription()
29+
val s2 = channel.openSubscription()
30+
31+
val job1 = launch(Unconfined, CoroutineStart.UNDISPATCHED) {
32+
expect(1)
33+
s1.receive()
34+
s1.cancel()
35+
}
36+
37+
val job2 = launch(Unconfined, CoroutineStart.UNDISPATCHED) {
38+
expect(2)
39+
s2.receive()
40+
}
41+
42+
expect(3)
43+
channel.send(1)
44+
joinAll(job1, job2)
45+
finish(4)
46+
}
47+
2548
@Test
2649
fun testBasicScenario() = runTest {
2750
expect(1)

js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ internal class NoOpLock {
99
fun unlock(): Unit {}
1010
}
1111

12-
internal actual fun <E> subscriberList(): MutableList<E> = ArrayList()
12+
internal actual fun <E> subscriberList(): SubscribersList<E> = CopyOnWriteList()
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.internal
18+
19+
/**
20+
* Analogue of java.util.concurrent.CopyOnWriteArrayList for JS.
21+
* Even though JS has no real concurrency, [CopyOnWriteList] is essential to manage any kinds
22+
* of callbacks or continuations.
23+
*
24+
* Implementation note: most of the methods fallbacks to [AbstractMutableList] (thus inefficient for CoW pattern)
25+
* and some methods are unsupported, because currently they are not required for this class consumers.
26+
*/
27+
internal class CopyOnWriteList<E>(private var array: Array<E> = emptyArray()) : AbstractMutableList<E>() {
28+
29+
override val size: Int get() = array.size
30+
31+
override fun add(element: E): Boolean {
32+
val copy = array.asDynamic().slice()
33+
copy.push(element)
34+
array = copy as Array<E>
35+
return true
36+
}
37+
38+
override fun add(index: Int, element: E) {
39+
val copy = array.asDynamic().slice()
40+
copy.splice(insertionRangeCheck(index), 0, element)
41+
array = copy as Array<E>
42+
}
43+
44+
override fun remove(element: E): Boolean {
45+
for (index in array.indices) {
46+
if (array[index] == element) {
47+
val copy = array.asDynamic().slice()
48+
copy.splice(index, 1)
49+
array = copy as Array<E>
50+
return true
51+
}
52+
}
53+
54+
return false
55+
}
56+
57+
override fun removeAt(index: Int): E {
58+
modCount++
59+
val copy = array.asDynamic().slice()
60+
val result = if (index == lastIndex) {
61+
copy.pop()
62+
} else {
63+
copy.splice(index, 1)[0]
64+
}
65+
66+
array = copy as Array<E>
67+
return result as E
68+
}
69+
70+
override fun iterator(): MutableIterator<E> = IteratorImpl(array)
71+
72+
override fun listIterator(): MutableListIterator<E> = throw UnsupportedOperationException("Operation is not supported")
73+
74+
override fun listIterator(index: Int): MutableListIterator<E> = throw UnsupportedOperationException("Operation is not supported")
75+
76+
override fun isEmpty(): Boolean = size == 0
77+
78+
override fun set(index: Int, element: E): E = throw UnsupportedOperationException("Operation is not supported")
79+
80+
override fun get(index: Int): E = array[rangeCheck(index)]
81+
82+
private class IteratorImpl<E>(private var array: Array<E>) : MutableIterator<E> {
83+
84+
private var current = 0
85+
86+
override fun hasNext(): Boolean = current != array.size
87+
88+
override fun next(): E {
89+
if (!hasNext()) {
90+
throw NoSuchElementException()
91+
}
92+
93+
return array[current++]
94+
}
95+
96+
override fun remove() = throw UnsupportedOperationException("Operation is not supported")
97+
}
98+
99+
private fun insertionRangeCheck(index: Int) {
100+
if (index < 0 || index > size) {
101+
throw IndexOutOfBoundsException("index: $index, size: $size")
102+
}
103+
}
104+
105+
private fun rangeCheck(index: Int) = index.apply {
106+
if (index < 0 || index >= size) {
107+
throw IndexOutOfBoundsException("index: $index, size: $size")
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)