Skip to content

Commit 18fc441

Browse files
committed
Improved MqttIncomingQosHandler pubrec error code handling
1 parent ac0fe41 commit 18fc441

File tree

1 file changed

+4
-18
lines changed

1 file changed

+4
-18
lines changed

src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingQosHandler.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
import com.hivemq.client.internal.mqtt.message.publish.pubrec.MqttPubRec;
3434
import com.hivemq.client.internal.mqtt.message.publish.pubrec.MqttPubRecBuilder;
3535
import com.hivemq.client.internal.mqtt.message.publish.pubrel.MqttPubRel;
36-
import com.hivemq.client.internal.netty.ContextFuture;
37-
import com.hivemq.client.internal.netty.DefaultContextPromise;
3836
import com.hivemq.client.internal.util.collections.IntIndex;
3937
import com.hivemq.client.mqtt.MqttVersion;
4038
import com.hivemq.client.mqtt.datatypes.MqttQos;
@@ -53,7 +51,7 @@
5351
* @author Silvio Giebl
5452
*/
5553
@ClientScope
56-
public class MqttIncomingQosHandler extends MqttSessionAwareHandler implements ContextFuture.Listener<MqttPubRec> {
54+
public class MqttIncomingQosHandler extends MqttSessionAwareHandler {
5755

5856
public static final @NotNull String NAME = "qos.incoming";
5957
private static final @NotNull InternalLogger LOGGER = InternalLoggerFactory.getLogger(MqttIncomingQosHandler.class);
@@ -246,7 +244,8 @@ void ack(final @NotNull MqttStatefulPublishWithFlows publishWithFlows) {
246244
}
247245
case EXACTLY_ONCE: {
248246
final MqttPubRec pubRec = buildPubRec(new MqttPubRecBuilder(publishWithFlows.publish));
249-
final Object prevMessage = messages.put(pubRec);
247+
final Object prevMessage = !pubRec.getReasonCode().isError() ? messages.put(pubRec) :
248+
messages.remove(pubRec.getPacketIdentifier());
250249
if (ack(prevMessage, publishWithFlows) && (ctx != null)) {
251250
writePubRec(ctx, pubRec);
252251
}
@@ -276,20 +275,7 @@ private void writePubAck(final @NotNull ChannelHandlerContext ctx, final @NotNul
276275
}
277276

278277
private void writePubRec(final @NotNull ChannelHandlerContext ctx, final @NotNull MqttPubRec pubRec) {
279-
if (pubRec.getReasonCode().isError()) {
280-
ctx.writeAndFlush(pubRec, new DefaultContextPromise<>(ctx.channel(), pubRec)).addListener(this);
281-
} else {
282-
ctx.writeAndFlush(pubRec, ctx.voidPromise());
283-
}
284-
}
285-
286-
@Override
287-
public void operationComplete(final @NotNull ContextFuture<? extends MqttPubRec> future) {
288-
if (future.isSuccess()) {
289-
messages.remove(future.getContext().getPacketIdentifier());
290-
} else {
291-
future.channel().pipeline().fireExceptionCaught(future.cause());
292-
}
278+
ctx.writeAndFlush(pubRec, ctx.voidPromise());
293279
}
294280

295281
private void readPubRel(final @NotNull ChannelHandlerContext ctx, final @NotNull MqttPubRel pubRel) {

0 commit comments

Comments
 (0)