Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit c30be6f

Browse files
committed
Added Mono.sequenceEqual(source1, source2, isEqual)
Added Mono.sequenceEqual(source1, source2, isEqual, bufferSize)
1 parent 5ca803b commit c30be6f

File tree

2 files changed

+58
-2
lines changed

2 files changed

+58
-2
lines changed

src/main/scala/reactor/core/scala/publisher/Mono.scala

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package reactor.core.scala.publisher
2121
import java.lang.{Boolean => JBoolean, Iterable => JIterable, Long => JLong}
2222
import java.time.{Duration => JDuration}
2323
import java.util.concurrent.{Callable, CompletableFuture}
24-
import java.util.function.{BiConsumer, BiFunction, Consumer, Function, Predicate, Supplier}
24+
import java.util.function.{BiConsumer, BiFunction, BiPredicate, Consumer, Function, Predicate, Supplier}
2525
import java.util.logging.Level
2626

2727
import org.reactivestreams.{Publisher, Subscriber, Subscription}
@@ -1397,12 +1397,55 @@ object Mono {
13971397
* the type of items emitted by each Publisher
13981398
* @return a Mono that emits a Boolean value that indicates whether the two sequences are the same
13991399
*/
1400-
def sequenceEqual[T](source1: Publisher[_ <: T], source2: Publisher[_ <: T]) = Mono[Boolean](
1400+
def sequenceEqual[T](source1: Publisher[_ <: T], source2: Publisher[_ <: T]): Mono[Boolean] = Mono[Boolean](
14011401
JMono.sequenceEqual[T](source1, source2).map(new Function[JBoolean, Boolean] {
14021402
override def apply(t: JBoolean) = Boolean2boolean(t)
14031403
})
14041404
)
14051405

1406+
/**
1407+
* Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the
1408+
* same by comparing the items emitted by each Publisher pairwise based on the results of a specified
1409+
* equality function.
1410+
*
1411+
* @param source1
1412+
* the first Publisher to compare
1413+
* @param source2
1414+
* the second Publisher to compare
1415+
* @param isEqual
1416+
* a function used to compare items emitted by each Publisher
1417+
* @tparam T
1418+
* the type of items emitted by each Publisher
1419+
* @return a Mono that emits a Boolean value that indicates whether the two sequences are the same
1420+
*/
1421+
def sequenceEqual[T](source1: Publisher[_ <: T], source2: Publisher[_ <: T], isEqual: (T, T) => Boolean): Mono[Boolean] = {
1422+
Mono(JMono.sequenceEqual[T](source1, source2, new BiPredicate[T, T] {
1423+
override def test(t: T, u: T): Boolean = isEqual(t, u)
1424+
})).map(Boolean2boolean)
1425+
}
1426+
1427+
/**
1428+
* Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the
1429+
* same by comparing the items emitted by each Publisher pairwise based on the results of a specified
1430+
* equality function.
1431+
*
1432+
* @param source1
1433+
* the first Publisher to compare
1434+
* @param source2
1435+
* the second Publisher to compare
1436+
* @param isEqual
1437+
* a function used to compare items emitted by each Publisher
1438+
* @param bufferSize
1439+
* the number of items to prefetch from the first and second source Publisher
1440+
* @tparam T
1441+
* the type of items emitted by each Publisher
1442+
* @return a Mono that emits a Boolean value that indicates whether the two Publisher two sequences
1443+
* are the same according to the specified function
1444+
*/
1445+
def sequenceEqual[T](source1: Publisher[_ <: T], source2: Publisher[_ <: T], isEqual: (T, T) => Boolean, bufferSize: Int): Mono[Boolean] = Mono(JMono.sequenceEqual[T](source1, source2, new BiPredicate[T, T] {
1446+
override def test(t: T, u: T): Boolean = isEqual(t, u)
1447+
}, bufferSize)).map(Boolean2boolean)
1448+
14061449
/**
14071450
* Merge given monos into a new a `Mono` that will be fulfilled when all of the given `Monos`
14081451
* have been fulfilled. An error will cause pending results to be cancelled and immediate error emission to the

src/test/scala/reactor/core/scala/publisher/MonoTest.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,19 @@ class MonoTest extends FreeSpec with Matchers with TableDrivenPropertyChecks {
221221
isSubscribed shouldBe 'get
222222
emittedValue shouldBe 'get
223223
}
224+
"emit true when both publisher emit the same value according to the isEqual function" in {
225+
val mono = Mono.sequenceEqual[Int](just(10), just(100), (t1: Int, t2: Int) => t1 % 10 == t2 % 10)
226+
StepVerifier.create(mono)
227+
.expectNext(true)
228+
.verifyComplete()
229+
}
230+
"emit true when both publisher emit the same value according to the isEqual function with bufferSize" in {
231+
val mono = Mono.sequenceEqual[Int](just(10), just(100), (t1: Int, t2: Int) => t1 % 10 == t2 % 10, 2)
232+
StepVerifier.create(mono)
233+
.expectNext(true)
234+
.verifyComplete()
235+
236+
}
224237
}
225238

226239
".when" - {

0 commit comments

Comments
 (0)