Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 768197a

Browse files
committed
#12: Added Filter[T]
1 parent 38af5c7 commit 768197a

File tree

3 files changed

+20
-2
lines changed

3 files changed

+20
-2
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package reactor.core.scala.publisher
2+
3+
import org.reactivestreams.Publisher
4+
5+
trait Filter[T] { self: Publisher[T] =>
6+
/**
7+
* Evaluate each accepted value against the given predicate T => Boolean. If the predicate test succeeds, the value is
8+
* passed into the new [[Publisher]]. If the predicate test fails, the value is ignored and a request of 1 is
9+
* emitted.
10+
*
11+
* <p>
12+
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.0.5.RELEASE/src/docs/marble/filter.png" alt="">
13+
*
14+
* @param p the [[Function1]] predicate to test values against
15+
* @return a new [[Publisher]] containing only values that pass the predicate test
16+
*/
17+
def filter(f: T => Boolean): Publisher[T]
18+
}

src/main/scala/reactor/core/scala/publisher/Flux.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import scala.concurrent.duration.Duration
4343
* @see [[Mono]]
4444
*/
4545
class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T])
46-
extends Publisher[T] with MapablePublisher[T] with OnErrorReturn[T] with FluxLike[T] {
46+
extends Publisher[T] with MapablePublisher[T] with OnErrorReturn[T] with FluxLike[T] with Filter [T] {
4747
override def subscribe(s: Subscriber[_ >: T]): Unit = jFlux.subscribe(s)
4848

4949
/**

src/main/scala/reactor/core/scala/publisher/Mono.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ import scala.util.{Failure, Success, Try}
6363
* @see Flux
6464
*/
6565
class Mono[T] private(private val jMono: JMono[T])
66-
extends Publisher[T] with MapablePublisher[T] with OnErrorReturn[T] with MonoLike[T] {
66+
extends Publisher[T] with MapablePublisher[T] with OnErrorReturn[T] with MonoLike[T] with Filter[T] {
6767
override def subscribe(s: Subscriber[_ >: T]): Unit = jMono.subscribe(s)
6868

6969
/**

0 commit comments

Comments
 (0)