Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit ab03614

Browse files
committed
Deprecate old SFlux.retryWhen(predicate)
Added SFlux.retryWhen(Retry)
1 parent f8c8cb0 commit ab03614

File tree

3 files changed

+100
-2
lines changed

3 files changed

+100
-2
lines changed

src/main/scala/reactor/core/scala/publisher/SFlux.scala

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import reactor.util.Logger
1717
import reactor.util.concurrent.Queues
1818
import reactor.util.concurrent.Queues.{SMALL_BUFFER_SIZE, XS_BUFFER_SIZE}
1919
import reactor.util.function.{Tuple2, Tuple3, Tuple4, Tuple5, Tuple6}
20+
import reactor.util.retry.Retry
2021

2122
import scala.collection.mutable
2223
import scala.concurrent.duration.Duration
@@ -504,7 +505,20 @@ trait SFlux[T] extends SFluxLike[T, SFlux] with MapablePublisher[T] with ScalaCo
504505
new ReactiveSFlux[SGroupedFlux[K, V]](jFluxOfGroupedFlux.map((jg: JGroupedFlux[K, V]) => SGroupedFlux(jg)))
505506
}
506507

507-
final def handle[R](handler: (T, SynchronousSink[R]) => Unit): SFlux[R] = SFlux.fromPublisher(coreFlux.handle[R](handler))
508+
/**
509+
* Handle the items emitted by this [[SFlux]] by calling a biconsumer with the
510+
* output sink for each onNext. At most one [[SynchronousSink.next(Anyref)]]
511+
* call must be performed and/or 0 or 1 [[SynchronousSink.error(Throwable)]] or
512+
* [[SynchronousSink.complete()]].
513+
*
514+
* @param handler the handling [[Function2]]
515+
* @tparam R the transformed type
516+
* @reactor.errorMode This operator supports { @link #onErrorContinue(BiConsumer) resuming on errors} (including when
517+
* fusion is enabled) when the { @link BiConsumer} throws an exception or if an error is signaled explicitly via
518+
* { @link SynchronousSink#error(Throwable)}.
519+
* @return a transformed [[SFlux]]
520+
*/
521+
final def handle[R](handler: (T, SynchronousSink[R]) => Unit): SFlux[R] = coreFlux.handle[R](handler).asScala
508522

509523
final def hasElement(value: T): SMono[Boolean] = new ReactiveSMono[JBoolean](coreFlux.hasElement(value)).map(Boolean2boolean)
510524

@@ -596,13 +610,65 @@ trait SFlux[T] extends SFluxLike[T, SFlux] with MapablePublisher[T] with ScalaCo
596610

597611
final def retry(numRetries: Long = Long.MaxValue, retryMatcher: Throwable => Boolean = (_: Throwable) => true): SFlux[T] = coreFlux.retry(numRetries, retryMatcher).asScala
598612

613+
@deprecated("Use retryWhen(Retry)")
599614
final def retryWhen(whenFactory: SFlux[Throwable] => Publisher[_]): SFlux[T] = {
600615
val func = new Function[JFlux[Throwable], Publisher[_]] {
601616
override def apply(t: JFlux[Throwable]): Publisher[_] = whenFactory(new ReactiveSFlux[Throwable](t))
602617
}
603618
coreFlux.retryWhen(func).asScala
604619
}
605620

621+
/**
622+
* Retries this [[SFlux]] in response to signals emitted by a companion [[Publisher]].
623+
* The companion is generated by the provided [[Retry]] instance, see {@link Retry#max(long)}, {@link Retry#maxInARow(long)}
624+
* and {@link Retry#backoff(long, Duration)} for readily available strategy builders.
625+
* <p>
626+
* The operator generates a base for the companion, a [[SFlux]] of [[reactor.util.retry.Retry.RetrySignal]]
627+
* which each give metadata about each retryable failure whenever this [[SFlux]] signals an error. The final companion
628+
* should be derived from that base companion and emit data in response to incoming onNext (although it can emit less
629+
* elements, or delay the emissions).
630+
* <p>
631+
* Terminal signals in the companion terminate the sequence with the same signal, so emitting an {@link Subscriber#onError(Throwable)}
632+
* will fail the resulting [[SFlux]] with that same error.
633+
* <p>
634+
* <img class="marble" src="doc-files/marbles/retryWhenSpecForFlux.svg" alt="">
635+
* <p>
636+
* Note that the [[Retry.RetrySignal]] state can be transient and change between each source
637+
* {@link org.reactivestreams.Subscriber#onError(Throwable) onError} or
638+
* {@link org.reactivestreams.Subscriber#onNext(Object) onNext}. If processed with a delay,
639+
* this could lead to the represented state being out of sync with the state at which the retry
640+
* was evaluated. Map it to {@link Retry.RetrySignal#copy()} right away to mediate this.
641+
* <p>
642+
* Note that if the companion [[Publisher]] created by the {@code whenFactory}
643+
* emits [[reactor.util.context.Context]] as trigger objects, these [[reactor.util.context.Context]] will be merged with
644+
* the previous Context:
645+
* <blockquote><pre>
646+
* {@code
647+
* Retry customStrategy = Retry.fromFunction(companion -> companion.handle((retrySignal, sink) -> {
648+
* Context ctx = sink.currentContext();
649+
* int rl = ctx.getOrDefault("retriesLeft", 0);
650+
* if (rl > 0) {
651+
* sink.next(Context.of(
652+
* "retriesLeft", rl - 1,
653+
* "lastError", retrySignal.failure()
654+
* ));
655+
* } else {
656+
* sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure()));
657+
* }
658+
* }));
659+
* Flux<T> retried = originalFlux.retryWhen(customStrategy);
660+
* }</pre>
661+
* </blockquote>
662+
*
663+
* @param retrySpec the { @link Retry} strategy that will generate the companion { @link Publisher},
664+
* given a { @link Flux} that signals each onError as a { @link reactor.util.retry.Retry.RetrySignal}.
665+
* @return a { @link Flux} that retries on onError when a companion { @link Publisher} produces an onNext signal
666+
* @see Retry.max(long)
667+
* @see Retry.maxInARow(long)
668+
* @see Retry.backoff(long, Duration)
669+
*/
670+
final def retryWhen(retry: Retry): SFlux[T] = coreFlux.retryWhen(retry).asScala
671+
606672
final def sample(timespan: Duration): SFlux[T] = coreFlux.sample(timespan).asScala
607673

608674
final def sampleFirst(timespan: Duration): SFlux[T] = coreFlux.sampleFirst(timespan).asScala
@@ -863,7 +929,6 @@ object SFlux {
863929
* @tparam T the produced type
864930
* @return a [[reactor.core.publisher.FluxProcessor]] accepting publishers and producing T
865931
*/
866-
// TODO: How to test these switchOnNext?
867932
def switchOnNext[T](mergedPublishers: Publisher[_ <: Publisher[_ <: T]]): SFlux[T] = SFlux.fromPublisher(JFlux.switchOnNext[T](mergedPublishers))
868933

869934
def using[T, D](resourceSupplier: () => D, sourceSupplier: D => Publisher[_ <: T], resourceCleanup: D => Unit, eager: Boolean = false): SFlux[T] =
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package reactor.util.scala.retry
2+
3+
import org.reactivestreams.Publisher
4+
import reactor.core.scala.publisher.SFlux
5+
import reactor.util.retry.Retry
6+
import reactor.util.retry.Retry.RetrySignal
7+
8+
object SRetry {
9+
/**
10+
* A wrapper around [[Function1]] to provide [[Retry]] by using lambda expressions.
11+
*
12+
* @param function the { @link Function} representing the desired { @link Retry} strategy as a lambda
13+
* @return the { @link Retry} strategy adapted from the { @link Function}
14+
*/
15+
final def from(function: SFlux[RetrySignal] => Publisher[_]): Retry = Retry.from(function)
16+
}

src/test/scala/reactor/core/scala/publisher/SFluxTest.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import reactor.core.scheduler.Schedulers
2121
import reactor.test.StepVerifier
2222
import reactor.test.scheduler.VirtualTimeScheduler
2323
import reactor.util.concurrent.Queues
24+
import reactor.util.scala.retry.SRetry
2425

2526
import scala.collection.mutable
2627
import scala.collection.mutable.ListBuffer
@@ -1770,6 +1771,8 @@ class SFluxTest extends AnyFreeSpec with Matchers with TableDrivenPropertyChecks
17701771
}
17711772
}
17721773

1774+
/*
1775+
//this is deprecated, should be removed on 0.7.x
17731776
".retryWhen should retry the companion publisher produces onNext signal" in {
17741777
val counter = new AtomicInteger(0)
17751778
val flux = SFlux.just(1, 2, 3).concatWith(SMono.raiseError(new RuntimeException("ex"))).retryWhen { _ =>
@@ -1781,6 +1784,20 @@ class SFluxTest extends AnyFreeSpec with Matchers with TableDrivenPropertyChecks
17811784
.expectNext(1, 2, 3)
17821785
.verifyComplete()
17831786
}
1787+
*/
1788+
1789+
".retryWhen should retry according to the spec" in {
1790+
val counter = new AtomicInteger(0)
1791+
val sFlux = SFlux.just(1, 2, 3).concatWith(SMono.raiseError(new RuntimeException("ex")))
1792+
.retryWhen(SRetry.from(_ => {
1793+
if (counter.getAndIncrement() > 0) SMono.raiseError[Int](new RuntimeException("another ex"))
1794+
else SMono.just(1)
1795+
}))
1796+
StepVerifier.create(sFlux)
1797+
.expectNext(1, 2, 3)
1798+
.expectNext(1, 2, 3)
1799+
.verifyComplete()
1800+
}
17841801

17851802
".sample should emit the last value for given interval" in {
17861803
StepVerifier.withVirtualTime(() => SFlux.just(1, 2, 3, 4, 5, 6).delayElements(1 second).sample(1500 milliseconds))

0 commit comments

Comments
 (0)