Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 52 additions & 24 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -488,7 +490,7 @@ private Map<String, String> peerProperties() {
OutstandingRequest<Map<String, String>> request = outstandingRequest();
LOGGER.debug("Peer properties request has correlation ID {}", correlationId);
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
if (request.error() == null) {
return request.response.get();
Expand Down Expand Up @@ -568,7 +570,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate(
}
OutstandingRequest<SaslAuthenticateResponse> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand All @@ -593,7 +595,7 @@ private Map<String, String> open(String virtualHost) {
bb.writeBytes(virtualHost.getBytes(CHARSET));
OutstandingRequest<OpenResponse> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
if (!request.response.get().isOk()) {
throw new StreamException(
Expand Down Expand Up @@ -635,7 +637,7 @@ private void sendClose(short code, String reason) {
bb.writeBytes(reason.getBytes(CHARSET));
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
if (!request.response.get().isOk()) {
LOGGER.warn(
Expand Down Expand Up @@ -665,7 +667,7 @@ private List<String> getSaslMechanisms() {
bb.writeInt(correlationId);
OutstandingRequest<List<String>> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand Down Expand Up @@ -695,7 +697,7 @@ public Response create(String stream, Map<String, String> arguments) {
writeMap(bb, arguments);
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand Down Expand Up @@ -746,7 +748,7 @@ Response createSuperStream(
writeMap(bb, arguments);
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand All @@ -772,7 +774,7 @@ Response deleteSuperStream(String superStream) {
bb.writeBytes(superStream.getBytes(CHARSET));
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand Down Expand Up @@ -856,7 +858,7 @@ public Response delete(String stream) {
bb.writeBytes(stream.getBytes(CHARSET));
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand Down Expand Up @@ -887,7 +889,7 @@ public Map<String, StreamMetadata> metadata(String... streams) {
writeArray(bb, streams);
OutstandingRequest<Map<String, StreamMetadata>> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand Down Expand Up @@ -926,7 +928,7 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
bb.writeBytes(stream.getBytes(CHARSET));
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand All @@ -951,7 +953,7 @@ public Response deletePublisher(byte publisherId) {
bb.writeByte(publisherId);
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand Down Expand Up @@ -1289,7 +1291,7 @@ public Response subscribe(
subscriptionOffsets.add(
new SubscriptionOffset(subscriptionId, offsetSpecification.getOffset()));
}
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand Down Expand Up @@ -1346,10 +1348,9 @@ public QueryOffsetResponse queryOffset(String reference, String stream) {
bb.writeBytes(stream.getBytes(CHARSET));
OutstandingRequest<QueryOffsetResponse> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
QueryOffsetResponse response = request.response.get();
return response;
return request.response.get();
} catch (StreamException e) {
this.handleRpcError(correlationId, e);
throw e;
Expand Down Expand Up @@ -1387,7 +1388,7 @@ public long queryPublisherSequence(String publisherReference, String stream) {
bb.writeBytes(stream.getBytes(CHARSET));
OutstandingRequest<QueryPublisherSequenceResponse> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
QueryPublisherSequenceResponse response = request.response.get();
if (!response.isOk()) {
Expand Down Expand Up @@ -1421,7 +1422,7 @@ public Response unsubscribe(byte subscriptionId) {
bb.writeByte(subscriptionId);
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand Down Expand Up @@ -1586,7 +1587,7 @@ public List<String> route(String routingKey, String superStream) {
bb.writeBytes(superStream.getBytes(CHARSET));
OutstandingRequest<List<String>> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand Down Expand Up @@ -1619,7 +1620,7 @@ public List<String> partitions(String superStream) {
bb.writeBytes(superStream.getBytes(CHARSET));
OutstandingRequest<List<String>> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand Down Expand Up @@ -1651,7 +1652,7 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
}
OutstandingRequest<List<FrameHandlerInfo>> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand Down Expand Up @@ -1680,7 +1681,7 @@ StreamStatsResponse streamStats(String stream) {
bb.writeBytes(stream.getBytes(CHARSET));
OutstandingRequest<StreamStatsResponse> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
channel.writeAndFlush(bb).addListener(maybeFailRpc(correlationId));
request.block();
return request.response.get();
} catch (StreamException e) {
Expand Down Expand Up @@ -2972,10 +2973,37 @@ private void debug(Supplier<String> format, Object... args) {
}
}

private void handleRpcError(int correlationId, Exception e) {
OutstandingRequest<?> request = this.outstandingRequests.remove(correlationId);
private GenericFutureListener<Future<? super Void>> maybeFailRpc(int correlationId) {
return new FailRpcFuture(this.outstandingRequests, correlationId);
}

private void handleRpcError(int correlationId, Throwable e) {
handleRpcError(this.outstandingRequests, correlationId, e);
}

private static void handleRpcError(
Map<Integer, OutstandingRequest<?>> requests, int correlationId, Throwable e) {
OutstandingRequest<?> request = requests.remove(correlationId);
if (request != null) {
request.completeExceptionally(e);
}
}

private static final class FailRpcFuture implements GenericFutureListener<Future<? super Void>> {

private final Map<Integer, OutstandingRequest<?>> requests;
private final int correlationId;

private FailRpcFuture(Map<Integer, OutstandingRequest<?>> requests, int correlationId) {
this.requests = requests;
this.correlationId = correlationId;
}

@Override
public void operationComplete(Future<? super Void> f) {
if (!f.isSuccess()) {
handleRpcError(requests, correlationId, f.cause());
}
}
}
}
Loading