104104import java .util .concurrent .atomic .AtomicLong ;
105105import java .util .concurrent .atomic .AtomicReference ;
106106import java .util .function .Consumer ;
107+ import java .util .function .Supplier ;
107108import java .util .function .ToLongFunction ;
108109import javax .net .ssl .SSLEngine ;
109110import javax .net .ssl .SSLHandshakeException ;
@@ -178,7 +179,7 @@ public long applyAsLong(Object value) {
178179 private final CredentialsProvider credentialsProvider ;
179180 private final Runnable nettyClosing ;
180181 private final int maxFrameSize ;
181- private final boolean frameSizeCopped ;
182+ private final boolean frameSizeCapped ;
182183 private final EventLoopGroup eventLoopGroup ;
183184 private final Map <String , String > clientProperties ;
184185 private final String NETTY_HANDLER_FLUSH_CONSOLIDATION =
@@ -373,12 +374,18 @@ public void initChannel(SocketChannel ch) {
373374 new TuneState (
374375 parameters .requestedMaxFrameSize , (int ) parameters .requestedHeartbeat .getSeconds ());
375376 this .clientProperties = clientProperties (parameters .clientProperties );
377+ debug (() -> "exchanging peer properties" );
376378 this .serverProperties = peerProperties ();
379+ debug (() -> "peer properties exchanged" );
380+ debug (() -> "starting SASL handshake" );
377381 this .saslMechanisms = getSaslMechanisms ();
382+ debug (() -> "SASL mechanisms supported by server ({})" , this .saslMechanisms );
383+ debug (() -> "starting authentication" );
378384 authenticate (this .credentialsProvider );
385+ debug (() -> "authenticated" );
379386 this .tuneState .await (Duration .ofSeconds (10 ));
380387 this .maxFrameSize = this .tuneState .getMaxFrameSize ();
381- this .frameSizeCopped = this .maxFrameSize () > 0 ;
388+ this .frameSizeCapped = this .maxFrameSize () > 0 ;
382389 LOGGER .debug (
383390 "Connection tuned with max frame size {} and heartbeat {}" ,
384391 this .maxFrameSize (),
@@ -418,6 +425,8 @@ public void initChannel(SocketChannel ch) {
418425 started .set (true );
419426 this .metricsCollector .openConnection ();
420427 } catch (RuntimeException e ) {
428+ LOGGER .debug (
429+ "Error while opening connection {}: {}" , this .clientConnectionName , e .getMessage ());
421430 this .closingSequence (null );
422431 throw e ;
423432 }
@@ -462,10 +471,14 @@ int maxFrameSize() {
462471 return this .maxFrameSize ;
463472 }
464473
474+ private int nextCorrelationId () {
475+ return this .correlationSequence .getAndIncrement ();
476+ }
477+
465478 private Map <String , String > peerProperties () {
466479 int clientPropertiesSize = mapSize (this .clientProperties );
467480 int length = 2 + 2 + 4 + clientPropertiesSize ;
468- int correlationId = correlationSequence . incrementAndGet ();
481+ int correlationId = nextCorrelationId ();
469482 try {
470483 ByteBuf bb = allocateNoCheck (length + 4 );
471484 bb .writeInt (length );
@@ -474,6 +487,7 @@ private Map<String, String> peerProperties() {
474487 bb .writeInt (correlationId );
475488 writeMap (bb , this .clientProperties );
476489 OutstandingRequest <Map <String , String >> request = outstandingRequest ();
490+ LOGGER .debug ("Peer properties request has correlation ID {}" , correlationId );
477491 outstandingRequests .put (correlationId , request );
478492 channel .writeAndFlush (bb );
479493 request .block ();
@@ -539,7 +553,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate(
539553 + saslMechanism .getName ().length ()
540554 + 4
541555 + (challengeResponse == null ? 0 : challengeResponse .length );
542- int correlationId = correlationSequence . incrementAndGet ();
556+ int correlationId = nextCorrelationId ();
543557 try {
544558 ByteBuf bb = allocateNoCheck (length + 4 );
545559 bb .writeInt (length );
@@ -569,7 +583,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate(
569583
570584 private Map <String , String > open (String virtualHost ) {
571585 int length = 2 + 2 + 4 + 2 + virtualHost .length ();
572- int correlationId = correlationSequence . incrementAndGet ();
586+ int correlationId = nextCorrelationId ();
573587 try {
574588 ByteBuf bb = allocate (length + 4 );
575589 bb .writeInt (length );
@@ -610,7 +624,7 @@ void send(byte[] content) {
610624
611625 private void sendClose (short code , String reason ) {
612626 int length = 2 + 2 + 4 + 2 + 2 + reason .length ();
613- int correlationId = correlationSequence . incrementAndGet ();
627+ int correlationId = nextCorrelationId ();
614628 try {
615629 ByteBuf bb = allocate (length + 4 );
616630 bb .writeInt (length );
@@ -643,7 +657,7 @@ private void sendClose(short code, String reason) {
643657
644658 private List <String > getSaslMechanisms () {
645659 int length = 2 + 2 + 4 ;
646- int correlationId = correlationSequence . incrementAndGet ();
660+ int correlationId = nextCorrelationId ();
647661 try {
648662 ByteBuf bb = allocateNoCheck (length + 4 );
649663 bb .writeInt (length );
@@ -670,7 +684,7 @@ public Response create(String stream) {
670684
671685 public Response create (String stream , Map <String , String > arguments ) {
672686 int length = 2 + 2 + 4 + 2 + stream .length () + mapSize (arguments );
673- int correlationId = correlationSequence . incrementAndGet ();
687+ int correlationId = nextCorrelationId ();
674688 try {
675689 ByteBuf bb = allocate (length + 4 );
676690 bb .writeInt (length );
@@ -719,7 +733,7 @@ Response createSuperStream(
719733 + collectionSize (partitions )
720734 + collectionSize (bindingKeys )
721735 + mapSize (arguments );
722- int correlationId = correlationSequence . incrementAndGet ();
736+ int correlationId = nextCorrelationId ();
723737 try {
724738 ByteBuf bb = allocate (length + 4 );
725739 bb .writeInt (length );
@@ -748,7 +762,7 @@ Response createSuperStream(
748762 Response deleteSuperStream (String superStream ) {
749763 this .superStreamManagementCommandVersionsCheck .run ();
750764 int length = 2 + 2 + 4 + 2 + superStream .length ();
751- int correlationId = correlationSequence . incrementAndGet ();
765+ int correlationId = nextCorrelationId ();
752766 try {
753767 ByteBuf bb = allocate (length + 4 );
754768 bb .writeInt (length );
@@ -808,7 +822,7 @@ private static ByteBuf writeMap(ByteBuf bb, Map<String, String> elements) {
808822 }
809823
810824 ByteBuf allocate (ByteBufAllocator allocator , int capacity ) {
811- if (frameSizeCopped && capacity > this .maxFrameSize ()) {
825+ if (frameSizeCapped && capacity > this .maxFrameSize ()) {
812826 throw new IllegalArgumentException (
813827 "Cannot allocate "
814828 + capacity
@@ -832,7 +846,7 @@ private ByteBuf allocateNoCheck(int capacity) {
832846
833847 public Response delete (String stream ) {
834848 int length = 2 + 2 + 4 + 2 + stream .length ();
835- int correlationId = correlationSequence . incrementAndGet ();
849+ int correlationId = nextCorrelationId ();
836850 try {
837851 ByteBuf bb = allocate (length + 4 );
838852 bb .writeInt (length );
@@ -864,7 +878,7 @@ public Map<String, StreamMetadata> metadata(String... streams) {
864878 throw new IllegalArgumentException ("At least one stream must be specified" );
865879 }
866880 int length = 2 + 2 + 4 + arraySize (streams ); // API code, version, correlation ID, array size
867- int correlationId = correlationSequence . incrementAndGet ();
881+ int correlationId = nextCorrelationId ();
868882 try {
869883 ByteBuf bb = allocate (length + 4 );
870884 bb .writeInt (length );
@@ -897,7 +911,7 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
897911 "If specified, publisher reference must less than 256 characters" );
898912 }
899913 int length = 2 + 2 + 4 + 1 + 2 + publisherReferenceSize + 2 + stream .length ();
900- int correlationId = correlationSequence . getAndIncrement ();
914+ int correlationId = nextCorrelationId ();
901915 try {
902916 ByteBuf bb = allocate (length + 4 );
903917 bb .writeInt (length );
@@ -928,7 +942,7 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
928942
929943 public Response deletePublisher (byte publisherId ) {
930944 int length = 2 + 2 + 4 + 1 ;
931- int correlationId = correlationSequence . getAndIncrement ();
945+ int correlationId = nextCorrelationId ();
932946 try {
933947 ByteBuf bb = allocate (length + 4 );
934948 bb .writeInt (length );
@@ -1252,7 +1266,7 @@ public Response subscribe(
12521266 propertiesSize = mapSize (properties );
12531267 }
12541268 length += propertiesSize ;
1255- int correlationId = correlationSequence . getAndIncrement ();
1269+ int correlationId = nextCorrelationId ();
12561270 try {
12571271 ByteBuf bb = allocate (length + 4 );
12581272 bb .writeInt (length );
@@ -1320,7 +1334,7 @@ public QueryOffsetResponse queryOffset(String reference, String stream) {
13201334 }
13211335
13221336 int length = 2 + 2 + 4 + 2 + reference .length () + 2 + stream .length ();
1323- int correlationId = correlationSequence . getAndIncrement ();
1337+ int correlationId = nextCorrelationId ();
13241338 try {
13251339 ByteBuf bb = allocate (length + 4 );
13261340 bb .writeInt (length );
@@ -1361,7 +1375,7 @@ public long queryPublisherSequence(String publisherReference, String stream) {
13611375 }
13621376
13631377 int length = 2 + 2 + 4 + 2 + publisherReference .length () + 2 + stream .length ();
1364- int correlationId = correlationSequence . getAndIncrement ();
1378+ int correlationId = nextCorrelationId ();
13651379 try {
13661380 ByteBuf bb = allocate (length + 4 );
13671381 bb .writeInt (length );
@@ -1398,7 +1412,7 @@ public long queryPublisherSequence(String publisherReference, String stream) {
13981412
13991413 public Response unsubscribe (byte subscriptionId ) {
14001414 int length = 2 + 2 + 4 + 1 ;
1401- int correlationId = correlationSequence . getAndIncrement ();
1415+ int correlationId = nextCorrelationId ();
14021416 try {
14031417 ByteBuf bb = allocate (length + 4 );
14041418 bb .writeInt (length );
@@ -1445,6 +1459,8 @@ private void closeNetty() {
14451459 if (this .channel != null && this .channel .isOpen ()) {
14461460 LOGGER .debug ("Closing Netty channel" );
14471461 this .channel .close ().get (10 , TimeUnit .SECONDS );
1462+ } else {
1463+ LOGGER .debug ("No Netty channel to close" );
14481464 }
14491465 } catch (InterruptedException e ) {
14501466 LOGGER .info ("Channel closing has been interrupted" );
@@ -1530,7 +1546,7 @@ public List<String> route(String routingKey, String superStream) {
15301546 + routingKey .length ()
15311547 + 2
15321548 + superStream .length (); // API code, version, correlation ID, 2 strings
1533- int correlationId = correlationSequence . incrementAndGet ();
1549+ int correlationId = nextCorrelationId ();
15341550 try {
15351551 ByteBuf bb = allocate (length + 4 );
15361552 bb .writeInt (length );
@@ -1565,7 +1581,7 @@ public List<String> partitions(String superStream) {
15651581 }
15661582 int length =
15671583 2 + 2 + 4 + 2 + superStream .length (); // API code, version, correlation ID, 1 string
1568- int correlationId = correlationSequence . incrementAndGet ();
1584+ int correlationId = nextCorrelationId ();
15691585 try {
15701586 ByteBuf bb = allocate (length + 4 );
15711587 bb .writeInt (length );
@@ -1593,7 +1609,7 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
15931609 List <FrameHandlerInfo > commandVersions = ServerFrameHandler .commandVersions ();
15941610 int length = 2 + 2 + 4 + 4 ; // API code, version, correlation ID, array size
15951611 length += commandVersions .size () * (2 + 2 + 2 );
1596- int correlationId = correlationSequence . incrementAndGet ();
1612+ int correlationId = nextCorrelationId ();
15971613 try {
15981614 ByteBuf bb = allocate (length + 4 );
15991615 bb .writeInt (length );
@@ -1626,7 +1642,7 @@ StreamStatsResponse streamStats(String stream) {
16261642 throw new IllegalArgumentException ("stream must not be null" );
16271643 }
16281644 int length = 2 + 2 + 4 + 2 + stream .length (); // API code, version, correlation ID, 1 string
1629- int correlationId = correlationSequence . incrementAndGet ();
1645+ int correlationId = nextCorrelationId ();
16301646 try {
16311647 ByteBuf bb = allocate (length + 4 );
16321648 bb .writeInt (length );
@@ -2583,6 +2599,10 @@ public ClientParameters bootstrapCustomizer(Consumer<Bootstrap> bootstrapCustomi
25832599 return this ;
25842600 }
25852601
2602+ Duration rpcTimeout () {
2603+ return this .rpcTimeout ;
2604+ }
2605+
25862606 ClientParameters duplicate () {
25872607 ClientParameters duplicate = new ClientParameters ();
25882608 for (Field field : ClientParameters .class .getDeclaredFields ()) {
@@ -2926,4 +2946,10 @@ private <T> OutstandingRequest<T> outstandingRequest() {
29262946 public String toString () {
29272947 return "Client{connectionName='" + connectionName () + "'}" ;
29282948 }
2949+
2950+ private void debug (Supplier <String > format , Object ... args ) {
2951+ if (LOGGER .isDebugEnabled ()) {
2952+ LOGGER .debug ("Connection '" + this .clientConnectionName + "': " + format .get (), args );
2953+ }
2954+ }
29292955}
0 commit comments