Skip to content

Commit 59cb7ba

Browse files
committed
spline #1078 AQL query is not re-attempted on DatabaseException
1 parent 60b8aab commit 59cb7ba

File tree

6 files changed

+168
-61
lines changed

6 files changed

+168
-61
lines changed

persistence/src/main/scala/za/co/absa/spline/persistence/Persister.scala renamed to commons/src/main/scala/za/co/absa/spline/common/AsyncCallRetryer.scala

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,44 +14,36 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.spline.persistence
17+
package za.co.absa.spline.common
1818

19-
import java.util.concurrent.CompletionException
20-
21-
import com.arangodb.ArangoDBException
2219
import org.slf4s.Logging
20+
import za.co.absa.spline.common.AsyncCallRetryer._
2321

24-
import scala.concurrent.Future
25-
26-
object Persister extends Logging {
27-
28-
import scala.concurrent.ExecutionContext.Implicits._
22+
import scala.concurrent.{ExecutionContext, Future}
2923

30-
private[persistence] val MaxRetries = 5
24+
class AsyncCallRetryer(isRetryable: Throwable => Boolean, maxRetries: Int) extends Logging {
3125

32-
def execute[R](fn: => Future[R]): Future[R] = {
26+
def execute[R](fn: => Future[R])(implicit ex: ExecutionContext): Future[R] = {
3327
executeWithRetry(fn, None)
3428
}
3529

36-
@throws(classOf[IllegalArgumentException])
37-
@throws(classOf[ArangoDBException])
38-
@throws(classOf[CompletionException])
39-
private def executeWithRetry[R](fn: => Future[R], lastFailure: Option[FailedAttempt]): Future[R] = {
30+
private def executeWithRetry[R](fn: => Future[R], lastFailure: Option[FailedAttempt])(implicit ex: ExecutionContext): Future[R] = {
4031
val eventualResult = fn
4132
val attemptsUsed = lastFailure.map(_.count).getOrElse(0)
4233

4334
for (failure <- lastFailure) eventualResult.foreach { _ =>
4435
log.warn(s"Succeeded after ${failure.count + 1} attempts. Previous message was: ${failure.error.getMessage}")
4536
}
4637

47-
if (attemptsUsed >= MaxRetries)
38+
if (attemptsUsed >= maxRetries)
4839
eventualResult
4940
else
5041
eventualResult.recoverWith {
51-
case RetryableException(e) => executeWithRetry(fn, Some(FailedAttempt(attemptsUsed + 1, e)))
42+
case e if isRetryable(e) => executeWithRetry(fn, Some(FailedAttempt(attemptsUsed + 1, e)))
5243
}
5344
}
45+
}
5446

55-
case class FailedAttempt(count: Int, error: Exception)
56-
47+
object AsyncCallRetryer {
48+
case class FailedAttempt(count: Int, error: Throwable)
5749
}

persistence/src/main/scala/za/co/absa/spline/persistence/RetryableException.scala renamed to persistence/src/main/scala/za/co/absa/spline/persistence/RetryableExceptionUtils.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
package za.co.absa.spline.persistence
1818

19-
import java.util.concurrent.CompletionException
20-
2119
import com.arangodb.ArangoDBException
2220
import za.co.absa.spline.persistence.ArangoCode._
2321

24-
object RetryableException {
22+
import scala.annotation.tailrec
23+
24+
object RetryableExceptionUtils {
2525

2626
private[persistence] val RetryableCodes = Set(
2727
ArangoConflict,
@@ -33,10 +33,16 @@ object RetryableException {
3333
ClusterTimeout)
3434
.map(_.code)
3535

36-
def unapply(exception: Throwable): Option[RuntimeException] = exception match {
37-
case e: ArangoDBException if RetryableCodes(e.getResponseCode) => Some(e)
38-
case e: CompletionException => Option(e.getCause).flatMap(unapply).map(_ => e)
39-
case _ => None
36+
def isRetryable(exception: Throwable): Boolean = {
37+
@tailrec def loop(ex: Throwable, visited: Set[Throwable]): Boolean = {
38+
(!visited.contains(ex)) && (ex match {
39+
case e: ArangoDBException if RetryableCodes.contains(e.getErrorNum) => true
40+
case e: Exception => loop(e.getCause, visited + e)
41+
case _ => false
42+
})
43+
}
44+
45+
loop(exception, Set.empty)
4046
}
4147

4248
}

persistence/src/test/scala/za/co/absa/spline/persistence/PersisterSpec.scala renamed to persistence/src/test/scala/za/co/absa/spline/persistence/AsyncCallRetryerSpec.scala

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,71 +16,64 @@
1616

1717
package za.co.absa.spline.persistence
1818

19-
import java.util.concurrent.CompletionException
20-
21-
import com.arangodb.ArangoDBException
2219
import org.mockito.Mockito._
2320
import org.scalatest.flatspec.AsyncFlatSpec
2421
import org.scalatest.matchers.should.Matchers
2522
import org.scalatestplus.mockito.MockitoSugar
26-
import za.co.absa.spline.persistence.PersisterSpec.ARecoverableArangoErrorCode
23+
import za.co.absa.spline.common.AsyncCallRetryer
2724

2825
import scala.concurrent.Future
2926
import scala.concurrent.Future._
3027
import scala.language.implicitConversions
3128

32-
object PersisterSpec {
33-
private val ARecoverableArangoErrorCode = ArangoCode.ClusterTimeout.code
34-
}
35-
36-
class PersisterSpec
29+
class AsyncCallRetryerSpec
3730
extends AsyncFlatSpec
3831
with Matchers
3932
with MockitoSugar {
4033

41-
behavior of "Persister"
34+
behavior of "isRetryable()"
4235

4336
it should "call an underlying method and return a result" in {
37+
val retryer = new AsyncCallRetryer(mock[Throwable => Boolean], 5)
4438
val spy = mock[() => Future[String]]
4539
when(spy()) thenReturn successful("result")
46-
for (result <- Persister.execute(spy()))
40+
for (result <- retryer.execute(spy()))
4741
yield {
4842
verify(spy, times(1))()
4943
result should equal("result")
5044
}
5145
}
5246

5347
it should "retry after a recoverable failure" in {
54-
Future.traverse(RetryableException.RetryableCodes) {
55-
errorCode => {
56-
val spy = mock[() => Future[String]]
57-
(when(spy())
58-
thenReturn failed(new ArangoDBException("1st call failed", errorCode))
59-
thenReturn failed(new CompletionException(new ArangoDBException("2st call failed", errorCode)))
60-
thenReturn successful("3rd call succeeded"))
61-
for (result <- Persister.execute(spy()))
62-
yield {
63-
verify(spy, times(3))()
64-
result should equal("3rd call succeeded")
65-
}
48+
val retryer = new AsyncCallRetryer(_ => true, 5)
49+
val spy = mock[() => Future[String]]
50+
(when(spy())
51+
thenReturn failed(new Exception("1st call failed"))
52+
thenReturn failed(new Exception("2nd call failed"))
53+
thenReturn successful("3rd call succeeded"))
54+
for (result <- retryer.execute(spy()))
55+
yield {
56+
verify(spy, times(3))()
57+
result should equal("3rd call succeeded")
6658
}
67-
}.map(_.head)
6859
}
6960

7061
it should "only retry up to the maximum number of retries" in {
62+
val retryer = new AsyncCallRetryer(_ => true, 2)
7163
val spy = mock[() => Future[String]]
72-
when(spy()) thenReturn failed(new ArangoDBException("oops", ARecoverableArangoErrorCode))
73-
for (thrown <- recoverToExceptionIf[ArangoDBException](Persister.execute(spy())))
64+
when(spy()) thenReturn failed(new Exception("oops"))
65+
for (thrown <- recoverToExceptionIf[Exception](retryer.execute(spy())))
7466
yield {
75-
verify(spy, times(Persister.MaxRetries + 1))()
67+
verify(spy, times(3))() // 2 retries + 1 initial call
7668
thrown.getMessage should equal("oops")
7769
}
7870
}
7971

8072
it should "not retry on a non-recoverable error" in {
73+
val retryer = new AsyncCallRetryer(_ => false, 5)
8174
val spy = mock[() => Future[String]]
8275
when(spy()) thenReturn failed(new RuntimeException("boom"))
83-
for (thrown <- recoverToExceptionIf[Exception](Persister.execute(spy())))
76+
for (thrown <- recoverToExceptionIf[Exception](retryer.execute(spy())))
8477
yield {
8578
verify(spy, times(1))()
8679
thrown.getMessage should equal("boom")
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2022 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.spline.persistence
18+
19+
import com.arangodb.ArangoDBException
20+
import com.arangodb.entity.ErrorEntity
21+
import com.arangodb.velocypack.{VPack, VPackBuilder, ValueType}
22+
import org.scalatest.flatspec.AnyFlatSpec
23+
import org.scalatest.matchers.should.Matchers
24+
import org.scalatest.{BeforeAndAfterEach, OptionValues}
25+
import za.co.absa.spline.persistence.RetryableExceptionUtils.RetryableCodes
26+
import za.co.absa.spline.persistence.RetryableExceptionUtilsSpec._
27+
28+
class RetryableExceptionUtilsSpec
29+
extends AnyFlatSpec
30+
with BeforeAndAfterEach
31+
with Matchers
32+
with OptionValues {
33+
34+
35+
behavior of "RetryableException.unapply()"
36+
37+
RetryableCodes.foreach { errNum =>
38+
it should s"return Some(ex) when ex is ArangoDBException with error $errNum" in {
39+
val ex =
40+
new ArangoDBException(
41+
new ErrorEntity().copy(errorNum = errNum)
42+
)
43+
RetryableExceptionUtils.isRetryable(ex) shouldBe true
44+
}
45+
}
46+
47+
it should s"return Some(ex) when ex is ArangoDBException wrapped with another exception" in {
48+
val ex =
49+
new Exception(
50+
new RuntimeException(
51+
new ArangoDBException(
52+
new ErrorEntity().copy(errorNum = RetryableCodes.head)
53+
)
54+
)
55+
)
56+
RetryableExceptionUtils.isRetryable(ex) shouldBe true
57+
}
58+
59+
it should "return None when neither ex nor any of its causes is ArangoDBException with retryable error codes" in {
60+
RetryableExceptionUtils.isRetryable(new Exception()) shouldBe false
61+
RetryableExceptionUtils.isRetryable(new ArangoDBException(new ErrorEntity().copy(errorNum = ArangoCode.Internal.code))) shouldBe false
62+
}
63+
64+
it should "gracefully handle nulls" in {
65+
RetryableExceptionUtils.isRetryable(null) shouldBe false
66+
}
67+
68+
it should "gracefully handle exceptions with null cause" in {
69+
RetryableExceptionUtils.isRetryable(new Exception) shouldBe false
70+
}
71+
72+
it should "gracefully handle exceptions with looped cause" in {
73+
lazy val loopedEx: Exception =
74+
new Exception {
75+
override def getCause: Throwable =
76+
new Exception {
77+
override def getCause: Throwable = loopedEx
78+
}
79+
}
80+
RetryableExceptionUtils.isRetryable(loopedEx) shouldBe false
81+
}
82+
83+
}
84+
85+
object RetryableExceptionUtilsSpec {
86+
87+
private val vpack = new VPack.Builder().build
88+
89+
implicit class ErrorEntityOps(val ee: ErrorEntity) {
90+
def copy(
91+
errorMessage: String = ee.getErrorMessage,
92+
exception: String = ee.getException,
93+
code: Int = ee.getCode,
94+
errorNum: Int = ee.getErrorNum
95+
): ErrorEntity = {
96+
vpack.deserialize[ErrorEntity](
97+
new VPackBuilder()
98+
.add(ValueType.OBJECT)
99+
.add("errorMessage", errorMessage)
100+
.add("exception", exception)
101+
.add("code", Int.box(code))
102+
.add("errorNum", Int.box(errorNum))
103+
.close()
104+
.slice(),
105+
classOf[ErrorEntity]
106+
)
107+
}
108+
}
109+
}

producer-services/src/main/scala/za/co/absa/spline/producer/service/ProducerServicesConfig.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,14 @@
1616

1717
package za.co.absa.spline.producer.service
1818

19-
import org.springframework.context.annotation.{ComponentScan, Configuration}
19+
import org.springframework.context.annotation.{Bean, ComponentScan, Configuration}
20+
import za.co.absa.spline.common.AsyncCallRetryer
21+
import za.co.absa.spline.persistence.RetryableExceptionUtils
2022

2123
@Configuration
2224
@ComponentScan(basePackageClasses = Array(classOf[repo._package]))
23-
class ProducerServicesConfig
25+
class ProducerServicesConfig {
26+
27+
@Bean def repeater: AsyncCallRetryer = new AsyncCallRetryer(isRetryable = RetryableExceptionUtils.isRetryable, 5)
28+
29+
}

producer-services/src/main/scala/za/co/absa/spline/producer/service/repo/ExecutionProducerRepositoryImpl.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ import com.arangodb.async.ArangoDatabaseAsync
2020
import org.slf4s.Logging
2121
import org.springframework.beans.factory.annotation.Autowired
2222
import org.springframework.stereotype.Repository
23+
import za.co.absa.spline.common.AsyncCallRetryer
2324
import za.co.absa.spline.persistence.model._
2425
import za.co.absa.spline.persistence.tx.{ArangoTx, InsertQuery, TxBuilder}
25-
import za.co.absa.spline.persistence.{ArangoImplicits, Persister}
26+
import za.co.absa.spline.persistence.ArangoImplicits
2627
import za.co.absa.spline.producer.model.v1_1.ExecutionEvent._
2728
import za.co.absa.spline.producer.model.{v1_1 => apiModel}
2829
import za.co.absa.spline.producer.service.model.{ExecutionEventKeyCreator, ExecutionPlanPersistentModel, ExecutionPlanPersistentModelBuilder}
29-
import za.co.absa.spline.producer.service.{UUIDCollisionDetectedException, InconsistentEntityException}
30+
import za.co.absa.spline.producer.service.{InconsistentEntityException, UUIDCollisionDetectedException}
3031

3132
import java.util.UUID
3233
import scala.compat.java8.FutureConverters._
@@ -35,13 +36,13 @@ import scala.concurrent.{ExecutionContext, Future}
3536
import scala.util.control.NonFatal
3637

3738
@Repository
38-
class ExecutionProducerRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends ExecutionProducerRepository
39+
class ExecutionProducerRepositoryImpl @Autowired()(db: ArangoDatabaseAsync, repeater: AsyncCallRetryer) extends ExecutionProducerRepository
3940
with Logging {
4041

4142
import ArangoImplicits._
4243
import ExecutionProducerRepositoryImpl._
4344

44-
override def insertExecutionPlan(executionPlan: apiModel.ExecutionPlan)(implicit ec: ExecutionContext): Future[Unit] = Persister.execute({
45+
override def insertExecutionPlan(executionPlan: apiModel.ExecutionPlan)(implicit ec: ExecutionContext): Future[Unit] = repeater.execute({
4546
// Here I have to use the type parameter `Any` and cast to `String` later due to ArangoDb Java driver issue.
4647
// See https://github.com/arangodb/arangodb-java-driver/issues/389
4748
val eventualMaybeExistingDiscriminatorOpt: Future[Option[String]] = db.queryOptional[Any](
@@ -80,7 +81,7 @@ class ExecutionProducerRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) exte
8081
} yield Unit
8182
})
8283

83-
override def insertExecutionEvents(events: Array[apiModel.ExecutionEvent])(implicit ec: ExecutionContext): Future[Unit] = Persister.execute({
84+
override def insertExecutionEvents(events: Array[apiModel.ExecutionEvent])(implicit ec: ExecutionContext): Future[Unit] = repeater.execute({
8485
val eventualExecPlanInfos: Future[Seq[ExecPlanInfo]] = db.queryStream[ExecPlanInfo](
8586
s"""
8687
|WITH executionPlan, executes, operation, dataSource

0 commit comments

Comments
 (0)