1515 */
1616package com .hivemq .bootstrap ;
1717
18- import com .google .common .annotations .VisibleForTesting ;
1918import com .google .common .util .concurrent .SettableFuture ;
2019import com .hivemq .configuration .service .entity .Listener ;
21- import org .jetbrains .annotations .NotNull ;
22- import org .jetbrains .annotations .Nullable ;
2320import com .hivemq .extension .sdk .api .client .parameter .ClientInformation ;
2421import com .hivemq .extension .sdk .api .client .parameter .ConnectionInformation ;
2522import com .hivemq .extension .sdk .api .packets .auth .ModifiableDefaultPermissions ;
3734import com .hivemq .security .auth .SslClientCertificate ;
3835import io .netty .channel .Channel ;
3936import io .netty .util .AttributeKey ;
37+ import org .jetbrains .annotations .NotNull ;
38+ import org .jetbrains .annotations .Nullable ;
4039
4140import java .net .InetAddress ;
4241import java .net .InetSocketAddress ;
4746import java .util .Optional ;
4847import java .util .concurrent .ScheduledFuture ;
4948import java .util .concurrent .atomic .AtomicInteger ;
49+ import java .util .concurrent .atomic .AtomicReference ;
5050
5151public class ClientConnection {
5252
5353 /**
5454 * The name of the {@link Channel} attribute which the client connection information is stored in.
5555 */
56- public static final AttributeKey <ClientConnection > CHANNEL_ATTRIBUTE_NAME =
56+ public static final @ NotNull AttributeKey <ClientConnection > CHANNEL_ATTRIBUTE_NAME =
5757 AttributeKey .valueOf ("Client.Connection" );
5858
5959 private final @ NotNull Channel channel ;
6060 private final @ NotNull PublishFlushHandler publishFlushHandler ;
61- private volatile @ NotNull ClientState clientState = ClientState .CONNECTING ;
61+ private final @ NotNull FreePacketIdRanges messageIDPool ;
62+ private final @ NotNull AtomicReference <ClientState > clientState ;
63+ private final @ NotNull HashMap <String , Object > additionalInformation ;
6264 private @ Nullable ProtocolVersion protocolVersion ;
6365 private @ Nullable String clientId ;
64- private boolean cleanStart ;
6566 private @ Nullable ModifiableDefaultPermissions authPermissions ;
6667 private @ Nullable Listener connectedListener ;
6768 private @ Nullable MqttWillPublish willPublish ;
@@ -72,22 +73,19 @@ public class ClientConnection {
7273 private @ Nullable Long clientSessionExpiryInterval ;
7374 private @ Nullable Long connectReceivedTimestamp ;
7475 private @ Nullable Long maxPacketSizeSend ;
75- private @ Nullable String [] topicAliasMapping ;
76+ private @ Nullable String @ Nullable [] topicAliasMapping ;
77+ private @ Nullable Boolean requestProblemInformation ;
78+ private @ Nullable SettableFuture <Void > disconnectFuture ;
79+ private @ Nullable ConnectionAttributes connectionAttributes ;
7680 private boolean noSharedSubscription ;
7781 private boolean clientIdAssigned ;
7882 private boolean incomingPublishesSkipRest ;
7983 private boolean incomingPublishesDefaultFailedSkipRest ;
8084 private boolean requestResponseInformation ;
81- private @ Nullable Boolean requestProblemInformation ;
82- private @ Nullable SettableFuture <Void > disconnectFuture ;
83- private final @ NotNull FreePacketIdRanges messageIDPool ;
84-
85- private @ Nullable ConnectionAttributes connectionAttributes ;
86-
87- private boolean sendWill = true ;
85+ private boolean cleanStart ;
86+ private boolean sendWill ;
8887 private boolean preventLwt ;
8988 private boolean inFlightMessagesSent ;
90-
9189 private @ Nullable SslClientCertificate authCertificate ;
9290 private @ Nullable String authSniHostname ;
9391 private @ Nullable String authCipherSuite ;
@@ -100,20 +98,24 @@ public class ClientConnection {
10098 private @ Nullable Mqtt5UserProperties authUserProperties ;
10199 private @ Nullable ScheduledFuture <?> authFuture ;
102100 private @ Nullable Boolean clearPasswordAfterAuth ;
103-
104101 private @ Nullable ClientContextImpl extensionClientContext ;
105102 private @ Nullable ClientEventListeners extensionClientEventListeners ;
106103 private @ Nullable ClientAuthenticators extensionClientAuthenticators ;
107104 private @ Nullable ClientAuthorizers extensionClientAuthorizers ;
108105 private @ Nullable ClientInformation extensionClientInformation ;
109106 private @ Nullable ConnectionInformation extensionConnectionInformation ;
110- private @ NotNull HashMap <String , Object > additionalInformation ;
111107
112108 public ClientConnection (final @ NotNull Channel channel , final @ NotNull PublishFlushHandler publishFlushHandler ) {
113109 this .channel = channel ;
114110 this .publishFlushHandler = publishFlushHandler ;
115- messageIDPool = new FreePacketIdRanges ();
111+ this . messageIDPool = new FreePacketIdRanges ();
116112 this .additionalInformation = new HashMap <>();
113+ this .clientState = new AtomicReference <>(ClientState .CONNECTING );
114+ this .sendWill = true ;
115+ }
116+
117+ public static @ NotNull ClientConnection fromChannel (final @ NotNull Channel channel ) {
118+ return channel .attr (ClientConnection .CHANNEL_ATTRIBUTE_NAME ).get ();
117119 }
118120
119121 public @ NotNull Channel getChannel () {
@@ -125,20 +127,11 @@ public ClientConnection(final @NotNull Channel channel, final @NotNull PublishFl
125127 }
126128
127129 public @ NotNull ClientState getClientState () {
128- return clientState ;
130+ return clientState . get () ;
129131 }
130132
131- public void proposeClientState (final @ NotNull ClientState clientState ) {
132- if (!this .clientState .disconnected ()) {
133- this .clientState = clientState ;
134- }
135- }
136-
137- // ONLY VISIBLE FOR TESTING !!!
138- // DO NOT USE IN PROD !!!
139- @ VisibleForTesting ()
140- public void setClientStateUnsafe (final @ NotNull ClientState clientState ) {
141- this .clientState = clientState ;
133+ public void proposeClientState (final @ NotNull ClientState proposed ) {
134+ clientState .updateAndGet (current -> current .disconnected () ? current : proposed );
142135 }
143136
144137 public @ Nullable ProtocolVersion getProtocolVersion () {
@@ -255,7 +248,7 @@ public int incrementInFlightCount() {
255248 return inFlightMessageCount .incrementAndGet ();
256249 }
257250
258- public int incrementInFlightCount (int count ) {
251+ public int incrementInFlightCount (final int count ) {
259252 if (inFlightMessageCount == null ) {
260253 inFlightMessageCount = new AtomicInteger (0 );
261254 }
@@ -292,11 +285,11 @@ public void setMaxPacketSizeSend(final @Nullable Long maxPacketSizeSend) {
292285 this .maxPacketSizeSend = maxPacketSizeSend ;
293286 }
294287
295- public @ Nullable String [] getTopicAliasMapping () {
288+ public @ Nullable String @ Nullable [] getTopicAliasMapping () {
296289 return topicAliasMapping ;
297290 }
298291
299- public void setTopicAliasMapping (final @ Nullable String [] topicAliasMapping ) {
292+ public void setTopicAliasMapping (final @ Nullable String @ Nullable [] topicAliasMapping ) {
300293 this .topicAliasMapping = topicAliasMapping ;
301294 }
302295
@@ -592,8 +585,8 @@ public void setClearPasswordAfterAuth(final @Nullable Boolean clearPasswordAfter
592585 return Optional .ofNullable (clearPasswordAfterAuth );
593586 }
594587
595- public void clearPassword (){
596- if (authPassword == null ) {
588+ public void clearPassword () {
589+ if (authPassword == null ) {
597590 return ;
598591 }
599592 Arrays .fill (authPassword , (byte ) 0 );
@@ -603,8 +596,4 @@ public void clearPassword(){
603596 public @ NotNull HashMap <String , Object > getAdditionalInformation () {
604597 return additionalInformation ;
605598 }
606-
607- public static @ NotNull ClientConnection fromChannel (Channel channel ) {
608- return channel .attr (ClientConnection .CHANNEL_ATTRIBUTE_NAME ).get ();
609- }
610599}
0 commit comments