Skip to content

Commit b5129b0

Browse files
committed
Fixed GH-5741
1 parent 0c65d83 commit b5129b0

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

core-tests/src/server/server.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,3 +1444,47 @@ TEST(server, forward_message) {
14441444
serv.start();
14451445
t1.join();
14461446
}
1447+
1448+
TEST(server, abnormal_pipeline_data) {
1449+
Server *server = new Server(Server::MODE_PROCESS);
1450+
server->worker_num = 2;
1451+
server->add_port(SW_SOCK_TCP, TEST_HOST, 0);
1452+
1453+
uint64_t msg_id = swoole_rand(1, INT_MAX);
1454+
string filename = "/tmp/swoole.log";
1455+
swoole_set_log_file(filename.c_str());
1456+
1457+
server->create();
1458+
1459+
server->onReceive = [](Server *server, RecvData *req) -> int { return SW_OK; };
1460+
1461+
server->onWorkerStart = [&](Server *server, Worker *worker) {
1462+
if (worker->id == 1) {
1463+
auto send_fn = [server](int flags, uint64_t msg_id) {
1464+
auto sock = server->get_worker_pipe_master(0);
1465+
size_t len = swoole_rand(1000, 8000);
1466+
EventData ev;
1467+
ev.info.msg_id = msg_id;
1468+
ev.info.flags = flags;
1469+
ev.info.len = len;
1470+
swoole_random_bytes(ev.data, len);
1471+
1472+
sock->send_sync(&ev, sizeof(ev.info) + len);
1473+
};
1474+
1475+
send_fn(SW_EVENT_DATA_CHUNK | SW_EVENT_DATA_BEGIN, msg_id);
1476+
send_fn(SW_EVENT_DATA_CHUNK, msg_id + 9999);
1477+
1478+
usleep(100000);
1479+
server->shutdown();
1480+
}
1481+
};
1482+
1483+
server->start();
1484+
1485+
File fp(filename, File::READ);
1486+
auto cont = fp.read_content();
1487+
ASSERT_TRUE(cont->contains(std::string("abnormal pipeline data, msg_id=") + std::to_string(msg_id + 9999)));
1488+
1489+
unlink(filename.c_str());
1490+
}

src/protocol/message_bus.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ ssize_t MessageBus::read(Socket *sock) {
108108
info->msg_id,
109109
sock->get_fd(),
110110
info->reactor_id);
111+
// Read data from the socket buffer and discard it.
112+
recv(sock->get_fd(), info, sizeof(buffer_->info), 0);
111113
return SW_OK;
112114
}
113115

0 commit comments

Comments
 (0)