Skip to content

Commit 85bcfb3

Browse files
committed
feat(cluster): 优化集群创建器并完善集群消息广播功能
- 新增 MqttBroker 入口类简化 Broker 创建过程 - MqttClusterConfig 支持链式调用,提升配置便捷性 - 优化 MqttClusterBrokerCreator 支持 start() 直接启动集群和 MQTT 服务 - 集群管理器新增 publish 方法实现集群广播消息的智能转发 - 调整 ClusterTest 集群测试用例,增加定时广播消息示例 - 集群消息类型处理代码简化,提升可读性和一致性 - MqttClusterIntegrationTest 补充广播消息接收的断言验证 - 优化导入包,去除冗余并统一消息常量引用方式
1 parent 1d68cb5 commit 85bcfb3

File tree

8 files changed

+244
-111
lines changed

8 files changed

+244
-111
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.dromara.mica.mqtt.broker;
18+
19+
import org.dromara.mica.mqtt.broker.cluster.MqttClusterBrokerCreator;
20+
import org.dromara.mica.mqtt.core.server.MqttServer;
21+
import org.dromara.mica.mqtt.core.server.MqttServerCreator;
22+
23+
/**
24+
* Mqtt Broker
25+
* <p>
26+
* Entry point for creating MQTT broker instances, including cluster support.
27+
* </p>
28+
*
29+
* @author L.cm
30+
* @author opencode
31+
*/
32+
public class MqttBroker {
33+
34+
/**
35+
* Create a new MqttClusterBrokerCreator using an existing MqttServerCreator
36+
*
37+
* @param serverCreator the underlying MqttServerCreator
38+
* @return MqttClusterBrokerCreator
39+
*/
40+
public static MqttClusterBrokerCreator create(MqttServerCreator serverCreator) {
41+
return new MqttClusterBrokerCreator(serverCreator);
42+
}
43+
44+
/**
45+
* Create a new MqttClusterBrokerCreator using a new MqttServerCreator
46+
*
47+
* @return MqttClusterBrokerCreator
48+
*/
49+
public static MqttClusterBrokerCreator create() {
50+
return new MqttClusterBrokerCreator(MqttServer.create());
51+
}
52+
53+
}

mica-mqtt-broker/src/main/java/org/dromara/mica/mqtt/broker/cluster/MqttClusterBrokerCreator.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616

1717
package org.dromara.mica.mqtt.broker.cluster;
1818

19+
import org.dromara.mica.mqtt.broker.cluster.dispatcher.ClusterMessageDispatcher;
1920
import org.dromara.mica.mqtt.core.server.MqttServer;
2021
import org.dromara.mica.mqtt.core.server.MqttServerCreator;
2122
import org.dromara.mica.mqtt.core.server.session.IMqttSessionManager;
2223
import org.dromara.mica.mqtt.core.server.session.InMemoryMqttSessionManager;
23-
import org.dromara.mica.mqtt.broker.cluster.dispatcher.ClusterMessageDispatcher;
2424

2525
/**
2626
* 集群模式的 Broker 创建器
@@ -40,7 +40,7 @@ public MqttClusterBrokerCreator clusterConfig(MqttClusterConfig config) {
4040
return this;
4141
}
4242

43-
public MqttServer build() throws Exception {
43+
public MqttServer build() {
4444
if (clusterConfig == null || !clusterConfig.isEnabled()) {
4545
return serverCreator.build();
4646
}
@@ -71,12 +71,27 @@ public MqttServer build() throws Exception {
7171
ClusterMessageDispatcher dispatcher = new ClusterMessageDispatcher(mqttServer, clusterManager, clusterSessionManager);
7272
serverCreator.getMessagePipeline().addHandler(dispatcher);
7373

74-
// 6. 启动集群管理器(会自动启动 MQTT 服务)
75-
clusterManager.start();
74+
return mqttServer;
75+
}
7676

77+
public MqttServer start() {
78+
MqttServer mqttServer = this.build();
79+
try {
80+
if (clusterManager != null) {
81+
clusterManager.start();
82+
} else {
83+
mqttServer.start();
84+
}
85+
} catch (Exception e) {
86+
throw new RuntimeException("Failed to start MqttClusterBroker", e);
87+
}
7788
return mqttServer;
7889
}
7990

91+
public MqttServerCreator getServerCreator() {
92+
return serverCreator;
93+
}
94+
8095
public MqttClusterManager getClusterManager() {
8196
return clusterManager;
8297
}

mica-mqtt-broker/src/main/java/org/dromara/mica/mqtt/broker/cluster/MqttClusterConfig.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,39 @@ public long getNodeTimeout() {
8181
public void setNodeTimeout(long nodeTimeout) {
8282
this.nodeTimeout = nodeTimeout;
8383
}
84+
85+
public MqttClusterConfig enabled(boolean enabled) {
86+
this.enabled = enabled;
87+
return this;
88+
}
89+
90+
public MqttClusterConfig clusterHost(String clusterHost) {
91+
this.clusterHost = clusterHost;
92+
return this;
93+
}
94+
95+
public MqttClusterConfig clusterPort(int clusterPort) {
96+
this.clusterPort = clusterPort;
97+
return this;
98+
}
99+
100+
public MqttClusterConfig seedMembers(List<String> seedMembers) {
101+
this.seedMembers = seedMembers;
102+
return this;
103+
}
104+
105+
public MqttClusterConfig clusterName(String clusterName) {
106+
this.clusterName = clusterName;
107+
return this;
108+
}
109+
110+
public MqttClusterConfig heartbeatInterval(long heartbeatInterval) {
111+
this.heartbeatInterval = heartbeatInterval;
112+
return this;
113+
}
114+
115+
public MqttClusterConfig nodeTimeout(long nodeTimeout) {
116+
this.nodeTimeout = nodeTimeout;
117+
return this;
118+
}
84119
}

mica-mqtt-broker/src/main/java/org/dromara/mica/mqtt/broker/cluster/MqttClusterManager.java

Lines changed: 68 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package org.dromara.mica.mqtt.broker.cluster;
22

3-
import org.dromara.mica.mqtt.broker.cluster.message.ClusterMessage;
4-
import org.dromara.mica.mqtt.broker.cluster.message.GenericClusterMessage;
5-
import org.dromara.mica.mqtt.broker.cluster.message.StateSyncResponseMessage;
3+
import org.dromara.mica.mqtt.broker.cluster.message.*;
4+
import org.dromara.mica.mqtt.codec.MqttQoS;
65
import org.dromara.mica.mqtt.core.server.MqttServer;
76
import org.dromara.mica.mqtt.core.server.model.Subscribe;
87
import org.slf4j.Logger;
@@ -12,11 +11,9 @@
1211
import org.tio.server.cluster.core.ClusterConfig;
1312
import org.tio.server.cluster.core.ClusterImpl;
1413
import org.tio.server.cluster.message.ClusterDataMessage;
14+
1515
import java.io.*;
16-
import java.util.Collection;
17-
import java.util.HashMap;
18-
import java.util.List;
19-
import java.util.Map;
16+
import java.util.*;
2017

2118
public class MqttClusterManager {
2219
private static final Logger logger = LoggerFactory.getLogger(MqttClusterManager.class);
@@ -78,7 +75,7 @@ private void requestStateSync() {
7875
if (!nodeToString(seed).equals(localNodeId)) {
7976
try {
8077
GenericClusterMessage syncRequest = new GenericClusterMessage();
81-
syncRequest.setType(org.dromara.mica.mqtt.broker.cluster.message.MessageType.STATE_SYNC_REQUEST);
78+
syncRequest.setType(MessageType.STATE_SYNC_REQUEST);
8279
syncRequest.setSourceNode(localNodeId);
8380
cluster.send(seed, serialize(syncRequest));
8481
logger.info("Sent state sync request to seed node: {}", seed);
@@ -109,33 +106,28 @@ private void handleClusterMessage(ClusterDataMessage message) {
109106
private void handleClusterMessageInternal(ClusterMessage clusterMsg, ClusterDataMessage rawMessage) {
110107
ClusterMqttSessionManager sessionManager = (ClusterMqttSessionManager) mqttServer.getServerCreator().getSessionManager();
111108

112-
if (clusterMsg.getType() == org.dromara.mica.mqtt.broker.cluster.message.MessageType.PUBLISH_FORWARD) {
113-
org.dromara.mica.mqtt.broker.cluster.message.PublishForwardMessage pfm =
114-
(org.dromara.mica.mqtt.broker.cluster.message.PublishForwardMessage) clusterMsg;
115-
mqttServer.publishAll(pfm.getTopic(), pfm.getPayload(), org.dromara.mica.mqtt.codec.MqttQoS.valueOf(pfm.getQos()), pfm.isRetain());
116-
} else if (clusterMsg.getType() == org.dromara.mica.mqtt.broker.cluster.message.MessageType.SUBSCRIBE_NOTIFY) {
117-
org.dromara.mica.mqtt.broker.cluster.message.SubscribeNotifyMessage snm =
118-
(org.dromara.mica.mqtt.broker.cluster.message.SubscribeNotifyMessage) clusterMsg;
109+
if (clusterMsg.getType() == MessageType.PUBLISH_FORWARD) {
110+
PublishForwardMessage pfm = (PublishForwardMessage) clusterMsg;
111+
mqttServer.publishAll(pfm.getTopic(), pfm.getPayload(), MqttQoS.valueOf(pfm.getQos()), pfm.isRetain());
112+
} else if (clusterMsg.getType() == MessageType.SUBSCRIBE_NOTIFY) {
113+
SubscribeNotifyMessage snm = (SubscribeNotifyMessage) clusterMsg;
119114
sessionManager.syncRemoteSubscriptions(snm.getClientId(), snm.getNodeId(), snm.getSubscriptions());
120-
} else if (clusterMsg.getType() == org.dromara.mica.mqtt.broker.cluster.message.MessageType.UNSUBSCRIBE_NOTIFY) {
121-
org.dromara.mica.mqtt.broker.cluster.message.UnsubscribeNotifyMessage unm =
122-
(org.dromara.mica.mqtt.broker.cluster.message.UnsubscribeNotifyMessage) clusterMsg;
115+
} else if (clusterMsg.getType() == MessageType.UNSUBSCRIBE_NOTIFY) {
116+
UnsubscribeNotifyMessage unm = (UnsubscribeNotifyMessage) clusterMsg;
123117
sessionManager.removeRemoteSubscriptions(unm.getClientId(), unm.getTopics());
124-
} else if (clusterMsg.getType() == org.dromara.mica.mqtt.broker.cluster.message.MessageType.CLIENT_CONNECT) {
125-
org.dromara.mica.mqtt.broker.cluster.message.ClientConnectMessage ccm =
126-
(org.dromara.mica.mqtt.broker.cluster.message.ClientConnectMessage) clusterMsg;
118+
} else if (clusterMsg.getType() == MessageType.CLIENT_CONNECT) {
119+
ClientConnectMessage ccm = (ClientConnectMessage) clusterMsg;
127120
sessionManager.registerRemoteClient(ccm.getClientId(), ccm.getSourceNode());
128-
} else if (clusterMsg.getType() == org.dromara.mica.mqtt.broker.cluster.message.MessageType.CLIENT_DISCONNECT) {
129-
org.dromara.mica.mqtt.broker.cluster.message.ClientDisconnectMessage cdm =
130-
(org.dromara.mica.mqtt.broker.cluster.message.ClientDisconnectMessage) clusterMsg;
121+
} else if (clusterMsg.getType() == MessageType.CLIENT_DISCONNECT) {
122+
ClientDisconnectMessage cdm = (ClientDisconnectMessage) clusterMsg;
131123
sessionManager.removeRemoteClient(cdm.getClientId());
132-
} else if (clusterMsg.getType() == org.dromara.mica.mqtt.broker.cluster.message.MessageType.STATE_SYNC_REQUEST) {
124+
} else if (clusterMsg.getType() == MessageType.STATE_SYNC_REQUEST) {
133125
handleStateSyncRequest(clusterMsg.getSourceNode());
134-
} else if (clusterMsg.getType() == org.dromara.mica.mqtt.broker.cluster.message.MessageType.STATE_SYNC_RESPONSE) {
126+
} else if (clusterMsg.getType() == MessageType.STATE_SYNC_RESPONSE) {
135127
StateSyncResponseMessage ssm = (StateSyncResponseMessage) clusterMsg;
136128
sessionManager.syncFullState(ssm.getClientNodeMap(), ssm.getSubscriptionMap());
137129
logger.info("State sync completed, received {} client mappings", ssm.getClientNodeMap().size());
138-
} else if (clusterMsg.getType() == org.dromara.mica.mqtt.broker.cluster.message.MessageType.NODE_LEAVE) {
130+
} else if (clusterMsg.getType() == MessageType.NODE_LEAVE) {
139131
// 节点离开消息,需要清理该节点的所有订阅
140132
String leavingNodeId = clusterMsg.getSourceNode();
141133
sessionManager.clearNodeClientsAndSubscriptions(leavingNodeId);
@@ -150,7 +142,7 @@ private void handleStateSyncRequest(String requestNodeId) {
150142
ClusterMqttSessionManager sessionManager = (ClusterMqttSessionManager) mqttServer.getServerCreator().getSessionManager();
151143

152144
StateSyncResponseMessage response = new StateSyncResponseMessage();
153-
response.setType(org.dromara.mica.mqtt.broker.cluster.message.MessageType.STATE_SYNC_RESPONSE);
145+
response.setType(MessageType.STATE_SYNC_RESPONSE);
154146
response.setSourceNode(localNodeId);
155147

156148
// 获取远程客户端映射
@@ -214,7 +206,7 @@ public void stop() {
214206
if (cluster != null) {
215207
// 广播节点离开消息
216208
GenericClusterMessage leaveMsg = new GenericClusterMessage();
217-
leaveMsg.setType(org.dromara.mica.mqtt.broker.cluster.message.MessageType.NODE_LEAVE);
209+
leaveMsg.setType(MessageType.NODE_LEAVE);
218210
leaveMsg.setSourceNode(localNodeId);
219211
broadcast(leaveMsg);
220212

@@ -229,9 +221,55 @@ public void stop() {
229221
}
230222
}
231223

224+
/**
225+
* 集群级别的下发消息:发布消息到集群中的所有匹配订阅者
226+
*
227+
* @param topic 主题
228+
* @param payload 消息体
229+
* @param qos QoS
230+
* @param retain 是否保留消息
231+
*/
232+
public void publish(String topic, byte[] payload, int qos, boolean retain) {
233+
if (mqttServer == null) {
234+
return;
235+
}
236+
237+
// 1. 先在本地节点下发
238+
mqttServer.publishAll(topic, payload, MqttQoS.valueOf(qos), retain);
239+
240+
// 2. 查找是否有其他节点存在该 topic 的订阅者,按需转发以节省网络开销
241+
if (config.isEnabled() && cluster != null) {
242+
ClusterMqttSessionManager sessionManager = (ClusterMqttSessionManager) mqttServer.getServerCreator().getSessionManager();
243+
List<Subscribe> allSubs = sessionManager.searchAllSubscribe(topic);
244+
245+
if (allSubs != null && !allSubs.isEmpty()) {
246+
Set<String> targetNodes = new HashSet<>();
247+
for (Subscribe sub : allSubs) {
248+
String node = sessionManager.getClientNode(sub.getClientId());
249+
if (node != null && !node.equals(localNodeId)) {
250+
targetNodes.add(node);
251+
}
252+
}
253+
254+
if (!targetNodes.isEmpty()) {
255+
PublishForwardMessage clusterMsg = new PublishForwardMessage();
256+
clusterMsg.setType(MessageType.PUBLISH_FORWARD);
257+
clusterMsg.setTopic(topic);
258+
clusterMsg.setPayload(payload);
259+
clusterMsg.setQos(qos);
260+
clusterMsg.setRetain(retain);
261+
262+
for (String node : targetNodes) {
263+
sendToNode(node, clusterMsg);
264+
}
265+
}
266+
}
267+
}
268+
}
269+
232270
public Collection<Node> getRemoteMembers() {
233271
if (cluster == null) {
234-
return java.util.Collections.emptyList();
272+
return Collections.emptyList();
235273
}
236274
return cluster.getRemoteMembers();
237275
}

mica-mqtt-broker/src/test/java/org/dromara/mica/mqtt/broker/cluster/ClusterTestNode1.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.dromara.mica.mqtt.broker.cluster;
1818

19+
import org.dromara.mica.mqtt.broker.MqttBroker;
1920
import org.dromara.mica.mqtt.core.server.MqttServer;
2021
import org.dromara.mica.mqtt.core.server.MqttServerCreator;
2122

@@ -36,11 +37,11 @@ public static void main(String[] args) throws Exception {
3637
System.out.println("========================================");
3738

3839
// 1. 集群配置
39-
MqttClusterConfig clusterConfig = new MqttClusterConfig();
40-
clusterConfig.setEnabled(true);
41-
clusterConfig.setClusterHost("127.0.0.1");
42-
clusterConfig.setClusterPort(9001);
43-
clusterConfig.setSeedMembers(Arrays.asList("127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"));
40+
MqttClusterConfig clusterConfig = new MqttClusterConfig()
41+
.enabled(true)
42+
.clusterHost("127.0.0.1")
43+
.clusterPort(9001)
44+
.seedMembers(Arrays.asList("127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"));
4445

4546
// 2. 创建 MQTT Server
4647
MqttServerCreator creator = MqttServer.create()
@@ -49,14 +50,35 @@ public static void main(String[] args) throws Exception {
4950
.enableMqtt(1883);
5051

5152
// 3. 使用集群创建器构建并启动
52-
MqttServer mqttServer = new MqttClusterBrokerCreator(creator)
53-
.clusterConfig(clusterConfig)
54-
.build();
53+
MqttClusterBrokerCreator brokerCreator = MqttBroker.create(creator)
54+
.clusterConfig(clusterConfig);
55+
MqttServer mqttServer = brokerCreator.start();
56+
MqttClusterManager clusterManager = brokerCreator.getClusterManager();
5557

5658
System.out.println("Node 1 started successfully!");
5759
System.out.println("MQTT Server listening on port 1883");
5860

59-
// 4. 等待关闭
61+
// 4. 定时下发测试消息(每隔5秒下发一次集群广播消息)
62+
new Thread(() -> {
63+
int count = 1;
64+
while (true) {
65+
try {
66+
Thread.sleep(5000);
67+
68+
// 集群广播下发测试(所有订阅了 /test/cluster/topic 的设备都会收到)
69+
String broadcastMsg = "Broadcast message from Node 1, count: " + count;
70+
clusterManager.publish("/test/cluster/topic", broadcastMsg.getBytes(), 0, false);
71+
System.out.println("[Node 1] Published broadcast: " + broadcastMsg);
72+
73+
count++;
74+
} catch (InterruptedException e) {
75+
Thread.currentThread().interrupt();
76+
break;
77+
}
78+
}
79+
}).start();
80+
81+
// 5. 等待关闭
6082
Thread.sleep(Long.MAX_VALUE);
6183
}
6284
}

mica-mqtt-broker/src/test/java/org/dromara/mica/mqtt/broker/cluster/ClusterTestNode2.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ public static void main(String[] args) throws Exception {
3636
System.out.println("========================================");
3737

3838
// 1. 集群配置
39-
MqttClusterConfig clusterConfig = new MqttClusterConfig();
40-
clusterConfig.setEnabled(true);
41-
clusterConfig.setClusterHost("127.0.0.1");
42-
clusterConfig.setClusterPort(9002);
43-
clusterConfig.setSeedMembers(Arrays.asList("127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"));
39+
MqttClusterConfig clusterConfig = new MqttClusterConfig()
40+
.enabled(true)
41+
.clusterHost("127.0.0.1")
42+
.clusterPort(9002)
43+
.seedMembers(Arrays.asList("127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"));
4444

4545
// 2. 创建 MQTT Server
4646
MqttServerCreator creator = MqttServer.create()
@@ -49,9 +49,9 @@ public static void main(String[] args) throws Exception {
4949
.enableMqtt(1884);
5050

5151
// 3. 使用集群创建器构建并启动
52-
MqttServer mqttServer = new MqttClusterBrokerCreator(creator)
52+
MqttServer mqttServer = org.dromara.mica.mqtt.broker.MqttBroker.create(creator)
5353
.clusterConfig(clusterConfig)
54-
.build();
54+
.start();
5555

5656
System.out.println("Node 2 started successfully!");
5757
System.out.println("MQTT Server listening on port 1884");

0 commit comments

Comments
 (0)