Skip to content

Commit 9c2ba3a

Browse files
committed
perf: Reduce loops in when clean queue in BroadcastHub
1 parent eb22d8d commit 9c2ba3a

File tree

1 file changed

+22
-7
lines changed
  • stream/src/main/scala/org/apache/pekko/stream/scaladsl

1 file changed

+22
-7
lines changed

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -605,10 +605,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
605605
else if (head != finalOffset) {
606606
// If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not
607607
// see the already consumed elements. This feature is quite handy.
608-
while (head != finalOffset) {
609-
queue(head & Mask) = null
610-
head += 1
611-
}
608+
cleanQueueInRange(head, finalOffset)
612609
head = finalOffset
613610
tryPull()
614611
}
@@ -617,6 +614,20 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
617614
}
618615
}
619616

617+
private def cleanQueueInRange(headOffset: Int, upToOffset: Int): Unit = {
618+
// we need to clean when head != tail
619+
if (headOffset != upToOffset) {
620+
val startIdx = headOffset & Mask
621+
val endIdx = upToOffset & Mask
622+
if (startIdx <= endIdx) {
623+
java.util.Arrays.fill(queue, startIdx, endIdx, null)
624+
} else {
625+
java.util.Arrays.fill(queue, startIdx, queue.length, null)
626+
java.util.Arrays.fill(queue, 0, endIdx, null)
627+
}
628+
}
629+
}
630+
620631
// Producer API
621632
// We are full if the distance between the slowest (known) consumer and the fastest (known) consumer is
622633
// the buffer size. We must wait until the slowest either advances, or cancels.
@@ -677,11 +688,15 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
677688
if (offsetOfConsumerRemoved == head) {
678689
// Try to advance along the wheel. We can skip any wheel slots which have no waiting Consumers, until
679690
// we either find a nonempty one, or we reached the end of the buffer.
680-
while (consumerWheel(head & WheelMask).isEmpty && head != tail) {
681-
queue(head & Mask) = null
682-
head += 1
691+
var upToOffset = head
692+
while (consumerWheel(upToOffset & WheelMask).isEmpty) {
693+
upToOffset += 1
683694
unblocked = true
684695
}
696+
if (upToOffset != head) {
697+
cleanQueueInRange(head, upToOffset)
698+
head = upToOffset
699+
}
685700
}
686701
unblocked
687702
}

0 commit comments

Comments
 (0)