|
16 | 16 | package com.arpnetworking.http;
|
17 | 17 |
|
18 | 18 | import akka.NotUsed;
|
| 19 | +import akka.actor.ActorNotFound; |
19 | 20 | import akka.actor.ActorRef;
|
20 | 21 | import akka.actor.ActorSystem;
|
21 | 22 | import akka.actor.PoisonPill;
|
|
31 | 32 | import akka.http.javadsl.model.ws.Message;
|
32 | 33 | import akka.japi.JavaPartialFunction;
|
33 | 34 | import akka.japi.function.Function;
|
34 |
| -import akka.pattern.Patterns; |
| 35 | +import akka.japi.pf.PFBuilder; |
| 36 | +import akka.pattern.PatternsCS; |
35 | 37 | import akka.stream.OverflowStrategy;
|
36 | 38 | import akka.stream.javadsl.Flow;
|
37 | 39 | import akka.stream.javadsl.Sink;
|
|
57 | 59 | import com.google.common.collect.ImmutableList;
|
58 | 60 | import com.google.common.io.Resources;
|
59 | 61 | import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
|
60 |
| -import scala.compat.java8.FutureConverters; |
61 |
| -import scala.concurrent.Future; |
62 | 62 | import scala.concurrent.duration.FiniteDuration;
|
63 | 63 |
|
64 | 64 | import java.util.Objects;
|
@@ -104,7 +104,11 @@ public Routes(
|
104 | 104 | */
|
105 | 105 | public Flow<HttpRequest, HttpResponse, NotUsed> flow() {
|
106 | 106 | return Flow.<HttpRequest>create()
|
107 |
| - .mapAsync(1, this); |
| 107 | + .mapAsync(1, this) |
| 108 | + .recover( |
| 109 | + new PFBuilder<Throwable, HttpResponse>() |
| 110 | + .match(Exception.class, e -> HttpResponse.create().withStatus(StatusCodes.INTERNAL_SERVER_ERROR)) |
| 111 | + .build()); |
108 | 112 | }
|
109 | 113 |
|
110 | 114 | @Override
|
@@ -204,21 +208,32 @@ private CompletionStage<HttpResponse> process(final HttpRequest request) {
|
204 | 208 | }
|
205 | 209 | }
|
206 | 210 |
|
207 |
| - return CompletableFuture.completedFuture(HttpResponse.create().withStatus(404)); |
| 211 | + return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.NOT_FOUND)); |
208 | 212 | }
|
209 | 213 |
|
210 | 214 | private CompletionStage<HttpResponse> dispatchHttpRequest(final HttpRequest request, final String actorName) {
|
211 |
| - final Future<ActorRef> refFuture = _actorSystem.actorSelection(actorName) |
212 |
| - .resolveOne(FiniteDuration.create(1, TimeUnit.SECONDS)); |
213 |
| - return FutureConverters.toJava(refFuture).thenCompose( |
| 215 | + final CompletionStage<ActorRef> refFuture = _actorSystem.actorSelection(actorName) |
| 216 | + .resolveOneCS(FiniteDuration.create(1, TimeUnit.SECONDS)); |
| 217 | + return refFuture.thenCompose( |
214 | 218 | ref -> {
|
215 | 219 | final CompletableFuture<HttpResponse> response = new CompletableFuture<>();
|
216 | 220 | ref.tell(new RequestReply(request, response), ActorRef.noSender());
|
217 | 221 | return response;
|
218 | 222 | })
|
219 | 223 | // We return 404 here since actor startup is controlled by config and
|
220 | 224 | // the actors may not be running.
|
221 |
| - .exceptionally(err -> HttpResponse.create().withStatus(404)); |
| 225 | + .exceptionally(err -> { |
| 226 | + final Throwable cause = err.getCause(); |
| 227 | + if (cause instanceof ActorNotFound) { |
| 228 | + return HttpResponse.create().withStatus(StatusCodes.NOT_FOUND); |
| 229 | + } |
| 230 | + LOGGER.error() |
| 231 | + .setMessage("Unhandled exception when looking up actor for http request routing") |
| 232 | + .addData("actorName", actorName) |
| 233 | + .setThrowable(cause) |
| 234 | + .log(); |
| 235 | + return HttpResponse.create().withStatus(StatusCodes.INTERNAL_SERVER_ERROR); |
| 236 | + }); |
222 | 237 | }
|
223 | 238 |
|
224 | 239 | private CompletionStage<HttpResponse> getHttpResponseForTelemetry(
|
@@ -258,11 +273,10 @@ public Object apply(final ActorRef telemetry, final boolean isCheck) throws Exce
|
258 | 273 |
|
259 | 274 | @SuppressWarnings("unchecked")
|
260 | 275 | private <T> CompletionStage<T> ask(final String actorPath, final Object request, final T defaultValue) {
|
261 |
| - return FutureConverters.toJava( |
262 |
| - (Future<T>) Patterns.ask( |
| 276 | + return (CompletionStage<T>) PatternsCS.ask( |
263 | 277 | _actorSystem.actorSelection(actorPath),
|
264 | 278 | request,
|
265 |
| - Timeout.apply(1, TimeUnit.SECONDS))) |
| 279 | + Timeout.apply(1, TimeUnit.SECONDS)) |
266 | 280 | .exceptionally(throwable -> defaultValue);
|
267 | 281 | }
|
268 | 282 |
|
|
0 commit comments