Skip to content

Commit 2eb2d71

Browse files
committed
Fixed MqttIncomingPublishFlowsWithId.findMatching
1 parent a3054bb commit 2eb2d71

File tree

2 files changed

+12
-19
lines changed

2 files changed

+12
-19
lines changed

src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlows.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import com.hivemq.client.internal.annotations.NotThreadSafe;
2121
import com.hivemq.client.internal.mqtt.datatypes.MqttTopicFilterImpl;
22-
import com.hivemq.client.internal.mqtt.datatypes.MqttTopicImpl;
2322
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
2423
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
2524
import com.hivemq.client.internal.mqtt.message.subscribe.MqttStatefulSubscribe;
@@ -75,8 +74,9 @@ public void subAck(
7574

7675
final ImmutableList<MqttSubscription> subscriptions = subscribe.stateless().getSubscriptions();
7776
final ImmutableList<Mqtt5SubAckReasonCode> reasonCodes = subAck.getReasonCodes();
77+
final boolean countNotMatching = subscriptions.size() > reasonCodes.size();
7878
for (int i = 0; i < subscriptions.size(); i++) {
79-
if (reasonCodes.get(i).isError()) {
79+
if (countNotMatching || reasonCodes.get(i).isError()) {
8080
remove(subscriptions.get(i).getTopicFilter(), flow);
8181
}
8282
}
@@ -89,9 +89,9 @@ void remove(final @NotNull MqttTopicFilterImpl topicFilter, final @Nullable Mqtt
8989
public void unsubscribe(final @NotNull MqttStatefulUnsubscribe unsubscribe, final @NotNull MqttUnsubAck unsubAck) {
9090
final ImmutableList<MqttTopicFilterImpl> topicFilters = unsubscribe.stateless().getTopicFilters();
9191
final ImmutableList<Mqtt5UnsubAckReasonCode> reasonCodes = unsubAck.getReasonCodes();
92-
final boolean areAllSuccess = reasonCodes == Mqtt3UnsubAckView.REASON_CODES_ALL_SUCCESS;
92+
final boolean allSuccess = reasonCodes == Mqtt3UnsubAckView.REASON_CODES_ALL_SUCCESS;
9393
for (int i = 0; i < topicFilters.size(); i++) {
94-
if (areAllSuccess || !reasonCodes.get(i).isError()) {
94+
if (allSuccess || !reasonCodes.get(i).isError()) {
9595
unsubscribe(topicFilters.get(i));
9696
}
9797
}
@@ -114,8 +114,7 @@ void cancel(final @NotNull MqttSubscribedPublishFlow flow) {
114114
void findMatching(
115115
final @NotNull MqttStatefulPublish publish, final @NotNull MqttMatchingPublishFlows matchingFlows) {
116116

117-
final MqttTopicImpl topic = publish.stateless().getTopic();
118-
subscriptionFlows.findMatching(topic, matchingFlows);
117+
subscriptionFlows.findMatching(publish.stateless().getTopic(), matchingFlows);
119118
if (matchingFlows.subscriptionFound) {
120119
add(matchingFlows, globalFlows[MqttGlobalPublishFilter.SUBSCRIBED.ordinal()]);
121120
} else {

src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlowsWithId.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,10 @@
2929

3030
import javax.inject.Inject;
3131
import java.util.HashMap;
32-
import java.util.function.Consumer;
3332

3433
import static com.hivemq.client.internal.mqtt.message.subscribe.MqttStatefulSubscribe.DEFAULT_NO_SUBSCRIPTION_IDENTIFIER;
3534

3635
/**
37-
* single threaded, in channel eventloop
38-
*
3936
* @author Silvio Giebl
4037
*/
4138
@ClientScope
@@ -44,7 +41,6 @@ public class MqttIncomingPublishFlowsWithId extends MqttIncomingPublishFlows {
4441

4542
private final @NotNull HashMap<Integer, MqttSubscribedPublishFlow> flowsWithIdsMap = new HashMap<>();
4643
private final @NotNull MqttSubscriptionFlows flowsWithIds;
47-
private final @NotNull Consumer<MqttSubscribedPublishFlow> flowWithIdUnsubscribedCallback = this::unsubscribed;
4844

4945
@Inject
5046
MqttIncomingPublishFlowsWithId(
@@ -61,8 +57,8 @@ public void subscribe(
6157
if (flow != null) {
6258
final int subscriptionIdentifier = subscribe.getSubscriptionIdentifier();
6359
if (subscriptionIdentifier != DEFAULT_NO_SUBSCRIPTION_IDENTIFIER) {
64-
flowsWithIdsMap.put(subscriptionIdentifier, flow);
6560
flow.setSubscriptionIdentifier(subscriptionIdentifier);
61+
flowsWithIdsMap.put(subscriptionIdentifier, flow);
6662
}
6763
}
6864
super.subscribe(subscribe, flow);
@@ -102,15 +98,12 @@ void remove(final @NotNull MqttTopicFilterImpl topicFilter, final @Nullable Mqtt
10298

10399
@Override
104100
void unsubscribe(final @NotNull MqttTopicFilterImpl topicFilter) {
105-
flowsWithIds.unsubscribe(topicFilter, flowWithIdUnsubscribedCallback);
101+
flowsWithIds.unsubscribe(topicFilter, this::unsubscribed);
106102
super.unsubscribe(topicFilter);
107103
}
108104

109105
private void unsubscribed(final @NotNull MqttSubscribedPublishFlow flow) {
110-
final int subscriptionIdentifier = flow.getSubscriptionIdentifier();
111-
if (subscriptionIdentifier != DEFAULT_NO_SUBSCRIPTION_IDENTIFIER) {
112-
flowsWithIdsMap.remove(subscriptionIdentifier);
113-
}
106+
flowsWithIdsMap.remove(flow.getSubscriptionIdentifier());
114107
}
115108

116109
@Override
@@ -131,13 +124,14 @@ void findMatching(
131124
final ImmutableIntList subscriptionIdentifiers = publish.getSubscriptionIdentifiers();
132125
if (!subscriptionIdentifiers.isEmpty()) {
133126
for (int i = 0; i < subscriptionIdentifiers.size(); i++) {
134-
final int subscriptionIdentifier = subscriptionIdentifiers.get(i);
135-
final MqttSubscribedPublishFlow flow = flowsWithIdsMap.get(subscriptionIdentifier);
127+
final MqttSubscribedPublishFlow flow = flowsWithIdsMap.get(subscriptionIdentifiers.get(i));
136128
if (flow != null) {
137129
matchingFlows.add(flow);
138130
}
139131
}
140-
if (!matchingFlows.isEmpty()) {
132+
if (matchingFlows.isEmpty()) {
133+
flowsWithIds.findMatching(publish.stateless().getTopic(), matchingFlows);
134+
} else {
141135
matchingFlows.subscriptionFound = true;
142136
}
143137
}

0 commit comments

Comments
 (0)