This repository was archived by the owner on Mar 2, 2022. It is now read-only.
File tree Expand file tree Collapse file tree 2 files changed +28
-0
lines changed
main/scala/reactor/core/scala/publisher
test/scala/reactor/core/scala/publisher Expand file tree Collapse file tree 2 files changed +28
-0
lines changed Original file line number Diff line number Diff line change @@ -262,6 +262,27 @@ class Mono[T] private(private val jMono: JMono[T]) extends Publisher[T] with Map
262262 */
263263 final def delayElement (delay : Duration , timer : Scheduler ) = Mono (jMono.delayElement(delay, timer))
264264
265+ /**
266+ * Subscribe to this [[Mono Mono ]] and another [[Publisher ]] that is generated from
267+ * this Mono's element and which will be used as a trigger for relaying said element.
268+ * <p>
269+ * That is to say, the resulting [[Mono ]] delays until this Mono's element is
270+ * emitted, generates a trigger Publisher and then delays again until the trigger
271+ * Publisher terminates.
272+ * <p>
273+ * Note that contiguous calls to all delayUntil are fused together.
274+ * The triggers are generated and subscribed to in sequence, once the previous trigger
275+ * completes. Error is propagated immediately
276+ * downstream. In both cases, an error in the source is immediately propagated.
277+ * <p>
278+ * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.RC1/src/docs/marble/delayUntil.png" alt="">
279+ *
280+ * @param triggerProvider a [[Function1 ]] that maps this Mono's value into a
281+ * [[Publisher ]] whose termination will trigger relaying the value.
282+ * @return this Mono, but delayed until the derived publisher terminates.
283+ */
284+ final def delayUntil (triggerProvider : T => Publisher [_]) = Mono (jMono.delayUntil(triggerProvider))
285+
265286 /**
266287 * Delay the [[Mono.subscribe subscription ]] to this [[Mono ]] source until the given
267288 * period elapses.
Original file line number Diff line number Diff line change @@ -470,6 +470,13 @@ class MonoTest extends FreeSpec with Matchers with TableDrivenPropertyChecks wit
470470 }
471471 }
472472
473+ " .delayUntil should delay until the other provider terminate" in {
474+ StepVerifier .withVirtualTime(() => Mono .just(randomValue).delayUntil(t => Flux .just(1 , 2 ).delayElements(2 seconds)))
475+ .thenAwait(4 seconds)
476+ .expectNext(randomValue)
477+ .verifyComplete()
478+ }
479+
473480 " .delaySubscription" - {
474481 " with delay duration should delay subscription as long as the provided duration" in {
475482 StepVerifier .withVirtualTime(() => Mono .just(1 ).delaySubscription(1 hour))
You can’t perform that action at this time.
0 commit comments