Skip to content

Commit c8d9c34

Browse files
committed
Add FailFast policy.
1 parent 4a077ad commit c8d9c34

File tree

3 files changed

+236
-0
lines changed

3 files changed

+236
-0
lines changed

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,27 @@ policy(execptionalAttempt)
136136

137137
Note, The domain of the PartialFunction passed to When may cover both the exception thrown _or_ the successful result of the future.
138138

139+
#### FailFast
140+
141+
`retry.FailFast` allows you to wrap any of the above policies and define which failures should immediately stop the retries.
142+
143+
The difference between `retry.FailFast` and `retry.When` with a partial function for `Throwable`s is that `retry.When`
144+
passes the execution to another policy after the first retry, whereas `retry.FailFast` uses the inner policy logic
145+
for each retry. For instance, it allows using a policy that retries forever together with a fail fast logic
146+
on some irrecoverable exceptions.
147+
148+
```scala
149+
val innerPolicy = retry.Backoff.forever
150+
val policy = retry.FailFast(innerPolicy) {
151+
case e: FooException => true
152+
case e: RuntimeException => isFatal(e.getCause)
153+
}
154+
155+
policy(issueRequest)
156+
```
157+
158+
When the provided partial function is not defined at a particular `Throwable`, the retry logic is defined by the wrapped policy.
159+
139160
#### Suggested library usage
140161

141162
Since all retry modules now produce a generic interface, a `retry.Policy`, if you wish to write clients of services you may wish to make define

retry/src/main/scala/Policy.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import odelay.{Delay, Timer}
44

55
import scala.concurrent.duration.{Duration, FiniteDuration}
66
import scala.concurrent.{ExecutionContext, Future}
7+
import scala.util.Try
78
//import scala.language.implicitConversions
89
import scala.util.control.NonFatal
910

@@ -236,6 +237,43 @@ object When {
236237
}
237238
}
238239

240+
/** A retry policy that wraps another policy and defines which failures immediately
241+
* stop the retries.
242+
*
243+
* {{{
244+
* val innerPolicy = retry.Backoff.forever
245+
* val policy = retry.FailFast(innerPolicy) {
246+
* case e: FooException => true
247+
* case e: RuntimeException => isFatal(e.getCause)
248+
* }
249+
* val future = policy(issueRequest)
250+
* }}}
251+
*
252+
* When the provided partial function is not defined at a particular throwable,
253+
* the retry logic is defined by the wrapped policy.
254+
*/
255+
object FailFast {
256+
def apply(policy: Policy)(failFastOn: PartialFunction[Throwable, Boolean]): Policy =
257+
new Policy {
258+
def apply[T](promise: PromiseWrapper[T])(
259+
implicit success: Success[T],
260+
executor: ExecutionContext): Future[T] = {
261+
implicit val successWithFailFast = Success[Try[T]] {
262+
case scala.util.Success(res) => success.predicate(res)
263+
case scala.util.Failure(_) => true
264+
}
265+
policy.apply {
266+
promise()
267+
.map(scala.util.Success(_))
268+
.recover {
269+
case e: Throwable if failFastOn.lift(e).contains(true) =>
270+
scala.util.Failure(e)
271+
}
272+
}.map(_.get)
273+
}
274+
}
275+
}
276+
239277
/** Retry policy that incorporates a count */
240278
trait CountingPolicy extends Policy {
241279
protected def countdown[T](max: Int,

retry/src/test/scala/PolicySpec.scala

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,4 +418,181 @@ abstract class PolicySpec extends AsyncFunSpec with BeforeAndAfterAll {
418418
}
419419
}
420420
}
421+
422+
describe("retry.FailFast") {
423+
it("should not retry on success") {
424+
implicit val success = Success.always
425+
val innerPolicy = Directly()
426+
val counter = new AtomicInteger(0)
427+
val future = FailFast(innerPolicy)(_ => false) {
428+
counter.incrementAndGet()
429+
Future.successful("yay!")
430+
}
431+
future.map(result => assert(counter.get() === 1 && result === "yay!"))
432+
}
433+
434+
it("should retry number of times specified in the inner policy") {
435+
implicit val success = Success[Int](_ == 3)
436+
val tries = forwardCountingFutureStream().iterator
437+
val innerPolicy = Directly(3)
438+
val future = FailFast(innerPolicy)(_ => false)(tries.next)
439+
future.map(result => assert(success.predicate(result) === true))
440+
}
441+
442+
it("should fail when inner policy retries are exceeded") {
443+
implicit val success = Success.always
444+
val innerPolicy = Directly(3)
445+
val counter = new AtomicInteger(0)
446+
val future = FailFast(innerPolicy)(_ => false) {
447+
counter.incrementAndGet()
448+
Future.failed(new RuntimeException("always failing"))
449+
}
450+
// expect failure after 1+3 tries
451+
future.failed.map { t =>
452+
assert(counter.get() === 4 && t.getMessage === "always failing")
453+
}
454+
}
455+
456+
it("should fail fast when predicate matches every throwable") {
457+
implicit val success = Success.always
458+
val innerPolicy = Directly.forever
459+
val counter = new AtomicInteger(0)
460+
val future = FailFast(innerPolicy)(_ => true) {
461+
counter.incrementAndGet()
462+
Future.failed(new RuntimeException("always failing"))
463+
}
464+
future.failed.map { t =>
465+
assert(counter.get() === 1 && t.getMessage === "always failing")
466+
}
467+
}
468+
469+
it("should fail fast when predicate matches a specific throwable") {
470+
implicit val success = Success.always
471+
val innerPolicy = Directly.forever
472+
val counter = new AtomicInteger(0)
473+
val future = FailFast(innerPolicy)(_.getMessage == "2") {
474+
val counterValue = counter.getAndIncrement()
475+
Future.failed(new RuntimeException(counterValue.toString))
476+
}
477+
future.failed.map { t =>
478+
assert(counter.get() === 3 && t.getMessage === "2")
479+
}
480+
}
481+
482+
it("should repeat on failure until success") {
483+
implicit val success = Success[Boolean](identity)
484+
val retried = new AtomicInteger()
485+
val retriedUntilSuccess = 10000
486+
def run() =
487+
if (retried.get() < retriedUntilSuccess) {
488+
retried.incrementAndGet()
489+
Future.failed(new RuntimeException)
490+
} else {
491+
Future(true)
492+
}
493+
val innerPolicy = Directly.forever
494+
val policy = FailFast(innerPolicy)(_ => false)
495+
policy(run()).map { result =>
496+
assert(result === true)
497+
assert(retried.get() == 10000)
498+
}
499+
}
500+
501+
it("should repeat on failure with pause until success") {
502+
implicit val success = Success[Boolean](identity)
503+
val retried = new AtomicInteger()
504+
val retriedUntilSuccess = 1000
505+
def run() =
506+
if (retried.get() < retriedUntilSuccess) {
507+
retried.incrementAndGet()
508+
Future.failed(new RuntimeException)
509+
} else {
510+
Future(true)
511+
}
512+
val innerPolicy = Pause.forever(1.millis)
513+
val policy = FailFast(innerPolicy)(_ => false)
514+
policy(run()).map { result =>
515+
assert(result === true)
516+
assert(retried.get() == 1000)
517+
}
518+
}
519+
520+
it("should repeat on failure with backoff until success") {
521+
implicit val success = Success[Boolean](identity)
522+
val retried = new AtomicInteger()
523+
val retriedUntilSuccess = 5
524+
def run() =
525+
if (retried.get() < retriedUntilSuccess) {
526+
retried.incrementAndGet()
527+
Future.failed(new RuntimeException)
528+
} else {
529+
Future(true)
530+
}
531+
val innerPolicy = Backoff.forever(1.millis)
532+
val policy = FailFast(innerPolicy)(_ => false)
533+
policy(run()).map { result =>
534+
assert(result === true)
535+
assert(retried.get() == 5)
536+
}
537+
}
538+
539+
it("should repeat on failure with jitter backoff until success") {
540+
implicit val success = Success[Boolean](identity)
541+
val retried = new AtomicInteger()
542+
val retriedUntilSuccess = 10
543+
def run() =
544+
if (retried.get() < retriedUntilSuccess) {
545+
retried.incrementAndGet()
546+
Future.failed(new RuntimeException)
547+
} else {
548+
Future(true)
549+
}
550+
val innerPolicy = JitterBackoff.forever(1.millis)
551+
val policy = FailFast(innerPolicy)(_ => false)
552+
policy(run()).map { result =>
553+
assert(result === true)
554+
assert(retried.get() == 10)
555+
}
556+
}
557+
558+
it("should repeat on failure with when condition until success") {
559+
implicit val success = Success[Boolean](identity)
560+
class MyException extends RuntimeException
561+
val retried = new AtomicInteger()
562+
val retriedUntilSuccess = 10000
563+
def run() =
564+
if (retried.get() < retriedUntilSuccess) {
565+
retried.incrementAndGet()
566+
Future.failed(new MyException)
567+
} else {
568+
Future(true)
569+
}
570+
val innerPolicy = When {
571+
case _: MyException => Directly.forever
572+
}
573+
val policy = FailFast(innerPolicy)(_ => false)
574+
policy(run()).map { result =>
575+
assert(result === true)
576+
assert(retried.get() == 10000)
577+
}
578+
}
579+
580+
it("should take precedence over when condition if it also matches fail fast condition") {
581+
implicit val success = Success[Boolean](identity)
582+
class MyException extends RuntimeException("my exception")
583+
val retried = new AtomicInteger()
584+
def run() = {
585+
retried.incrementAndGet()
586+
Future.failed(new MyException)
587+
}
588+
val innerPolicy = When {
589+
case _: MyException => Directly.forever
590+
}
591+
val policy = FailFast(innerPolicy)(_ => true)
592+
policy(run()).failed.map { t =>
593+
assert(t.getMessage === "my exception")
594+
assert(retried.get() == 1)
595+
}
596+
}
597+
}
421598
}

0 commit comments

Comments
 (0)