Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
else if (head != finalOffset) {
// If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not
// see the already consumed elements. This feature is quite handy.
while (head != finalOffset) {
queue(head & Mask) = null
head += 1
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which can loop many times

cleanQueueInRange(head, finalOffset)
head = finalOffset
tryPull()
}
Expand All @@ -617,6 +614,20 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
}
}

private def cleanQueueInRange(headOffset: Int, upToOffset: Int): Unit = {
// We need to clean the queue from headOffset to upToOffset
if (headOffset != upToOffset) {
val startIdx = headOffset & Mask
val endIdx = upToOffset & Mask
if (startIdx <= endIdx) {
java.util.Arrays.fill(queue, startIdx, endIdx, null)
} else {
java.util.Arrays.fill(queue, startIdx, queue.length, null)
java.util.Arrays.fill(queue, 0, endIdx, null)
}
}
}

// Producer API
// We are full if the distance between the slowest (known) consumer and the fastest (known) consumer is
// the buffer size. We must wait until the slowest either advances, or cancels.
Expand Down
Loading