Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 23d6c26

Browse files
committed
Added Mono.delayElement
1 parent 5e4d2fb commit 23d6c26

File tree

2 files changed

+51
-3
lines changed

2 files changed

+51
-3
lines changed

src/main/scala/reactor/core/scala/publisher/Mono.scala

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,42 @@ class Mono[T] private(private val jMono: JMono[T]) extends Publisher[T] with Map
225225
* @return a new [[Mono]]
226226
* @see [[Flux.defaultIfEmpty]]
227227
*/
228-
final def defaultIfEmpty(defaultV: T): Mono[T] = new Mono[T](
229-
jMono.defaultIfEmpty(defaultV)
230-
)
228+
final def defaultIfEmpty(defaultV: T): Mono[T] = Mono[T](jMono.defaultIfEmpty(defaultV))
229+
230+
/**
231+
* Delay this [[Mono]] element ([[Subscriber.onNext]] signal) by a given
232+
* duration. Empty Monos or error signals are not delayed.
233+
*
234+
* <p>
235+
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.RC1/src/docs/marble/delayonnext.png" alt="">
236+
*
237+
* <p>
238+
* Note that the scheduler on which the Mono chain continues execution will be the
239+
* [[reactor.core.scheduler.Schedulers#parallel() parallel]] scheduler if the mono is valued, or the
240+
* current scheduler if the mono completes empty or errors.
241+
*
242+
* @param delay duration by which to delay the [[Subscriber.onNext]] signal
243+
* @return a delayed [[Mono]]
244+
*/
245+
final def delayElement(delay: Duration) = Mono(jMono.delayElement(delay))
246+
247+
/**
248+
* Delay this [[Mono]] element ([[Subscriber.onNext]] signal) by a given
249+
* [[Duration]], on a particular [[Scheduler]]. Empty monos or error signals are not delayed.
250+
*
251+
* <p>
252+
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.RC1/src/docs/marble/delayonnext.png" alt="">
253+
*
254+
* <p>
255+
* Note that the scheduler on which the mono chain continues execution will be the
256+
* scheduler provided if the mono is valued, or the current scheduler if the mono
257+
* completes empty or errors.
258+
*
259+
* @param delay [[Duration]] by which to delay the { @link Subscriber#onNext} signal
260+
* @param timer a time-capable [[Scheduler]] instance to delay the value signal on
261+
* @return a delayed [[Mono]]
262+
*/
263+
final def delayElement(delay: Duration, timer: Scheduler) = Mono(jMono.delayElement(delay, timer))
231264

232265
/**
233266
* Delay the [[Mono.subscribe subscription]] to this [[Mono]] source until the given

src/test/scala/reactor/core/scala/publisher/MonoTest.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,21 @@ class MonoTest extends FreeSpec with Matchers with TableDrivenPropertyChecks wit
455455
.verifyComplete()
456456
}
457457

458+
".delayElement" - {
459+
"should delay the element" in {
460+
StepVerifier.withVirtualTime(() => Mono.just(randomValue).delayElement(5 seconds))
461+
.thenAwait(5 seconds)
462+
.expectNext(randomValue)
463+
.verifyComplete()
464+
}
465+
"with timer should delay using timer" in {
466+
StepVerifier.withVirtualTime(() => Mono.just(randomValue).delayElement(5 seconds, Schedulers.parallel()))
467+
.thenAwait(5 seconds)
468+
.expectNext(randomValue)
469+
.verifyComplete()
470+
}
471+
}
472+
458473
".delaySubscription" - {
459474
"with delay duration should delay subscription as long as the provided duration" in {
460475
StepVerifier.withVirtualTime(() => Mono.just(1).delaySubscription(1 hour))

0 commit comments

Comments
 (0)