Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 58eddda

Browse files
committed
Added SParallelFlux.sequential
Signed-off-by: Winarto Zhao <[email protected]>
1 parent 80f5d6f commit 58eddda

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

src/main/scala/reactor/core/scala/publisher/SParallelFlux.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,17 @@ class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) exte
6666
*/
6767
final def runOn(scheduler: Scheduler, prefetch: Int = Queues.SMALL_BUFFER_SIZE) = SParallelFlux(jParallelFlux.runOn(scheduler, prefetch))
6868

69+
70+
/**
71+
* Merges the values from each 'rail' in a round-robin or same-order fashion and
72+
* exposes it as a regular Publisher sequence, running with a give prefetch value for
73+
* the rails.
74+
*
75+
* @param prefetch the prefetch amount to use for each rail
76+
* @return the new Flux instance
77+
*/
78+
final def sequential(prefetch: Int = Queues.SMALL_BUFFER_SIZE) = Flux(jParallelFlux.sequential(prefetch))
79+
6980
/**
7081
* Subscribes to this [[SParallelFlux]] by providing an onNext, onError,
7182
* onComplete and onSubscribe callback and triggers the execution chain for all

src/test/scala/reactor/core/scala/publisher/SParallelFluxTest.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ class SParallelFluxTest extends FreeSpec with Matchers {
3434
.verifyComplete()
3535
}
3636

37+
".sequential should merge the rails" in {
38+
val expected = data.map(_.toString)
39+
StepVerifier.create(fluxParallel.map(i => i.toString).sequential())
40+
.expectNextMatches((i: String) => expected.contains(i))
41+
.expectNextMatches((i: String) => expected.contains(i))
42+
.expectNextMatches((i: String) => expected.contains(i))
43+
.verifyComplete()
44+
}
45+
3746
".runOn should run on different thread" in {
3847
val scheduler = spy(Schedulers.parallel())
3948
StepVerifier.create(flux.parallel(2).runOn(scheduler))

0 commit comments

Comments
 (0)