From f43df4caa83b54e49ab26c1a53f56eeac525f687 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 20 Aug 2025 12:11:23 +0200 Subject: [PATCH] Fail RPC requests in case of asynchronous failure --- .../java/com/rabbitmq/stream/impl/Client.java | 76 +++++++++++++------ 1 file changed, 52 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 56d02862dc..892e7553fb 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -70,6 +70,8 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.Field; @@ -488,7 +490,7 @@ private Map peerProperties() { OutstandingRequest> request = outstandingRequest(); LOGGER.debug("Peer properties request has correlation ID {}", correlationId); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); if (request.error() == null) { return request.response.get(); @@ -568,7 +570,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate( } OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -593,7 +595,7 @@ private Map open(String virtualHost) { bb.writeBytes(virtualHost.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); if (!request.response.get().isOk()) { throw new StreamException( @@ -635,7 +637,7 @@ private void sendClose(short code, String reason) { bb.writeBytes(reason.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); if (!request.response.get().isOk()) { LOGGER.warn( @@ -665,7 +667,7 @@ private List getSaslMechanisms() { bb.writeInt(correlationId); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -695,7 +697,7 @@ public Response create(String stream, Map arguments) { writeMap(bb, arguments); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -746,7 +748,7 @@ Response createSuperStream( writeMap(bb, arguments); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -772,7 +774,7 @@ Response deleteSuperStream(String superStream) { bb.writeBytes(superStream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -856,7 +858,7 @@ public Response delete(String stream) { bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -887,7 +889,7 @@ public Map metadata(String... streams) { writeArray(bb, streams); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -926,7 +928,7 @@ public Response declarePublisher(byte publisherId, String publisherReference, St bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -951,7 +953,7 @@ public Response deletePublisher(byte publisherId) { bb.writeByte(publisherId); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -1289,7 +1291,7 @@ public Response subscribe( subscriptionOffsets.add( new SubscriptionOffset(subscriptionId, offsetSpecification.getOffset())); } - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -1346,10 +1348,9 @@ public QueryOffsetResponse queryOffset(String reference, String stream) { bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); - QueryOffsetResponse response = request.response.get(); - return response; + return request.response.get(); } catch (StreamException e) { this.handleRpcError(correlationId, e); throw e; @@ -1387,7 +1388,7 @@ public long queryPublisherSequence(String publisherReference, String stream) { bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); QueryPublisherSequenceResponse response = request.response.get(); if (!response.isOk()) { @@ -1421,7 +1422,7 @@ public Response unsubscribe(byte subscriptionId) { bb.writeByte(subscriptionId); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -1586,7 +1587,7 @@ public List route(String routingKey, String superStream) { bb.writeBytes(superStream.getBytes(CHARSET)); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -1619,7 +1620,7 @@ public List partitions(String superStream) { bb.writeBytes(superStream.getBytes(CHARSET)); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -1651,7 +1652,7 @@ List exchangeCommandVersions() { } OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -1680,7 +1681,7 @@ StreamStatsResponse streamStats(String stream) { bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); - channel.writeAndFlush(bb); + channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId)); request.block(); return request.response.get(); } catch (StreamException e) { @@ -2972,10 +2973,37 @@ private void debug(Supplier format, Object... args) { } } - private void handleRpcError(int correlationId, Exception e) { - OutstandingRequest request = this.outstandingRequests.remove(correlationId); + private GenericFutureListener> maybeFailRpc(int correlationId) { + return new FailRpcFuture(this.outstandingRequests, correlationId); + } + + private void handleRpcError(int correlationId, Throwable e) { + handleRpcError(this.outstandingRequests, correlationId, e); + } + + private static void handleRpcError( + Map> requests, int correlationId, Throwable e) { + OutstandingRequest request = requests.remove(correlationId); if (request != null) { request.completeExceptionally(e); } } + + private static final class FailRpcFuture implements GenericFutureListener> { + + private final Map> requests; + private final int correlationId; + + private FailRpcFuture(Map> requests, int correlationId) { + this.requests = requests; + this.correlationId = correlationId; + } + + @Override + public void operationComplete(Future f) { + if (!f.isSuccess()) { + handleRpcError(requests, correlationId, f.cause()); + } + } + } }