@@ -90,9 +90,12 @@ internal open class BufferedChannel<E>(
90
90
private val receiveSegment: AtomicRef <ChannelSegment <E >>
91
91
private val bufferEndSegment: AtomicRef <ChannelSegment <E >>
92
92
93
+ internal val sendSegmentId: Long get() = sendSegment.value.id
94
+ internal val receiveSegmentId: Long get() = receiveSegment.value.id
95
+
93
96
init {
94
97
@Suppress(" LeakingThis" )
95
- val firstSegment = ChannelSegment (id = 0 , prev = null , channel = this , pointers = 3 )
98
+ val firstSegment = ChannelSegment (id = 0 , prev = null , channel = this )
96
99
sendSegment = atomic(firstSegment)
97
100
receiveSegment = atomic(firstSegment)
98
101
// If this channel is rendezvous or has unlimited capacity, the algorithm never
@@ -301,10 +304,6 @@ internal open class BufferedChannel<E>(
301
304
when (updateCellSend(segment, i, element, s, waiter, closed)) {
302
305
RESULT_RENDEZVOUS -> {
303
306
// A rendezvous with a receiver has happened.
304
- // The previous segments are no longer needed
305
- // for the upcoming requests, so the algorithm
306
- // resets the link to the previous segment.
307
- segment.cleanPrev()
308
307
return onRendezvousOrBuffered()
309
308
}
310
309
RESULT_BUFFERED -> {
@@ -325,17 +324,11 @@ internal open class BufferedChannel<E>(
325
324
}
326
325
RESULT_CLOSED -> {
327
326
// This channel is closed.
328
- // In case this segment is already or going to be
329
- // processed by a receiver, ensure that all the
330
- // previous segments are unreachable.
331
- if (s < receiversCounter) segment.cleanPrev()
332
327
return onClosed()
333
328
}
334
329
RESULT_FAILED -> {
335
330
// Either the cell stores an interrupted receiver,
336
331
// or it was poisoned by a concurrent receiver.
337
- // In both cases, all the previous segments are already processed,
338
- segment.cleanPrev()
339
332
continue
340
333
}
341
334
RESULT_SUSPEND_NO_WAITER -> {
@@ -392,22 +385,16 @@ internal open class BufferedChannel<E>(
392
385
// restarting the operation from the beginning on failure.
393
386
// Check the `sendImpl(..)` function for the comments.
394
387
when (updateCellSend(segment, index, element, s, waiter, false )) {
395
- RESULT_RENDEZVOUS -> {
396
- segment.cleanPrev()
397
- onRendezvousOrBuffered()
398
- }
399
- RESULT_BUFFERED -> {
388
+ RESULT_RENDEZVOUS , RESULT_BUFFERED -> {
400
389
onRendezvousOrBuffered()
401
390
}
402
391
RESULT_SUSPEND -> {
403
392
waiter.prepareSenderForSuspension(segment, index)
404
393
}
405
394
RESULT_CLOSED -> {
406
- if (s < receiversCounter) segment.cleanPrev()
407
395
onClosed()
408
396
}
409
397
RESULT_FAILED -> {
410
- segment.cleanPrev()
411
398
sendImpl(
412
399
element = element,
413
400
waiter = waiter,
@@ -857,14 +844,9 @@ internal open class BufferedChannel<E>(
857
844
when {
858
845
updCellResult == = FAILED -> {
859
846
// The cell is poisoned; restart from the beginning.
860
- // To avoid memory leaks, we also need to reset
861
- // the `prev` pointer of the working segment.
862
- if (r < sendersCounter) segment.cleanPrev()
863
847
}
864
848
else -> { // element
865
849
// A buffered element was retrieved from the cell.
866
- // Clean the reference to the previous segment.
867
- segment.cleanPrev()
868
850
@Suppress(" UNCHECKED_CAST" )
869
851
onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E )?.let { throw it }
870
852
}
@@ -938,9 +920,6 @@ internal open class BufferedChannel<E>(
938
920
// but failed: either the opposite request has
939
921
// already been cancelled or the cell is poisoned.
940
922
// Restart from the beginning in this case.
941
- // To avoid memory leaks, we also need to reset
942
- // the `prev` pointer of the working segment.
943
- if (r < sendersCounter) segment.cleanPrev()
944
923
continue
945
924
}
946
925
updCellResult == = SUSPEND_NO_WAITER -> {
@@ -951,8 +930,6 @@ internal open class BufferedChannel<E>(
951
930
else -> { // element
952
931
// Either a buffered element was retrieved from the cell
953
932
// or a rendezvous with a waiting sender has happened.
954
- // Clean the reference to the previous segment before finishing.
955
- segment.cleanPrev()
956
933
@Suppress(" UNCHECKED_CAST" )
957
934
onElementRetrieved(updCellResult as E )
958
935
}
@@ -987,7 +964,6 @@ internal open class BufferedChannel<E>(
987
964
waiter.prepareReceiverForSuspension(segment, index)
988
965
}
989
966
updCellResult == = FAILED -> {
990
- if (r < sendersCounter) segment.cleanPrev()
991
967
receiveImpl(
992
968
waiter = waiter,
993
969
onElementRetrieved = onElementRetrieved,
@@ -996,7 +972,6 @@ internal open class BufferedChannel<E>(
996
972
)
997
973
}
998
974
else -> {
999
- segment.cleanPrev()
1000
975
@Suppress(" UNCHECKED_CAST" )
1001
976
onElementRetrieved(updCellResult as E )
1002
977
}
@@ -1211,7 +1186,7 @@ internal open class BufferedChannel<E>(
1211
1186
if (s <= b) {
1212
1187
// Should `bufferEndSegment` be moved forward to avoid memory leaks?
1213
1188
if (segment.id < id && segment.next != null )
1214
- moveSegmentBufferEndToSpecifiedOrLast (id, segment)
1189
+ bufferEndSegment.moveToSpecifiedOrLast (id, segment)
1215
1190
// Increment the number of completed `expandBuffer()`-s and finish.
1216
1191
incCompletedExpandBufferAttempts()
1217
1192
return
@@ -2299,7 +2274,6 @@ internal open class BufferedChannel<E>(
2299
2274
// Otherwise, if the required segment is removed, the operation restarts.
2300
2275
if (receiveSegment.value.id < id) return false else continue
2301
2276
}
2302
- segment.cleanPrev() // all the previous segments are no longer needed.
2303
2277
// Does the `r`-th cell contain waiting sender or buffered element?
2304
2278
val i = (r % SEGMENT_SIZE ).toInt()
2305
2279
if (isCellNonEmpty(segment, i, r)) return true
@@ -2398,12 +2372,6 @@ internal open class BufferedChannel<E>(
2398
2372
// This channel is already closed or cancelled; help to complete
2399
2373
// the closing or cancellation procedure.
2400
2374
completeCloseOrCancel()
2401
- // Clean the `prev` reference of the provided segment
2402
- // if all the previous cells are already covered by senders.
2403
- // It is important to clean the `prev` reference only in
2404
- // this case, as the closing/cancellation procedure may
2405
- // need correct value to traverse the linked list from right to left.
2406
- if (startFrom.id * SEGMENT_SIZE < receiversCounter) startFrom.cleanPrev()
2407
2375
// As the required segment is not found and cannot be allocated, return `null`.
2408
2376
null
2409
2377
} else {
@@ -2415,12 +2383,6 @@ internal open class BufferedChannel<E>(
2415
2383
// segment with `id` not lower than the required one.
2416
2384
// Skip the sequence of removed cells in O(1).
2417
2385
updateSendersCounterIfLower(segment.id * SEGMENT_SIZE )
2418
- // Clean the `prev` reference of the provided segment
2419
- // if all the previous cells are already covered by senders.
2420
- // It is important to clean the `prev` reference only in
2421
- // this case, as the closing/cancellation procedure may
2422
- // need correct value to traverse the linked list from right to left.
2423
- if (segment.id * SEGMENT_SIZE < receiversCounter) segment.cleanPrev()
2424
2386
// As the required segment is not found and cannot be allocated, return `null`.
2425
2387
null
2426
2388
} else {
@@ -2453,12 +2415,6 @@ internal open class BufferedChannel<E>(
2453
2415
// This channel is already closed or cancelled; help to complete
2454
2416
// the closing or cancellation procedure.
2455
2417
completeCloseOrCancel()
2456
- // Clean the `prev` reference of the provided segment
2457
- // if all the previous cells are already covered by senders.
2458
- // It is important to clean the `prev` reference only in
2459
- // this case, as the closing/cancellation procedure may
2460
- // need correct value to traverse the linked list from right to left.
2461
- if (startFrom.id * SEGMENT_SIZE < sendersCounter) startFrom.cleanPrev()
2462
2418
// As the required segment is not found and cannot be allocated, return `null`.
2463
2419
null
2464
2420
} else {
@@ -2474,12 +2430,6 @@ internal open class BufferedChannel<E>(
2474
2430
// segment with `id` not lower than the required one.
2475
2431
// Skip the sequence of removed cells in O(1).
2476
2432
updateReceiversCounterIfLower(segment.id * SEGMENT_SIZE )
2477
- // Clean the `prev` reference of the provided segment
2478
- // if all the previous cells are already covered by senders.
2479
- // It is important to clean the `prev` reference only in
2480
- // this case, as the closing/cancellation procedure may
2481
- // need correct value to traverse the linked list from right to left.
2482
- if (segment.id * SEGMENT_SIZE < sendersCounter) segment.cleanPrev()
2483
2433
// As the required segment is already removed, return `null`.
2484
2434
null
2485
2435
} else {
@@ -2504,7 +2454,7 @@ internal open class BufferedChannel<E>(
2504
2454
completeCloseOrCancel()
2505
2455
// Update `bufferEndSegment` to the last segment
2506
2456
// in the linked list to avoid memory leaks.
2507
- moveSegmentBufferEndToSpecifiedOrLast (id, startFrom)
2457
+ bufferEndSegment.moveToSpecifiedOrLast (id, startFrom)
2508
2458
// When this function does not find the requested segment,
2509
2459
// it should update the number of completed `expandBuffer()` attempts.
2510
2460
incCompletedExpandBufferAttempts()
@@ -2534,32 +2484,6 @@ internal open class BufferedChannel<E>(
2534
2484
}
2535
2485
}
2536
2486
2537
- /* *
2538
- * Updates [bufferEndSegment] to the one with the specified [id] or
2539
- * to the last existing segment, if the required segment is not yet created.
2540
- *
2541
- * Unlike [findSegmentBufferEnd], this function does not allocate new segments.
2542
- */
2543
- private fun moveSegmentBufferEndToSpecifiedOrLast (id : Long , startFrom : ChannelSegment <E >) {
2544
- // Start searching the required segment from the specified one.
2545
- var segment: ChannelSegment <E > = startFrom
2546
- while (segment.id < id) {
2547
- segment = segment.next ? : break
2548
- }
2549
- // Skip all removed segments and try to update `bufferEndSegment`
2550
- // to the first non-removed one. This part should succeed eventually,
2551
- // as the tail segment is never removed.
2552
- while (true ) {
2553
- while (segment.isRemoved) {
2554
- segment = segment.next ? : break
2555
- }
2556
- // Try to update `bufferEndSegment`. On failure,
2557
- // the found segment is already removed, so it
2558
- // should be skipped.
2559
- if (bufferEndSegment.moveForward(segment)) return
2560
- }
2561
- }
2562
-
2563
2487
/* *
2564
2488
* Updates the `senders` counter if its value
2565
2489
* is lower that the specified one.
@@ -2588,6 +2512,17 @@ internal open class BufferedChannel<E>(
2588
2512
if (receivers.compareAndSet(cur, value)) return
2589
2513
}
2590
2514
2515
+ /* *
2516
+ This method is used in the physical removal of the segment. It helps to move pointers forward from
2517
+ the segment which was physically removed.
2518
+ */
2519
+ internal fun movePointersForwardFrom (from : ChannelSegment <E >) {
2520
+ check(from.isRemoved) { " Trying to move channel pointers from the alive segment." }
2521
+ if (from == sendSegment.value) sendSegment.moveToSpecifiedOrLast(from.id, from)
2522
+ if (from == receiveSegment.value) receiveSegment.moveToSpecifiedOrLast(from.id, from)
2523
+ if (from == bufferEndSegment.value) bufferEndSegment.moveToSpecifiedOrLast(from.id, from)
2524
+ }
2525
+
2591
2526
// ###################
2592
2527
// # Debug Functions #
2593
2528
// ###################
@@ -2799,7 +2734,7 @@ internal open class BufferedChannel<E>(
2799
2734
* to update [BufferedChannel.sendSegment], [BufferedChannel.receiveSegment],
2800
2735
* and [BufferedChannel.bufferEndSegment] correctly.
2801
2736
*/
2802
- internal class ChannelSegment <E >(id : Long , prev : ChannelSegment <E >? , channel : BufferedChannel <E >? , pointers : Int ) : Segment<ChannelSegment<E>>(id, prev, pointers ) {
2737
+ internal class ChannelSegment <E >(id : Long , prev : ChannelSegment <E >? , channel : BufferedChannel <E >? ) : Segment<ChannelSegment<E>>(id, prev) {
2803
2738
private val _channel : BufferedChannel <E >? = channel
2804
2739
val channel get() = _channel !! // always non-null except for `NULL_SEGMENT`
2805
2740
@@ -2841,6 +2776,26 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: Bu
2841
2776
2842
2777
internal fun getAndSetState (index : Int , update : Any? ) = data[index * 2 + 1 ].getAndSet(update)
2843
2778
2779
+ // ###################################################
2780
+ // # Manipulation with the structure of segment list #
2781
+ // ###################################################
2782
+
2783
+ /* *
2784
+ * Shows if all segments going before this segment have been processed.
2785
+ * When the value is true, the [prev] reference of the segment should be `null`.
2786
+ */
2787
+ override val isLeftmostOrProcessed: Boolean get() = id <= channel.sendSegmentId && id <= channel.receiveSegmentId
2788
+
2789
+ /* *
2790
+ * Removes the segment physically from the segment list.
2791
+ *
2792
+ * If the physical removal was successful and there are channel pointers pointing on this segment,
2793
+ * the [BufferedChannel.movePointersForwardFrom] method is invoked to move them further on the segment list.
2794
+ */
2795
+ override fun remove (): Boolean =
2796
+ super .remove().also {
2797
+ if (it) channel.movePointersForwardFrom(this )
2798
+ }
2844
2799
2845
2800
// ########################
2846
2801
// # Cancellation Support #
@@ -2926,10 +2881,9 @@ internal fun <E> createSegmentFunction(): KFunction2<Long, ChannelSegment<E>, Ch
2926
2881
private fun <E > createSegment (id : Long , prev : ChannelSegment <E >) = ChannelSegment (
2927
2882
id = id,
2928
2883
prev = prev,
2929
- channel = prev.channel,
2930
- pointers = 0
2884
+ channel = prev.channel
2931
2885
)
2932
- private val NULL_SEGMENT = ChannelSegment <Any ?>(id = - 1 , prev = null , channel = null , pointers = 0 )
2886
+ private val NULL_SEGMENT = ChannelSegment <Any ?>(id = - 1 , prev = null , channel = null )
2933
2887
2934
2888
/* *
2935
2889
* Number of cells in each segment.
0 commit comments