Skip to content

Commit d0844fa

Browse files
committed
chore: Fix leak in FlatMapPrefix operator.
1 parent 798a54c commit d0844fa

File tree

1 file changed

+12
-11
lines changed

1 file changed

+12
-11
lines changed

stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@ import pekko.util.OptionVal
4343
.mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy]
4444
.propagateToNestedMaterialization
4545
val matPromise = Promise[M]()
46-
val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
47-
val accumulated = collection.mutable.Buffer.empty[In]
46+
object logic extends GraphStageLogic(shape) with InHandler with OutHandler {
47+
private var left = if (n < 0) 0 else n
48+
private var builder = Vector.newBuilder[In]
49+
builder.sizeHint(left)
4850

4951
private var subSource = OptionVal.none[SubSourceOutlet[In]]
5052
private var subSink = OptionVal.none[SubSinkInlet[Out]]
@@ -65,8 +67,9 @@ import pekko.util.OptionVal
6567
subSource match {
6668
case OptionVal.Some(s) => s.push(grab(in))
6769
case _ =>
68-
accumulated.append(grab(in))
69-
if (accumulated.size == n) {
70+
builder += grab(in)
71+
left -= 1
72+
if (left == 0) {
7073
materializeFlow()
7174
} else {
7275
// gi'me some more!
@@ -98,12 +101,10 @@ import pekko.util.OptionVal
98101
// delegate to subSink
99102
s.pull()
100103
case _ =>
101-
if (accumulated.size < n) pull(in)
102-
else if (accumulated.size == n) {
104+
if (left > 0) pull(in)
105+
else if (left == 0) {
103106
// corner case for n = 0, can be handled in FlowOps
104107
materializeFlow()
105-
} else {
106-
throw new IllegalStateException(s"Unexpected accumulated size: ${accumulated.size} (n: $n)")
107108
}
108109
}
109110
}
@@ -114,7 +115,7 @@ import pekko.util.OptionVal
114115
case _ =>
115116
if (propagateToNestedMaterialization) {
116117
downstreamCause = OptionVal.Some(cause)
117-
if (accumulated.size == n) {
118+
if (left == 0) {
118119
// corner case for n = 0, can be handled in FlowOps
119120
materializeFlow()
120121
} else if (!hasBeenPulled(in)) { // if in was already closed, nested flow would have already been materialized
@@ -128,8 +129,8 @@ import pekko.util.OptionVal
128129

129130
def materializeFlow(): Unit =
130131
try {
131-
val prefix = accumulated.toVector
132-
accumulated.clear()
132+
val prefix = builder.result()
133+
builder = null // free for GC
133134
subSource = OptionVal.Some(new SubSourceOutlet[In]("FlatMapPrefix.subSource"))
134135
val theSubSource = subSource.get
135136
subSink = OptionVal.Some(new SubSinkInlet[Out]("FlatMapPrefix.subSink"))

0 commit comments

Comments
 (0)