Skip to content

Commit 48a99f3

Browse files
committed
Return retry attempt count in withRetries (#143)
1 parent 77457b3 commit 48a99f3

File tree

2 files changed

+223
-13
lines changed

2 files changed

+223
-13
lines changed

modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Retrying.scala

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
package com.snowplowanalytics.snowplow.runtime
1212

1313
import cats.{Applicative, Show}
14-
import cats.effect.Sync
14+
import cats.effect.{Ref, Sync}
1515
import cats.implicits._
1616
import retry._
1717
import io.circe.Decoder
@@ -58,19 +58,25 @@ object Retrying {
5858
toAlert: SetupExceptionMessages => Alert,
5959
setupErrorCheck: PartialFunction[Throwable, String]
6060
)(
61-
action: F[A]
61+
action: Int => F[A]
6262
): F[A] =
63-
action
64-
.retryingOnSomeErrors(
65-
isWorthRetrying = checkingNestedExceptions(setupErrorCheck, _).nonEmpty.pure[F],
66-
policy = policyForSetupErrors[F](configForSetup),
67-
onError = logErrorAndSendAlert(appHealth, setupErrorCheck, toAlert, _, _)
68-
)
69-
.retryingOnAllErrors(
70-
policy = policyForTransientErrors[F](configForTransient),
71-
onError = logErrorAndReportUnhealthy(appHealth, service, _, _)
72-
)
73-
.productL(appHealth.beHealthyForRuntimeService(service))
63+
Ref[F].of(0).flatMap { attemptRef =>
64+
(for {
65+
attempt <- attemptRef.get
66+
result <- action(attempt)
67+
} yield result)
68+
.retryingOnSomeErrors(
69+
isWorthRetrying = checkingNestedExceptions(setupErrorCheck, _).nonEmpty.pure[F],
70+
policy = policyForSetupErrors[F](configForSetup),
71+
onError =
72+
(error, details) => attemptRef.update(_ + 1) *> logErrorAndSendAlert(appHealth, setupErrorCheck, toAlert, error, details)
73+
)
74+
.retryingOnAllErrors(
75+
policy = policyForTransientErrors[F](configForTransient),
76+
onError = (error, details) => attemptRef.update(_ + 1) *> logErrorAndReportUnhealthy(appHealth, service, error, details)
77+
)
78+
.productL(appHealth.beHealthyForRuntimeService(service))
79+
}
7480

7581
private def policyForSetupErrors[F[_]: Applicative](config: Config.ForSetup): RetryPolicy[F] =
7682
RetryPolicies.exponentialBackoff[F](config.delay)
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved.
3+
*
4+
* This software is made available by Snowplow Analytics, Ltd.,
5+
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
6+
* located at https://docs.snowplow.io/limited-use-license-1.0
7+
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
8+
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
9+
*/
10+
package com.snowplowanalytics.snowplow.runtime
11+
12+
import cats.Show
13+
import cats.effect.testing.specs2.CatsEffect
14+
import cats.effect.{IO, Ref}
15+
import org.specs2.Specification
16+
import scala.concurrent.duration._
17+
18+
class RetryingSpec extends Specification with CatsEffect {
19+
import RetryingSpec._
20+
21+
def is = s2"""
22+
The Retrying.withRetries should:
23+
Pass attempt number 0 on first execution $test1
24+
Pass attempt number 1 after first transient retry $test2
25+
Pass attempt number 2 after second transient retry $test3
26+
Pass incremented attempt numbers through multiple retries $test4
27+
Pass incremented attempt numbers on setup errors $test5
28+
Track attempts correctly across both retry strategies $test6
29+
"""
30+
31+
def test1 = {
32+
val config = createConfig()
33+
for {
34+
appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil)
35+
attemptsRef <- Ref[IO].of(List.empty[Int])
36+
result <- Retrying.withRetries[IO, TestAlert, TestService, String](
37+
appHealth = appHealth,
38+
configForTransient = config.transient,
39+
configForSetup = config.setup,
40+
service = TestService1,
41+
toAlert = _ => TestAlert1,
42+
setupErrorCheck = PartialFunction.empty
43+
) { attempt =>
44+
attemptsRef.update(_ :+ attempt) *> IO.pure("success")
45+
}
46+
attempts <- attemptsRef.get
47+
} yield (result should beEqualTo("success")) and (attempts should beEqualTo(List(0)))
48+
}
49+
50+
def test2 = {
51+
val config = createConfig()
52+
for {
53+
appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil)
54+
attemptsRef <- Ref[IO].of(List.empty[Int])
55+
callCount <- Ref[IO].of(0)
56+
result <- Retrying.withRetries[IO, TestAlert, TestService, String](
57+
appHealth = appHealth,
58+
configForTransient = config.transient,
59+
configForSetup = config.setup,
60+
service = TestService1,
61+
toAlert = _ => TestAlert1,
62+
setupErrorCheck = PartialFunction.empty
63+
) { attempt =>
64+
attemptsRef.update(_ :+ attempt) *>
65+
callCount.getAndUpdate(_ + 1).flatMap { count =>
66+
if (count == 0) IO.raiseError(new RuntimeException("transient error"))
67+
else IO.pure("success")
68+
}
69+
}
70+
attempts <- attemptsRef.get
71+
} yield (result should beEqualTo("success")) and (attempts should beEqualTo(List(0, 1)))
72+
}
73+
74+
def test3 = {
75+
val config = createConfig()
76+
for {
77+
appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil)
78+
attemptsRef <- Ref[IO].of(List.empty[Int])
79+
callCount <- Ref[IO].of(0)
80+
result <- Retrying.withRetries[IO, TestAlert, TestService, String](
81+
appHealth = appHealth,
82+
configForTransient = config.transient,
83+
configForSetup = config.setup,
84+
service = TestService1,
85+
toAlert = _ => TestAlert1,
86+
setupErrorCheck = PartialFunction.empty
87+
) { attempt =>
88+
attemptsRef.update(_ :+ attempt) *>
89+
callCount.getAndUpdate(_ + 1).flatMap { count =>
90+
if (count < 2) IO.raiseError(new RuntimeException("transient error"))
91+
else IO.pure("success")
92+
}
93+
}
94+
attempts <- attemptsRef.get
95+
} yield (result should beEqualTo("success")) and (attempts should beEqualTo(List(0, 1, 2)))
96+
}
97+
98+
def test4 = {
99+
val config = createConfig(maxAttempts = 5)
100+
for {
101+
appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil)
102+
attemptsRef <- Ref[IO].of(List.empty[Int])
103+
callCount <- Ref[IO].of(0)
104+
result <- Retrying.withRetries[IO, TestAlert, TestService, String](
105+
appHealth = appHealth,
106+
configForTransient = config.transient,
107+
configForSetup = config.setup,
108+
service = TestService1,
109+
toAlert = _ => TestAlert1,
110+
setupErrorCheck = PartialFunction.empty
111+
) { attempt =>
112+
attemptsRef.update(_ :+ attempt) *>
113+
callCount.getAndUpdate(_ + 1).flatMap { count =>
114+
if (count < 3) IO.raiseError(new RuntimeException("transient error"))
115+
else IO.pure("success after 3 retries")
116+
}
117+
}
118+
attempts <- attemptsRef.get
119+
} yield (result should beEqualTo("success after 3 retries")) and (attempts should beEqualTo(List(0, 1, 2, 3)))
120+
}
121+
122+
def test5 = {
123+
val config = createConfig()
124+
val setupErrorCheck: PartialFunction[Throwable, String] = { case e: IllegalArgumentException =>
125+
e.getMessage
126+
}
127+
for {
128+
appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil)
129+
attemptsRef <- Ref[IO].of(List.empty[Int])
130+
callCount <- Ref[IO].of(0)
131+
result <- Retrying.withRetries[IO, TestAlert, TestService, String](
132+
appHealth = appHealth,
133+
configForTransient = config.transient,
134+
configForSetup = config.setup,
135+
service = TestService1,
136+
toAlert = _ => TestAlert1,
137+
setupErrorCheck = setupErrorCheck
138+
) { attempt =>
139+
attemptsRef.update(_ :+ attempt) *>
140+
callCount.getAndUpdate(_ + 1).flatMap { count =>
141+
if (count == 0) IO.raiseError(new IllegalArgumentException("setup error"))
142+
else IO.pure("success")
143+
}
144+
}
145+
attempts <- attemptsRef.get
146+
} yield (result should beEqualTo("success")) and (attempts should beEqualTo(List(0, 1)))
147+
}
148+
149+
def test6 = {
150+
val config = createConfig()
151+
val setupErrorCheck: PartialFunction[Throwable, String] = { case e: IllegalArgumentException =>
152+
e.getMessage
153+
}
154+
for {
155+
appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil)
156+
attemptsRef <- Ref[IO].of(List.empty[Int])
157+
callCount <- Ref[IO].of(0)
158+
result <- Retrying.withRetries[IO, TestAlert, TestService, String](
159+
appHealth = appHealth,
160+
configForTransient = config.transient,
161+
configForSetup = config.setup,
162+
service = TestService1,
163+
toAlert = _ => TestAlert1,
164+
setupErrorCheck = setupErrorCheck
165+
) { attempt =>
166+
attemptsRef.update(_ :+ attempt) *>
167+
callCount.getAndUpdate(_ + 1).flatMap {
168+
case 0 => IO.raiseError(new IllegalArgumentException("setup error"))
169+
case 1 => IO.raiseError(new RuntimeException("transient error"))
170+
case _ => IO.pure("success")
171+
}
172+
}
173+
attempts <- attemptsRef.get
174+
} yield (result should beEqualTo("success")) and (attempts should beEqualTo(List(0, 1, 2)))
175+
}
176+
}
177+
178+
object RetryingSpec {
179+
180+
sealed trait TestAlert
181+
case object TestAlert1 extends TestAlert
182+
183+
sealed trait TestService
184+
case object TestService1 extends TestService
185+
186+
implicit def showTestService: Show[TestService] = Show { case TestService1 =>
187+
"test service 1"
188+
}
189+
190+
case class TestConfig(
191+
transient: Retrying.Config.ForTransient,
192+
setup: Retrying.Config.ForSetup
193+
)
194+
195+
def createConfig(
196+
maxAttempts: Int = 5,
197+
transientDelay: FiniteDuration = 1.millis,
198+
setupDelay: FiniteDuration = 1.millis
199+
): TestConfig =
200+
TestConfig(
201+
transient = Retrying.Config.ForTransient(delay = transientDelay, attempts = maxAttempts),
202+
setup = Retrying.Config.ForSetup(delay = setupDelay)
203+
)
204+
}

0 commit comments

Comments
 (0)