Skip to content

Commit c8fe4f4

Browse files
committed
Beautify
1 parent 1e1489a commit c8fe4f4

File tree

3 files changed

+22
-23
lines changed

3 files changed

+22
-23
lines changed

src/main/scala/alpakka/env/FileServer.scala

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.github.blemale.scaffeine.{Cache, Scaffeine}
44
import org.apache.pekko.actor.ActorSystem
55
import org.apache.pekko.http.scaladsl.Http
66
import org.apache.pekko.http.scaladsl.model.StatusCodes.*
7-
import org.apache.pekko.http.scaladsl.model.{HttpResponse, MediaTypes, StatusCodes}
7+
import org.apache.pekko.http.scaladsl.model.{HttpResponse, MediaTypes, StatusCode, StatusCodes}
88
import org.apache.pekko.http.scaladsl.server.Directives.{logRequestResult, path, *}
99
import org.apache.pekko.http.scaladsl.server.{ExceptionHandler, Route}
1010
import org.slf4j.{Logger, LoggerFactory}
@@ -20,13 +20,14 @@ import scala.util.{Failure, Success}
2020
*
2121
* The client can request these types of response:
2222
* - HTTP 200 response: /download/[id]
23-
* - Flaky response: /downloadflaky/[id]
2423
* - Non-idempotent response: /downloadni/[id]
2524
* Allows only one download file request per id, answer with HTTP 404 on subsequent requests
25+
* - Flaky response: /downloadflaky/[id]
26+
* Reply with additional random failures on requests with certain IDs
2627
*
2728
* Uses a cache to remember the "one download per id" behaviour
28-
* Note that akka-http also supports server-side caching (by wrapping caffeine in caching directives):
29-
* https://doc.akka.io/docs/akka-http/current/routing-dsl/directives/caching-directives/index.html
29+
* Note that pekko-http would also support server-side caching (by wrapping caffeine in caching directives):
30+
* https://pekko.apache.org/docs/pekko-http/current/common/caching.html
3031
*/
3132
object FileServer extends App {
3233
val logger: Logger = LoggerFactory.getLogger(this.getClass)
@@ -116,14 +117,10 @@ object FileServer extends App {
116117
Thread.sleep(sleepTime.toLong)
117118
}
118119

119-
def randomErrorHttpStatusCode = {
120-
val statusCodes = Seq(StatusCodes.InternalServerError, StatusCodes.BadRequest, StatusCodes.ServiceUnavailable)
121-
val start = 0
122-
val end = statusCodes.size - 1
123-
val rnd = new scala.util.Random
124-
val finalRnd = start + rnd.nextInt((end - start) + 1)
125-
val statusCode = statusCodes(finalRnd)
120+
def randomErrorHttpStatusCode: StatusCode = {
121+
val statusCodes = Seq(StatusCodes.NotFound, StatusCodes.InternalServerError, StatusCodes.ServiceUnavailable)
122+
val statusCode = statusCodes(scala.util.Random.nextInt(statusCodes.size))
126123
logger.info(s" -> Complete with HTTP status code: $statusCode")
127-
statusCodes(finalRnd)
124+
statusCode
128125
}
129126
}

src/main/scala/sample/stream_shared_state/DownloaderRetry.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,21 +101,19 @@ public boolean retryRequest(HttpRequest request, IOException exception, int exec
101101
} else if (rootCause instanceof SocketException
102102
|| rootCause instanceof InterruptedIOException
103103
|| exception instanceof SSLException) {
104-
try {
105-
Thread.sleep(DELAY_TO_RETRY_SECONDS * 1000);
106-
} catch (InterruptedException e) {
107-
e.printStackTrace(); // do nothing
108-
}
104+
// Don't sleep here; let getRetryInterval handle the delay
109105
return true;
110-
} else
106+
} else {
111107
return false;
108+
}
112109
}
113110

114111
@Override
115112
public boolean retryRequest(HttpResponse response, int execCount, HttpContext context) {
116113
int httpStatusCode = response.getCode();
117-
if (httpStatusCode != HttpStatus.SC_SERVICE_UNAVAILABLE)
118-
return false; // retry only on HTTP 503
114+
if (httpStatusCode != HttpStatus.SC_SERVICE_UNAVAILABLE
115+
&& httpStatusCode != HttpStatus.SC_INTERNAL_SERVER_ERROR)
116+
return false; // retry only on HTTP 500 and 503
119117

120118
if (execCount >= maxRetriesCount) {
121119
logger.warn("File downloading failed after {} retries in {} minute(s)",
@@ -130,7 +128,10 @@ public boolean retryRequest(HttpResponse response, int execCount, HttpContext co
130128

131129
@Override
132130
public TimeValue getRetryInterval(org.apache.hc.core5.http.HttpResponse response, int execCount, HttpContext context) {
133-
return TimeValue.ofSeconds(DELAY_TO_RETRY_SECONDS);
131+
// Exponential backoff: 2s, 4s, 8s
132+
int initialDelaySeconds = 2;
133+
long delaySeconds = initialDelaySeconds * (1L << (execCount - 1));
134+
return TimeValue.ofSeconds(Math.min(delaySeconds, 10)); // cap at 10 seconds
134135
}
135136
}
136137

src/main/scala/sample/stream_shared_state/LocalFileCacheCaffeine.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import scala.util.control.NonFatal
2828
* - On downstream error: the path needs to be kept longer in the cache
2929
* - On restart: populate cache from local filesystem
3030
*
31-
* Before running this class: start [[alpakka.env.FileServer]] to simulate non-idempotent responses
31+
* Before running this class: start [[alpakka.env.FileServer]] to simulate non-idempotent (or flaky) responses
3232
* Monitor `localFileCache` dir with cmd: watch ls -ltr
3333
*
3434
* Doc:
@@ -58,7 +58,7 @@ object LocalFileCacheCaffeine {
5858
FileUtils.forceMkdir(localFileCache.toFile)
5959
// Comment out to start with empty local file storage
6060
// Note that this may provoke "CACHE miss" cases, when we try to scavenge during recoverWith
61-
//FileUtils.cleanDirectory(localFileCache.toFile)
61+
// FileUtils.cleanDirectory(localFileCache.toFile)
6262

6363

6464
def deleteFromFileStore(key: Int, value: Path, cause: caffeine.cache.RemovalCause): Unit = {
@@ -90,6 +90,7 @@ object LocalFileCacheCaffeine {
9090
.mapAsyncUnordered(5) { message =>
9191
def processNext(message: Message): Message = {
9292
val key = message.id
93+
// switch to /downloadflaky for "more confusion"
9394
val url = new URI("http://127.0.0.1:6001/downloadni/" + key.toString)
9495
val destinationFile = localFileCache.resolve(Paths.get(message.id.toString + ".zip"))
9596

0 commit comments

Comments
 (0)