Skip to content

Commit e78bcfa

Browse files
authored
Fix broker enable dedup cause client publish failed (#1530)
1 parent c2431a6 commit e78bcfa

File tree

4 files changed

+55
-4
lines changed

4 files changed

+55
-4
lines changed

mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/consumer/MessagePublishContext.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public final class MessagePublishContext implements PublishContext {
3636
private Topic topic;
3737
private long startTimeNs;
3838
private CompletableFuture<Position> positionFuture;
39+
private long sequenceId;
3940

4041
/**
4142
* Executed from managed ledger thread when the message is persisted.
@@ -58,11 +59,12 @@ public void completed(Exception exception, long ledgerId, long entryId) {
5859

5960
// recycler
6061
public static MessagePublishContext get(CompletableFuture<Position> positionFuture, String producerName,
61-
Topic topic, long startTimeNs) {
62+
Topic topic, long sequenceId, long startTimeNs) {
6263
MessagePublishContext callback = RECYCLER.get();
6364
callback.positionFuture = positionFuture;
6465
callback.producerName = producerName;
6566
callback.topic = topic;
67+
callback.sequenceId = sequenceId;
6668
callback.startTimeNs = startTimeNs;
6769
return callback;
6870
}
@@ -77,6 +79,12 @@ public String getProducerName() {
7779
return producerName;
7880
}
7981

82+
@Override
83+
public long getSequenceId() {
84+
return this.sequenceId;
85+
}
86+
87+
8088
private static final Recycler<MessagePublishContext> RECYCLER = new Recycler<MessagePublishContext>() {
8189
protected MessagePublishContext newObject(Handle<MessagePublishContext> handle) {
8290
return new MessagePublishContext(handle);
@@ -87,19 +95,20 @@ public void recycle() {
8795
positionFuture = null;
8896
topic = null;
8997
startTimeNs = -1;
98+
sequenceId = -1;
9099
recyclerHandle.recycle(this);
91100
}
92101

93102
/**
94103
* publish mqtt message to pulsar topic, no batch.
95104
*/
96105
public static CompletableFuture<Position> publishMessages(String producerName, Message<byte[]> message,
97-
Topic topic) {
106+
long sequenceId, Topic topic) {
98107
CompletableFuture<Position> future = new CompletableFuture<>();
99108

100109
ByteBuf headerAndPayload = messageToByteBuf(message);
101110
topic.publishMessage(headerAndPayload,
102-
MessagePublishContext.get(future, producerName, topic, System.nanoTime()));
111+
MessagePublishContext.get(future, producerName, topic, sequenceId, System.nanoTime()));
103112
headerAndPayload.release();
104113
return future;
105114
}

mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@
2929
import io.streamnative.pulsar.handlers.mqtt.common.utils.PulsarTopicUtils;
3030
import java.util.Optional;
3131
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.ConcurrentHashMap;
3233
import org.apache.bookkeeper.mledger.Position;
3334
import org.apache.commons.lang3.StringUtils;
3435
import org.apache.pulsar.broker.PulsarService;
3536
import org.apache.pulsar.broker.service.BrokerServiceException;
3637
import org.apache.pulsar.broker.service.Topic;
38+
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
3739
import org.apache.pulsar.client.impl.MessageImpl;
3840
import org.apache.pulsar.common.util.FutureUtil;
3941

@@ -45,6 +47,8 @@ public abstract class AbstractQosPublishHandler implements QosPublishHandler {
4547
protected final PulsarService pulsarService;
4648
protected final RetainedMessageHandler retainedMessageHandler;
4749
protected final MQTTServerConfiguration configuration;
50+
private final ConcurrentHashMap<String, Long> sequenceIdMap = new ConcurrentHashMap<>();
51+
4852

4953
protected AbstractQosPublishHandler(MQTTService mqttService) {
5054
this.pulsarService = mqttService.getPulsarService();
@@ -104,9 +108,23 @@ protected CompletableFuture<Position> writeToPulsarTopic(Connection connection,
104108
mqttTopicName = msg.variableHeader().topicName();
105109
}
106110
return getTopicReference(mqttTopicName).thenCompose(topicOp -> topicOp.map(topic -> {
111+
long lastPublishedSequenceId = -1;
112+
if (topic instanceof PersistentTopic) {
113+
final long lastPublishedId = ((PersistentTopic) topic).getLastPublishedSequenceId(producerName);
114+
lastPublishedSequenceId = sequenceIdMap.compute(producerName, (k, v) -> {
115+
long id;
116+
if (v == null) {
117+
id = lastPublishedId + 1;
118+
} else {
119+
id = Math.max(v, lastPublishedId) + 1;
120+
}
121+
return id;
122+
});
123+
}
107124
MessageImpl<byte[]> message = toPulsarMsg(configuration, topic, msg.variableHeader().properties(),
108125
msg.payload().nioBuffer());
109-
CompletableFuture<Position> ret = MessagePublishContext.publishMessages(producerName, message, topic);
126+
CompletableFuture<Position> ret = MessagePublishContext.publishMessages(producerName, message,
127+
lastPublishedSequenceId, topic);
110128
message.recycle();
111129
return ret.thenApply(position -> {
112130
if (checkSubscription && topic.getSubscriptions().isEmpty()) {

tests/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@
3434
<version>${project.version}</version>
3535
<scope>test</scope>
3636
</dependency>
37+
<dependency>
38+
<groupId>io.streamnative.pulsar.handlers</groupId>
39+
<artifactId>pulsar-protocol-handler-mqtt-common</artifactId>
40+
<version>${project.version}</version>
41+
<scope>test</scope>
42+
</dependency>
3743
<dependency>
3844
<groupId>io.streamnative</groupId>
3945
<artifactId>testmocks</artifactId>

tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleBrokerEnableDedupTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,22 @@ public void testSendAndConsume() throws Exception {
5353
received.ack();
5454
connection.disconnect();
5555
}
56+
57+
@Test
58+
public void testDedup() throws Exception {
59+
MQTT mqtt = createMQTTClient();
60+
String topicName = "testDedup";
61+
BlockingConnection connection = mqtt.blockingConnection();
62+
connection.connect();
63+
Topic[] topics = { new Topic(topicName, QoS.AT_MOST_ONCE) };
64+
connection.subscribe(topics);
65+
String message = "Hello MQTT";
66+
for (int i = 1; i <= 10; i++) {
67+
connection.publish(topicName, (message + i).getBytes(), QoS.AT_MOST_ONCE, false);
68+
Message received = connection.receive();
69+
Assert.assertEquals(received.getTopic(), topicName);
70+
Assert.assertEquals(new String(received.getPayload()), message + i);
71+
}
72+
connection.disconnect();
73+
}
5674
}

0 commit comments

Comments
 (0)