@@ -47,7 +47,7 @@ class ArrayBroadcastChannel<E>(
47
47
}
48
48
49
49
private val bufferLock = ReentrantLock ()
50
- private val buffer: Array < Any ?> = arrayOfNulls<Any ?>(capacity) // guarded by lock
50
+ private val buffer = arrayOfNulls<Any ?>(capacity) // guarded by bufferLock
51
51
52
52
// head & tail are Long (64 bits) and we assume that they never wrap around
53
53
// head, tail, and size are guarded by bufferLock
@@ -58,24 +58,24 @@ class ArrayBroadcastChannel<E>(
58
58
@Volatile
59
59
private var size: Int = 0
60
60
61
+ /*
62
+ Writes to buffer are guarded by bufferLock, but reads from buffer are concurrent with writes
63
+ - Write element to buffer then write "tail" (volatile)
64
+ - Read "tail" (volatile), then read element from buffer
65
+ So read/writes to buffer need not be volatile
66
+ */
67
+
61
68
private val subs = CopyOnWriteArrayList <Subscriber <E >>()
62
69
63
70
override val isBufferAlwaysFull: Boolean get() = false
64
71
override val isBufferFull: Boolean get() = size >= capacity
65
72
66
73
override fun openSubscription (): SubscriptionReceiveChannel <E > {
67
- val sub = Subscriber (this , head)
68
- subs.add(sub)
69
- // between creating and adding of subscription into the list the buffer head could have been bumped past it,
70
- // so here we check if it did happen and update the head in subscription in this case
71
- // we did not leak newly created subscription yet, so its subHead cannot update
72
- val head = this .head // volatile read after sub was added to subs
73
- if (head != sub.subHead) {
74
- // needs update
75
- sub.subHead = head
76
- updateHead() // and also must recompute head of the buffer
74
+ bufferLock.withLock {
75
+ val sub = Subscriber (this , head)
76
+ subs.add(sub)
77
+ return sub
77
78
}
78
- return sub
79
79
}
80
80
81
81
override fun close (cause : Throwable ? ): Boolean {
@@ -122,12 +122,6 @@ class ArrayBroadcastChannel<E>(
122
122
return OFFER_SUCCESS
123
123
}
124
124
125
- private fun closeSubscriber (sub : Subscriber <E >) {
126
- subs.remove(sub)
127
- if (head == sub.subHead)
128
- updateHead()
129
- }
130
-
131
125
private fun checkSubOffers () {
132
126
var updated = false
133
127
var hasSubs = false
@@ -137,51 +131,54 @@ class ArrayBroadcastChannel<E>(
137
131
if (sub.checkOffer()) updated = true
138
132
}
139
133
if (updated || ! hasSubs)
140
- updateHead()
134
+ updateHead(null )
141
135
}
142
136
143
- private fun updateHead () {
144
- // compute minHead w/o lock (it will be eventually consistent)
145
- val minHead = computeMinHead()
146
- // update head in a loop
147
- while (true ) {
148
- var send: Send ? = null
149
- var token: Any? = null
150
- bufferLock.withLock {
151
- val tail = this .tail
152
- var head = this .head
153
- val targetHead = minHead.coerceAtMost(tail)
154
- if (targetHead <= head) return // nothing to do -- head was already moved
155
- var size = this .size
156
- // clean up removed (on not need if we don't have any subscribers anymore)
157
- while (head < targetHead) {
158
- buffer[(head % capacity).toInt()] = null
159
- val wasFull = size >= capacity
160
- // update the size before checking queue (no more senders can queue up)
161
- this .head = ++ head
162
- this .size = -- size
163
- if (wasFull) {
164
- while (true ) {
165
- send = takeFirstSendOrPeekClosed() ? : break // when when no sender
166
- if (send is Closed <* >) break // break when closed for send
167
- token = send!! .tryResumeSend(idempotent = null )
168
- if (token != null ) {
169
- // put sent element to the buffer
170
- buffer[(tail % capacity).toInt()] = (send as Send ).pollResult
171
- this .size = size + 1
172
- this .tail = tail + 1
173
- return @withLock // go out of lock to wakeup this sender
174
- }
137
+ private tailrec fun updateHead (removeSub : Subscriber <E >? ) {
138
+ // update head in a tail rec loop
139
+ var send: Send ? = null
140
+ var token: Any? = null
141
+ bufferLock.withLock {
142
+ if (removeSub != null ) {
143
+ subs.remove(removeSub)
144
+ if (head != removeSub.subHead) return // no need to update
145
+ }
146
+ val minHead = computeMinHead()
147
+ val tail = this .tail
148
+ var head = this .head
149
+ val targetHead = minHead.coerceAtMost(tail)
150
+ if (targetHead <= head) return // nothing to do -- head was already moved
151
+ var size = this .size
152
+ // clean up removed (on not need if we don't have any subscribers anymore)
153
+ while (head < targetHead) {
154
+ buffer[(head % capacity).toInt()] = null
155
+ val wasFull = size >= capacity
156
+ // update the size before checking queue (no more senders can queue up)
157
+ this .head = ++ head
158
+ this .size = -- size
159
+ if (wasFull) {
160
+ while (true ) {
161
+ send = takeFirstSendOrPeekClosed() ? : break // when when no sender
162
+ if (send is Closed <* >) break // break when closed for send
163
+ token = send!! .tryResumeSend(idempotent = null )
164
+ if (token != null ) {
165
+ // put sent element to the buffer
166
+ buffer[(tail % capacity).toInt()] = (send as Send ).pollResult
167
+ this .size = size + 1
168
+ this .tail = tail + 1
169
+ return @withLock // go out of lock to wakeup this sender
175
170
}
176
171
}
177
172
}
178
- return // done updating here -> return
179
173
}
180
- // we only get out of the lock normally when there is a sender to resume
181
- send!! .completeResumeSend(token!! )
182
- // since we've just sent an element, we might need to resume some receivers
183
- checkSubOffers()
174
+ return // done updating here -> return
184
175
}
176
+ // we only get out of the lock normally when there is a sender to resume
177
+ send!! .completeResumeSend(token!! )
178
+ // since we've just sent an element, we might need to resume some receivers
179
+ checkSubOffers()
180
+ // tailrec call to recheck
181
+ updateHead(null )
185
182
}
186
183
187
184
private fun computeMinHead (): Long {
@@ -196,9 +193,9 @@ class ArrayBroadcastChannel<E>(
196
193
197
194
private class Subscriber <E >(
198
195
private val broadcastChannel : ArrayBroadcastChannel <E >,
199
- @Volatile @JvmField var subHead : Long // guarded by lock
196
+ @Volatile @JvmField var subHead : Long // guarded by subLock
200
197
) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
201
- private val lock = ReentrantLock ()
198
+ private val subLock = ReentrantLock ()
202
199
203
200
override val isBufferAlwaysEmpty: Boolean get() = false
204
201
override val isBufferEmpty: Boolean get() = subHead >= broadcastChannel.tail
@@ -207,7 +204,7 @@ class ArrayBroadcastChannel<E>(
207
204
208
205
override fun close () {
209
206
if (close(cause = null ))
210
- broadcastChannel.closeSubscriber( this )
207
+ broadcastChannel.updateHead(removeSub = this )
211
208
}
212
209
213
210
// returns true if subHead was updated and broadcast channel's head must be checked
@@ -220,7 +217,7 @@ class ArrayBroadcastChannel<E>(
220
217
while (needsToCheckOfferWithoutLock()) {
221
218
// just use `tryLock` here and break when some other thread is checking under lock
222
219
// it means that `checkOffer` must be retried after every `unlock`
223
- if (! lock .tryLock()) break
220
+ if (! subLock .tryLock()) break
224
221
val receive: ReceiveOrClosed <E >?
225
222
val token: Any?
226
223
try {
@@ -241,7 +238,7 @@ class ArrayBroadcastChannel<E>(
241
238
this .subHead = subHead + 1 // retrieved element for this subscriber
242
239
updated = true
243
240
} finally {
244
- lock .unlock()
241
+ subLock .unlock()
245
242
}
246
243
receive!! .completeResumeReceive(token!! )
247
244
}
@@ -253,10 +250,8 @@ class ArrayBroadcastChannel<E>(
253
250
// result is `E | POLL_FAILED | Closed`
254
251
override fun pollInternal (): Any? {
255
252
var updated = false
256
- val result: Any?
257
- lock.lock()
258
- try {
259
- result = peekUnderLock()
253
+ val result = subLock.withLock {
254
+ val result = peekUnderLock()
260
255
when {
261
256
result is Closed <* > -> { /* just bail out of lock */ }
262
257
result == = POLL_FAILED -> { /* just bail out of lock */ }
@@ -267,8 +262,7 @@ class ArrayBroadcastChannel<E>(
267
262
updated = true
268
263
}
269
264
}
270
- } finally {
271
- lock.unlock()
265
+ result
272
266
}
273
267
// do close outside of lock
274
268
(result as ? Closed <* >)?.also { close(cause = it.closeCause) }
@@ -278,17 +272,15 @@ class ArrayBroadcastChannel<E>(
278
272
updated = true
279
273
// and finally update broadcast's channel head if needed
280
274
if (updated)
281
- broadcastChannel.updateHead()
275
+ broadcastChannel.updateHead(null )
282
276
return result
283
277
}
284
278
285
279
// result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
286
280
override fun pollSelectInternal (select : SelectInstance <* >): Any? {
287
281
var updated = false
288
- var result: Any?
289
- lock.lock()
290
- try {
291
- result = peekUnderLock()
282
+ val result = subLock.withLock {
283
+ var result = peekUnderLock()
292
284
when {
293
285
result is Closed <* > -> { /* just bail out of lock */ }
294
286
result == = POLL_FAILED -> { /* just bail out of lock */ }
@@ -304,8 +296,7 @@ class ArrayBroadcastChannel<E>(
304
296
}
305
297
}
306
298
}
307
- } finally {
308
- lock.unlock()
299
+ result
309
300
}
310
301
// do close outside of lock
311
302
(result as ? Closed <* >)?.also { close(cause = it.closeCause) }
@@ -315,7 +306,7 @@ class ArrayBroadcastChannel<E>(
315
306
updated = true
316
307
// and finally update broadcast's channel head if needed
317
308
if (updated)
318
- broadcastChannel.updateHead()
309
+ broadcastChannel.updateHead(null )
319
310
return result
320
311
}
321
312
0 commit comments