@@ -20,8 +20,8 @@ import org.bouncycastle.util.encoders.Hex
2020import org .slf4j .{Logger , LoggerFactory }
2121
2222import java .security .MessageDigest
23- import java .util .concurrent .ConcurrentHashMap
2423import java .util .concurrent .atomic .AtomicInteger
24+ import java .util .concurrent .{ConcurrentHashMap , ThreadLocalRandom }
2525import scala .collection .parallel .CollectionConverters .ImmutableIterableIsParallelizable
2626import scala .concurrent .*
2727import scala .concurrent .duration .DurationInt
@@ -47,12 +47,11 @@ import scala.util.{Failure, Success}
4747 *
4848 * Remarks:
4949 * - The target server selection is via the "Host" HTTP header
50- * - Local/Remote target servers are designed to be flaky to show Retry/CircuitBreaker behavior
51- * e.g. for Local adjust [[responseCodes ]]
50+ * - Local/Remote target servers are designed to be faulty to show Retry/CircuitBreaker behavior
51+ * e.g. for mode Local adjust [[responseCodes ]]
5252 * - On top of the built-in client, you may also try other clients, see below
53- * - This PoC may not scale well, possible bottlenecks are:
54- * - Combination of CircuitBreaker which wraps Retry
55- * - Round robin impl. with `requestCounter` means shared state
53+ * - This PoC may not scale well, because the 'round robin' implementation
54+ * with `requestCounter` means shared state
5655 *
5756 * Gatling client: [[ReverseProxySimulation ]]
5857 *
@@ -77,8 +76,6 @@ object ReverseProxy extends App {
7776
7877 val http : HttpExt = Http (system)
7978
80- ReverseProxyMonitor .initializeWebUI(system)
81-
8279 val circuitBreakers = new ConcurrentHashMap [String , CircuitBreaker ]()
8380 val requestCounter = new AtomicInteger (0 )
8481
@@ -98,14 +95,19 @@ object ReverseProxy extends App {
9895 )
9996 )
10097
101- // For Mode.local: Adjust to provoke more retries on ReverseProxy
102- val responseCodes = List (200 , 200 , 200 , 200 , 200 , 200 , 200 , 200 , 500 , 503 )
98+ // For Mode.local: Add more failure response codes to provoke more retries on ReverseProxy
99+ // and thus provoke the CircuitBreaker to open
100+ // val responseCodes = List(200, 200, 200, 200, 200, 200, 200, 200, 500, 503)
101+ val responseCodes = List (200 , 200 , 500 , 500 , 500 , 500 , 503 , 503 , 503 , 503 )
103102
104103 localTargetServers(maxConnections = 100 ) // 1-1024
105104 reverseProxy()
106- // Switch Mode to let ReverseProxy forward client requests to local/remote target server(s)
105+
106+ // Switch mode to let ReverseProxy forward client requests to local/remote target server(s)
107107 // Note that the remote servers can not interpret the X-Correlation-ID header
108- clients(nbrOfClients = 10 , requestsPerClient = 10 , Mode .local)
108+ val mode = Mode .remote
109+ clients(nbrOfClients = 10 , requestsPerClient = 10 , mode)
110+ ReverseProxyMonitor .initializeWebUI(system, services(mode))
109111
110112 sys.addShutdownHook {
111113 ReverseProxyMonitor .shutdown()
@@ -131,7 +133,7 @@ object ReverseProxy extends App {
131133 }
132134
133135 Source (1 to nbrOfRequests)
134- .throttle(1 , 1 .second , 10 , ThrottleMode .shaping)
136+ .throttle(1 , 2 .seconds , 10 , ThrottleMode .shaping)
135137 .wireTap(each => logger.info(s " Client: $clientId about to send request with id: $clientId- $each... " ))
136138 .mapAsync(1 )(each => http.singleRequest(HttpRequest (uri = s " http:// $proxyHost: $proxyPort/ $fixedPath" )
137139 .withHeaders(Seq (RawHeader (" Host" , targetHost.toString), RawHeader (" X-Correlation-ID" , s " $clientId- $each" )))))
@@ -162,7 +164,6 @@ object ReverseProxy extends App {
162164 val mode = Mode .values.find(_.toString == host).getOrElse(Mode .local)
163165 val id = request.getHeader(" X-Correlation-ID" ).orElse(RawHeader (" X-Correlation-ID" , " N/A" )).value()
164166
165- val requestId = ReverseProxyMonitor .logRequest(request, host, id)
166167 val startTime = System .currentTimeMillis()
167168
168169 def headers (target : Target ) = {
@@ -205,20 +206,19 @@ object ReverseProxy extends App {
205206 val target = seq(index)
206207 logger.info(s " Forwarding request with id: $id to $mode target server: ${target.url}" )
207208
209+ val requestId = ReverseProxyMonitor .logRequest(request, target.url, id)
210+
208211 val circuitBreaker = circuitBreakers.computeIfAbsent(target.url, _ => {
209212 val cb = new CircuitBreaker (
210213 system.scheduler,
211- // Account for retry attempts
212- // A lower value opens the circuit breaker for subsequent requests (until resetTimeout)
213214 maxFailures = 5 ,
214215 // Needs to be shorter than pekko-http 'request-timeout' (20s)
215216 // If not, clients get 503 from pekko-http
216- callTimeout = 15 .seconds,
217- resetTimeout = 10 .seconds)
218- // For yet unknown reasons these callbacks are not executed
219- cb.onOpen { () => logger.info(s " CircuitBreaker OPENED for ${target.url}" ) }
220- cb.onClose { () => logger.info(s " CircuitBreaker CLOSED for ${target.url}" ) }
221- cb.onHalfOpen { () => logger.info(s " CircuitBreaker HALF-OPEN for ${target.url}" ) }
217+ callTimeout = 5 .seconds,
218+ resetTimeout = 5 .seconds)
219+ cb.onOpen(ReverseProxyMonitor .logCircuitBreakerEvent(target.url, " OPENED" ))
220+ cb.onClose(ReverseProxyMonitor .logCircuitBreakerEvent(target.url, " CLOSED" ))
221+ cb.onHalfOpen(ReverseProxyMonitor .logCircuitBreakerEvent(target.url, " HALF-OPENED" ))
222222 cb
223223 })
224224
@@ -248,17 +248,18 @@ object ReverseProxy extends App {
248248 case Failure (exception) =>
249249 val responseTime = System .currentTimeMillis() - startTime
250250 val errorResponse = exception match {
251- case _ : CircuitBreakerOpenException => BadGateway (id, " Circuit breaker opened " )
251+ case e : CircuitBreakerOpenException => BadGateway (id, e.getMessage )
252252 case _ : TimeoutException => GatewayTimeout (id)
253253 case e => BadGateway (id, e.getMessage)
254254 }
255255 ReverseProxyMonitor .logResponse(requestId, errorResponse, responseTime, id, Some (exception.getMessage))
256256 }.recover {
257- case _ : CircuitBreakerOpenException => BadGateway (id, " Circuit breaker opened " )
257+ case e : CircuitBreakerOpenException => BadGateway (id, e.getMessage )
258258 case _ : TimeoutException => GatewayTimeout (id)
259259 case e => BadGateway (id, e.getMessage)
260260 }
261261 case None =>
262+ val requestId = ReverseProxyMonitor .logRequest(request, host, id)
262263 val notFoundResponse = NotFound (id, host)
263264 val responseTime = System .currentTimeMillis() - startTime
264265 ReverseProxyMonitor .logResponse(requestId, notFoundResponse, responseTime, id, Some (" Host not found" ))
@@ -281,7 +282,7 @@ object ReverseProxy extends App {
281282 val echoRoute : Route =
282283 extractRequest { request =>
283284 complete {
284- Thread .sleep(500 )
285+ Thread .sleep(ThreadLocalRandom .current.nextInt( 1 , 20 ) * 100 )
285286 val id = request.getHeader(" X-Correlation-ID" ).orElse(RawHeader (" X-Correlation-ID" , " N/A" )).value()
286287
287288 val randomResponseCode = responseCodes(new scala.util.Random ().nextInt(responseCodes.length))
0 commit comments