Skip to content

Commit 00e0d28

Browse files
committed
[broker-23] add splitTopic method, rework subscribeAck handler
1 parent 66fd5d3 commit 00e0d28

File tree

4 files changed

+37
-19
lines changed

4 files changed

+37
-19
lines changed

src/main/java/com/ss/mqtt/broker/handler/packet/in/SubscribeInPacketHandler.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static com.ss.mqtt.broker.model.reason.code.SubscribeAckReasonCode.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED;
44
import static com.ss.mqtt.broker.model.reason.code.SubscribeAckReasonCode.WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED;
5+
import static java.lang.Byte.toUnsignedInt;
56
import com.ss.mqtt.broker.model.reason.code.DisconnectReasonCode;
67
import com.ss.mqtt.broker.model.reason.code.SubscribeAckReasonCode;
78
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
@@ -10,26 +11,27 @@
1011
import lombok.RequiredArgsConstructor;
1112
import org.jetbrains.annotations.NotNull;
1213

14+
import java.util.Set;
15+
1316
@RequiredArgsConstructor
1417
public class SubscribeInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, SubscribeInPacket> {
1518

19+
private final static Set<SubscribeAckReasonCode> INVALID_ACK_CODE = Set.of(
20+
SHARED_SUBSCRIPTIONS_NOT_SUPPORTED,
21+
WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED
22+
);
23+
1624
private final @NotNull SubscriptionService subscriptionService;
1725

1826
@Override
1927
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull SubscribeInPacket packet) {
2028
var ackReasonCodes = subscriptionService.subscribe(client, packet.getTopicFilters());
2129
client.send(client.getPacketOutFactory().newSubscribeAck(packet.getPacketId(), ackReasonCodes));
22-
ackReasonCodes.findAny(client, SubscribeInPacketHandler::hasInvalidReason);
23-
}
24-
25-
private static boolean hasInvalidReason(@NotNull UnsafeMqttClient client, @NotNull SubscribeAckReasonCode reason) {
26-
if (reason == SHARED_SUBSCRIPTIONS_NOT_SUPPORTED || reason == WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED) {
27-
int reasonIndex = Byte.toUnsignedInt(reason.getValue());
28-
var disconnect = client.getPacketOutFactory().newDisconnect(client, DisconnectReasonCode.of(reasonIndex));
30+
var reason = ackReasonCodes.findAny(INVALID_ACK_CODE::contains);
31+
if (reason != null) {
32+
var disconnectReasonCode = DisconnectReasonCode.of(toUnsignedInt(reason.getValue()));
33+
var disconnect = client.getPacketOutFactory().newDisconnect(client, disconnectReasonCode);
2934
client.sendWithFeedback(disconnect).thenAccept(result -> client.getConnection().close());
30-
return true;
31-
} else {
32-
return false;
3335
}
3436
}
3537
}

src/main/java/com/ss/mqtt/broker/model/SingleSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import org.jetbrains.annotations.NotNull;
88

99
@ToString
10-
@EqualsAndHashCode(of = "mqttClient", callSuper = false)
10+
@EqualsAndHashCode(of = "mqttClient")
1111
public class SingleSubscriber implements Subscriber {
1212

1313
private final @Getter @NotNull MqttClient mqttClient;

src/main/java/com/ss/mqtt/broker/model/topic/AbstractTopic.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package com.ss.mqtt.broker.model.topic;
22

3-
import static com.ss.mqtt.broker.util.TopicUtils.DELIMITER;
4-
import static com.ss.rlib.common.util.ArrayUtils.EMPTY_STRING_ARRAY;
3+
import static com.ss.mqtt.broker.util.TopicUtils.splitTopic;
54
import static com.ss.rlib.common.util.StringUtils.EMPTY;
65
import com.ss.mqtt.broker.util.DebugUtils;
76
import lombok.EqualsAndHashCode;
87
import lombok.Getter;
98
import org.jetbrains.annotations.NotNull;
109

10+
import java.util.List;
11+
1112
@Getter
1213
@EqualsAndHashCode(of = "rawTopic")
1314
public abstract class AbstractTopic {
@@ -16,28 +17,28 @@ public abstract class AbstractTopic {
1617
DebugUtils.registerIncludedFields("rawTopic");
1718
}
1819

19-
private final @NotNull String[] segments;
20+
private final @NotNull List<String> segments;
2021
private final @NotNull String rawTopic;
2122
private final int length;
2223

2324
AbstractTopic() {
2425
length = 0;
25-
segments = EMPTY_STRING_ARRAY;
26+
segments = List.of();
2627
rawTopic = EMPTY;
2728
}
2829

2930
AbstractTopic(@NotNull String topicName) {
3031
length = topicName.length();
31-
segments = topicName.split(DELIMITER);
32+
segments = splitTopic(topicName);
3233
rawTopic = topicName;
3334
}
3435

3536
@NotNull String getSegment(int level) {
36-
return segments[level];
37+
return segments.get(level);
3738
}
3839

3940
int levelsCount() {
40-
return segments.length;
41+
return segments.size();
4142
}
4243

4344
@Override

src/main/java/com/ss/mqtt/broker/util/TopicUtils.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
package com.ss.mqtt.broker.util;
22

3+
import static java.util.Collections.unmodifiableList;
34
import com.ss.mqtt.broker.model.topic.SharedTopicFilter;
45
import com.ss.mqtt.broker.model.topic.TopicFilter;
56
import com.ss.mqtt.broker.model.topic.TopicName;
67
import org.jetbrains.annotations.NotNull;
78

9+
import java.util.ArrayList;
10+
import java.util.List;
11+
812
public class TopicUtils {
913

1014
private static final TopicFilter INVALID_TOPIC_FILTER = new TopicFilter();
1115
private static final TopicName INVALID_TOPIC_NAME = new TopicName();
1216
public static final TopicName EMPTY_TOPIC_NAME = new TopicName();
1317

1418
private static final String SHARE_KEYWORD = "$share";
15-
public static final String DELIMITER = "/";
19+
private static final String DELIMITER = "/";
1620
public static final String MULTI_LEVEL_WILDCARD = "#";
1721
public static final String SINGLE_LEVEL_WILDCARD = "+";
1822

@@ -51,6 +55,17 @@ public static boolean hasWildcard(@NotNull TopicFilter topicFilter) {
5155
}
5256
}
5357

58+
public static @NotNull List<String> splitTopic(@NotNull String topic) {
59+
var segments = new ArrayList<String>();
60+
int pos = 0, end;
61+
while ((end = topic.indexOf(DELIMITER, pos)) >= 0) {
62+
segments.add(topic.substring(pos, end));
63+
pos = end + 1;
64+
}
65+
segments.add(topic.substring(pos));
66+
return unmodifiableList(segments);
67+
}
68+
5469
private static @NotNull TopicFilter newSharedTopicFilter(@NotNull String topicFilter) {
5570
int firstSlash = topicFilter.indexOf(DELIMITER) + 1;
5671
int secondSlash = topicFilter.indexOf(DELIMITER, firstSlash);

0 commit comments

Comments
 (0)