7070import io .netty .handler .timeout .IdleState ;
7171import io .netty .handler .timeout .IdleStateEvent ;
7272import io .netty .handler .timeout .IdleStateHandler ;
73+ import io .netty .util .concurrent .Future ;
74+ import io .netty .util .concurrent .GenericFutureListener ;
7375import java .io .IOException ;
7476import java .io .OutputStream ;
7577import java .lang .reflect .Field ;
@@ -488,7 +490,7 @@ private Map<String, String> peerProperties() {
488490 OutstandingRequest <Map <String , String >> request = outstandingRequest ();
489491 LOGGER .debug ("Peer properties request has correlation ID {}" , correlationId );
490492 outstandingRequests .put (correlationId , request );
491- channel .writeAndFlush (bb );
493+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
492494 request .block ();
493495 if (request .error () == null ) {
494496 return request .response .get ();
@@ -568,7 +570,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate(
568570 }
569571 OutstandingRequest <SaslAuthenticateResponse > request = outstandingRequest ();
570572 outstandingRequests .put (correlationId , request );
571- channel .writeAndFlush (bb );
573+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
572574 request .block ();
573575 return request .response .get ();
574576 } catch (StreamException e ) {
@@ -593,7 +595,7 @@ private Map<String, String> open(String virtualHost) {
593595 bb .writeBytes (virtualHost .getBytes (CHARSET ));
594596 OutstandingRequest <OpenResponse > request = outstandingRequest ();
595597 outstandingRequests .put (correlationId , request );
596- channel .writeAndFlush (bb );
598+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
597599 request .block ();
598600 if (!request .response .get ().isOk ()) {
599601 throw new StreamException (
@@ -635,7 +637,7 @@ private void sendClose(short code, String reason) {
635637 bb .writeBytes (reason .getBytes (CHARSET ));
636638 OutstandingRequest <Response > request = outstandingRequest ();
637639 outstandingRequests .put (correlationId , request );
638- channel .writeAndFlush (bb );
640+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
639641 request .block ();
640642 if (!request .response .get ().isOk ()) {
641643 LOGGER .warn (
@@ -665,7 +667,7 @@ private List<String> getSaslMechanisms() {
665667 bb .writeInt (correlationId );
666668 OutstandingRequest <List <String >> request = outstandingRequest ();
667669 outstandingRequests .put (correlationId , request );
668- channel .writeAndFlush (bb );
670+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
669671 request .block ();
670672 return request .response .get ();
671673 } catch (StreamException e ) {
@@ -695,7 +697,7 @@ public Response create(String stream, Map<String, String> arguments) {
695697 writeMap (bb , arguments );
696698 OutstandingRequest <Response > request = outstandingRequest ();
697699 outstandingRequests .put (correlationId , request );
698- channel .writeAndFlush (bb );
700+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
699701 request .block ();
700702 return request .response .get ();
701703 } catch (StreamException e ) {
@@ -746,7 +748,7 @@ Response createSuperStream(
746748 writeMap (bb , arguments );
747749 OutstandingRequest <Response > request = outstandingRequest ();
748750 outstandingRequests .put (correlationId , request );
749- channel .writeAndFlush (bb );
751+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
750752 request .block ();
751753 return request .response .get ();
752754 } catch (StreamException e ) {
@@ -772,7 +774,7 @@ Response deleteSuperStream(String superStream) {
772774 bb .writeBytes (superStream .getBytes (CHARSET ));
773775 OutstandingRequest <Response > request = outstandingRequest ();
774776 outstandingRequests .put (correlationId , request );
775- channel .writeAndFlush (bb );
777+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
776778 request .block ();
777779 return request .response .get ();
778780 } catch (StreamException e ) {
@@ -856,7 +858,7 @@ public Response delete(String stream) {
856858 bb .writeBytes (stream .getBytes (CHARSET ));
857859 OutstandingRequest <Response > request = outstandingRequest ();
858860 outstandingRequests .put (correlationId , request );
859- channel .writeAndFlush (bb );
861+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
860862 request .block ();
861863 return request .response .get ();
862864 } catch (StreamException e ) {
@@ -887,7 +889,7 @@ public Map<String, StreamMetadata> metadata(String... streams) {
887889 writeArray (bb , streams );
888890 OutstandingRequest <Map <String , StreamMetadata >> request = outstandingRequest ();
889891 outstandingRequests .put (correlationId , request );
890- channel .writeAndFlush (bb );
892+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
891893 request .block ();
892894 return request .response .get ();
893895 } catch (StreamException e ) {
@@ -926,7 +928,7 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
926928 bb .writeBytes (stream .getBytes (CHARSET ));
927929 OutstandingRequest <Response > request = outstandingRequest ();
928930 outstandingRequests .put (correlationId , request );
929- channel .writeAndFlush (bb );
931+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
930932 request .block ();
931933 return request .response .get ();
932934 } catch (StreamException e ) {
@@ -951,7 +953,7 @@ public Response deletePublisher(byte publisherId) {
951953 bb .writeByte (publisherId );
952954 OutstandingRequest <Response > request = outstandingRequest ();
953955 outstandingRequests .put (correlationId , request );
954- channel .writeAndFlush (bb );
956+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
955957 request .block ();
956958 return request .response .get ();
957959 } catch (StreamException e ) {
@@ -1289,7 +1291,7 @@ public Response subscribe(
12891291 subscriptionOffsets .add (
12901292 new SubscriptionOffset (subscriptionId , offsetSpecification .getOffset ()));
12911293 }
1292- channel .writeAndFlush (bb );
1294+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
12931295 request .block ();
12941296 return request .response .get ();
12951297 } catch (StreamException e ) {
@@ -1346,10 +1348,9 @@ public QueryOffsetResponse queryOffset(String reference, String stream) {
13461348 bb .writeBytes (stream .getBytes (CHARSET ));
13471349 OutstandingRequest <QueryOffsetResponse > request = outstandingRequest ();
13481350 outstandingRequests .put (correlationId , request );
1349- channel .writeAndFlush (bb );
1351+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
13501352 request .block ();
1351- QueryOffsetResponse response = request .response .get ();
1352- return response ;
1353+ return request .response .get ();
13531354 } catch (StreamException e ) {
13541355 this .handleRpcError (correlationId , e );
13551356 throw e ;
@@ -1387,7 +1388,7 @@ public long queryPublisherSequence(String publisherReference, String stream) {
13871388 bb .writeBytes (stream .getBytes (CHARSET ));
13881389 OutstandingRequest <QueryPublisherSequenceResponse > request = outstandingRequest ();
13891390 outstandingRequests .put (correlationId , request );
1390- channel .writeAndFlush (bb );
1391+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
13911392 request .block ();
13921393 QueryPublisherSequenceResponse response = request .response .get ();
13931394 if (!response .isOk ()) {
@@ -1421,7 +1422,7 @@ public Response unsubscribe(byte subscriptionId) {
14211422 bb .writeByte (subscriptionId );
14221423 OutstandingRequest <Response > request = outstandingRequest ();
14231424 outstandingRequests .put (correlationId , request );
1424- channel .writeAndFlush (bb );
1425+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
14251426 request .block ();
14261427 return request .response .get ();
14271428 } catch (StreamException e ) {
@@ -1582,7 +1583,7 @@ public List<String> route(String routingKey, String superStream) {
15821583 bb .writeBytes (superStream .getBytes (CHARSET ));
15831584 OutstandingRequest <List <String >> request = outstandingRequest ();
15841585 outstandingRequests .put (correlationId , request );
1585- channel .writeAndFlush (bb );
1586+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
15861587 request .block ();
15871588 return request .response .get ();
15881589 } catch (StreamException e ) {
@@ -1615,7 +1616,7 @@ public List<String> partitions(String superStream) {
16151616 bb .writeBytes (superStream .getBytes (CHARSET ));
16161617 OutstandingRequest <List <String >> request = outstandingRequest ();
16171618 outstandingRequests .put (correlationId , request );
1618- channel .writeAndFlush (bb );
1619+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
16191620 request .block ();
16201621 return request .response .get ();
16211622 } catch (StreamException e ) {
@@ -1647,7 +1648,7 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
16471648 }
16481649 OutstandingRequest <List <FrameHandlerInfo >> request = outstandingRequest ();
16491650 outstandingRequests .put (correlationId , request );
1650- channel .writeAndFlush (bb );
1651+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
16511652 request .block ();
16521653 return request .response .get ();
16531654 } catch (StreamException e ) {
@@ -1676,7 +1677,7 @@ StreamStatsResponse streamStats(String stream) {
16761677 bb .writeBytes (stream .getBytes (CHARSET ));
16771678 OutstandingRequest <StreamStatsResponse > request = outstandingRequest ();
16781679 outstandingRequests .put (correlationId , request );
1679- channel .writeAndFlush (bb );
1680+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
16801681 request .block ();
16811682 return request .response .get ();
16821683 } catch (StreamException e ) {
@@ -2970,10 +2971,37 @@ private void debug(Supplier<String> format, Object... args) {
29702971 }
29712972 }
29722973
2973- private void handleRpcError (int correlationId , Exception e ) {
2974- OutstandingRequest <?> request = this .outstandingRequests .remove (correlationId );
2974+ private GenericFutureListener <Future <? super Void >> maybeFailRpc (int correlationId ) {
2975+ return new FailRpcFuture (this .outstandingRequests , correlationId );
2976+ }
2977+
2978+ private void handleRpcError (int correlationId , Throwable e ) {
2979+ handleRpcError (this .outstandingRequests , correlationId , e );
2980+ }
2981+
2982+ private static void handleRpcError (
2983+ Map <Integer , OutstandingRequest <?>> requests , int correlationId , Throwable e ) {
2984+ OutstandingRequest <?> request = requests .remove (correlationId );
29752985 if (request != null ) {
29762986 request .completeExceptionally (e );
29772987 }
29782988 }
2989+
2990+ private static final class FailRpcFuture implements GenericFutureListener <Future <? super Void >> {
2991+
2992+ private final Map <Integer , OutstandingRequest <?>> requests ;
2993+ private final int correlationId ;
2994+
2995+ private FailRpcFuture (Map <Integer , OutstandingRequest <?>> requests , int correlationId ) {
2996+ this .requests = requests ;
2997+ this .correlationId = correlationId ;
2998+ }
2999+
3000+ @ Override
3001+ public void operationComplete (Future <? super Void > f ) {
3002+ if (!f .isSuccess ()) {
3003+ handleRpcError (requests , correlationId , f .cause ());
3004+ }
3005+ }
3006+ }
29793007}
0 commit comments