Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit ef7fc3c

Browse files
committed
Added SParallelFlux.reduce
Signed-off-by: Winarto Zhao <[email protected]>
1 parent 73b151c commit ef7fc3c

File tree

2 files changed

+46
-7
lines changed

2 files changed

+46
-7
lines changed

src/main/scala/reactor/core/scala/publisher/SParallelFlux.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,22 @@ class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) exte
5757
*/
5858
final def reduce(reducer: (T, T) => T) = Mono(jParallelFlux.reduce(reducer))
5959

60+
/**
61+
* Reduces all values within a 'rail' to a single value (with a possibly different
62+
* type) via a reducer function that is initialized on each rail from an
63+
* initialSupplier value.
64+
* <p>
65+
* Note that the same mapper function may be called from multiple threads
66+
* concurrently.
67+
*
68+
* @tparam R the reduced output type
69+
* @param initialSupplier the supplier for the initial value
70+
* @param reducer the function to reduce a previous output of reduce (or the initial
71+
* value supplied) with a current source value.
72+
* @return the new [[SParallelFlux]] instance
73+
*/
74+
final def reduce[R](initialSupplier: () => R, reducer: (R, T) => R) = SParallelFlux(jParallelFlux.reduce[R](initialSupplier, reducer))
75+
6076
/**
6177
* Specifies where each 'rail' will observe its incoming values with possibly
6278
* work-stealing and a given prefetch amount.

src/test/scala/reactor/core/scala/publisher/SParallelFluxTest.scala

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import reactor.test.StepVerifier
99
class SParallelFluxTest extends FreeSpec with Matchers {
1010
"SParallelFlux" - {
1111
val data = Seq(1, 2, 3)
12-
val flux = Flux.just(data.head,data.tail:_*)
12+
val flux = Flux.just(data.head, data.tail: _*)
1313
val fluxParallel: SParallelFlux[Int] = flux.parallel()
1414
".asJava should convert as Java ParallelFlux" in {
1515
fluxParallel.asJava shouldBe a[JParallelFlux[Int]]
@@ -20,7 +20,7 @@ class SParallelFluxTest extends FreeSpec with Matchers {
2020
}
2121

2222
".filter should filter elements" in {
23-
StepVerifier.create(fluxParallel.filter((i:Int) => i % 2 == 0))
23+
StepVerifier.create(fluxParallel.filter((i: Int) => i % 2 == 0))
2424
.expectNext(2)
2525
.verifyComplete()
2626
}
@@ -34,11 +34,34 @@ class SParallelFluxTest extends FreeSpec with Matchers {
3434
.verifyComplete()
3535
}
3636

37-
".reduce should aggregate the values" in {
38-
val mono = fluxParallel.reduce(_ + _)
39-
StepVerifier.create(mono)
40-
.expectNext(6)
41-
.verifyComplete()
37+
".reduce should aggregate the values" - {
38+
"without initial supplier" in {
39+
val mono = fluxParallel.reduce(_ + _)
40+
StepVerifier.create(mono)
41+
.expectNext(6)
42+
.verifyComplete()
43+
}
44+
"with initial value should aggregate the values with initial one" ignore {
45+
val parallelFlux = fluxParallel.reduce[String](() => "0", (agg, v) => s"$agg-${v.toString}")
46+
StepVerifier.create(parallelFlux)
47+
.expectNext("0-1")
48+
.expectNext("0-2")
49+
.expectNext("0-3")
50+
.expectNext("0")
51+
.expectNext("0")
52+
.expectNext("0")
53+
.expectNext("0")
54+
.expectNext("0")
55+
.expectNext("0")
56+
.expectNext("0")
57+
.expectNext("0")
58+
.expectNext("0")
59+
.expectNext("0")
60+
.expectNext("0")
61+
.expectNext("0")
62+
.expectNext("0")
63+
.verifyComplete()
64+
}
4265
}
4366

4467
".sequential should merge the rails" in {

0 commit comments

Comments
 (0)