Skip to content

Commit 66345e0

Browse files
committed
Fixed MqttSubscribedPublishFlowTree.suback and unsubscribe if used with same subscription id, same topic filter, but different prefixes
1 parent 18fc441 commit 66345e0

File tree

1 file changed

+29
-18
lines changed

1 file changed

+29
-18
lines changed

src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscribedPublishFlowTree.java

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void suback(
6666
final MqttTopicIterator topicIterator = MqttTopicIterator.of(topicFilter);
6767
TopicTreeNode node = rootNode;
6868
while (node != null) {
69-
node = node.suback(topicIterator, subscriptionIdentifier, error);
69+
node = node.suback(topicIterator, topicFilter, subscriptionIdentifier, error);
7070
}
7171
compact();
7272
}
@@ -76,7 +76,7 @@ public void unsubscribe(final @NotNull MqttTopicFilterImpl topicFilter) {
7676
final MqttTopicIterator topicIterator = MqttTopicIterator.of(topicFilter);
7777
TopicTreeNode node = rootNode;
7878
while (node != null) {
79-
node = node.unsubscribe(topicIterator);
79+
node = node.unsubscribe(topicIterator, topicFilter);
8080
}
8181
compact();
8282
}
@@ -210,17 +210,20 @@ private static class TopicTreeNode {
210210
}
211211

212212
@Nullable TopicTreeNode suback(
213-
final @NotNull MqttTopicIterator topicIterator, final int subscriptionIdentifier, final boolean error) {
213+
final @NotNull MqttTopicIterator topicIterator,
214+
final @NotNull MqttTopicFilterImpl topicFilter,
215+
final int subscriptionIdentifier,
216+
final boolean error) {
214217

215218
if (topicIterator.hasNext()) {
216219
return traverseNext(topicIterator);
217220
}
218221
if (topicIterator.hasMultiLevelWildcard()) {
219-
if (suback(multiLevelEntries, subscriptionIdentifier, error)) {
222+
if (suback(multiLevelEntries, topicFilter, subscriptionIdentifier, error)) {
220223
multiLevelEntries = null;
221224
}
222225
} else {
223-
if (suback(entries, subscriptionIdentifier, error)) {
226+
if (suback(entries, topicFilter, subscriptionIdentifier, error)) {
224227
entries = null;
225228
}
226229
}
@@ -230,49 +233,57 @@ private static class TopicTreeNode {
230233

231234
private static boolean suback(
232235
final @Nullable NodeList<TopicTreeEntry> entries,
236+
final @NotNull MqttTopicFilterImpl topicFilter,
233237
final int subscriptionIdentifier,
234238
final boolean error) {
235239

236240
if (entries != null) {
241+
final byte[] topicFilterPrefix = topicFilter.getPrefix();
237242
for (TopicTreeEntry entry = entries.getFirst(); entry != null; entry = entry.getNext()) {
238-
if (entry.subscriptionIdentifier == subscriptionIdentifier) {
243+
if ((entry.subscriptionIdentifier == subscriptionIdentifier) &&
244+
Arrays.equals(topicFilterPrefix, entry.topicFilterPrefix)) {
239245
if (!error) {
240246
entry.acknowledged = true;
241-
return false;
242-
}
243-
if (entry.flow != null) {
244-
assert entry.handle != null : "entry.flow != null -> entry.handle != null";
245-
entry.flow.getTopicFilters().remove(entry.handle);
247+
} else {
248+
if (entry.flow != null) {
249+
assert entry.handle != null : "entry.flow != null -> entry.handle != null";
250+
entry.flow.getTopicFilters().remove(entry.handle);
251+
}
252+
entries.remove(entry);
246253
}
247-
entries.remove(entry);
248-
return entries.isEmpty();
249254
}
250255
}
256+
return entries.isEmpty();
251257
}
252258
return false;
253259
}
254260

255-
@Nullable TopicTreeNode unsubscribe(final @NotNull MqttTopicIterator topicIterator) {
261+
@Nullable TopicTreeNode unsubscribe(
262+
final @NotNull MqttTopicIterator topicIterator, final @NotNull MqttTopicFilterImpl topicFilter) {
263+
256264
if (topicIterator.hasNext()) {
257265
return traverseNext(topicIterator);
258266
}
259267
if (topicIterator.hasMultiLevelWildcard()) {
260-
if (unsubscribe(multiLevelEntries)) {
268+
if (unsubscribe(multiLevelEntries, topicFilter)) {
261269
multiLevelEntries = null;
262270
}
263271
} else {
264-
if (unsubscribe(entries)) {
272+
if (unsubscribe(entries, topicFilter)) {
265273
entries = null;
266274
}
267275
}
268276
compact();
269277
return null;
270278
}
271279

272-
private static boolean unsubscribe(final @Nullable NodeList<TopicTreeEntry> entries) {
280+
private static boolean unsubscribe(
281+
final @Nullable NodeList<TopicTreeEntry> entries, final @NotNull MqttTopicFilterImpl topicFilter) {
282+
273283
if (entries != null) {
284+
final byte[] topicFilterPrefix = topicFilter.getPrefix();
274285
for (TopicTreeEntry entry = entries.getFirst(); entry != null; entry = entry.getNext()) {
275-
if (entry.acknowledged) {
286+
if (Arrays.equals(topicFilterPrefix, entry.topicFilterPrefix) && entry.acknowledged) {
276287
if (entry.flow != null) {
277288
assert entry.handle != null : "entry.flow != null -> entry.handle != null";
278289
entry.flow.getTopicFilters().remove(entry.handle);

0 commit comments

Comments
 (0)