Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 63b42fa

Browse files
committed
Added Mono.blockOption
1 parent 0d17429 commit 63b42fa

File tree

3 files changed

+61
-0
lines changed

3 files changed

+61
-0
lines changed

src/main/scala/reactor/core/scala/publisher/Mono.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,43 @@ class Mono[T] private(private val jMono: JMono[T]) extends Publisher[T] with Map
142142
*/
143143
final def block(timeout: Duration): T = jMono.block(timeout)
144144

145+
/**
146+
* Subscribe to this {[[Mono]] Mono} and <strong>block indefinitely</strong> until a next signal is
147+
* received or the Mono completes empty. Returns an [[Option]], which can be used
148+
* to replace the empty case with an Exception via [[Option.orElse(throw exception)]].
149+
* In case the Mono itself errors, the original exception is thrown (wrapped in a
150+
* [[RuntimeException]] if it was a checked exception).
151+
*
152+
* <p>
153+
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/block.png" alt="">
154+
* <p>
155+
* Note that each blockOptional() will trigger a new subscription: in other words, the result
156+
* might miss signal from hot publishers.
157+
*
158+
* @return T the result
159+
*/
160+
final def blockOption(): Option[T] = jMono.blockOptional()
161+
162+
/**
163+
* Subscribe to this [[Mono]] and <strong>block</strong> until a next signal is
164+
* received, the Mono completes empty or a timeout expires. Returns an [[Option]]
165+
* for the first two cases, which can be used to replace the empty case with an
166+
* Exception via [[Option.orElse(throw exception)]].
167+
* In case the Mono itself errors, the original exception is thrown (wrapped in a
168+
* [[RuntimeException]] if it was a checked exception).
169+
* If the provided timeout expires, a [[RuntimeException]] is thrown.
170+
*
171+
* <p>
172+
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/block.png" alt="">
173+
* <p>
174+
* Note that each block() will trigger a new subscription: in other words, the result
175+
* might miss signal from hot publishers.
176+
*
177+
* @param timeout maximum time period to wait for before raising a [[RuntimeException]]
178+
* @return T the result
179+
*/
180+
final def blockOption(timeout: Duration): Option[T] = jMono.blockOptional(timeout)
181+
145182
/**
146183
* Cast the current [[Mono]] produced type into a target produced type.
147184
*

src/main/scala/reactor/core/scala/publisher/package.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,4 +159,6 @@ Uncomment this when used. It is not used for now and reduce the code coverage
159159
implicit def scalaBiPredicate2JavaBiPredicate[T, U](scalaBiPredicate: (T, U) => Boolean): BiPredicate[T, U] = new BiPredicate[T, U] {
160160
override def test(t: T, u: U) = scalaBiPredicate(t, u)
161161
}
162+
163+
implicit def javaOptional2ScalaOption[T](jOptional: Optional[T]): Option[T] = if(jOptional.isPresent) Some(jOptional.get()) else None
162164
}

src/test/scala/reactor/core/scala/publisher/MonoTest.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,28 @@ class MonoTest extends FreeSpec with Matchers with TableDrivenPropertyChecks wit
407407
}
408408
}
409409

410+
".blockOption" - {
411+
"without duration" - {
412+
"should block the mono to get value" in {
413+
Mono.just(randomValue).blockOption() shouldBe Some(randomValue)
414+
}
415+
"should retun None if mono is empty" in {
416+
Mono.empty.blockOption() shouldBe None
417+
}
418+
}
419+
"with duration" - {
420+
"should block the mono up to the duration" in {
421+
Mono.just(randomValue).blockOption(10 seconds) shouldBe Some(randomValue)
422+
}
423+
"shouldBlock the mono up to the duration and return None" in {
424+
StepVerifier.withVirtualTime(() => Mono.just(Mono.empty.blockOption(10 seconds)))
425+
.thenAwait(10 seconds)
426+
.expectNext(None)
427+
.verifyComplete()
428+
}
429+
}
430+
}
431+
410432
".cast should cast the underlying value" in {
411433
val number = Mono.just(BigDecimal("123")).cast(classOf[ScalaNumber]).block()
412434
number shouldBe a[ScalaNumber]

0 commit comments

Comments
 (0)