Skip to content

Commit 299fba4

Browse files
committed
chore: null out ref for closed stream.
1 parent 7184dad commit 299fba4

File tree

1 file changed

+2
-1
lines changed
  • stream/src/main/scala/org/apache/pekko/stream/scaladsl

1 file changed

+2
-1
lines changed

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1327,7 +1327,7 @@ object Concat {
13271327
*/
13281328
final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] {
13291329
require(inputPorts > 1, "A Concat must have more than 1 input ports")
1330-
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("Concat.in" + i))
1330+
val in: immutable.IndexedSeq[Inlet[T]] = Array.tabulate(inputPorts)(i => Inlet[T]("Concat.in" + i))
13311331
val out: Outlet[T] = Outlet[T]("Concat.out")
13321332
override def initialAttributes = DefaultAttributes.concat
13331333
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
@@ -1350,6 +1350,7 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[
13501350

13511351
override def onUpstreamFinish() = {
13521352
if (idx == activeStream) {
1353+
in(activeStream) = null
13531354
activeStream += 1
13541355
// Skip closed inputs
13551356
while (activeStream < inputPorts && isClosed(in(activeStream))) activeStream += 1

0 commit comments

Comments
 (0)