Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public static <T> void asyncResume(AsyncResponse asyncResponse, CompletableFutur
AsyncResponseBuilder.<T>from(Response.ok()).entity(entity).asyncResume(asyncResponse);
}

public static <T> void asyncResume(
AsyncResponse asyncResponse, CompletableFuture<T> entity, boolean isDeleteAclCall) {
AsyncResponseBuilder.<T>from(Response.ok())
.entity(entity)
.asyncResume(asyncResponse, isDeleteAclCall);
}

/** A analogous of {@link AsyncResponse} for {@link ResponseBuilder}. */
public static final class AsyncResponseBuilder<T> {

Expand Down Expand Up @@ -124,5 +131,42 @@ public void asyncResume(AsyncResponse asyncResponse) {
}
});
}

public void asyncResume(AsyncResponse asyncResponse, boolean isDeleteAclCall) {
if (entityFuture == null) {
throw new IllegalStateException();
}
entityFuture.whenComplete(
(entity, exception) -> {
if (exception == null) {
if (statusFunction != null) {
responseBuilder.status(statusFunction.apply(entity));
}
if (entityAnnotations != null) {
asyncResponse.resume(responseBuilder.entity(entity, entityAnnotations).build());
} else {
asyncResponse.resume(responseBuilder.entity(entity).build());
}
} else if (exception instanceof CompletionException) {
log.error(
"Async response CompletionException with error response entity of type {}: {}",
entity != null ? entity.getClass() : "unknown",
entity,
exception);
asyncResponse.resume(exception.getCause());
} else {
log.error(
"Async response exception with error response entity of type {}: {}",
entity != null ? entity.getClass() : "unknown",
entity,
exception);
asyncResponse.resume(exception);
}
});
if (isDeleteAclCall) {
log.error("Delete ACL call throws error");
throw new IllegalStateException("Response does not exist (likely recycled)");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void deleteAcls(
.thenComparing(AclData::getPermission))
.collect(Collectors.toList())));

AsyncResponses.asyncResume(asyncResponse, response);
AsyncResponses.asyncResume(asyncResponse, response, true);
}

public AclData toAclData(Acl acl) {
Expand Down
Empty file added logs/kafka-rest.log
Empty file.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.confluent</groupId>
<artifactId>rest-utils-parent</artifactId>
<version>[8.1.0-0, 8.1.1-0)</version>
<version>8.1.0-0</version>
</parent>

<artifactId>kafka-rest-parent</artifactId>
Expand Down