Skip to content

Commit 27a97d7

Browse files
committed
Added mqtt server tests
1 parent 1769811 commit 27a97d7

File tree

3 files changed

+320
-0
lines changed

3 files changed

+320
-0
lines changed

core-tests/js/mqtt.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
const mqtt = require('mqtt');
2+
const port = process.argv[2];
3+
const pino = require('pino');
4+
const logger = pino(pino.destination('/tmp/swoole.log'));
5+
6+
const client = mqtt.connect(`mqtt://localhost:${port}`);
7+
8+
client.on('connect', () => {
9+
logger.info('the client is connected');
10+
11+
client.subscribe('test/topic', (err) => {
12+
if (err) {
13+
console.error('subscribe fail:', err);
14+
return;
15+
}
16+
logger.info('subscribed: test/topic');
17+
18+
client.publish('test/topic', 'Hello MQTT from Node.js!');
19+
});
20+
});
21+
22+
client.on('disconnect', () => {
23+
logger.info('the client is disconnected');
24+
client.end()
25+
})
26+
27+
client.on('message', (topic, message) => {
28+
logger.info(`received message, topic: ${topic}, content: ${message.toString()}`);
29+
});
30+
31+
client.on('error', (err) => {
32+
console.error('error:', err);
33+
});

core-tests/js/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"dependencies": {
3+
"mqtt": "^4.3.8",
34
"pino": "^6.14.0",
45
"ws": "^8.18.2"
56
}

core-tests/src/server/mqtt.cpp

Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
/*
2+
+----------------------------------------------------------------------+
3+
| Swoole |
4+
+----------------------------------------------------------------------+
5+
| This source file is subject to version 2.0 of the Apache license, |
6+
| that is bundled with this package in the file LICENSE, and is |
7+
| available through the world-wide-web at the following url: |
8+
| http://www.apache.org/licenses/LICENSE-2.0.html |
9+
| If you did not receive a copy of the Apache2.0 license and are unable|
10+
| to obtain it through the world-wide-web, please send a note to |
11+
| [email protected] so we can mail you a copy immediately. |
12+
+----------------------------------------------------------------------+
13+
| @link https://www.swoole.com/ |
14+
| @contact [email protected] |
15+
| @license https://github.com/swoole/swoole-src/blob/master/LICENSE |
16+
| @Author Tianfeng Han <[email protected]> |
17+
+----------------------------------------------------------------------+
18+
*/
19+
20+
#include "test_core.h"
21+
22+
#include "swoole_server.h"
23+
#include "swoole_memory.h"
24+
#include "swoole_signal.h"
25+
#include "swoole_lock.h"
26+
#include "swoole_util.h"
27+
28+
using namespace std;
29+
using namespace swoole;
30+
31+
enum MqttPacketType {
32+
CONNECT = 1,
33+
CONNACK = 2,
34+
PUBLISH = 3,
35+
PUBACK = 4,
36+
SUBSCRIBE = 8,
37+
SUBACK = 9,
38+
DISCONNECT = 14,
39+
};
40+
41+
std::string current_timestamp() {
42+
using namespace std::chrono;
43+
auto now = system_clock::now();
44+
time_t t = system_clock::to_time_t(now);
45+
char buf[64];
46+
struct tm tm_now;
47+
localtime_r(&t, &tm_now);
48+
strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &tm_now);
49+
return std::string(buf);
50+
}
51+
52+
struct MqttSession {
53+
SessionId fd;
54+
bool subscribed = false;
55+
uint16_t count = 0;
56+
uint16_t packet_id_subscribe = 0;
57+
std::string subscribed_topic;
58+
Server *server;
59+
60+
MqttSession(Server *_server, SessionId fd_) : fd(fd_), server(_server) {}
61+
62+
// 发送 CONNACK 报文,简单实现:Session Present=0, Connect Return code=0 (Success)
63+
bool send_connack() {
64+
uint8_t connack[] = {
65+
0x20,
66+
0x02, // 固定报头:CONNACK, 剩余长度2
67+
0x00, // Session Present = 0
68+
0x00 // Connect return code = 0 (成功)
69+
};
70+
return server->send(fd, connack, sizeof(connack));
71+
}
72+
73+
// 发送 SUBACK 报文,确认订阅成功
74+
bool send_suback(uint16_t packet_id) {
75+
uint8_t suback[] = {
76+
0x90,
77+
0x03, // 固定报头:SUBACK, 剩余长度3
78+
uint8_t(packet_id >> 8),
79+
uint8_t(packet_id & 0xFF), // 报文标识符
80+
0x00 // 返回码:0x00 QoS 0
81+
};
82+
return server->send(fd, suback, sizeof(suback));
83+
}
84+
85+
// 发送 PUBLISH 报文,QoS 0 简化,无标识符
86+
bool send_publish(const std::string &topic, const std::string &message) {
87+
// PUBLISH fixed header: 0x30 (QoS0), 剩余长度计算
88+
// variable header: topic (2字节长度 + 字符串)
89+
uint16_t topic_len = topic.size();
90+
size_t var_header_len = 2 + topic_len;
91+
size_t payload_len = message.size();
92+
size_t remaining_length = var_header_len + payload_len;
93+
94+
std::vector<uint8_t> packet;
95+
packet.push_back(0x30); // PUBLISH, QoS0
96+
97+
// MQTT剩余长度使用可变长度编码,这里实现简单编码(长度<128假定)
98+
if (remaining_length < 128) {
99+
packet.push_back(uint8_t(remaining_length));
100+
} else {
101+
// 简单处理大于127的长度,实际可以完善
102+
do {
103+
uint8_t byte = remaining_length % 128;
104+
remaining_length /= 128;
105+
if (remaining_length > 0) byte |= 0x80;
106+
packet.push_back(byte);
107+
} while (remaining_length > 0);
108+
}
109+
110+
// variable header topic
111+
packet.push_back(uint8_t(topic_len >> 8));
112+
packet.push_back(uint8_t(topic_len & 0xFF));
113+
packet.insert(packet.end(), topic.begin(), topic.end());
114+
115+
// payload
116+
packet.insert(packet.end(), message.begin(), message.end());
117+
118+
return server->send(fd, packet.data(), packet.size()) == (ssize_t) packet.size();
119+
}
120+
121+
bool send_puback(uint16_t packet_id) {
122+
uint8_t puback[] = {0x40, 0x02, uint8_t(packet_id >> 8), uint8_t(packet_id & 0xFF)};
123+
return server->send(fd, puback, sizeof(puback));
124+
}
125+
126+
bool send_disconnect() {
127+
uint8_t disconnect[] = {0xE0, 0x00};
128+
return server->send(fd, disconnect, sizeof(disconnect));
129+
}
130+
131+
bool process_packet(const uint8_t *data, size_t len) {
132+
uint8_t packet_type = (data[0] >> 4);
133+
switch (packet_type) {
134+
case CONNECT: {
135+
std::cout << "收到 CONNECT 报文\n";
136+
// 简化:收到CONNECT直接回复CONNACK成功
137+
return send_connack();
138+
}
139+
case SUBSCRIBE: {
140+
std::cout << "收到 SUBSCRIBE 报文\n";
141+
// SUBSCRIBE 报文结构:固定头 + 剩余长度 + 报文标识符 (2bytes) + Payload
142+
// 简化解析报文标识符和第一个订阅主题
143+
if (len < 5) return false;
144+
uint16_t packet_id = (data[2] << 8) | data[3];
145+
packet_id_subscribe = packet_id;
146+
147+
size_t pos = 4;
148+
if (pos + 2 > len) return false;
149+
uint16_t topic_len = (data[pos] << 8) | data[pos + 1];
150+
pos += 2;
151+
if (pos + topic_len > len) return false;
152+
subscribed_topic.assign((const char *) (data + pos), topic_len);
153+
std::cout << "订阅主题: " << subscribed_topic << std::endl;
154+
155+
subscribed = true;
156+
return send_suback(packet_id);
157+
}
158+
case PUBLISH: {
159+
std::cout << "收到 PUBLISH 报文\n";
160+
161+
uint8_t flags = data[0] & 0x0F;
162+
uint8_t qos = (flags & 0x06) >> 1;
163+
164+
// TODO 需可变长度解析
165+
size_t remaining_length = data[1];
166+
EXPECT_GT(remaining_length, 2);
167+
168+
size_t pos = 2;
169+
if (pos + 2 > len) return false;
170+
171+
uint16_t topic_len = (data[pos] << 8) | data[pos + 1];
172+
pos += 2;
173+
if (pos + topic_len > len) return false;
174+
175+
std::string topic((const char *) (data + pos), topic_len);
176+
pos += topic_len;
177+
178+
uint16_t packet_id = 0;
179+
if (qos > 0) {
180+
if (pos + 2 > len) return false;
181+
packet_id = (data[pos] << 8) | data[pos + 1];
182+
pos += 2;
183+
}
184+
185+
if (pos > len) return false;
186+
187+
std::string payload((const char *) (data + pos), len - pos);
188+
189+
std::cout << "主题: " << topic << ", 消息体: " << payload << ", QoS: " << (int) qos << std::endl;
190+
191+
// 根据需要处理 payload 内容
192+
// 例如转发给其他客户端、存储等
193+
194+
// QoS1需要发送PUBACK确认
195+
if (qos == 1) {
196+
return send_puback(packet_id);
197+
}
198+
199+
// QoS0直接返回成功
200+
return true;
201+
}
202+
// 你可以增加 PINGREQ、DISCONNECT 等消息处理
203+
default: {
204+
std::cout << "收到未处理的包类型: " << (int) packet_type << std::endl;
205+
return true;
206+
}
207+
}
208+
}
209+
};
210+
211+
static void test_mqtt_server(function<void(Server *)> fn) {
212+
thread child_thread;
213+
Server serv(Server::MODE_BASE);
214+
serv.worker_num = 1;
215+
serv.enable_reuse_port = true;
216+
serv.private_data_2 = (void *) &fn;
217+
218+
sw_logger()->set_level(SW_LOG_WARNING);
219+
220+
std::unordered_map<SessionId, MqttSession *> sessions;
221+
222+
ListenPort *port = serv.add_port(SW_SOCK_TCP, TEST_HOST, 9501);
223+
if (!port) {
224+
swoole_warning("listen failed, [error=%d]", swoole_get_last_error());
225+
exit(2);
226+
}
227+
port->open_mqtt_protocol = 1;
228+
229+
serv.create();
230+
231+
serv.onWorkerStart = [&child_thread](Server *serv, Worker *worker) {
232+
function<void(Server *)> fn = *(function<void(Server *)> *) serv->private_data_2;
233+
child_thread = thread(fn, serv);
234+
};
235+
236+
serv.onClose = [&sessions](Server *serv, DataHead *info) -> void {
237+
delete sessions[info->fd];
238+
sessions.erase(info->fd);
239+
};
240+
241+
serv.onConnect = [&sessions](Server *serv, DataHead *info) -> void {
242+
auto session = new MqttSession(serv, info->fd);
243+
sessions[info->fd] = session;
244+
swoole_timer_tick(100, [session, serv](auto r1, TimerNode *tnode) {
245+
if (session->subscribed) {
246+
std::string ts = current_timestamp();
247+
session->send_publish(session->subscribed_topic,
248+
"Index: " + std::to_string(session->count) + ", Time: " + ts);
249+
session->count++;
250+
if (session->count > 10) {
251+
session->send_disconnect();
252+
serv->close(session->fd, false);
253+
swoole_timer_del(tnode);
254+
}
255+
}
256+
});
257+
};
258+
259+
serv.onReceive = [&sessions](Server *serv, RecvData *req) -> int {
260+
auto session = sessions[req->info.fd];
261+
if (!session->process_packet((uint8_t *) req->data, req->info.len)) {
262+
std::cerr << "处理数据包失败,关闭连接\n";
263+
}
264+
return SW_OK;
265+
};
266+
267+
serv.start();
268+
child_thread.join();
269+
}
270+
271+
TEST(mqtt, echo) {
272+
test_mqtt_server([](Server *serv) {
273+
swoole_signal_block_all();
274+
EXPECT_EQ(test::exec_js_script("mqtt.js", std::to_string(serv->get_primary_port()->get_port())), 0);
275+
kill(serv->get_master_pid(), SIGTERM);
276+
});
277+
278+
File fp(TEST_LOG_FILE, O_RDONLY);
279+
EXPECT_TRUE(fp.ready());
280+
auto str = fp.read_content();
281+
SW_LOOP_N(10) {
282+
ASSERT_TRUE(
283+
str->contains("received message, topic: test/topic, content: Index: " + std::to_string(i) + ", Time: "));
284+
}
285+
unlink(TEST_LOG_FILE);
286+
}

0 commit comments

Comments
 (0)