diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/resources/AsyncResponses.java b/kafka-rest/src/main/java/io/confluent/kafkarest/resources/AsyncResponses.java index 01677d05d4..d3a21b2796 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/resources/AsyncResponses.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/resources/AsyncResponses.java @@ -37,6 +37,13 @@ public static void asyncResume(AsyncResponse asyncResponse, CompletableFutur AsyncResponseBuilder.from(Response.ok()).entity(entity).asyncResume(asyncResponse); } + public static void asyncResume( + AsyncResponse asyncResponse, CompletableFuture entity, boolean isDeleteAclCall) { + AsyncResponseBuilder.from(Response.ok()) + .entity(entity) + .asyncResume(asyncResponse, isDeleteAclCall); + } + /** A analogous of {@link AsyncResponse} for {@link ResponseBuilder}. */ public static final class AsyncResponseBuilder { @@ -124,5 +131,42 @@ public void asyncResume(AsyncResponse asyncResponse) { } }); } + + public void asyncResume(AsyncResponse asyncResponse, boolean isDeleteAclCall) { + if (entityFuture == null) { + throw new IllegalStateException(); + } + entityFuture.whenComplete( + (entity, exception) -> { + if (exception == null) { + if (statusFunction != null) { + responseBuilder.status(statusFunction.apply(entity)); + } + if (entityAnnotations != null) { + asyncResponse.resume(responseBuilder.entity(entity, entityAnnotations).build()); + } else { + asyncResponse.resume(responseBuilder.entity(entity).build()); + } + } else if (exception instanceof CompletionException) { + log.error( + "Async response CompletionException with error response entity of type {}: {}", + entity != null ? entity.getClass() : "unknown", + entity, + exception); + asyncResponse.resume(exception.getCause()); + } else { + log.error( + "Async response exception with error response entity of type {}: {}", + entity != null ? entity.getClass() : "unknown", + entity, + exception); + asyncResponse.resume(exception); + } + }); + if (isDeleteAclCall) { + log.error("Delete ACL call throws error"); + throw new IllegalStateException("Response does not exist (likely recycled)"); + } + } } } diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/resources/v3/AclsResource.java b/kafka-rest/src/main/java/io/confluent/kafkarest/resources/v3/AclsResource.java index 86397ccd8f..502b42ff50 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/resources/v3/AclsResource.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/resources/v3/AclsResource.java @@ -255,7 +255,7 @@ public void deleteAcls( .thenComparing(AclData::getPermission)) .collect(Collectors.toList()))); - AsyncResponses.asyncResume(asyncResponse, response); + AsyncResponses.asyncResume(asyncResponse, response, true); } public AclData toAclData(Acl acl) { diff --git a/logs/kafka-rest.log b/logs/kafka-rest.log new file mode 100644 index 0000000000..e69de29bb2 diff --git a/pom.xml b/pom.xml index 690d90e072..c00556f729 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ io.confluent rest-utils-parent - [8.1.0-0, 8.1.1-0) + 8.1.0-0 kafka-rest-parent