-
Notifications
You must be signed in to change notification settings - Fork 191
Open
Description
@queimadus's report in #1566
refs: #1623
and with the code below
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.apache.pekko.util.ByteString
import scala.concurrent.Await
object PekkoQuickstart extends App {
private implicit val system: ActorSystem = ActorSystem()
val s = Source
.repeat(())
.map(_ => ByteString('a' * 400000))
.take(1000000)
.prefixAndTail(50000)
.flatMapConcat { case (prefix, tail) => Source(prefix).concatLazy(tail) }
val r = Source.empty
.concatAllLazy(List.tabulate(30000)(_ => s): _*)
.runWith(Sink.ignore)
Await.result(r, scala.concurrent.duration.Duration.Inf)
println(r.value)
// Source
// .repeat(s)
// .take(30000)
// .flatMapConcat(x => x)
// .runWith(Sink.ignore)
// .onComplete(println(_))
// Source.empty
// .concatAllLazy(List.tabulate(30000)(_ => Source.lazySource(() => s)): _*)
// .runWith(Sink.ignore).onComplete(println(_))
}
To fix the problem, I think we need to clean the logic once we are done with a sub-graph, but the current code needs to get a snapshot for the materializer
To fix the problem I think we need to recycle the graph stage logics, which means more changes need to be done in the interpreter.
Metadata
Metadata
Assignees
Labels
No labels
