Skip to content

Commit 3050c2a

Browse files
authored
chore: Set vector builder to null after stage completed to avoid leak. (#1917)
1 parent 2ef02d0 commit 3050c2a

File tree

1 file changed

+24
-18
lines changed
  • stream/src/main/scala/org/apache/pekko/stream/impl/fusing

1 file changed

+24
-18
lines changed

stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -807,20 +807,20 @@ private[stream] object Collect {
807807

808808
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
809809
new GraphStageLogic(shape) with InHandler with OutHandler {
810-
private val buf = Vector.newBuilder[T]
811-
var left: Long = minWeight
810+
private var builder = Vector.newBuilder[T]
811+
private var left: Long = minWeight
812812

813813
override def onPush(): Unit = {
814814
val elem = grab(in)
815815
val cost = costFn(elem)
816816
if (cost < 0L)
817817
failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed"))
818818
else {
819-
buf += elem
819+
builder += elem
820820
left -= cost
821821
if (left <= 0) {
822-
val elements = buf.result()
823-
buf.clear()
822+
val elements = builder.result()
823+
builder.clear()
824824
left = minWeight
825825
push(out, elements)
826826
} else {
@@ -835,12 +835,12 @@ private[stream] object Collect {
835835

836836
override def onUpstreamFinish(): Unit = {
837837
// Since the upstream has finished we have to push any buffered elements downstream.
838-
val elements = buf.result()
838+
val elements = builder.result()
839839
if (elements.nonEmpty) {
840-
buf.clear()
841840
left = minWeight
842841
push(out, elements)
843842
}
843+
builder = null // free for GC
844844
completeStage()
845845
}
846846

@@ -1763,7 +1763,7 @@ private[stream] object Collect {
17631763
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
17641764
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
17651765

1766-
private val buf: VectorBuilder[T] = new VectorBuilder
1766+
private var builder: VectorBuilder[T] = new VectorBuilder
17671767
private var pending: T = null.asInstanceOf[T]
17681768
private var pendingWeight: Long = 0L
17691769
// True if:
@@ -1798,7 +1798,7 @@ private[stream] object Collect {
17981798
hasElements = true
17991799
// if there is place (both weight and number) for `elem` in the current group
18001800
if (totalWeight + cost <= maxWeight && totalNumber + 1 <= maxNumber) {
1801-
buf += elem
1801+
builder += elem
18021802
totalWeight += cost
18031803
totalNumber += 1;
18041804

@@ -1820,7 +1820,7 @@ private[stream] object Collect {
18201820
} else {
18211821
// if there is a single heavy element that weighs more than the limit
18221822
if (totalWeight == 0L && totalNumber == 0) {
1823-
buf += elem
1823+
builder += elem
18241824
totalWeight += cost
18251825
totalNumber += 1;
18261826
pushEagerly = true
@@ -1842,19 +1842,23 @@ private[stream] object Collect {
18421842
private def emitGroup(): Unit = {
18431843
groupEmitted = true
18441844
contextPropagation.resumeContext()
1845-
push(out, buf.result())
1846-
buf.clear()
1847-
if (!finished) startNewGroup()
1848-
else if (pending != null) emit(out, Vector(pending), () => completeStage())
1849-
else completeStage()
1845+
push(out, builder.result())
1846+
if (finished) {
1847+
builder = null // free for GC
1848+
if (pending != null) emit(out, Vector(pending), () => completeStage())
1849+
else completeStage()
1850+
} else {
1851+
builder.clear()
1852+
startNewGroup()
1853+
}
18501854
}
18511855

18521856
private def startNewGroup(): Unit = {
18531857
if (pending != null) {
18541858
totalWeight = pendingWeight
18551859
totalNumber = 1
18561860
pendingWeight = 0L
1857-
buf += pending
1861+
builder += pending
18581862
pending = null.asInstanceOf[T]
18591863
groupEmitted = false
18601864
} else {
@@ -1876,8 +1880,10 @@ private[stream] object Collect {
18761880

18771881
override def onUpstreamFinish(): Unit = {
18781882
finished = true
1879-
if (groupEmitted) completeStage()
1880-
else tryCloseGroup()
1883+
if (groupEmitted) {
1884+
builder = null // free for GC
1885+
completeStage()
1886+
} else tryCloseGroup()
18811887
}
18821888

18831889
override protected def onTimer(timerKey: Any) = if (hasElements) {

0 commit comments

Comments
 (0)