Skip to content

Commit e71d4b8

Browse files
popoaichuiniuRovicnowYoyi
authored andcommitted
[Optimize] Optimize WebsocketClient
Use WebSocketTask to manage thread and socket. issue: #23
1 parent ba34408 commit e71d4b8

File tree

5 files changed

+346
-279
lines changed

5 files changed

+346
-279
lines changed

debug_router/native/core/debug_router_core.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ void DebugRouterCore::AddStateListener(
458458
}
459459

460460
void DebugRouterCore::TryToReconnect() {
461-
if (retry_times_ < 30) {
461+
if (retry_times_ < 3) {
462462
retry_times_++;
463463
LOGI("try to reconnect: " << retry_times_);
464464

debug_router/native/net/websocket_client.cc

Lines changed: 6 additions & 270 deletions
Original file line numberDiff line numberDiff line change
@@ -33,90 +33,21 @@
3333
namespace debugrouter {
3434
namespace net {
3535

36-
static int readline(SOCKET sock, char *buf, size_t size) {
37-
char *out = buf;
38-
while (out - buf < size) {
39-
int res = static_cast<int>(recv(sock, out, 1, 0));
40-
if (res == 1) {
41-
if (*out++ == '\n') {
42-
break;
43-
}
44-
} else if (res == -1) {
45-
break;
46-
}
47-
}
48-
*out = '\0';
49-
return static_cast<int>(out - buf);
50-
}
51-
52-
std::string decodeURIComponent(std::string url) {
53-
int flag = 0;
54-
int code = 0;
55-
std::stringstream result_url_;
56-
for (char c : url) {
57-
if ((flag == 0) && (c == '%')) {
58-
flag = 1;
59-
continue;
60-
} else if (flag == 1) {
61-
if (isxdigit(c)) {
62-
if (isdigit(c)) {
63-
code = c - '0';
64-
} else if (c >= 'A' && c <= 'F') {
65-
code = (0x0a + (c - 'A'));
66-
} else if (c >= 'a' && c <= 'f') {
67-
code = (0x0a + (c - 'a'));
68-
} else {
69-
return std::string();
70-
}
71-
flag = 2;
72-
continue;
73-
} else {
74-
return std::string();
75-
}
76-
} else if (flag == 2) {
77-
if (isxdigit(c)) {
78-
code <<= 4;
79-
if (isdigit(c)) {
80-
code |= (c - '0');
81-
} else if (c >= 'A' && c <= 'F') {
82-
code |= (0x0a + (c - 'A'));
83-
} else if (c >= 'a' && c <= 'f') {
84-
code |= (0x0a + (c - 'a'));
85-
} else {
86-
return std::string();
87-
}
88-
result_url_ << (char)(code & 0xff);
89-
code = 0;
90-
flag = 0;
91-
continue;
92-
} else {
93-
return std::string();
94-
}
95-
} else {
96-
result_url_ << c;
97-
}
98-
}
99-
return result_url_.str();
100-
}
101-
102-
WebSocketClient::WebSocketClient() : socket_guard_(nullptr) {}
36+
WebSocketClient::WebSocketClient() {}
10337

10438
WebSocketClient::~WebSocketClient() { DisconnectInternal(); }
10539

10640
void WebSocketClient::Init() {}
10741

10842
bool WebSocketClient::Connect(const std::string &url) {
10943
auto self = std::static_pointer_cast<WebSocketClient>(shared_from_this());
110-
work_thread_.submit([client_ptr = self, url]() {
111-
client_ptr->DisconnectInternal();
112-
client_ptr->ConnectInternal(url);
113-
});
44+
work_thread_.submit(
45+
[client_ptr = self, url]() { client_ptr->ConnectInternal(url); });
11446
return true;
11547
}
11648

11749
void WebSocketClient::ConnectInternal(const std::string &url) {
118-
url_ = url;
119-
thread_ = std::make_unique<std::thread>([this]() { run(); });
50+
current_task_ = std::make_unique<WebSocketTask>(shared_from_this(), url);
12051
}
12152

12253
void WebSocketClient::Disconnect() {
@@ -125,209 +56,14 @@ void WebSocketClient::Disconnect() {
12556
[client_ptr = self]() { client_ptr->DisconnectInternal(); });
12657
}
12758

128-
void WebSocketClient::DisconnectInternal() {
129-
if (thread_) {
130-
if (thread_->joinable()) {
131-
thread_->join();
132-
LOGI("WebSocketClient thread exit successfully.");
133-
}
134-
thread_.reset();
135-
}
136-
}
59+
void WebSocketClient::DisconnectInternal() { current_task_.reset(nullptr); }
13760

13861
core::ConnectionType WebSocketClient::GetType() {
13962
return core::ConnectionType::kWebSocket;
14063
}
14164

14265
void WebSocketClient::Send(const std::string &data) {
143-
auto self = std::static_pointer_cast<WebSocketClient>(shared_from_this());
144-
work_thread_.submit(
145-
[client_ptr = self, data]() { client_ptr->SendInternal(data); });
146-
}
147-
148-
void WebSocketClient::SendInternal(const std::string &data) {
149-
const char *buf = data.data();
150-
size_t payloadLen = data.size();
151-
uint8_t prefix[14];
152-
size_t prefix_len = 2;
153-
154-
prefix[0] = 1 /*OP_TEXT*/ | 0x80 /*FIN*/;
155-
156-
if (payloadLen > 65535) {
157-
prefix[1] = 127;
158-
*reinterpret_cast<uint32_t *>(prefix + 2) = 0;
159-
prefix[6] = payloadLen >> 24;
160-
prefix[7] = payloadLen >> 16;
161-
prefix[8] = payloadLen >> 8;
162-
prefix[9] = payloadLen;
163-
prefix_len += 8;
164-
} else if (payloadLen > 125) {
165-
prefix[1] = 126;
166-
prefix[2] = payloadLen >> 8;
167-
prefix[3] = payloadLen;
168-
prefix_len += 2;
169-
} else {
170-
prefix[1] = payloadLen;
171-
}
172-
173-
// All frames sent from client to server have this bit set to 1.
174-
prefix[1] |= 0x80 /*MASK*/;
175-
*reinterpret_cast<uint32_t *>(prefix + prefix_len) = 0;
176-
prefix_len += 4;
177-
178-
send(socket_guard_->Get(), (char *)prefix, prefix_len, 0);
179-
send(socket_guard_->Get(), buf, payloadLen, 0);
180-
}
181-
182-
void WebSocketClient::run() {
183-
auto self = std::static_pointer_cast<WebSocketClient>(shared_from_this());
184-
if (!do_connect()) {
185-
if (delegate()) {
186-
delegate()->OnFailure(self);
187-
}
188-
return;
189-
}
190-
191-
if (delegate()) {
192-
delegate()->OnOpen(self);
193-
}
194-
195-
std::string msg;
196-
while (do_read(msg)) {
197-
if (delegate()) {
198-
delegate()->OnMessage(msg, self);
199-
}
200-
}
201-
}
202-
203-
bool WebSocketClient::do_connect() {
204-
url_ = decodeURIComponent(url_);
205-
const char *purl = url_.c_str();
206-
if (memcmp(purl, "wss://", 6) == 0) {
207-
purl += 6;
208-
} else if (memcmp(purl, "ws://", 5) == 0) {
209-
purl += 5;
210-
} else {
211-
LOGE("Parse url error, url: " << purl);
212-
return false;
213-
}
214-
215-
char host[128] = {0};
216-
char path[256] = {0};
217-
int port = 80;
218-
if (sscanf(purl, "%[^:/]:%d/%s", host, &port, path) == 3) {
219-
} else if (sscanf(purl, "%[^:/]/%s", host, path) == 2) {
220-
} else if (sscanf(purl, "%[^:/]:%d", host, &port) == 2) {
221-
} else if (sscanf(purl, "%[^:/]", host) == 1) {
222-
} else {
223-
LOGE("Parse url error, url: " << purl);
224-
return false;
225-
}
226-
227-
struct addrinfo ai, *servinfo;
228-
memset(&ai, 0, sizeof ai);
229-
ai.ai_family = AF_INET; // IPV4
230-
ai.ai_socktype = SOCK_STREAM;
231-
char str_port[16];
232-
snprintf(str_port, sizeof(str_port), "%d", port);
233-
int ret = getaddrinfo(host, str_port, &ai, &servinfo);
234-
if (ret != 0) {
235-
LOGE("getaddrinfo Error");
236-
return false;
237-
}
238-
239-
for (auto p = servinfo; p != NULL; p = p->ai_next) {
240-
auto sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
241-
if (sockfd == -1) {
242-
continue;
243-
}
244-
if (connect(sockfd, p->ai_addr, p->ai_addrlen) != -1) {
245-
socket_guard_ = std::make_unique<base::SocketGuard>(sockfd);
246-
break;
247-
}
248-
CLOSESOCKET(sockfd);
249-
}
250-
freeaddrinfo(servinfo);
251-
252-
char buf[512];
253-
snprintf(buf, sizeof(buf),
254-
"GET /%s HTTP/1.1\r\n"
255-
"Host: %s:%d\r\n"
256-
"Upgrade: websocket\r\n"
257-
"Connection: Upgrade\r\n"
258-
"Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n"
259-
"Sec-WebSocket-Version: 13\r\n\r\n",
260-
path, host, port);
261-
send(socket_guard_->Get(), buf, strlen(buf), 0);
262-
263-
int status;
264-
if (readline(socket_guard_->Get(), buf, sizeof(buf)) < 10 ||
265-
sscanf(buf, "HTTP/1.1 %d Switching Protocols\r\n", &status) != 1 ||
266-
status != 101) {
267-
LOGE("Connect Error: " << url_.c_str());
268-
return false;
269-
}
270-
271-
while (readline(socket_guard_->Get(), buf, sizeof(buf)) > 0 &&
272-
buf[0] != '\r') {
273-
size_t len = strlen(buf);
274-
buf[len - 2] = '\0';
275-
LOGI(buf);
276-
}
277-
return true;
278-
}
279-
280-
bool WebSocketClient::do_read(std::string &msg) {
281-
struct {
282-
uint8_t flag_opcode;
283-
uint8_t mask_payload_len;
284-
} head;
285-
auto self = std::static_pointer_cast<WebSocketClient>(shared_from_this());
286-
287-
if (recv(socket_guard_->Get(), (char *)&head, sizeof(head), 0) !=
288-
sizeof(head)) {
289-
LOGE("failed to read websocket message");
290-
delegate()->OnFailure(self);
291-
return false;
292-
}
293-
if ((head.flag_opcode & 0x80) == 0) { // FIN
294-
LOGE("read_message not final fragment");
295-
delegate()->OnFailure(self);
296-
return false;
297-
}
298-
const uint8_t flags = head.flag_opcode >> 4;
299-
if ((head.mask_payload_len & 0x80) != 0) { // masked payload
300-
LOGE("read_message masked");
301-
delegate()->OnFailure(self);
302-
return false;
303-
}
304-
size_t payloadLen = head.mask_payload_len & 0x7f;
305-
bool deflated = (flags & 4 /*FLAG_RSV1*/) != 0;
306-
if (deflated) {
307-
LOGE("deflated message unimplemented");
308-
delegate()->OnFailure(self);
309-
return false;
310-
}
311-
312-
if (payloadLen == 126) {
313-
uint8_t len[2];
314-
recv(socket_guard_->Get(), (char *)&len, sizeof(len), 0);
315-
payloadLen = (len[0] << 8) | len[1];
316-
} else if (payloadLen == 127) {
317-
uint8_t len[8];
318-
recv(socket_guard_->Get(), (char *)&len, sizeof(len), 0);
319-
payloadLen = (len[4] << 24) | (len[5] << 16) | (len[6] << 8) | len[7];
320-
}
321-
322-
msg.resize(payloadLen);
323-
324-
if (recv(socket_guard_->Get(), const_cast<char *>(msg.data()), payloadLen,
325-
0) != payloadLen) {
326-
LOGE("failed to read websocket message");
327-
delegate()->OnFailure(self);
328-
return false;
329-
}
330-
return true;
66+
work_thread_.submit([this, data]() { current_task_->SendInternal(data); });
33167
}
33268

33369
} // namespace net

debug_router/native/net/websocket_client.h

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include "debug_router/native/base/socket_guard.h"
1414
#include "debug_router/native/core/message_transceiver.h"
15+
#include "debug_router/native/net/websocket_task.h"
1516
#include "debug_router/native/socket/work_thread_executor.h"
1617

1718
#if defined(_WIN32)
@@ -46,16 +47,9 @@ class WebSocketClient : public core::MessageTransceiver {
4647
private:
4748
void DisconnectInternal();
4849
void ConnectInternal(const std::string &url);
49-
void SendInternal(const std::string &data);
5050

51-
void run();
52-
bool do_connect();
53-
bool do_read(std::string &msg);
54-
55-
std::string url_;
56-
std::unique_ptr<base::SocketGuard> socket_guard_;
5751
base::WorkThreadExecutor work_thread_;
58-
std::unique_ptr<std::thread> thread_;
52+
std::unique_ptr<WebSocketTask> current_task_;
5953
};
6054
} // namespace net
6155
} // namespace debugrouter

0 commit comments

Comments
 (0)