Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit f60e74c

Browse files
committed
Added Mono.filterWhen
Added Flux.filterWhen
1 parent ed91fcb commit f60e74c

File tree

4 files changed

+90
-4
lines changed

4 files changed

+90
-4
lines changed

src/main/scala/reactor/core/scala/publisher/Flux.scala

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package reactor.core.scala.publisher
22

3-
import java.lang.{Iterable => JIterable, Long => JLong}
3+
import java.lang.{Iterable => JIterable, Long => JLong, Boolean => JBoolean}
44
import java.util
55
import java.util.concurrent.Callable
66
import java.util.function.{BiFunction, Consumer, Function, Supplier}
@@ -1231,6 +1231,52 @@ class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T]) extends
12311231
*/
12321232
final def filter(p: T => Boolean) = Flux(jFlux.filter(p))
12331233

1234+
/**
1235+
* Test each value emitted by this [[Flux]] asynchronously using a generated
1236+
* [[Publisher[Boolean]]] test. A value is replayed if the first item emitted
1237+
* by its corresponding test is `true`. It is dropped if its test is either
1238+
* empty or its first emitted value is `false`.
1239+
* <p>
1240+
* Note that only the first value of the test publisher is considered, and unless it
1241+
* is a [[Mono]], test will be cancelled after receiving that first value. Test
1242+
* publishers are generated and subscribed to in sequence.
1243+
*
1244+
* @param asyncPredicate the function generating a [[Publisher]] of [[Boolean]]
1245+
* for each value, to filter the Flux with
1246+
* @return a filtered [[Flux]]
1247+
*/
1248+
final def filterWhen(asyncPredicate: T => _ <: Publisher[Boolean] with MapablePublisher[Boolean]): Flux[T] = {
1249+
val asyncPredicateFunction = new Function[T, Publisher[JBoolean]] {
1250+
override def apply(t: T): Publisher[JBoolean] = asyncPredicate(t).map(Boolean2boolean(_))
1251+
}
1252+
Flux(jFlux.filterWhen(asyncPredicateFunction))
1253+
}
1254+
1255+
/**
1256+
* Test each value emitted by this [[Flux]] asynchronously using a generated
1257+
* [[Publisher[Boolean]]] test. A value is replayed if the first item emitted
1258+
* by its corresponding test is `true`. It is dropped if its test is either
1259+
* empty or its first emitted value is `false`.
1260+
* <p>
1261+
* Note that only the first value of the test publisher is considered, and unless it
1262+
* is a [[Mono]], test will be cancelled after receiving that first value. Test
1263+
* publishers are generated and subscribed to in sequence.
1264+
*
1265+
* @param asyncPredicate the function generating a [[Publisher]] of [[Boolean]]
1266+
* for each value, to filter the Flux with
1267+
* @param bufferSize the maximum expected number of values to hold pending a result of
1268+
* their respective asynchronous predicates, rounded to the next power of two. This is
1269+
* capped depending on the size of the heap and the JVM limits, so be careful with
1270+
* large values (although eg. { @literal 65536} should still be fine). Also serves as
1271+
* the initial request size for the source.
1272+
* @return a filtered [[Flux]]
1273+
*/
1274+
final def filterWhen(asyncPredicate: T => _ <: Publisher[Boolean] with MapablePublisher[Boolean], bufferSize: Int): Flux[T] = {
1275+
val asyncPredicateFunction = new Function[T, Publisher[JBoolean]] {
1276+
override def apply(t: T): Publisher[JBoolean] = asyncPredicate(t).map(Boolean2boolean(_))
1277+
}
1278+
Flux(jFlux.filterWhen(asyncPredicateFunction, bufferSize))
1279+
}
12341280
/**
12351281
* Emit from the fastest first sequence between this publisher and the given publisher
12361282
*

src/main/scala/reactor/core/scala/publisher/Mono.scala

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -582,9 +582,27 @@ class Mono[T] private(private val jMono: JMono[T]) extends Publisher[T] with Map
582582
* @param tester the predicate to evaluate
583583
* @return a filtered [[Mono]]
584584
*/
585-
final def filter(tester: T => Boolean) = new Mono[T](
586-
jMono.filter(tester)
587-
)
585+
final def filter(tester: T => Boolean) = Mono[T](jMono.filter(tester))
586+
587+
/**
588+
* If this [[Mono]] is valued, test the value asynchronously using a generated
589+
* [[Publisher[Boolean]]] test. The value from the Mono is replayed if the
590+
* first item emitted by the test is `true`. It is dropped if the test is
591+
* either empty or its first emitted value is false``.
592+
* <p>
593+
* Note that only the first value of the test publisher is considered, and unless it
594+
* is a [[Mono]], test will be cancelled after receiving that first value.
595+
*
596+
* @param asyncPredicate the function generating a [[Publisher]] of [[Boolean]]
597+
* to filter the Mono with
598+
* @return a filtered [[Mono]]
599+
*/
600+
final def filterWhen(asyncPredicate: T => _ <: Publisher[Boolean] with MapablePublisher[Boolean]): Mono[T] = {
601+
val asyncPredicateFunction = new Function[T, Publisher[JBoolean]] {
602+
override def apply(t: T): Publisher[JBoolean] = asyncPredicate(t).map(Boolean2boolean(_))
603+
}
604+
Mono(jMono.filterWhen(asyncPredicateFunction))
605+
}
588606

589607
/**
590608
* Transform the item emitted by this [[Mono]] into a Publisher, then forward

src/test/scala/reactor/core/scala/publisher/FluxTest.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,6 +1172,21 @@ class FluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks {
11721172
.verifyComplete()
11731173
}
11741174

1175+
".filterWhen" - {
1176+
"should replay the value of mono if the first item emitted by the test is true" in {
1177+
val flux = Flux.just(10, 20, 30).filterWhen((i: Int) => Mono.just(i % 2 == 0))
1178+
StepVerifier.create(flux)
1179+
.expectNext(10, 20, 30)
1180+
.verifyComplete()
1181+
}
1182+
"with bufferSize should replay the value of mono if the first item emitted by the test is true" in {
1183+
val flux = Flux.just(10, 20, 30).filterWhen((i: Int) => Mono.just(i % 2 == 0), 1)
1184+
StepVerifier.create(flux)
1185+
.expectNext(10, 20, 30)
1186+
.verifyComplete()
1187+
}
1188+
}
1189+
11751190
".firstEmittingWith should emit from the fastest first sequence" in {
11761191
val flux = Flux.just(10, 20, 30).firstEmittingWith(Flux.just(1, 2, 3).delayElements(1 second))
11771192
StepVerifier.create(flux)

src/test/scala/reactor/core/scala/publisher/MonoTest.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,13 @@ class MonoTest extends FreeSpec with Matchers with TableDrivenPropertyChecks {
830830
.verifyComplete()
831831
}
832832

833+
".filterWhen should replay the value of mono if the first item emitted by the test is true" in {
834+
val mono = Mono.just(10).filterWhen((i: Int) => Mono.just(i % 2 == 0))
835+
StepVerifier.create(mono)
836+
.expectNext(10)
837+
.verifyComplete()
838+
}
839+
833840
".flatMap" - {
834841
"with a single mapper should flatmap the value mapped by the provided mapper" in {
835842
val flux = Mono.just(1).flatMap(i => Flux.just(i, i * 2))

0 commit comments

Comments
 (0)