Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 73b151c

Browse files
committed
Added SParallelFlux.reduce
Signed-off-by: Winarto Zhao <[email protected]>
1 parent 197fd09 commit 73b151c

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

src/main/scala/reactor/core/scala/publisher/SParallelFlux.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,19 @@ class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) exte
4444
*/
4545
final def map[U](mapper: T => _ <: U) = SParallelFlux(jParallelFlux.map[U](mapper))
4646

47+
/**
48+
* Reduces all values within a 'rail' and across 'rails' with a reducer function into
49+
* a single sequential value.
50+
* <p>
51+
* Note that the same reducer function may be called from multiple threads
52+
* concurrently.
53+
*
54+
* @param reducer the function to reduce two values into one.
55+
* @return the new Mono instance emitting the reduced value or empty if the
56+
* [[SParallelFlux]] was empty
57+
*/
58+
final def reduce(reducer: (T, T) => T) = Mono(jParallelFlux.reduce(reducer))
59+
4760
/**
4861
* Specifies where each 'rail' will observe its incoming values with possibly
4962
* work-stealing and a given prefetch amount.

src/test/scala/reactor/core/scala/publisher/SParallelFluxTest.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ class SParallelFluxTest extends FreeSpec with Matchers {
3434
.verifyComplete()
3535
}
3636

37+
".reduce should aggregate the values" in {
38+
val mono = fluxParallel.reduce(_ + _)
39+
StepVerifier.create(mono)
40+
.expectNext(6)
41+
.verifyComplete()
42+
}
43+
3744
".sequential should merge the rails" in {
3845
val expected = data.map(_.toString)
3946
StepVerifier.create(fluxParallel.map(i => i.toString).sequential())

0 commit comments

Comments
 (0)