Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 8744497

Browse files
committed
Added Mono.using(resourceSupplier, sourceSupplier, resourceCleanup, eager)
Added Mono.using(resourceSupplier, sourceSupplier, resourceCleanup)
1 parent c30be6f commit 8744497

File tree

1 file changed

+49
-0
lines changed
  • src/main/scala/reactor/core/scala/publisher

1 file changed

+49
-0
lines changed

src/main/scala/reactor/core/scala/publisher/Mono.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1446,6 +1446,55 @@ object Mono {
14461446
override def test(t: T, u: T): Boolean = isEqual(t, u)
14471447
}, bufferSize)).map(Boolean2boolean)
14481448

1449+
/**
1450+
* Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a
1451+
* Mono derived from the same resource and makes sure the resource is released if the
1452+
* sequence terminates or
1453+
* the Subscriber cancels.
1454+
* <p>
1455+
* <ul> <li>Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup
1456+
* Consumer may override the terminal even.</li> <li>Non-eager cleanup will drop any exception.</li> </ul>
1457+
* <p>
1458+
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.0.6.RELEASE/src/docs/marble/using.png"
1459+
* alt="">
1460+
*
1461+
* @param resourceSupplier a function that is called on subscribe for preparing the resource
1462+
* @param sourceSupplier a [[Mono]] factory derived from the supplied resource
1463+
* @param resourceCleanup invoked on completion
1464+
* @param eager true to clean before terminating downstream subscribers
1465+
* @tparam T emitted type
1466+
* @tparam D resource type
1467+
* @return new [[Mono]]
1468+
*/
1469+
def using[T, D](resourceSupplier: () => D, sourceSupplier: D => _ <: Mono[_ <: T], resourceCleanup: D => Unit, eager: Boolean) =
1470+
Mono(JMono.using[T, D](resourceSupplier, new Function[D, JMono[_ <: T]] {
1471+
override def apply(t: D): JMono[_ <: T] = sourceSupplier(t).asJava()
1472+
}, resourceCleanup, eager))
1473+
1474+
/**
1475+
* Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a
1476+
* Mono derived from the same resource and makes sure the resource is released if the
1477+
* sequence terminates or
1478+
* the Subscriber cancels.
1479+
* <p>
1480+
* Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer
1481+
* may override the terminal even.
1482+
* <p>
1483+
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.0.6.RELEASE/src/docs/marble/using.png"
1484+
* alt="">
1485+
*
1486+
* @param resourceSupplier a function that is called on subscribe to prepare the resource
1487+
* @param sourceSupplier a [[Mono]] factory derived from the supplied resource
1488+
* @param resourceCleanup invoked on completion
1489+
* @tparam T emitted type
1490+
* @tparam D resource type
1491+
* @return new [[Mono]]
1492+
*/
1493+
def using[T, D](resourceSupplier: () => D, sourceSupplier: D => Mono[_ <: T], resourceCleanup: D => Unit) =
1494+
Mono(JMono.using[T, D](resourceSupplier, new Function[D, JMono[_ <: T]] {
1495+
override def apply(t: D): JMono[_ <: T] = sourceSupplier(t).asJava()
1496+
}, resourceCleanup))
1497+
14491498
/**
14501499
* Merge given monos into a new a `Mono` that will be fulfilled when all of the given `Monos`
14511500
* have been fulfilled. An error will cause pending results to be cancelled and immediate error emission to the

0 commit comments

Comments
 (0)