Skip to content

Commit 029c557

Browse files
authored
fix: Fix recoverWith on Failed stage. (#2631)
1 parent 4dd60f5 commit 029c557

File tree

8 files changed

+62
-22
lines changed

8 files changed

+62
-22
lines changed

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,39 @@ class FlowRecoverWithSpec extends StreamSpec {
7373
}
7474

7575
"recover with a failed future source" in {
76-
Source.failed(ex)
77-
.recoverWith { case _: Throwable => Source.future(Future.failed(ex)) }
76+
val counter = new java.util.concurrent.atomic.AtomicInteger(0)
77+
Source.failed[Int](ex)
78+
.recoverWith {
79+
case _: Throwable =>
80+
if (counter.incrementAndGet() < 100) {
81+
Source.future(Future.failed(ex))
82+
} else {
83+
Source.single(101)
84+
}
85+
}
7886
.runWith(TestSink[Int]())
79-
.request(1)
80-
.expectError(ex)
87+
.request(100)
88+
.expectNext(101)
89+
.expectComplete()
90+
counter.get() shouldBe 100
91+
}
92+
93+
"recover with a failed source" in {
94+
val counter = new java.util.concurrent.atomic.AtomicInteger(0)
95+
Source.failed[Int](ex)
96+
.recoverWith {
97+
case _: Throwable =>
98+
if (counter.incrementAndGet() < 100) {
99+
Source.failed(ex)
100+
} else {
101+
Source.single(101)
102+
}
103+
}
104+
.runWith(TestSink[Int]())
105+
.request(100)
106+
.expectNext(101)
107+
.expectComplete()
108+
counter.get() shouldBe 100
81109
}
82110

83111
"recover with a java stream source" in {

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
701701
val counter = new java.util.concurrent.atomic.AtomicInteger()
702702

703703
val source =
704-
withRetriesTest(failedSource("origin")) { _ =>
704+
withRetriesTest(failedSource("origin")) { () =>
705705
counter.incrementAndGet()
706706
exceptionSource()
707707
} { _ =>
@@ -715,33 +715,30 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
715715
assert(counter.get() == 3)
716716
}
717717

718-
"not retry FailedSources" in {
719-
// https://github.com/apache/pekko/issues/2620
718+
"should retry on a failed source" in {
720719
val counter = new java.util.concurrent.atomic.AtomicInteger()
721720

722721
val source =
723-
withRetriesTest(failedSource("origin")) { _ =>
724-
counter.incrementAndGet()
725-
failedSource("does not work")
726-
} { _ =>
727-
counter.get() < 3
728-
}
722+
withRetriesTest(failedSource("origin")) { () =>
723+
if (counter.incrementAndGet() < 3) {
724+
failedSource("does not work")
725+
} else Source.single(ByteString.fromString("ok"))
726+
} { _ => true }
727+
.runWith(Sink.head)
728+
val result = Await.result(source, Duration.Inf)
729+
assert(result.utf8String == "ok")
729730

730-
assertThrows[ArithmeticException] {
731-
Await.result(source.runWith(Sink.ignore), Duration.Inf)
732-
}
733-
734-
assert(counter.get() == 1)
731+
assert(counter.get() == 3)
735732
}
736733
}
737734

738-
private def withRetriesTest(originSource: Source[ByteString, Any])(fallbackTo: Long => Source[ByteString, NotUsed])(
735+
private def withRetriesTest(originSource: Source[ByteString, Any])(fallbackTo: () => Source[ByteString, NotUsed])(
739736
shouldRetry: Throwable => Boolean = { _ => true }): Source[ByteString, NotUsed] =
740737
originSource.recoverWithRetries(
741738
-1,
742739
{
743740
case e: Throwable if shouldRetry(e) =>
744-
fallbackTo(0)
741+
fallbackTo()
745742
}
746743
).mapMaterializedValue(_ => NotUsed)
747744

stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2169,6 +2169,7 @@ private[pekko] object TakeWithin {
21692169
override def onPull(): Unit = pull(in)
21702170

21712171
@nowarn("msg=Any")
2172+
@tailrec
21722173
def onFailure(ex: Throwable): Unit = {
21732174
import Collect.NotApplied
21742175
if (maximumRetries < 0 || attempt < maximumRetries) {
@@ -2180,10 +2181,10 @@ private[pekko] object TakeWithin {
21802181
TraversalBuilder.getValuePresentedSource(source) match {
21812182
case OptionVal.Some(graph) => graph match {
21822183
case singleSource: SingleSource[T @unchecked] => emit(out, singleSource.elem, () => completeStage())
2183-
case failed: FailedSource[T @unchecked] => failStage(failed.failure)
2184+
case failed: FailedSource[T @unchecked] => onFailure(failed.failure)
21842185
case futureSource: FutureSource[T @unchecked] => futureSource.future.value match {
21852186
case Some(Success(elem)) => emit(out, elem, () => completeStage())
2186-
case Some(Failure(ex)) => failStage(ex)
2187+
case Some(Failure(ex)) => onFailure(ex)
21872188
case None =>
21882189
switchTo(source)
21892190
attempt += 1

stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2078,6 +2078,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
20782078
*
20792079
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
20802080
*
2081+
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
2082+
*
20812083
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
20822084
* from alternative Source
20832085
*
@@ -2100,6 +2102,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
21002102
*
21012103
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
21022104
*
2105+
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
2106+
*
21032107
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
21042108
* from alternative Source
21052109
*

stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2317,6 +2317,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
23172317
*
23182318
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
23192319
*
2320+
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
2321+
*
23202322
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
23212323
* from alternative Source
23222324
*
@@ -2339,6 +2341,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
23392341
*
23402342
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
23412343
*
2344+
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
2345+
*
23422346
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
23432347
* from alternative Source
23442348
*

stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1427,6 +1427,8 @@ final class SubFlow[In, Out, Mat](
14271427
*
14281428
* Throwing an exception inside ``recoverWith`` _will_ be logged on ERROR level automatically.
14291429
*
1430+
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
1431+
*
14301432
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
14311433
* from alternative Source
14321434
*

stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1399,6 +1399,8 @@ final class SubSource[Out, Mat](
13991399
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
14001400
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
14011401
*
1402+
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
1403+
*
14021404
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
14031405
* from alternative Source
14041406
*

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,6 +906,8 @@ trait FlowOps[+Out, +Mat] {
906906
*
907907
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
908908
*
909+
* It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`.
910+
*
909911
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
910912
* from alternative Source
911913
*

0 commit comments

Comments
 (0)