Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ supervised {

```scala mdoc:compile-only
def computationR: Int = ???
retry(Schedule.exponentialBackoff(100.millis).maxRepeats(5)
retry(Schedule.exponentialBackoff(100.millis).maxRetries(4)
.jitter().maxInterval(5.minutes))(computationR)
```

Expand Down
17 changes: 13 additions & 4 deletions core/src/main/scala/ox/scheduling/Schedule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,20 @@ case class Schedule(intervals: () => LazyList[FiniteDuration], initialDelay: Opt
/** Caps the intervals to the given maximum. */
def maxInterval(max: FiniteDuration): Schedule = copy(intervals = () => intervals().map(_.min(max)))

/** Caps the number of repeats to the given maximum, creating a finite schedule. */
def maxRepeats(max: Int): Schedule = copy(intervals = () => intervals().take(max))
/** Caps the number of attempts to the given maximum, creating a finite schedule. The provided value specifies the total number of
* invocations (attempts) of the operation, including the initial invocation.
*/
def maxAttempts(max: Int): Schedule = copy(intervals = () => intervals().take(max - 1))

/** Caps the number of retries to the given maximum, creating a finite schedule. The provided value specifies the number of retries after
* the initial attempt. The total number of invocations will be `retries + 1`.
*/
def maxRetries(retries: Int): Schedule = maxAttempts(retries + 1)

/** Caps the total delay to the given maximum. The resulting schedule might still be infinite, if the intervals are originally 0. */
def maxRepeatsByCumulativeDelay(upTo: FiniteDuration): Schedule = copy(intervals = () =>
/** Caps the total delay (cumulative time between attempts) to the given maximum. The resulting schedule might still be infinite, if the
* intervals are originally 0.
*/
def maxCumulativeDelay(upTo: FiniteDuration): Schedule = copy(intervals = () =>
val d = intervals()
d
.scanLeft(0.seconds)((cumulative, next) => cumulative + next)
Expand Down
22 changes: 11 additions & 11 deletions core/src/test/scala/ox/flow/FlowOpsRetryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class FlowOpsRetryTest extends AnyFlatSpec with Matchers with ElapsedTime:
val flow = Flow.fromValues(1, 2, 3)

// when
val result = flow.retry(Schedule.immediate.maxRepeats(3)).runToList()
val result = flow.retry(Schedule.immediate.maxRetries(3)).runToList()

// then
result shouldBe List(1, 2, 3)
Expand All @@ -38,7 +38,7 @@ class FlowOpsRetryTest extends AnyFlatSpec with Matchers with ElapsedTime:
}

// when
val result = flow.retry(Schedule.immediate.maxRepeats(maxRetries)).runToList()
val result = flow.retry(Schedule.immediate.maxRetries(maxRetries)).runToList()

// then
result shouldBe List(42)
Expand All @@ -58,7 +58,7 @@ class FlowOpsRetryTest extends AnyFlatSpec with Matchers with ElapsedTime:

// when
val (result, elapsedTime) = measure {
flow.retry(Schedule.fixedInterval(interval).maxRepeats(maxRetries)).runToList()
flow.retry(Schedule.fixedInterval(interval).maxRetries(maxRetries)).runToList()
}

// then
Expand All @@ -74,7 +74,7 @@ class FlowOpsRetryTest extends AnyFlatSpec with Matchers with ElapsedTime:
val flow = Flow
.fromValues(1, 2, 3)
.tap(_ => upstreamInvocationCounter.incrementAndGet().discard)
.retry(Schedule.immediate.maxRepeats(3))
.retry(Schedule.immediate.maxRetries(3))
.tap { value =>
downstreamInvocationCounter.incrementAndGet().discard
if value == 2 then throw new RuntimeException("downstream failure")
Expand All @@ -101,7 +101,7 @@ class FlowOpsRetryTest extends AnyFlatSpec with Matchers with ElapsedTime:

// when/then
val exception = the[ChannelClosedException.Error] thrownBy {
flow.retry(Schedule.immediate.maxRepeats(maxRetries)).runToList()
flow.retry(Schedule.immediate.maxRetries(maxRetries)).runToList()
}

exception.getCause.getMessage shouldBe errorMessage
Expand All @@ -122,7 +122,7 @@ class FlowOpsRetryTest extends AnyFlatSpec with Matchers with ElapsedTime:
}

val config = RetryConfig[Throwable, Unit](
Schedule.immediate.maxRepeats(maxRetries),
Schedule.immediate.maxRetries(maxRetries),
ResultPolicy.retryWhen[Throwable, Unit](_.getMessage != fatalErrorMessage)
)

Expand All @@ -139,7 +139,7 @@ class FlowOpsRetryTest extends AnyFlatSpec with Matchers with ElapsedTime:
val flow = Flow.empty[Int]

// when
val result = flow.retry(Schedule.immediate.maxRepeats(3)).runToList()
val result = flow.retry(Schedule.immediate.maxRetries(3)).runToList()

// then
result shouldBe List.empty
Expand All @@ -153,7 +153,7 @@ class FlowOpsRetryTest extends AnyFlatSpec with Matchers with ElapsedTime:
}

// when
val result = flow.retry(Schedule.immediate.maxRepeats(5)).runToList()
val result = flow.retry(Schedule.immediate.maxRetries(5)).runToList()

// then
result shouldBe List("first try success")
Expand All @@ -169,7 +169,7 @@ class FlowOpsRetryTest extends AnyFlatSpec with Matchers with ElapsedTime:
}

// when
val result = flow.retry(Schedule.immediate.maxRepeats(2)).runToList()
val result = flow.retry(Schedule.immediate.maxRetries(2)).runToList()

// then
result shouldBe List(2, 4, 6)
Expand All @@ -188,7 +188,7 @@ class FlowOpsRetryTest extends AnyFlatSpec with Matchers with ElapsedTime:
}

// when
val result = flow.retry(Schedule.immediate.maxRepeats(1)).runToList()
val result = flow.retry(Schedule.immediate.maxRetries(1)).runToList()

// then
result shouldBe List(20, 40)
Expand All @@ -199,7 +199,7 @@ class FlowOpsRetryTest extends AnyFlatSpec with Matchers with ElapsedTime:
val invocationCounter = new AtomicInteger(0)

val flow =
Flow.fromValues(1 to 10*).tap(_ => invocationCounter.incrementAndGet().discard).take(3).retry(Schedule.immediate.maxRepeats(3))
Flow.fromValues(1 to 10*).tap(_ => invocationCounter.incrementAndGet().discard).take(3).retry(Schedule.immediate.maxRetries(3))

// when
val result = flow.runToList()
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/ox/resilience/AfterAttemptTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class AfterAttemptTest extends AnyFlatSpec with Matchers with EitherValues with
returnedResult = result

// when
val result = retry(RetryConfig(Schedule.immediate.maxRepeats(3), afterAttempt = afterAttempt))(f)
val result = retry(RetryConfig(Schedule.immediate.maxRetries(3), afterAttempt = afterAttempt))(f)

// then
result shouldBe successfulResult
Expand All @@ -52,7 +52,7 @@ class AfterAttemptTest extends AnyFlatSpec with Matchers with EitherValues with
returnedResult = result

// when
val result = the[RuntimeException] thrownBy retry(RetryConfig(Schedule.immediate.maxRepeats(3), afterAttempt = afterAttempt))(f)
val result = the[RuntimeException] thrownBy retry(RetryConfig(Schedule.immediate.maxRetries(3), afterAttempt = afterAttempt))(f)

// then
result shouldBe failedResult
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/ox/resilience/BackoffRetryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with

// when
val (result, elapsedTime) =
measure(the[RuntimeException] thrownBy retry(Schedule.exponentialBackoff(initialDelay).maxRepeats(maxRetries))(f))
measure(the[RuntimeException] thrownBy retry(Schedule.exponentialBackoff(initialDelay).maxRetries(maxRetries))(f))

// then
result should have message "boom"
Expand Down Expand Up @@ -63,7 +63,7 @@ class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with
// when
val (result, elapsedTime) =
measure(
the[RuntimeException] thrownBy retry(Schedule.exponentialBackoff(initialDelay).maxRepeats(maxRetries).maxInterval(maxDelay))(f)
the[RuntimeException] thrownBy retry(Schedule.exponentialBackoff(initialDelay).maxRetries(maxRetries).maxInterval(maxDelay))(f)
)

// then
Expand All @@ -86,7 +86,7 @@ class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with
val (result, elapsedTime) =
measure(
the[RuntimeException] thrownBy retry(
Schedule.exponentialBackoff(initialDelay).maxRepeats(maxRetries).maxInterval(maxDelay).jitter(Jitter.Equal)
Schedule.exponentialBackoff(initialDelay).maxRetries(maxRetries).maxInterval(maxDelay).jitter(Jitter.Equal)
)(f)
)

Expand All @@ -108,7 +108,7 @@ class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with
Left(errorMessage)

// when
val (result, elapsedTime) = measure(retryEither(Schedule.exponentialBackoff(initialDelay).maxRepeats(maxRetries))(f))
val (result, elapsedTime) = measure(retryEither(Schedule.exponentialBackoff(initialDelay).maxRetries(maxRetries))(f))

// then
result.left.value shouldBe errorMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class DelayedRetryTest extends AnyFlatSpec with Matchers with EitherValues with

// when
val (result, elapsedTime) =
measure(the[RuntimeException] thrownBy retry(Schedule.fixedInterval(sleep).maxRepeats(maxRetries))(f))
measure(the[RuntimeException] thrownBy retry(Schedule.fixedInterval(sleep).maxRetries(maxRetries))(f))

// then
result should have message "boom"
Expand Down Expand Up @@ -61,7 +61,7 @@ class DelayedRetryTest extends AnyFlatSpec with Matchers with EitherValues with
Left(errorMessage)

// when
val (result, elapsedTime) = measure(retryEither(Schedule.fixedInterval(sleep).maxRepeats(maxRetries))(f))
val (result, elapsedTime) = measure(retryEither(Schedule.fixedInterval(sleep).maxRetries(maxRetries))(f))

// then
result.left.value shouldBe errorMessage
Expand Down
22 changes: 11 additions & 11 deletions core/src/test/scala/ox/resilience/ImmediateRetryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi
successfulResult

// when
val result = retry(Schedule.immediate.maxRepeats(3))(f)
val result = retry(Schedule.immediate.maxRetries(3))(f)

// then
result shouldBe successfulResult
Expand All @@ -30,7 +30,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi
// given
var counter = 0
val errorMessage = "boom"
val policy = RetryConfig[Throwable, Unit](Schedule.immediate.maxRepeats(3), ResultPolicy.retryWhen(_.getMessage != errorMessage))
val policy = RetryConfig[Throwable, Unit](Schedule.immediate.maxRetries(3), ResultPolicy.retryWhen(_.getMessage != errorMessage))

def f =
counter += 1
Expand All @@ -44,7 +44,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi
// given
var counter = 0
val unsuccessfulResult = -1
val policy = RetryConfig[Throwable, Int](Schedule.immediate.maxRepeats(3), ResultPolicy.successfulWhen(_ > 0))
val policy = RetryConfig[Throwable, Int](Schedule.immediate.maxRetries(3), ResultPolicy.successfulWhen(_ > 0))

def f =
counter += 1
Expand All @@ -67,7 +67,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi
if true then throw new RuntimeException(errorMessage)

// when/then
the[RuntimeException] thrownBy retry(Schedule.immediate.maxRepeats(3))(f) should have message errorMessage
the[RuntimeException] thrownBy retry(Schedule.immediate.maxRetries(3))(f) should have message errorMessage
counter shouldBe 4

it should "retry a failing function forever" in:
Expand Down Expand Up @@ -97,7 +97,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi
Right(successfulResult)

// when
val result = retryEither(Schedule.immediate.maxRepeats(3))(f)
val result = retryEither(Schedule.immediate.maxRetries(3))(f)

// then
result.value shouldBe successfulResult
Expand All @@ -107,7 +107,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi
// given
var counter = 0
val errorMessage = "boom"
val policy: RetryConfig[String, Int] = RetryConfig(Schedule.immediate.maxRepeats(3), ResultPolicy.retryWhen(_ != errorMessage))
val policy: RetryConfig[String, Int] = RetryConfig(Schedule.immediate.maxRetries(3), ResultPolicy.retryWhen(_ != errorMessage))

def f: Either[String, Int] =
counter += 1
Expand All @@ -124,7 +124,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi
// given
var counter = 0
val unsuccessfulResult = -1
val policy: RetryConfig[String, Int] = RetryConfig(Schedule.immediate.maxRepeats(3), ResultPolicy.successfulWhen(_ > 0))
val policy: RetryConfig[String, Int] = RetryConfig(Schedule.immediate.maxRetries(3), ResultPolicy.successfulWhen(_ > 0))

def f: Either[String, Int] =
counter += 1
Expand All @@ -147,7 +147,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi
Left(errorMessage)

// when
val result = retryEither(Schedule.immediate.maxRepeats(3))(f)
val result = retryEither(Schedule.immediate.maxRetries(3))(f)

// then
result.left.value shouldBe errorMessage
Expand All @@ -167,7 +167,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi

val adaptive = AdaptiveRetry(TokenBucket(5), 1, 1)
// when
val result = adaptive.retryEither(Schedule.immediate.maxRepeats(5))(f)
val result = adaptive.retryEither(Schedule.immediate.maxRetries(5))(f)

// then
result.value shouldBe "Success"
Expand All @@ -184,7 +184,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi

val adaptive = AdaptiveRetry(TokenBucket(2), 1, 1)
// when
val result = adaptive.retryEither(Schedule.immediate.maxRepeats(5))(f)
val result = adaptive.retryEither(Schedule.immediate.maxRetries(5))(f)

// then
result.left.value shouldBe errorMessage
Expand All @@ -202,7 +202,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi

val adaptive = AdaptiveRetry(TokenBucket(2), 1, 1)
val retryConfig =
RetryConfig(Schedule.immediate.maxRepeats(5)).copy(resultPolicy = ResultPolicy.successfulWhen[String, String](_ => false))
RetryConfig(Schedule.immediate.maxRetries(5)).copy(resultPolicy = ResultPolicy.successfulWhen[String, String](_ => false))
// when
val result = adaptive.retryEither(retryConfig, _ => false)(f)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class ScheduleFallingBackRetryTest extends AnyFlatSpec with Matchers with Elapse
counter += 1
throw new RuntimeException("boom")

val schedule = Schedule.immediate.maxRepeats(immediateRetries).andThen(Schedule.fixedInterval(sleep).maxRepeats(delayedRetries))
val schedule =
Schedule.immediate.maxRetries(immediateRetries).andThen(Schedule.fixedInterval(sleep).maxRetries(delayedRetries))

// when
val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(RetryConfig(schedule))(f))
Expand All @@ -41,7 +42,7 @@ class ScheduleFallingBackRetryTest extends AnyFlatSpec with Matchers with Elapse
counter += 1
if counter <= retriesUntilSuccess then throw new RuntimeException("boom") else successfulResult

val schedule = Schedule.immediate.maxRepeats(100).andThen(Schedule.fixedInterval(2.millis))
val schedule = Schedule.immediate.maxRetries(100).andThen(Schedule.fixedInterval(2.millis))

// when
val result = retry(RetryConfig(schedule))(f)
Expand Down
24 changes: 12 additions & 12 deletions core/src/test/scala/ox/scheduling/FixedRateRepeatTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class FixedRateRepeatTest extends AnyFlatSpec with Matchers with EitherValues wi

it should "repeat a function at fixed rate" in:
// given
val repeats = 3
val attempts = 3
val funcSleepTime = 30.millis
val interval = 100.millis
var counter = 0
Expand All @@ -24,17 +24,17 @@ class FixedRateRepeatTest extends AnyFlatSpec with Matchers with EitherValues wi
counter

// when
val (result, elapsedTime) = measure(repeat(Schedule.fixedInterval(interval).maxRepeats(repeats))(f))
val (result, elapsedTime) = measure(repeat(Schedule.fixedInterval(interval).maxAttempts(attempts))(f))

// then
elapsedTime.toMillis should be >= 3 * interval.toMillis + funcSleepTime.toMillis - 5 // tolerance
elapsedTime.toMillis should be < 4 * interval.toMillis
result shouldBe 4
counter shouldBe 4
elapsedTime.toMillis should be >= 2 * interval.toMillis + funcSleepTime.toMillis - 5 // tolerance
elapsedTime.toMillis should be < 3 * interval.toMillis
result shouldBe 3
counter shouldBe 3

it should "repeat a function at fixed rate with initial delay" in:
// given
val repeats = 3
val attempts = 3
val initialDelay = 50.millis
val interval = 100.millis
var counter = 0
Expand All @@ -44,13 +44,13 @@ class FixedRateRepeatTest extends AnyFlatSpec with Matchers with EitherValues wi
counter

// when
val (result, elapsedTime) = measure(repeat(Schedule.fixedInterval(interval).maxRepeats(repeats).withInitialDelay(initialDelay))(f))
val (result, elapsedTime) = measure(repeat(Schedule.fixedInterval(interval).maxAttempts(attempts).withInitialDelay(initialDelay))(f))

// then
elapsedTime.toMillis should be >= 3 * interval.toMillis + initialDelay.toMillis - 5 // tolerance
elapsedTime.toMillis should be < 4 * interval.toMillis
result shouldBe 4
counter shouldBe 4
elapsedTime.toMillis should be >= 2 * interval.toMillis + initialDelay.toMillis - 5 // tolerance
elapsedTime.toMillis should be < 3 * interval.toMillis
result shouldBe 3
counter shouldBe 3

it should "repeat a function forever at fixed rate" in:
// given
Expand Down
Loading