@@ -251,6 +251,13 @@ public void handle(Client client, int frameSize, ChannelHandlerContext ctx, Byte
251251 }
252252
253253 abstract int doHandle (Client client , ChannelHandlerContext ctx , ByteBuf message );
254+
255+ protected void logMissingOutstandingRequest (int correlationId ) {
256+ LOGGER .warn (
257+ "Could not find outstanding request with correlation ID {} ({})" ,
258+ correlationId ,
259+ this .getClass ().getSimpleName ());
260+ }
254261 }
255262
256263 private static class ConfirmFrameHandler extends BaseFrameHandler {
@@ -670,7 +677,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
670677 OutstandingRequest <QueryPublisherSequenceResponse > outstandingRequest =
671678 remove (client .outstandingRequests , correlationId , QueryPublisherSequenceResponse .class );
672679 if (outstandingRequest == null ) {
673- LOGGER . warn ( "Could not find outstanding request with correlation ID {}" , correlationId );
680+ logMissingOutstandingRequest ( correlationId );
674681 } else {
675682 QueryPublisherSequenceResponse response =
676683 new QueryPublisherSequenceResponse (responseCode , sequence );
@@ -695,7 +702,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
695702 OutstandingRequest <QueryOffsetResponse > outstandingRequest =
696703 remove (client .outstandingRequests , correlationId , QueryOffsetResponse .class );
697704 if (outstandingRequest == null ) {
698- LOGGER . warn ( "Could not find outstanding request with correlation ID {}" , correlationId );
705+ logMissingOutstandingRequest ( correlationId );
699706 } else {
700707 QueryOffsetResponse response = new QueryOffsetResponse (responseCode , offset );
701708 outstandingRequest .response ().set (response );
@@ -744,7 +751,13 @@ private static class PeerPropertiesFrameHandler extends BaseFrameHandler {
744751
745752 @ Override
746753 int doHandle (Client client , ChannelHandlerContext ctx , ByteBuf message ) {
754+ LOGGER .debug (
755+ "Handling peer properties response for connection {}" , client .clientConnectionName ());
747756 int correlationId = message .readInt ();
757+ LOGGER .debug (
758+ "Handling peer properties response for connection {}, correlation ID is {}" ,
759+ client .clientConnectionName (),
760+ correlationId );
748761 int read = 4 ;
749762
750763 short responseCode = message .readShort ();
@@ -773,7 +786,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
773786 OutstandingRequest <Map <String , String >> outstandingRequest =
774787 remove (client .outstandingRequests , correlationId , new ParameterizedTypeReference <>() {});
775788 if (outstandingRequest == null ) {
776- LOGGER . warn ( "Could not find outstanding request with correlation ID {}" , correlationId );
789+ logMissingOutstandingRequest ( correlationId );
777790 } else {
778791 outstandingRequest .response ().set (Collections .unmodifiableMap (serverProperties ));
779792 outstandingRequest .countDown ();
@@ -811,7 +824,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
811824 OutstandingRequest <OpenResponse > outstandingRequest =
812825 remove (client .outstandingRequests , correlationId , OpenResponse .class );
813826 if (outstandingRequest == null ) {
814- LOGGER . warn ( "Could not find outstanding request with correlation ID {}" , correlationId );
827+ logMissingOutstandingRequest ( correlationId );
815828 } else {
816829 outstandingRequest .response ().set (new OpenResponse (responseCode , connectionProperties ));
817830 outstandingRequest .countDown ();
@@ -900,7 +913,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
900913 OutstandingRequest <SaslAuthenticateResponse > outstandingRequest =
901914 remove (client .outstandingRequests , correlationId , SaslAuthenticateResponse .class );
902915 if (outstandingRequest == null ) {
903- LOGGER . warn ( "Could not find outstanding request with correlation ID {}" , correlationId );
916+ logMissingOutstandingRequest ( correlationId );
904917 } else {
905918 outstandingRequest .response ().set (response );
906919 outstandingRequest .countDown ();
@@ -943,7 +956,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
943956 correlationId ,
944957 new ParameterizedTypeReference <List <String >>() {});
945958 if (outstandingRequest == null ) {
946- LOGGER . warn ( "Could not find outstanding request with correlation ID {}" , correlationId );
959+ logMissingOutstandingRequest ( correlationId );
947960 } else {
948961 outstandingRequest .response ().set (mechanisms );
949962 outstandingRequest .countDown ();
@@ -1005,7 +1018,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
10051018 correlationId ,
10061019 new ParameterizedTypeReference <Map <String , StreamMetadata >>() {});
10071020 if (outstandingRequest == null ) {
1008- LOGGER . warn ( "Could not find outstanding request with correlation ID {}" , correlationId );
1021+ logMissingOutstandingRequest ( correlationId );
10091022 } else {
10101023 outstandingRequest .response ().set (results );
10111024 outstandingRequest .countDown ();
@@ -1026,7 +1039,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
10261039 OutstandingRequest <Response > outstandingRequest =
10271040 remove (client .outstandingRequests , correlationId , Response .class );
10281041 if (outstandingRequest == null ) {
1029- LOGGER . warn ( "Could not find outstanding request with correlation ID {}" , correlationId );
1042+ logMissingOutstandingRequest ( correlationId );
10301043 } else {
10311044 Response response = new Response (responseCode );
10321045 outstandingRequest .response ().set (response );
@@ -1063,12 +1076,9 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
10631076 }
10641077
10651078 OutstandingRequest <List <String >> outstandingRequest =
1066- remove (
1067- client .outstandingRequests ,
1068- correlationId ,
1069- new ParameterizedTypeReference <List <String >>() {});
1079+ remove (client .outstandingRequests , correlationId , new ParameterizedTypeReference <>() {});
10701080 if (outstandingRequest == null ) {
1071- LOGGER . warn ( "Could not find outstanding request with correlation ID {}" , correlationId );
1081+ logMissingOutstandingRequest ( correlationId );
10721082 } else {
10731083 outstandingRequest .response ().set (streams );
10741084 outstandingRequest .countDown ();
@@ -1110,7 +1120,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
11101120 correlationId ,
11111121 new ParameterizedTypeReference <List <String >>() {});
11121122 if (outstandingRequest == null ) {
1113- LOGGER . warn ( "Could not find outstanding request with correlation ID {}" , correlationId );
1123+ logMissingOutstandingRequest ( correlationId );
11141124 } else {
11151125 outstandingRequest .response ().set (streams );
11161126 outstandingRequest .countDown ();
@@ -1155,7 +1165,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
11551165 correlationId ,
11561166 new ParameterizedTypeReference <List <FrameHandlerInfo >>() {});
11571167 if (outstandingRequest == null ) {
1158- LOGGER . warn ( "Could not find outstanding request with correlation ID {}" , correlationId );
1168+ logMissingOutstandingRequest ( correlationId );
11591169 } else {
11601170 outstandingRequest .response ().set (commandVersions );
11611171 outstandingRequest .countDown ();
@@ -1189,7 +1199,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
11891199 OutstandingRequest <StreamStatsResponse > outstandingRequest =
11901200 remove (client .outstandingRequests , correlationId , StreamStatsResponse .class );
11911201 if (outstandingRequest == null ) {
1192- LOGGER . warn ( "Could not find outstanding request with correlation ID {}" , correlationId );
1202+ logMissingOutstandingRequest ( correlationId );
11931203 } else {
11941204 outstandingRequest .response ().set (new StreamStatsResponse (responseCode , info ));
11951205 outstandingRequest .countDown ();
0 commit comments