Skip to content

Commit 7087f48

Browse files
committed
Fixed sending PUBACK/REC when a new connection has been established
1 parent c1b83c8 commit 7087f48

File tree

8 files changed

+132
-51
lines changed

8 files changed

+132
-51
lines changed

src/main/java/com/hivemq/client/internal/mqtt/codec/decoder/mqtt3/Mqtt3PublishDecoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.hivemq.client.internal.mqtt.codec.decoder.MqttMessageDecoder;
2222
import com.hivemq.client.internal.mqtt.datatypes.MqttTopicImpl;
2323
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
24-
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
24+
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulIncomingPublish;
2525
import com.hivemq.client.internal.mqtt.message.publish.mqtt3.Mqtt3PublishView;
2626
import com.hivemq.client.internal.util.ByteBufferUtil;
2727
import com.hivemq.client.mqtt.datatypes.MqttQos;
@@ -47,7 +47,7 @@ public class Mqtt3PublishDecoder implements MqttMessageDecoder {
4747
Mqtt3PublishDecoder() {}
4848

4949
@Override
50-
public @NotNull MqttStatefulPublish decode(
50+
public @NotNull MqttStatefulIncomingPublish decode(
5151
final int flags, final @NotNull ByteBuf in, final @NotNull MqttDecoderContext context)
5252
throws MqttDecoderException {
5353

src/main/java/com/hivemq/client/internal/mqtt/codec/decoder/mqtt5/Mqtt5PublishDecoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.hivemq.client.internal.mqtt.codec.decoder.MqttMessageDecoder;
2222
import com.hivemq.client.internal.mqtt.datatypes.*;
2323
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
24-
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
24+
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulIncomingPublish;
2525
import com.hivemq.client.internal.util.ByteBufferUtil;
2626
import com.hivemq.client.internal.util.Utf8Util;
2727
import com.hivemq.client.internal.util.collections.ImmutableIntList;
@@ -54,7 +54,7 @@ public class Mqtt5PublishDecoder implements MqttMessageDecoder {
5454
Mqtt5PublishDecoder() {}
5555

5656
@Override
57-
public @NotNull MqttStatefulPublish decode(
57+
public @NotNull MqttStatefulIncomingPublish decode(
5858
final int flags, final @NotNull ByteBuf in, final @NotNull MqttDecoderContext context)
5959
throws MqttDecoderException {
6060

src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishService.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import com.hivemq.client.internal.logging.InternalLoggerFactory;
2323
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
2424
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
25-
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
25+
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulIncomingPublish;
2626
import com.hivemq.client.internal.util.collections.ChunkedArrayQueue;
2727
import com.hivemq.client.internal.util.collections.HandleList.Handle;
2828
import com.hivemq.client.mqtt.datatypes.MqttQos;
@@ -61,7 +61,7 @@ class MqttIncomingPublishService {
6161
}
6262

6363
@CallByThread("Netty EventLoop")
64-
void onPublishQos0(final @NotNull MqttStatefulPublish publish, final int receiveMaximum) {
64+
void onPublishQos0(final @NotNull MqttStatefulIncomingPublish publish, final int receiveMaximum) {
6565
if (qos0Queue.size() >= (2 * receiveMaximum)) { // TODO receiveMaximum
6666
LOGGER.warn("QoS 0 publish message dropped.");
6767
if (QOS_0_DROP_OLDEST) {
@@ -86,7 +86,7 @@ void onPublishQos0(final @NotNull MqttStatefulPublish publish, final int receive
8686
}
8787

8888
@CallByThread("Netty EventLoop")
89-
boolean onPublishQos1Or2(final @NotNull MqttStatefulPublish publish, final int receiveMaximum) {
89+
boolean onPublishQos1Or2(final @NotNull MqttStatefulIncomingPublish publish, final int receiveMaximum) {
9090
if (qos1Or2Queue.size() >= (2 * receiveMaximum)) {
9191
return false; // flow control error
9292
}
@@ -102,7 +102,7 @@ boolean onPublishQos1Or2(final @NotNull MqttStatefulPublish publish, final int r
102102
}
103103

104104
@CallByThread("Netty EventLoop")
105-
private @NotNull MqttMatchingPublishFlows onPublish(final @NotNull MqttStatefulPublish publish) {
105+
private @NotNull MqttMatchingPublishFlows onPublish(final @NotNull MqttStatefulIncomingPublish publish) {
106106
final MqttMatchingPublishFlows flows = incomingPublishFlows.findMatching(publish);
107107
if (flows.isEmpty()) {
108108
LOGGER.warn("No publish flow registered for {}.", publish);
@@ -124,7 +124,7 @@ void drain() {
124124

125125
qos1Or2It.reset();
126126
while (qos1Or2It.hasNext()) {
127-
final MqttStatefulPublish publish = (MqttStatefulPublish) qos1Or2It.next();
127+
final MqttStatefulIncomingPublish publish = (MqttStatefulIncomingPublish) qos1Or2It.next();
128128
final MqttMatchingPublishFlows flows = (MqttMatchingPublishFlows) qos1Or2It.next();
129129
emit(publish, flows);
130130
if ((qos1Or2It.getIterated() == 2) && flows.isEmpty() && flows.areAcknowledged()) {
@@ -136,7 +136,7 @@ void drain() {
136136
}
137137
qos0It.reset();
138138
while (qos0It.hasNext()) {
139-
final MqttStatefulPublish publish = (MqttStatefulPublish) qos0It.next();
139+
final MqttStatefulIncomingPublish publish = (MqttStatefulIncomingPublish) qos0It.next();
140140
final MqttMatchingPublishFlows flows = (MqttMatchingPublishFlows) qos0It.next();
141141
emit(publish, flows);
142142
if ((qos0It.getIterated() == 2) && flows.isEmpty()) {
@@ -149,7 +149,7 @@ void drain() {
149149

150150
@CallByThread("Netty EventLoop")
151151
private void emit(
152-
final @NotNull MqttStatefulPublish statefulPublish, final @NotNull MqttMatchingPublishFlows flows) {
152+
final @NotNull MqttStatefulIncomingPublish statefulPublish, final @NotNull MqttMatchingPublishFlows flows) {
153153

154154
for (Handle<MqttIncomingPublishFlow> h = flows.getFirst(); h != null; h = h.getNext()) {
155155
final MqttIncomingPublishFlow flow = h.getElement();

src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingQosHandler.java

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import com.hivemq.client.internal.mqtt.handler.disconnect.MqttDisconnectUtil;
2525
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
2626
import com.hivemq.client.internal.mqtt.message.MqttMessage;
27-
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
27+
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulIncomingPublish;
2828
import com.hivemq.client.internal.mqtt.message.publish.puback.MqttPubAck;
2929
import com.hivemq.client.internal.mqtt.message.publish.puback.MqttPubAckBuilder;
3030
import com.hivemq.client.internal.mqtt.message.publish.pubcomp.MqttPubComp;
@@ -62,10 +62,11 @@ public class MqttIncomingQosHandler extends MqttSessionAwareHandler
6262

6363
// valid for session
6464
private final @NotNull IntIndex<MqttMessage.WithId> messages = new IntIndex<>(INDEX_SPEC);
65-
// contains StatefulPublish with AT_LEAST_ONCE/EXACTLY_ONCE, MqttPubAck or MqttPubRec
65+
// contains MqttStatefulIncomingPublish with AT_LEAST_ONCE/EXACTLY_ONCE, MqttPubAck or MqttPubRec
6666

6767
// valid for connection
6868
private int receiveMaximum;
69+
private long connectionIndex;
6970

7071
@Inject
7172
MqttIncomingQosHandler(
@@ -81,21 +82,24 @@ public void onSessionStartOrResume(
8182
final @NotNull MqttClientConnectionConfig connectionConfig, final @NotNull EventLoop eventLoop) {
8283

8384
receiveMaximum = connectionConfig.getReceiveMaximum();
85+
connectionIndex++;
8486
super.onSessionStartOrResume(connectionConfig, eventLoop);
8587
}
8688

8789
@Override
8890
public void channelRead(final @NotNull ChannelHandlerContext ctx, final @NotNull Object msg) {
89-
if (msg instanceof MqttStatefulPublish) {
90-
readPublish(ctx, (MqttStatefulPublish) msg);
91+
if (msg instanceof MqttStatefulIncomingPublish) {
92+
readPublish(ctx, (MqttStatefulIncomingPublish) msg);
9193
} else if (msg instanceof MqttPubRel) {
9294
readPubRel(ctx, (MqttPubRel) msg);
9395
} else {
9496
ctx.fireChannelRead(msg);
9597
}
9698
}
9799

98-
private void readPublish(final @NotNull ChannelHandlerContext ctx, final @NotNull MqttStatefulPublish publish) {
100+
private void readPublish(
101+
final @NotNull ChannelHandlerContext ctx, final @NotNull MqttStatefulIncomingPublish publish) {
102+
99103
switch (publish.stateless().getQos()) {
100104
case AT_MOST_ONCE:
101105
readPublishQos0(publish);
@@ -109,46 +113,62 @@ private void readPublish(final @NotNull ChannelHandlerContext ctx, final @NotNul
109113
}
110114
}
111115

112-
private void readPublishQos0(final @NotNull MqttStatefulPublish publish) {
116+
private void readPublishQos0(final @NotNull MqttStatefulIncomingPublish publish) {
113117
incomingPublishService.onPublishQos0(publish, receiveMaximum);
114118
}
115119

116-
private void readPublishQos1(final @NotNull ChannelHandlerContext ctx, final @NotNull MqttStatefulPublish publish) {
120+
private void readPublishQos1(
121+
final @NotNull ChannelHandlerContext ctx, final @NotNull MqttStatefulIncomingPublish publish) {
122+
117123
final MqttMessage.WithId prevMessage = messages.putIfAbsent(publish);
118-
if (prevMessage == null) { // new message
124+
if (prevMessage == null) {
125+
// new message
126+
publish.setConnectionIndex(connectionIndex);
119127
readNewPublishQos1Or2(ctx, publish);
120-
} else if ((prevMessage instanceof MqttStatefulPublish) &&
121-
(((MqttStatefulPublish) prevMessage).stateless().getQos() == MqttQos.AT_LEAST_ONCE)) { // resent message
128+
} else if ((prevMessage instanceof MqttStatefulIncomingPublish) &&
129+
(((MqttStatefulIncomingPublish) prevMessage).stateless().getQos() == MqttQos.AT_LEAST_ONCE)) {
130+
// resent message
131+
((MqttStatefulIncomingPublish) prevMessage).setConnectionIndex(connectionIndex);
122132
checkDupFlagSet(ctx, publish);
123-
} else if (prevMessage instanceof MqttPubAck) { // resent message and already acknowledged
133+
} else if (prevMessage instanceof MqttPubAck) {
134+
// resent message and already acknowledged
124135
if (checkDupFlagSet(ctx, publish)) {
125136
writePubAck(ctx, (MqttPubAck) prevMessage);
126137
}
127-
} else { // EXACTLY_ONCE or MqttPubRec
138+
} else {
139+
// EXACTLY_ONCE or MqttPubRec
128140
MqttDisconnectUtil.disconnect(ctx.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
129141
"QoS 1 PUBLISH must not be received with the same packet identifier as a QoS 2 PUBLISH");
130142
}
131143
}
132144

133-
private void readPublishQos2(final @NotNull ChannelHandlerContext ctx, final @NotNull MqttStatefulPublish publish) {
145+
private void readPublishQos2(
146+
final @NotNull ChannelHandlerContext ctx, final @NotNull MqttStatefulIncomingPublish publish) {
147+
134148
final MqttMessage.WithId prevMessage = messages.putIfAbsent(publish);
135-
if (prevMessage == null) { // new message
149+
if (prevMessage == null) {
150+
// new message
151+
publish.setConnectionIndex(connectionIndex);
136152
readNewPublishQos1Or2(ctx, publish);
137-
} else if ((prevMessage instanceof MqttStatefulPublish) &&
138-
(((MqttStatefulPublish) prevMessage).stateless().getQos() == MqttQos.EXACTLY_ONCE)) { // resent message
153+
} else if ((prevMessage instanceof MqttStatefulIncomingPublish) &&
154+
(((MqttStatefulIncomingPublish) prevMessage).stateless().getQos() == MqttQos.EXACTLY_ONCE)) {
155+
// resent message
156+
((MqttStatefulIncomingPublish) prevMessage).setConnectionIndex(connectionIndex);
139157
checkDupFlagSet(ctx, publish);
140-
} else if (prevMessage instanceof MqttPubRec) { // resent message and already acknowledged
158+
} else if (prevMessage instanceof MqttPubRec) {
159+
// resent message and already acknowledged
141160
if (checkDupFlagSet(ctx, publish)) {
142161
writePubRec(ctx, (MqttPubRec) prevMessage);
143162
}
144-
} else { // AT_LEAST_ONCE or MqttPubAck
163+
} else {
164+
// AT_LEAST_ONCE or MqttPubAck
145165
MqttDisconnectUtil.disconnect(ctx.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
146166
"QoS 2 PUBLISH must not be received with the same packet identifier as a QoS 1 PUBLISH");
147167
}
148168
}
149169

150170
private void readNewPublishQos1Or2(
151-
final @NotNull ChannelHandlerContext ctx, final @NotNull MqttStatefulPublish publish) {
171+
final @NotNull ChannelHandlerContext ctx, final @NotNull MqttStatefulIncomingPublish publish) {
152172

153173
if (!incomingPublishService.onPublishQos1Or2(publish, receiveMaximum)) {
154174
MqttDisconnectUtil.disconnect(ctx.channel(), Mqtt5DisconnectReasonCode.RECEIVE_MAXIMUM_EXCEEDED,
@@ -157,7 +177,7 @@ private void readNewPublishQos1Or2(
157177
}
158178

159179
private boolean checkDupFlagSet(
160-
final @NotNull ChannelHandlerContext ctx, final @NotNull MqttStatefulPublish publish) {
180+
final @NotNull ChannelHandlerContext ctx, final @NotNull MqttStatefulIncomingPublish publish) {
161181

162182
if (!publish.isDup()) {
163183
MqttDisconnectUtil.disconnect(ctx.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,
@@ -168,25 +188,39 @@ private boolean checkDupFlagSet(
168188
}
169189

170190
@CallByThread("Netty EventLoop")
171-
void ack(final @NotNull MqttStatefulPublish publish) {
191+
void ack(final @NotNull MqttStatefulIncomingPublish publish) {
172192
switch (publish.stateless().getQos()) {
173193
case AT_LEAST_ONCE:
174194
final MqttPubAck pubAck = buildPubAck(new MqttPubAckBuilder(publish));
175-
messages.put(pubAck);
176-
if (ctx != null) {
195+
if (ack(publish, pubAck) && (ctx != null)) {
177196
writePubAck(ctx, pubAck);
178197
}
179198
break;
180199
case EXACTLY_ONCE:
181200
final MqttPubRec pubRec = buildPubRec(new MqttPubRecBuilder(publish));
182-
messages.put(pubRec);
183-
if (ctx != null) {
201+
if (ack(publish, pubRec) && (ctx != null)) {
184202
writePubRec(ctx, pubRec);
185203
}
186204
break;
187205
}
188206
}
189207

208+
private boolean ack(
209+
final @NotNull MqttStatefulIncomingPublish publish, final @NotNull MqttMessage.WithId pubAckOrRec) {
210+
211+
final MqttMessage.WithId prevMessage = messages.put(pubAckOrRec);
212+
if (prevMessage != publish) {
213+
// message has been overwritten by a new message because session state on server differs
214+
if (prevMessage == null) {
215+
messages.remove(pubAckOrRec.getPacketIdentifier());
216+
} else {
217+
messages.put(prevMessage);
218+
}
219+
return false;
220+
}
221+
return publish.getConnectionIndex() == connectionIndex;
222+
}
223+
190224
private void writePubAck(final @NotNull ChannelHandlerContext ctx, final @NotNull MqttPubAck pubAck) {
191225
ctx.writeAndFlush(pubAck, new DefaultContextPromise<>(ctx.channel(), pubAck)).addListener(this);
192226
}
@@ -216,8 +250,8 @@ private void readPubRel(final @NotNull ChannelHandlerContext ctx, final @NotNull
216250
writePubComp(
217251
ctx, buildPubComp(new MqttPubCompBuilder(pubRel).reasonCode(
218252
Mqtt5PubCompReasonCode.PACKET_IDENTIFIER_NOT_FOUND)));
219-
} else if ((prevMessage instanceof MqttStatefulPublish) &&
220-
(((MqttStatefulPublish) prevMessage).stateless().getQos() ==
253+
} else if ((prevMessage instanceof MqttStatefulIncomingPublish) &&
254+
(((MqttStatefulIncomingPublish) prevMessage).stateless().getQos() ==
221255
MqttQos.EXACTLY_ONCE)) { // PubRec not sent yet
222256
messages.put(prevMessage); // revert
223257
MqttDisconnectUtil.disconnect(ctx.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR,

src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublish.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,21 +185,21 @@ public void acknowledge() {
185185
return new MqttPublishBuilder.Default(this);
186186
}
187187

188-
public @NotNull MqttStatefulPublish createStateful(
188+
public @NotNull MqttStatefulIncomingPublish createStateful(
189189
final int packetIdentifier,
190190
final boolean dup,
191191
final int topicAlias,
192192
final @NotNull ImmutableIntList subscriptionIdentifiers) {
193193

194-
return new MqttStatefulPublish(this, packetIdentifier, dup, topicAlias, subscriptionIdentifiers);
194+
return new MqttStatefulIncomingPublish(this, packetIdentifier, dup, topicAlias, subscriptionIdentifiers);
195195
}
196196

197197
public @NotNull MqttStatefulPublish createStateful(
198198
final int packetIdentifier, final boolean dup, final @Nullable MqttTopicAliasMapping topicAliasMapping) {
199199

200200
final int topicAlias =
201201
(topicAliasMapping == null) ? DEFAULT_NO_TOPIC_ALIAS : topicAliasMapping.onPublish(topic);
202-
return createStateful(packetIdentifier, dup, topicAlias, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS);
202+
return new MqttStatefulPublish(this, packetIdentifier, dup, topicAlias, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS);
203203
}
204204

205205
public @NotNull MqttPublish withConfirmable(final @NotNull Confirmable confirmable) {
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2018-present HiveMQ and the HiveMQ Community
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hivemq.client.internal.mqtt.message.publish;
18+
19+
import com.hivemq.client.internal.util.collections.ImmutableIntList;
20+
import org.jetbrains.annotations.NotNull;
21+
22+
/**
23+
* @author Silvio Giebl
24+
*/
25+
public class MqttStatefulIncomingPublish extends MqttStatefulPublish {
26+
27+
private long id;
28+
private long connectionIndex;
29+
30+
MqttStatefulIncomingPublish(
31+
final @NotNull MqttPublish publish,
32+
final int packetIdentifier,
33+
final boolean dup,
34+
final int topicAlias,
35+
final @NotNull ImmutableIntList subscriptionIdentifiers) {
36+
37+
super(publish, packetIdentifier, dup, topicAlias, subscriptionIdentifiers);
38+
}
39+
40+
public long getId() {
41+
return id;
42+
}
43+
44+
public void setId(final long id) {
45+
this.id = id;
46+
}
47+
48+
public long getConnectionIndex() {
49+
return connectionIndex;
50+
}
51+
52+
public void setConnectionIndex(final long connectionIndex) {
53+
this.connectionIndex = connectionIndex;
54+
}
55+
}

src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttStatefulPublish.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ public class MqttStatefulPublish extends MqttStatefulMessage.WithId<MqttPublish>
3737
private final boolean dup;
3838
private final int topicAlias;
3939
private final @NotNull ImmutableIntList subscriptionIdentifiers;
40-
private long id;
4140

4241
MqttStatefulPublish(
4342
final @NotNull MqttPublish publish,
@@ -68,14 +67,6 @@ public boolean isNewTopicAlias() {
6867
return subscriptionIdentifiers;
6968
}
7069

71-
public long getId() {
72-
return id;
73-
}
74-
75-
public void setId(final long id) {
76-
this.id = id;
77-
}
78-
7970
@Override
8071
protected @NotNull String toAttributeString() {
8172
return super.toAttributeString() + ", dup=" + dup + ", topicAlias=" + topicAlias +

0 commit comments

Comments
 (0)