Skip to content

Commit b121a6a

Browse files
committed
Switch to CircuitBreaker wraps Retry
1 parent 5a68b7b commit b121a6a

File tree

1 file changed

+47
-32
lines changed

1 file changed

+47
-32
lines changed

src/main/scala/akkahttp/ReverseProxy.scala

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ import scala.util.{Failure, Success}
4848
* Remarks:
4949
* - The target server selection is via the "Host" HTTP header
5050
* - Local/Remote target servers are designed to be flaky to show Retry/CircuitBreaker behavior
51-
* eg for Local adjust [[responseCodes]]
51+
* e.g. for Local adjust [[responseCodes]]
5252
* - On top of the built-in client, you may also try other clients, see below
5353
* - This PoC may not scale well, possible bottlenecks are:
54-
* - Combination of Retry/CircuitBreaker
54+
* - Combination of CircuitBreaker which wraps Retry
5555
* - Round robin impl. with `requestCounter` means shared state
5656
*
5757
* Gatling client: [[ReverseProxySimulation]]
@@ -75,7 +75,7 @@ object ReverseProxy extends App {
7575

7676
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
7777

78-
implicit val http: HttpExt = Http(system)
78+
val http: HttpExt = Http(system)
7979

8080
ReverseProxyMonitor.initializeWebUI(system)
8181

@@ -201,37 +201,46 @@ object ReverseProxy extends App {
201201
services.get(mode) match {
202202
case Some(rawSeq) =>
203203
val seq = rawSeq.flatMap(t => (1 to t.weight).map(_ => t))
204-
Retry.retry[HttpResponse](times = 3) {
205-
val index = requestCounter.incrementAndGet() % (if (seq.isEmpty) 1 else seq.size)
206-
val target = seq(index)
207-
logger.info(s"Forwarding request with id: $id to $mode target server: ${target.url}")
208-
val circuitBreaker = circuitBreakers.computeIfAbsent(target.url, _ => {
209-
new CircuitBreaker(
210-
system.scheduler,
211-
// A low value opens the circuit breaker for subsequent requests (until resetTimeout)
212-
maxFailures = 2,
213-
// Needs to be shorter than pekko-http 'request-timeout' (20s)
214-
// If not, clients get 503 from pekko-http
215-
callTimeout = 10.seconds,
216-
resetTimeout = 10.seconds)
217-
})
218-
219-
// Example of an on-the-fly processing scenario
220-
val hashFuture = request.entity.dataBytes
221-
.via(computeHashFromPayloadAndPayloadLength)
222-
.runWith(Sink.head)
223-
.map { accumulator =>
224-
RawHeader("X-Content-Hash", Hex.toHexString(accumulator.digest.digest()))
225-
}
204+
val index = requestCounter.incrementAndGet() % (if (seq.isEmpty) 1 else seq.size)
205+
val target = seq(index)
206+
logger.info(s"Forwarding request with id: $id to $mode target server: ${target.url}")
207+
208+
val circuitBreaker = circuitBreakers.computeIfAbsent(target.url, _ => {
209+
val cb = new CircuitBreaker(
210+
system.scheduler,
211+
// Account for retry attempts
212+
// A lower value opens the circuit breaker for subsequent requests (until resetTimeout)
213+
maxFailures = 5,
214+
// Needs to be shorter than pekko-http 'request-timeout' (20s)
215+
// If not, clients get 503 from pekko-http
216+
callTimeout = 15.seconds,
217+
resetTimeout = 10.seconds)
218+
// For yet unknown reasons these callbacks are not executed
219+
cb.onOpen { () => logger.info(s"CircuitBreaker OPENED for ${target.url}") }
220+
cb.onClose { () => logger.info(s"CircuitBreaker CLOSED for ${target.url}") }
221+
cb.onHalfOpen { () => logger.info(s"CircuitBreaker HALF-OPEN for ${target.url}") }
222+
cb
223+
})
224+
225+
// Example of an on-the-fly processing scenario
226+
val hashFuture = request.entity.dataBytes
227+
.via(computeHashFromPayloadAndPayloadLength)
228+
.runWith(Sink.head)
229+
.map { accumulator =>
230+
RawHeader("X-Content-Hash", Hex.toHexString(accumulator.digest.digest()))
231+
}
226232

233+
hashFuture.flatMap { hashHeader =>
234+
val proxyReq = request
235+
.withUri(uri(target))
236+
.withHeaders(headers(target) :+ hashHeader)
227237

228-
hashFuture.flatMap { hashHeader =>
229-
val proxyReq = request
230-
.withUri(uri(target))
231-
.withHeaders(headers(target) :+ hashHeader)
232-
circuitBreaker.withCircuitBreaker(http.singleRequest(proxyReq))
238+
// CircuitBreaker wraps the retry logic
239+
circuitBreaker.withCircuitBreaker {
240+
Retry.retry[HttpResponse](times = 3) {
241+
http.singleRequest(proxyReq)
242+
}
233243
}
234-
235244
}.andThen {
236245
case Success(response) =>
237246
val responseTime = System.currentTimeMillis() - startTime
@@ -349,7 +358,13 @@ object Retry {
349358
case Success(httpResponse: HttpResponse) if httpResponse.status.intValue() >= 500 =>
350359
val id = httpResponse.getHeader("X-Correlation-ID").orElse(RawHeader("X-Correlation-ID", "N/A")).value()
351360
logger.info(s"ReverseProxy got 5xx server error for id: $id. Retries left: ${times - 1}")
352-
retryPromise[T](times - 1, promise, Some(new RuntimeException(s"Received: ${httpResponse.status.intValue()} from target server")), f)
361+
val exception = new RuntimeException(s"Received: ${httpResponse.status.intValue()} from target server")
362+
if (times == 1) {
363+
// Last retry failed - propagate as failure to CircuitBreaker
364+
promise.tryFailure(exception)
365+
} else {
366+
retryPromise[T](times - 1, promise, Some(exception), f)
367+
}
353368
case Success(t) => promise.trySuccess(t)
354369
case Failure(e) => retryPromise[T](times - 1, promise, Some(e), f)
355370
}

0 commit comments

Comments
 (0)