Skip to content

Commit ccc89f3

Browse files
committed
isLeftmostOrProcessed in Semaphore
1 parent b099acc commit ccc89f3

File tree

3 files changed

+24
-8
lines changed

3 files changed

+24
-8
lines changed

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ internal open class BufferedChannel<E>(
9090
private val receiveSegment: AtomicRef<ChannelSegment<E>>
9191
private val bufferEndSegment: AtomicRef<ChannelSegment<E>>
9292

93+
/**
94+
These values are used in [ChannelSegment.isLeftmostOrProcessed].
95+
They help to detect when the `prev` reference of the segment should be cleaned.
96+
*/
9397
internal val sendSegmentId: Long get() = sendSegment.value.id
9498
internal val receiveSegmentId: Long get() = receiveSegment.value.id
9599

@@ -146,9 +150,9 @@ internal open class BufferedChannel<E>(
146150
the segment and the index in it. */
147151
segment: ChannelSegment<E>,
148152
index: Int,
149-
/** The element to be inserted. */
153+
/* The element to be inserted. */
150154
element: E,
151-
/** The global index of the cell. */
155+
/* The global index of the cell. */
152156
s: Long
153157
) = suspendCancellableCoroutineReusable sc@{ cont ->
154158
sendImplOnNoWaiter( // <-- this is an inline function

kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ internal inline fun <S : Segment<S>> AtomicRef<S>.moveForward(to: S): Boolean =
7171
*
7272
* The method is called when [moveForward] successfully updates the value stored in the `AtomicRef` reference.
7373
*/
74-
internal inline fun <S : Segment<S>> cleanLeftmostPrev(from: S, to: S) {
74+
private inline fun <S : Segment<S>> cleanLeftmostPrev(from: S, to: S) {
7575
var cur = to
7676
// Find the leftmost segment on the sublist between `from` and `to` segments.
7777
while (!cur.isLeftmostOrProcessed && cur.id > from.id) {

kotlinx-coroutines-core/common/src/sync/Semaphore.kt

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,16 @@ internal open class SemaphoreAndMutexImpl(private val permits: Int, acquiredPerm
128128
private val tail: AtomicRef<SemaphoreSegment>
129129
private val enqIdx = atomic(0L)
130130

131+
/**
132+
This value is used in [SemaphoreSegment.isLeftmostOrProcessed].
133+
It helps to detect when the `prev` reference of the segment should be cleaned.
134+
*/
135+
internal val headId: Long get() = head.value.id
136+
131137
init {
132138
require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" }
133139
require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" }
134-
val s = SemaphoreSegment(0, null)
140+
val s = SemaphoreSegment(0, null, this)
135141
head = atomic(s)
136142
tail = atomic(s)
137143
}
@@ -317,7 +323,6 @@ internal open class SemaphoreAndMutexImpl(private val permits: Int, acquiredPerm
317323
val createNewSegment = ::createSegment
318324
val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead,
319325
createNewSegment = createNewSegment).segment // cannot be closed
320-
segment.cleanPrev()
321326
if (segment.id > id) return false
322327
val i = (deqIdx % SEGMENT_SIZE).toInt()
323328
val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state
@@ -356,12 +361,19 @@ private class SemaphoreImpl(
356361
permits: Int, acquiredPermits: Int
357362
): SemaphoreAndMutexImpl(permits, acquiredPermits), Semaphore
358363

359-
private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev)
364+
private fun createSegment(id: Long, prev: SemaphoreSegment) = SemaphoreSegment(
365+
id = id,
366+
prev = prev,
367+
semaphore = prev.semaphore
368+
)
360369

361-
private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?) : Segment<SemaphoreSegment>(id, prev) {
370+
private class SemaphoreSegment(
371+
id: Long, prev: SemaphoreSegment?,
372+
val semaphore: SemaphoreAndMutexImpl
373+
) : Segment<SemaphoreSegment>(id, prev) {
362374
val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
363375
override val numberOfSlots: Int get() = SEGMENT_SIZE
364-
override val isLeftmostOrProcessed: Boolean get() = false // Does not impact semaphore implementation
376+
override val isLeftmostOrProcessed: Boolean get() = id <= semaphore.headId
365377

366378
@Suppress("NOTHING_TO_INLINE")
367379
inline fun get(index: Int): Any? = acquirers[index].value

0 commit comments

Comments
 (0)