Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 4042532

Browse files
committed
Added SParallelFlux.filter
Signed-off-by: Winarto Zhao <[email protected]>
1 parent 00c6d55 commit 4042532

File tree

2 files changed

+17
-0
lines changed

2 files changed

+17
-0
lines changed

src/main/scala/reactor/core/scala/publisher/SParallelFlux.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,17 @@ class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) exte
2121
*/
2222
final def as[U](converter: SParallelFlux[T] => U): U = jParallelFlux.as((t: JParallelFlux[T]) => converter(SParallelFlux(t)))
2323

24+
/**
25+
* Filters the source values on each 'rail'.
26+
* <p>
27+
* Note that the same predicate may be called from multiple threads concurrently.
28+
*
29+
* @param predicate the function returning true to keep a value or false to drop a
30+
* value
31+
* @return the new [[SParallelFlux]] instance
32+
*/
33+
final def filter(predicate: SPredicate[T]) = SParallelFlux(jParallelFlux.filter(predicate))
34+
2435
/**
2536
* Maps the source values on each 'rail' to another value.
2637
* <p>

src/test/scala/reactor/core/scala/publisher/SParallelFluxTest.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ class SParallelFluxTest extends FreeSpec with Matchers {
1919
SParallelFlux(JFlux.just(1, 2, 3).parallel()).asJava shouldBe a[JParallelFlux[Int]]
2020
}
2121

22+
".filter should filter elements" in {
23+
StepVerifier.create(fluxParallel.filter(i => i % 2 == 0))
24+
.expectNext(2)
25+
.verifyComplete()
26+
}
27+
2228
".map should map from T to U" in {
2329
val expected = data.map(_.toString)
2430
StepVerifier.create(fluxParallel.map(i => i.toString))

0 commit comments

Comments
 (0)