Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,39 @@ class FlowRecoverWithSpec extends StreamSpec {
}

"recover with a failed future source" in {
Source.failed(ex)
.recoverWith { case _: Throwable => Source.future(Future.failed(ex)) }
val counter = new java.util.concurrent.atomic.AtomicInteger(0)
Source.failed[Int](ex)
.recoverWith {
case _: Throwable =>
if (counter.incrementAndGet() < 100) {
Source.future(Future.failed(ex))
} else {
Source.single(101)
}
}
.runWith(TestSink[Int]())
.request(1)
.expectError(ex)
.request(100)
.expectNext(101)
.expectComplete()
counter.get() shouldBe 100
}

"recover with a failed source" in {
val counter = new java.util.concurrent.atomic.AtomicInteger(0)
Source.failed[Int](ex)
.recoverWith {
case _: Throwable =>
if (counter.incrementAndGet() < 100) {
Source.failed(ex)
} else {
Source.single(101)
}
}
.runWith(TestSink[Int]())
.request(100)
.expectNext(101)
.expectComplete()
counter.get() shouldBe 100
}

"recover with a java stream source" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import pekko.NotUsed
import pekko.stream.testkit._
import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.EventFilter
import pekko.util.ByteString

import scala.collection.immutable
import scala.concurrent.duration._
Expand Down Expand Up @@ -685,4 +686,57 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
.expectComplete()
}
}

"recoverWithRetries" must {
"retry when exceptions occur" in {
val counter = new java.util.concurrent.atomic.AtomicInteger()

val source =
withRetriesTest(failedSource("origin")) { () =>
counter.incrementAndGet()
exceptionSource()
} { _ =>
counter.get() < 3
}

assertThrows[ArithmeticException] {
Await.result(source.runWith(Sink.ignore), Duration.Inf)
}

assert(counter.get() == 3)
}

"should retry on a failed source" in {
val counter = new java.util.concurrent.atomic.AtomicInteger()

val source =
withRetriesTest(failedSource("origin")) { () =>
if (counter.incrementAndGet() < 3) {
failedSource("does not work")
} else Source.single(ByteString.fromString("ok"))
} { _ => true }
.runWith(Sink.head)
val result = Await.result(source, Duration.Inf)
assert(result.utf8String == "ok")

assert(counter.get() == 3)
}
}

private def withRetriesTest(originSource: Source[ByteString, Any])(fallbackTo: () => Source[ByteString, NotUsed])(
shouldRetry: Throwable => Boolean = { _ => true }): Source[ByteString, NotUsed] =
originSource.recoverWithRetries(
-1,
{
case e: Throwable if shouldRetry(e) =>
fallbackTo()
}
).mapMaterializedValue(_ => NotUsed)

private def failedSource(message: String): Source[ByteString, NotUsed] =
Source.failed(new ArithmeticException(message))

// has adivide by zero exception
private def exceptionSource(): Source[ByteString, NotUsed] =
Source.single(5).map(_ / 0).map(s => ByteString.fromString(s.toString))
}
Original file line number Diff line number Diff line change
Expand Up @@ -2183,6 +2183,7 @@ private[pekko] object TakeWithin {
override def onPull(): Unit = pull(in)

@nowarn("msg=Any")
@tailrec
def onFailure(ex: Throwable): Unit = {
import Collect.NotApplied
if (maximumRetries < 0 || attempt < maximumRetries) {
Expand All @@ -2194,10 +2195,10 @@ private[pekko] object TakeWithin {
TraversalBuilder.getValuePresentedSource(source) match {
case OptionVal.Some(graph) => graph match {
case singleSource: SingleSource[T @unchecked] => emit(out, singleSource.elem, () => completeStage())
case failed: FailedSource[T @unchecked] => failStage(failed.failure)
case failed: FailedSource[T @unchecked] => onFailure(failed.failure)
case futureSource: FutureSource[T @unchecked] => futureSource.future.value match {
case Some(Success(elem)) => emit(out, elem, () => completeStage())
case Some(Failure(ex)) => failStage(ex)
case Some(Failure(ex)) => onFailure(ex)
case None =>
switchTo(source)
attempt += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2243,6 +2243,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
*
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
Expand All @@ -2265,6 +2267,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
*
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2505,6 +2505,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
*
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
Expand All @@ -2527,6 +2529,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
*
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1528,6 +1528,8 @@ class SubFlow[In, Out, Mat](
*
* Throwing an exception inside ``recoverWith`` _will_ be logged on ERROR level automatically.
*
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,8 @@ class SubSource[Out, Mat](
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,8 @@ trait FlowOps[+Out, +Mat] {
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
*
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
Expand Down