Skip to content

Commit feae320

Browse files
authored
Merge pull request #42 from AvaPL/master
Add FailFast policy.
2 parents 4a077ad + e7eb7f7 commit feae320

File tree

3 files changed

+258
-0
lines changed

3 files changed

+258
-0
lines changed

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,27 @@ policy(execptionalAttempt)
136136

137137
Note, The domain of the PartialFunction passed to When may cover both the exception thrown _or_ the successful result of the future.
138138

139+
#### FailFast
140+
141+
`retry.FailFast` allows you to wrap any of the above policies and define which failures should immediately stop the retries.
142+
143+
The difference between `retry.FailFast` and `retry.When` with a partial function for `Throwable`s is that `retry.When`
144+
passes the execution to another policy after the first retry, whereas `retry.FailFast` uses the inner policy logic
145+
for each retry. For instance, it allows using a policy that retries forever together with a fail fast logic
146+
on some irrecoverable exceptions.
147+
148+
```scala
149+
val innerPolicy = retry.Backoff.forever
150+
val policy = retry.FailFast(innerPolicy) {
151+
case e: FooException => true
152+
case e: RuntimeException => isFatal(e.getCause)
153+
}
154+
155+
policy(issueRequest)
156+
```
157+
158+
When the provided partial function is not defined at a particular `Throwable`, the retry logic is defined by the wrapped policy.
159+
139160
#### Suggested library usage
140161

141162
Since all retry modules now produce a generic interface, a `retry.Policy`, if you wish to write clients of services you may wish to make define

retry/src/main/scala/Policy.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import odelay.{Delay, Timer}
44

55
import scala.concurrent.duration.{Duration, FiniteDuration}
66
import scala.concurrent.{ExecutionContext, Future}
7+
import scala.util.Try
78
//import scala.language.implicitConversions
89
import scala.util.control.NonFatal
910

@@ -236,6 +237,43 @@ object When {
236237
}
237238
}
238239

240+
/** A retry policy that wraps another policy and defines which failures immediately
241+
* stop the retries.
242+
*
243+
* {{{
244+
* val innerPolicy = retry.Backoff.forever
245+
* val policy = retry.FailFast(innerPolicy) {
246+
* case e: FooException => true
247+
* case e: RuntimeException => isFatal(e.getCause)
248+
* }
249+
* val future = policy(issueRequest)
250+
* }}}
251+
*
252+
* When the provided partial function is not defined at a particular throwable,
253+
* the retry logic is defined by the wrapped policy.
254+
*/
255+
object FailFast {
256+
def apply(policy: Policy)(failFastOn: PartialFunction[Throwable, Boolean]): Policy =
257+
new Policy {
258+
def apply[T](promise: PromiseWrapper[T])(
259+
implicit success: Success[T],
260+
executor: ExecutionContext): Future[T] = {
261+
implicit val successWithFailFast = Success[Try[T]] {
262+
case scala.util.Success(res) => success.predicate(res)
263+
case scala.util.Failure(_) => true
264+
}
265+
policy.apply {
266+
promise()
267+
.map(scala.util.Success(_))
268+
.recover {
269+
case e: Throwable if failFastOn.lift(e).contains(true) =>
270+
scala.util.Failure(e)
271+
}
272+
}.map(_.get)
273+
}
274+
}
275+
}
276+
239277
/** Retry policy that incorporates a count */
240278
trait CountingPolicy extends Policy {
241279
protected def countdown[T](max: Int,

retry/src/test/scala/PolicySpec.scala

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,4 +418,203 @@ abstract class PolicySpec extends AsyncFunSpec with BeforeAndAfterAll {
418418
}
419419
}
420420
}
421+
422+
describe("retry.FailFast") {
423+
it("should not retry on success") {
424+
implicit val success = Success.always
425+
val innerPolicy = Directly()
426+
val counter = new AtomicInteger(0)
427+
val future = FailFast(innerPolicy) {
428+
case _ => false
429+
} {
430+
counter.incrementAndGet()
431+
Future.successful("yay!")
432+
}
433+
future.map(result => assert(counter.get() === 1 && result === "yay!"))
434+
}
435+
436+
it("should retry number of times specified in the inner policy") {
437+
implicit val success = Success[Int](_ == 3)
438+
val tries = forwardCountingFutureStream().iterator
439+
val innerPolicy = Directly(3)
440+
val future = FailFast(innerPolicy) {
441+
case _ => false
442+
} (tries.next)
443+
future.map(result => assert(success.predicate(result) === true))
444+
}
445+
446+
it("should fail when inner policy retries are exceeded") {
447+
implicit val success = Success.always
448+
val innerPolicy = Directly(3)
449+
val counter = new AtomicInteger(0)
450+
val future = FailFast(innerPolicy) {
451+
case _ => false
452+
} {
453+
counter.incrementAndGet()
454+
Future.failed(new RuntimeException("always failing"))
455+
}
456+
// expect failure after 1+3 tries
457+
future.failed.map { t =>
458+
assert(counter.get() === 4 && t.getMessage === "always failing")
459+
}
460+
}
461+
462+
it("should fail fast when predicate matches every throwable") {
463+
implicit val success = Success.always
464+
val innerPolicy = Directly.forever
465+
val counter = new AtomicInteger(0)
466+
val future = FailFast(innerPolicy) {
467+
case _ => true
468+
} {
469+
counter.incrementAndGet()
470+
Future.failed(new RuntimeException("always failing"))
471+
}
472+
future.failed.map { t =>
473+
assert(counter.get() === 1 && t.getMessage === "always failing")
474+
}
475+
}
476+
477+
it("should fail fast when predicate matches a specific throwable") {
478+
implicit val success = Success.always
479+
val innerPolicy = Directly.forever
480+
val counter = new AtomicInteger(0)
481+
val future = FailFast(innerPolicy) {
482+
case e => e.getMessage == "2"
483+
} {
484+
val counterValue = counter.getAndIncrement()
485+
Future.failed(new RuntimeException(counterValue.toString))
486+
}
487+
future.failed.map { t =>
488+
assert(counter.get() === 3 && t.getMessage === "2")
489+
}
490+
}
491+
492+
it("should repeat on failure until success") {
493+
implicit val success = Success[Boolean](identity)
494+
val retried = new AtomicInteger()
495+
val retriedUntilSuccess = 10000
496+
def run() =
497+
if (retried.get() < retriedUntilSuccess) {
498+
retried.incrementAndGet()
499+
Future.failed(new RuntimeException)
500+
} else {
501+
Future(true)
502+
}
503+
val innerPolicy = Directly.forever
504+
val policy = FailFast(innerPolicy) {
505+
case _ => false
506+
}
507+
policy(run()).map { result =>
508+
assert(result === true)
509+
assert(retried.get() == 10000)
510+
}
511+
}
512+
513+
it("should repeat on failure with pause until success") {
514+
implicit val success = Success[Boolean](identity)
515+
val retried = new AtomicInteger()
516+
val retriedUntilSuccess = 1000
517+
def run() =
518+
if (retried.get() < retriedUntilSuccess) {
519+
retried.incrementAndGet()
520+
Future.failed(new RuntimeException)
521+
} else {
522+
Future(true)
523+
}
524+
val innerPolicy = Pause.forever(1.millis)
525+
val policy = FailFast(innerPolicy) {
526+
case _ => false
527+
}
528+
policy(run()).map { result =>
529+
assert(result === true)
530+
assert(retried.get() == 1000)
531+
}
532+
}
533+
534+
it("should repeat on failure with backoff until success") {
535+
implicit val success = Success[Boolean](identity)
536+
val retried = new AtomicInteger()
537+
val retriedUntilSuccess = 5
538+
def run() =
539+
if (retried.get() < retriedUntilSuccess) {
540+
retried.incrementAndGet()
541+
Future.failed(new RuntimeException)
542+
} else {
543+
Future(true)
544+
}
545+
val innerPolicy = Backoff.forever(1.millis)
546+
val policy = FailFast(innerPolicy) {
547+
case _ => false
548+
}
549+
policy(run()).map { result =>
550+
assert(result === true)
551+
assert(retried.get() == 5)
552+
}
553+
}
554+
555+
it("should repeat on failure with jitter backoff until success") {
556+
implicit val success = Success[Boolean](identity)
557+
val retried = new AtomicInteger()
558+
val retriedUntilSuccess = 10
559+
def run() =
560+
if (retried.get() < retriedUntilSuccess) {
561+
retried.incrementAndGet()
562+
Future.failed(new RuntimeException)
563+
} else {
564+
Future(true)
565+
}
566+
val innerPolicy = JitterBackoff.forever(1.millis)
567+
val policy = FailFast(innerPolicy) {
568+
case _ => false
569+
}
570+
policy(run()).map { result =>
571+
assert(result === true)
572+
assert(retried.get() == 10)
573+
}
574+
}
575+
576+
it("should repeat on failure with when condition until success") {
577+
implicit val success = Success[Boolean](identity)
578+
class MyException extends RuntimeException
579+
val retried = new AtomicInteger()
580+
val retriedUntilSuccess = 10000
581+
def run() =
582+
if (retried.get() < retriedUntilSuccess) {
583+
retried.incrementAndGet()
584+
Future.failed(new MyException)
585+
} else {
586+
Future(true)
587+
}
588+
val innerPolicy = When {
589+
case _: MyException => Directly.forever
590+
}
591+
val policy = FailFast(innerPolicy) {
592+
case _ => false
593+
}
594+
policy(run()).map { result =>
595+
assert(result === true)
596+
assert(retried.get() == 10000)
597+
}
598+
}
599+
600+
it("should take precedence over when condition if it also matches fail fast condition") {
601+
implicit val success = Success[Boolean](identity)
602+
class MyException extends RuntimeException("my exception")
603+
val retried = new AtomicInteger()
604+
def run() = {
605+
retried.incrementAndGet()
606+
Future.failed(new MyException)
607+
}
608+
val innerPolicy = When {
609+
case _: MyException => Directly.forever
610+
}
611+
val policy = FailFast(innerPolicy) {
612+
case _ => true
613+
}
614+
policy(run()).failed.map { t =>
615+
assert(t.getMessage === "my exception")
616+
assert(retried.get() == 1)
617+
}
618+
}
619+
}
421620
}

0 commit comments

Comments
 (0)