Skip to content

Commit 7184dad

Browse files
authored
chore: Fix flaky test in unsafeOptionalDataVia (#1611)
1 parent 53761e5 commit 7184dad

File tree

1 file changed

+32
-24
lines changed

1 file changed

+32
-24
lines changed

stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -57,36 +57,44 @@ object SourceWithContext {
5757
SourceWithContext.fromTuples(Source.fromGraph(GraphDSL.createGraph(source, viaFlow)(combine) {
5858
implicit b => (s, viaF) =>
5959
import GraphDSL.Implicits._
60-
val broadcast = b.add(Broadcast[(Option[SOut], Ctx)](2))
61-
val merge = b.add(Merge[(Option[FOut], Ctx)](2))
6260

63-
val unzip = b.add(Unzip[SOut, Ctx]())
64-
val zipper = b.add(Zip[FOut, Ctx]())
65-
66-
val filterAvailable = Flow[(Option[SOut], Ctx)].collect {
67-
case (Some(f), ctx) => (f, ctx)
68-
}
69-
70-
val filterUnavailable = Flow[(Option[SOut], Ctx)].collect {
71-
case (None, ctx) => (Option.empty[FOut], ctx)
61+
case class IndexedCtx(idx: Long, ctx: Ctx)
62+
val partition = b.add(Partition[(Option[SOut], IndexedCtx)](2,
63+
{
64+
case (None, _) => 0
65+
case (Some(_), _) => 1
66+
}))
67+
68+
val sequence = Flow[(Option[SOut], Ctx)].zipWithIndex
69+
.map {
70+
case ((opt, ctx), idx) => (opt, IndexedCtx(idx, ctx))
71+
}
72+
73+
val unzip = b.add(Unzip[Option[SOut], IndexedCtx]())
74+
val zipper = b.add(Zip[FOut, IndexedCtx]())
75+
val mergeSequence = b.add(MergeSequence[(Option[FOut], IndexedCtx)](2)(_._2.idx))
76+
val unwrapSome = b.add(Flow[Option[SOut]].map {
77+
case Some(elem) => elem
78+
case _ => throw new IllegalStateException("Only expects Some")
79+
})
80+
val unwrap = Flow[(Option[FOut], IndexedCtx)].map {
81+
case (opt, indexedCtx) => (opt, indexedCtx.ctx)
7282
}
7383

74-
val mapIntoOption = Flow[(FOut, Ctx)].map {
75-
case (f, ctx) => (Some(f), ctx)
84+
val mapIntoOption = Flow[(FOut, IndexedCtx)].map {
85+
case (elem, indexedCtx) => (Some(elem), indexedCtx)
7686
}
7787

78-
s ~> broadcast.in
79-
80-
broadcast.out(0) ~> filterAvailable ~> unzip.in
81-
82-
unzip.out0 ~> viaF ~> zipper.in0
83-
unzip.out1 ~> zipper.in1
84-
85-
zipper.out ~> mapIntoOption ~> merge.in(0)
86-
87-
broadcast.out(1) ~> filterUnavailable ~> merge.in(1)
88+
//format: off
89+
s ~> sequence ~> partition.in
90+
partition.out(0).asInstanceOf[Outlet[(Option[FOut], IndexedCtx)]] ~> mergeSequence.in(0)
91+
partition.out(1) ~> unzip.in
92+
unzip.out0 ~> unwrapSome ~> viaF ~> zipper.in0
93+
unzip.out1 ~> zipper.in1
94+
zipper.out ~> mapIntoOption ~> mergeSequence.in(1)
8895

89-
SourceShape(merge.out)
96+
//format: on
97+
SourceShape((mergeSequence.out ~> unwrap).outlet)
9098
}))
9199
}
92100

0 commit comments

Comments
 (0)