Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit ed91fcb

Browse files
committed
Added Mono.untilOther(anyPublisher)
Added Mono.untilOtherDelayError(anyPublisher)
1 parent 8744497 commit ed91fcb

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

src/main/scala/reactor/core/scala/publisher/Mono.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,6 +1124,30 @@ class Mono[T] private(private val jMono: JMono[T]) extends Publisher[T] with Map
11241124
*/
11251125
final def transform[V](transformer: Mono[T] => Publisher[V]): Mono[V] = Mono[V](jMono.transform[V]((_: JMono[T]) => transformer(Mono.this)))
11261126

1127+
/**
1128+
* Subscribe to this Mono and another Publisher, which will be used as a trigger for
1129+
* the emission of this Mono's element. That is to say, this Mono's element is delayed
1130+
* until the trigger Publisher emits for the first time (or terminates empty).
1131+
*
1132+
* @param anyPublisher the publisher which first emission or termination will trigger
1133+
* the emission of this Mono's value.
1134+
* @return this Mono, but delayed until the given publisher emits first or terminates.
1135+
*/
1136+
def untilOther(anyPublisher: Publisher[_]) = Mono(jMono.untilOther(anyPublisher))
1137+
1138+
/**
1139+
* Subscribe to this Mono and another Publisher, which will be used as a trigger for
1140+
* the emission of this Mono's element, mapped through a provided function.
1141+
* That is to say, this Mono's element is delayed until the trigger Publisher emits
1142+
* for the first time (or terminates empty). Any error is delayed until all publishers
1143+
* have triggered, and multiple errors are combined into one.
1144+
*
1145+
* @param anyPublisher the publisher which first emission or termination will trigger
1146+
* the emission of this Mono's value.
1147+
* @return this Mono, but delayed until the given publisher emits first or terminates.
1148+
*/
1149+
def untilOtherDelayError(anyPublisher: Publisher[_]) = Mono(jMono.untilOtherDelayError(anyPublisher))
1150+
11271151
final def asJava(): JMono[T] = jMono
11281152
}
11291153

src/test/scala/reactor/core/scala/publisher/MonoTest.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,6 +1289,28 @@ class MonoTest extends FreeSpec with Matchers with TableDrivenPropertyChecks {
12891289
.expectNext(randomValue.toString)
12901290
.verifyComplete()
12911291
}
1292+
1293+
".untilOther should emit the value in this mono after the other publisher emits the first tiem or terminates empty" in {
1294+
val flag = new AtomicBoolean(false)
1295+
val mono = Mono.just(randomValue).untilOther(Flux.just(1).doOnNext(_ => flag.compareAndSet(false, true))).doOnNext(_ => {
1296+
flag shouldBe 'get
1297+
flag.compareAndSet(true, false)
1298+
})
1299+
StepVerifier.create(mono)
1300+
.expectNext(randomValue)
1301+
.verifyComplete()
1302+
}
1303+
1304+
".untilOtherDelayError should emit the value in this mono after the other publisher emits the first tiem or terminates empty" in {
1305+
val flag = new AtomicBoolean(false)
1306+
val mono = Mono.just(randomValue).untilOtherDelayError(Flux.just(1).doOnNext(_ => flag.compareAndSet(false, true))).doOnNext(_ => {
1307+
flag shouldBe 'get
1308+
flag.compareAndSet(true, false)
1309+
})
1310+
StepVerifier.create(mono)
1311+
.expectNext(randomValue)
1312+
.verifyComplete()
1313+
}
12921314
}
12931315

12941316

0 commit comments

Comments
 (0)