@@ -112,4 +112,125 @@ class FlowOpsMapParTest extends AnyFlatSpec with Matchers with Eventually:
112112 // checking if the forks aren't left running
113113 sleep(200 .millis)
114114 trail.get shouldBe Vector (" done" , " done" , " exception" ) // TODO: 3 isn't cancelled because it's already taken off the queue
115+
116+ // Edge Cases
117+ it should " handle empty flow" in supervised :
118+ // given
119+ val flow = Flow .fromIterable(List .empty[Int ])
120+ val processedCount = new AtomicInteger (0 )
121+
122+ // when
123+ val result = flow.mapPar(5 ): i =>
124+ processedCount.incrementAndGet()
125+ i * 2
126+
127+ // then
128+ result.runToList() shouldBe List .empty
129+ processedCount.get() shouldBe 0
130+
131+ it should " handle flow with exactly parallelism number of elements" in supervised :
132+ // given
133+ val parallelism = 3
134+ val flow = Flow .fromIterable(1 to parallelism)
135+ val running = new AtomicInteger (0 )
136+ val maxRunning = new AtomicInteger (0 )
137+
138+ def f (i : Int ) =
139+ val current = running.incrementAndGet()
140+ maxRunning.updateAndGet(current.max)
141+ try
142+ sleep(100 .millis)
143+ i * 2
144+ finally running.decrementAndGet().discard
145+ end try
146+ end f
147+
148+ // when
149+ val result = flow.mapPar(parallelism)(f).runToList()
150+
151+ // then
152+ result shouldBe List (2 , 4 , 6 )
153+ maxRunning.get() shouldBe parallelism
154+
155+ it should " handle flow with less than parallelism number of elements" in supervised :
156+ // given
157+ val flow = Flow .fromIterable(1 to 2 )
158+ val running = new AtomicInteger (0 )
159+ val maxRunning = new AtomicInteger (0 )
160+
161+ def f (i : Int ) =
162+ val current = running.incrementAndGet()
163+ maxRunning.updateAndGet(current.max)
164+ try
165+ sleep(100 .millis)
166+ i * 2
167+ finally running.decrementAndGet().discard
168+ end try
169+ end f
170+
171+ // when
172+ val result = flow.mapPar(5 )(f).runToList()
173+
174+ // then
175+ result shouldBe List (2 , 4 )
176+ maxRunning.get() shouldBe 2 // should never exceed actual number of elements
177+
178+ // Order Preservation Tests
179+ it should " preserve order even with varying processing times" in supervised :
180+ // given
181+ val flow = Flow .fromIterable(1 to 10 )
182+
183+ def f (i : Int ) =
184+ // Later elements finish faster to test order preservation
185+ val delay = if i <= 5 then (6 - i) * 50 else 50
186+ sleep(delay.millis)
187+ i * 2
188+
189+ // when
190+ val result = flow.mapPar(3 )(f).runToList()
191+
192+ // then
193+ result shouldBe List (2 , 4 , 6 , 8 , 10 , 12 , 14 , 16 , 18 , 20 )
194+
195+ it should " preserve order with random processing times" in supervised :
196+ // given
197+ val elements = 1 to 20
198+ val flow = Flow .fromIterable(elements)
199+
200+ def f (i : Int ) =
201+ // Random delay to test order preservation
202+ val delay = scala.util.Random .nextInt(100 ) + 10
203+ sleep(delay.millis)
204+ i
205+
206+ // when
207+ val result = flow.mapPar(5 )(f).runToList()
208+
209+ // then
210+ result shouldBe elements.toList
211+
212+ // Other
213+ it should " work with very high parallelism values" in supervised :
214+ // given
215+ val flow = Flow .fromIterable(1 to 5 )
216+ val running = new AtomicInteger (0 )
217+ val maxRunning = new AtomicInteger (0 )
218+
219+ def f (i : Int ) =
220+ val current = running.incrementAndGet()
221+ maxRunning.updateAndGet(current.max)
222+ try
223+ sleep(50 .millis)
224+ i * 2
225+ finally running.decrementAndGet().discard
226+ end try
227+ end f
228+
229+ // when
230+ val result = flow.mapPar(1000 )(f).runToList()
231+
232+ // then
233+ result shouldBe List (2 , 4 , 6 , 8 , 10 )
234+ maxRunning.get() shouldBe 5 // Should not exceed actual number of elements
235+
115236end FlowOpsMapParTest
0 commit comments