Skip to content

Commit d0a299a

Browse files
committed
[broker-23] refactor TopicSubscribers
1 parent e782cd8 commit d0a299a

File tree

2 files changed

+41
-29
lines changed

2 files changed

+41
-29
lines changed

src/main/java/com/ss/mqtt/broker/model/topic/TopicSubscribers.java

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.ss.mqtt.broker.model.SubscribeTopicFilter;
77
import com.ss.mqtt.broker.model.Subscriber;
88
import com.ss.mqtt.broker.network.client.MqttClient;
9+
import com.ss.mqtt.broker.util.SubscriberUtils;
910
import com.ss.rlib.common.function.NotNullSupplier;
1011
import com.ss.rlib.common.util.array.Array;
1112
import com.ss.rlib.common.util.array.ConcurrentArray;
@@ -19,14 +20,6 @@ public class TopicSubscribers {
1920

2021
private final static NotNullSupplier<TopicSubscribers> TOPIC_SUBSCRIBER_SUPPLIER = TopicSubscribers::new;
2122

22-
private static boolean isSharedSubscriber(@NotNull Subscriber subscriber){
23-
return subscriber instanceof SharedSubscriber;
24-
}
25-
26-
private static boolean isSingleSubscriber(@NotNull Subscriber subscriber){
27-
return subscriber instanceof SingleSubscriber;
28-
}
29-
3023
private static void addSubscriber(
3124
@NotNull ConcurrentArray<Subscriber> subscribers,
3225
@NotNull MqttClient client,
@@ -56,7 +49,7 @@ private static void addSharedSubscriber(
5649

5750
var sharedSubscriber = (SharedSubscriber) subscribers.findAny(
5851
group,
59-
TopicSubscribers::sharedSubscriberWithGroup
52+
SubscriberUtils::sharedSubscriberWithGroup
6053
);
6154

6255
if (sharedSubscriber == null) {
@@ -68,14 +61,6 @@ private static void addSharedSubscriber(
6861
sharedSubscriber.addSubscriber(singleSubscriber);
6962
}
7063

71-
private static boolean sharedSubscriberWithGroup(@NotNull String group, @NotNull Subscriber subscriber) {
72-
return isSharedSubscriber(subscriber) && group.equals(((SharedSubscriber) subscriber).getGroup());
73-
}
74-
75-
private static @Nullable MqttClient singleSubscriberToMqttClient(@NotNull Subscriber subscriber) {
76-
return isSingleSubscriber(subscriber) ? ((SingleSubscriber) subscriber).getMqttClient() : null;
77-
}
78-
7964
private static boolean removeSubscriber(
8065
@NotNull ConcurrentArray<Subscriber> subscribers,
8166
@NotNull TopicFilter topic,
@@ -93,7 +78,7 @@ private static boolean removeSingleSubscriber(
9378
//noinspection NullableProblems
9479
return subscribers.removeIfConverted(
9580
client,
96-
TopicSubscribers::singleSubscriberToMqttClient,
81+
SubscriberUtils::singleSubscriberToMqttClient,
9782
Object::equals
9883
);
9984
}
@@ -106,7 +91,7 @@ private static boolean removeSharedSubscriber(
10691
boolean removed = false;
10792
var sharedSubscriber = (SharedSubscriber) subscribers.findAny(
10893
group,
109-
TopicSubscribers::sharedSubscriberWithGroup
94+
SubscriberUtils::sharedSubscriberWithGroup
11095
);
11196

11297
if (sharedSubscriber != null) {
@@ -138,7 +123,7 @@ private static boolean removeDuplicateWithLowerQoS(
138123
}
139124
}
140125

141-
private static void addSubscriber(@NotNull Array<SingleSubscriber> result, @NotNull Subscriber subscriber) {
126+
private static void addToResultArray(@NotNull Array<SingleSubscriber> result, @NotNull Subscriber subscriber) {
142127
if (subscriber instanceof SharedSubscriber) {
143128
result.add(((SharedSubscriber) subscriber).getSubscriber());
144129
} else {
@@ -163,7 +148,7 @@ private static void addSubscriber(@NotNull Array<SingleSubscriber> result, @NotN
163148
subscribers.forEachFiltered(
164149
result,
165150
TopicSubscribers::removeDuplicateWithLowerQoS,
166-
TopicSubscribers::addSubscriber
151+
TopicSubscribers::addToResultArray
167152
);
168153
} finally {
169154
subscribers.readUnlock(stamp);
@@ -176,10 +161,10 @@ private static void addSubscriber(@NotNull Array<SingleSubscriber> result, @NotN
176161
private volatile @Getter @Nullable ConcurrentArray<Subscriber> subscribers;
177162

178163
public void addSubscriber(@NotNull MqttClient client, @NotNull SubscribeTopicFilter subscribe) {
179-
addSubscriber(0, subscribe.getTopicFilter(), client, subscribe);
164+
searchPlaceForSubscriber(0, subscribe.getTopicFilter(), client, subscribe);
180165
}
181166

182-
private void addSubscriber(
167+
private void searchPlaceForSubscriber(
183168
int level,
184169
@NotNull TopicFilter topicFilter,
185170
@NotNull MqttClient client,
@@ -198,7 +183,7 @@ private void addSubscriber(
198183
ObjectDictionary::getOrCompute
199184
);
200185
//noinspection ConstantConditions
201-
topicSubscriber.addSubscriber(level + 1, topicFilter, client, subscribe);
186+
topicSubscriber.searchPlaceForSubscriber(level + 1, topicFilter, client, subscribe);
202187
}
203188
}
204189

@@ -207,27 +192,27 @@ public void removeSubscriber(@NotNull MqttClient client, @NotNull SubscribeTopic
207192
}
208193

209194
public boolean removeSubscriber(@NotNull MqttClient client, @NotNull TopicFilter topicFilter) {
210-
return removeSubscriber(0, topicFilter, client);
195+
return searchSubscriberToRemove(0, topicFilter, client);
211196
}
212197

213-
private boolean removeSubscriber(int level, @NotNull TopicFilter topicFilter, @NotNull MqttClient mqttClient) {
198+
private boolean searchSubscriberToRemove(int level, @NotNull TopicFilter topicFilter, @NotNull MqttClient mqttClient) {
214199
var removed = false;
215200
var topicSubscribers = getTopicSubscribers();
216201
if (level == topicFilter.levelsCount()) {
217-
removed = removeSubscriber(topicFilter, mqttClient);
202+
removed = tryToRemoveSubscriber(topicFilter, mqttClient);
218203
} else if (topicSubscribers != null) {
219204
var topicSubscriber = topicSubscribers.getInReadLock(
220205
topicFilter.getSegment(level),
221206
ObjectDictionary::get
222207
);
223208
if (topicSubscriber != null) {
224-
removed = topicSubscriber.removeSubscriber(level + 1, topicFilter, mqttClient);
209+
removed = topicSubscriber.searchSubscriberToRemove(level + 1, topicFilter, mqttClient);
225210
}
226211
}
227212
return removed;
228213
}
229214

230-
private boolean removeSubscriber(@NotNull TopicFilter topicFilter, @NotNull MqttClient mqttClient) {
215+
private boolean tryToRemoveSubscriber(@NotNull TopicFilter topicFilter, @NotNull MqttClient mqttClient) {
231216
var removed = false;
232217
var subscribers = getSubscribers();
233218
if (subscribers != null) {
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.ss.mqtt.broker.util;
2+
3+
import com.ss.mqtt.broker.model.SharedSubscriber;
4+
import com.ss.mqtt.broker.model.SingleSubscriber;
5+
import com.ss.mqtt.broker.model.Subscriber;
6+
import com.ss.mqtt.broker.network.client.MqttClient;
7+
import org.jetbrains.annotations.NotNull;
8+
import org.jetbrains.annotations.Nullable;
9+
10+
public class SubscriberUtils {
11+
12+
private static boolean isSharedSubscriber(@NotNull Subscriber subscriber) {
13+
return subscriber instanceof SharedSubscriber;
14+
}
15+
16+
private static boolean isSingleSubscriber(@NotNull Subscriber subscriber) {
17+
return subscriber instanceof SingleSubscriber;
18+
}
19+
20+
public static boolean sharedSubscriberWithGroup(@NotNull String group, @NotNull Subscriber subscriber) {
21+
return isSharedSubscriber(subscriber) && group.equals(((SharedSubscriber) subscriber).getGroup());
22+
}
23+
24+
public static @Nullable MqttClient singleSubscriberToMqttClient(@NotNull Subscriber subscriber) {
25+
return isSingleSubscriber(subscriber) ? ((SingleSubscriber) subscriber).getMqttClient() : null;
26+
}
27+
}

0 commit comments

Comments
 (0)