Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions core/src/main/scala/ox/flow/FlowOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import ox.forkCancellable
import ox.forkUnsupervised
import ox.forkUser
import ox.repeatWhile
import ox.resilience.RetryConfig
import ox.scheduling.Schedule
import ox.sleep
import ox.supervised
import ox.tapException
Expand Down Expand Up @@ -951,6 +953,60 @@ class FlowOps[+T]:
def onError(f: Throwable => Unit): Flow[T] = Flow.usingEmitInline: emit =>
last.run(emit).tapException(f)

/** Retries the upstream flow execution using the provided retry configuration. If the flow fails with an exception, it will be retried
* according to the schedule defined in the retry config until it succeeds or the retry policy decides to stop.
*
* Each retry attempt will run the complete upstream flow, from start up to this point. The retry behavior is controlled by the
* [[RetryConfig]].
*
* Note that this retries the flow execution itself, not individual elements within the flow. If you need to retry individual operations
* within the flow, consider using retry logic inside methods such as [[map]].
*
* Creates an asynchronous boundary (see [[buffer]]) to isolate failures when running the upstream flow.
*
* @param config
* The retry configuration that specifies the retry schedule and success/failure conditions.
* @return
* A new flow that will retry execution according to the provided configuration.
* @throws anything
* The exception from the last retry attempt if all retries are exhausted.
* @see
* [[ox.resilience.retry]]
*/
def retry(config: RetryConfig[Throwable, Unit])(using BufferCapacity): Flow[T] = Flow.usingEmitInline: emit =>
val ch = BufferCapacity.newChannel[T]
unsupervised:
forkPropagate(ch) {
ox.resilience.retry(config)(last.run(FlowEmit.fromInline(t => ch.send(t))))
ch.done()
}.discard
FlowEmit.channelToEmit(ch, emit)

/** @see
* [[retry(RetryConfig)]]
*/
def retry(schedule: Schedule): Flow[T] = retry(RetryConfig(schedule))

/** Recovers from errors in the upstream flow by emitting a recovery value when the error is handled by the partial function. If the
* partial function is not defined for the error, the original error is propagated.
*
* Creates an asynchronous boundary (see [[buffer]]) to isolate failures when running the upstream flow.
*
* @param pf
* A partial function that handles specific exceptions and returns a recovery value to emit.
* @return
* A flow that emits elements from the upstream flow, and emits a recovery value if the upstream fails with a handled exception.
*/
def recover[U >: T](pf: PartialFunction[Throwable, U])(using BufferCapacity): Flow[U] = Flow.usingEmitInline: emit =>
val ch = BufferCapacity.newChannel[U]
unsupervised:
forkPropagate(ch) {
try last.run(FlowEmit.fromInline(t => ch.send(t)))
catch case e: Throwable if pf.isDefinedAt(e) => ch.send(pf(e))
ch.done()
}.discard
FlowEmit.channelToEmit(ch, emit)

//

protected def runLastToChannelAsync(ch: Sink[T])(using OxUnsupervised): Unit =
Expand Down
15 changes: 7 additions & 8 deletions core/src/main/scala/ox/scheduling/scheduled.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,15 @@ def scheduledEither[E, T](config: ScheduledConfig[E, T])(operation: => Either[E,
*/
def scheduledWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(config: ScheduledConfig[E, T])(operation: => F[T]): F[T] =
@tailrec
def loop(invocation: Int, intervals: LazyList[FiniteDuration], lastDuration: Option[FiniteDuration]): F[T] =
def sleepIfNeeded(startTimestamp: Long, nextDelay: FiniteDuration) =
def loop(invocation: Int, intervals: LazyList[FiniteDuration]): F[T] =
def sleepIfNeeded(startTimestamp: Long, nextDelay: FiniteDuration): Unit =
val delay = config.sleepMode match
case SleepMode.StartToStart =>
val elapsed = System.nanoTime() - startTimestamp
val remaining = nextDelay.toNanos - elapsed
remaining.nanos
case SleepMode.EndToStart => nextDelay
if delay.toMillis > 0 then sleep(delay)
delay
end sleepIfNeeded

val startTimestamp = System.nanoTime()
Expand All @@ -123,22 +122,22 @@ def scheduledWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(config: ScheduledCon

nextDelay match
case Some(nd) if !shouldStop.stop =>
val delay = sleepIfNeeded(startTimestamp, nd)
loop(invocation + 1, intervals.tail, Some(delay))
sleepIfNeeded(startTimestamp, nd)
loop(invocation + 1, intervals.tail)
case _ => v
case v =>
val result = em.getT(v)
val shouldStop = config.afterAttempt(invocation, Right(result))

nextDelay match
case Some(nd) if !shouldStop.stop =>
val delay = sleepIfNeeded(startTimestamp, nd)
loop(invocation + 1, intervals.tail, Some(delay))
sleepIfNeeded(startTimestamp, nd)
loop(invocation + 1, intervals.tail)
case _ => v
end match
end loop

config.schedule.initialDelay.foreach(sleep)

loop(1, config.schedule.intervals(), None)
loop(1, config.schedule.intervals())
end scheduledWithErrorMode
137 changes: 137 additions & 0 deletions core/src/test/scala/ox/flow/FlowOpsRecoverTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package ox.flow

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*
import ox.channels.ChannelClosedException

class FlowOpsRecoverTest extends AnyFlatSpec with Matchers:

behavior of "Flow.recover"

it should "pass through elements when upstream flow succeeds" in:
// given
val flow = Flow.fromValues(1, 2, 3)
val recoveryFunction: PartialFunction[Throwable, Int] = { case _: IllegalArgumentException =>
42
}

// when
val result = flow.recover(recoveryFunction).runToList()

// then
result shouldBe List(1, 2, 3)

it should "emit recovery value when upstream flow fails with handled exception" in:
// given
val exception = new IllegalArgumentException("test error")
val flow = Flow.fromValues(1, 2).concat(Flow.failed(exception))
val recoveryFunction: PartialFunction[Throwable, Int] = { case _: IllegalArgumentException =>
42
}

// when
val result = flow.recover(recoveryFunction).runToList()

// then
result shouldBe List(1, 2, 42)

it should "not emit recovery value when downstream flow fails with handled exception" in:
// given
val exception = new IllegalArgumentException("test error")
val recoveryFunction: PartialFunction[Throwable, Int] = { case _: IllegalArgumentException =>
42
}
val flow = Flow.fromValues(1, 2).recover(recoveryFunction).concat(Flow.failed(exception))

// when & then
the[IllegalArgumentException] thrownBy {
flow.runToList()
} should have message "test error"

it should "propagate unhandled exceptions" in:
// given
val exception = new RuntimeException("unhandled error")
val flow = Flow.fromValues(1, 2).concat(Flow.failed(exception))
val recoveryFunction: PartialFunction[Throwable, Int] = { case _: IllegalArgumentException =>
42
}

// when & then
val caught = the[ChannelClosedException.Error] thrownBy {
flow.recover(recoveryFunction).runToList()
}
caught.getCause shouldBe an[RuntimeException]
caught.getCause.getMessage shouldBe "unhandled error"

it should "handle multiple exception types" in:
// given
val exception = new IllegalStateException("state error")
val flow = Flow.fromValues(1, 2).concat(Flow.failed(exception))
val recoveryFunction: PartialFunction[Throwable, Int] = {
case _: IllegalArgumentException => 42
case _: IllegalStateException => 99
case _: NullPointerException => 0
}

// when
val result = flow.recover(recoveryFunction).runToList()

// then
result shouldBe List(1, 2, 99)

it should "work with different recovery value type" in:
// given
val exception = new IllegalArgumentException("test error")
val flow = Flow.fromValues("a", "b").concat(Flow.failed(exception))
val recoveryFunction: PartialFunction[Throwable, String] = { case _: IllegalArgumentException =>
"recovered"
}

// when
val result = flow.recover(recoveryFunction).runToList()

// then
result shouldBe List("a", "b", "recovered")

it should "handle exception thrown during flow processing" in:
// given
val flow = Flow.fromValues(1, 2, 3).map(x => if x == 3 then throw new IllegalArgumentException("map error") else x)
val recoveryFunction: PartialFunction[Throwable, Int] = { case _: IllegalArgumentException =>
-1
}

// when
val result = flow.recover(recoveryFunction).runToList()

// then
result shouldBe List(1, 2, -1)

it should "work with empty flow" in:
// given
val flow = Flow.empty[Int]
val recoveryFunction: PartialFunction[Throwable, Int] = { case _: IllegalArgumentException =>
42
}

// when
val result = flow.recover(recoveryFunction).runToList()

// then
result shouldBe List.empty

it should "propagate exception when partial function throws" in:
// given
val originalException = new IllegalArgumentException("original error")
val flow = Flow.fromValues(1, 2).concat(Flow.failed(originalException))
val recoveryFunction: PartialFunction[Throwable, Int] = { case _: IllegalArgumentException =>
throw new RuntimeException("recovery failed")
}

// when & then
val caught = the[ChannelClosedException.Error] thrownBy {
flow.recover(recoveryFunction).runToList()
}
caught.getCause shouldBe an[RuntimeException]
caught.getCause.getMessage shouldBe "recovery failed"
end FlowOpsRecoverTest
Loading