Skip to content

Commit 8aaf84b

Browse files
committed
add subscribe/unsubscribe method
1 parent 0cfa5f1 commit 8aaf84b

File tree

6 files changed

+133
-8
lines changed

6 files changed

+133
-8
lines changed

zorro_websocket_proxy/src/websocket.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ namespace websocket {
1515
uint32_t id_ = 0;
1616
DWORD initiator_ = 0;
1717
std::unordered_set<DWORD> clients_;
18+
std::unordered_map<std::string, uint32_t> subscriptions_;
1819

1920
enum Status : uint8_t {
2021
CONNECTING,
@@ -53,16 +54,19 @@ namespace websocket {
5354
return false;
5455
}
5556

56-
auto retry = 150;
57-
while (retry--) {
57+
lwsl_user("Wait for connected.\n");
58+
59+
auto start = get_timestamp();
60+
do {
5861
auto status = status_.load(std::memory_order_relaxed);
5962
if (status != Status::CONNECTING) {
6063
break;
6164
}
6265
if (!proxy_->sendHeartbeat()) {
63-
std::this_thread::sleep_for(std::chrono::milliseconds(100));
66+
std::this_thread::yield();
67+
//std::this_thread::sleep_for(std::chrono::milliseconds(100));
6468
}
65-
}
69+
} while ((get_timestamp() - start) < 15000);
6670
return status_ == Status::CONNECTED;
6771
}
6872

zorro_websocket_proxy/src/zorro_websocket_proxy.cpp

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,13 @@ void ZorroWebsocketProxy::handleClientMessage(Message& msg) {
205205
case Message::Type::WsRequest:
206206
sendWsRequest(msg);
207207
break;
208+
case Message::Type::Subscribe:
209+
handleSubscribe(msg);
210+
break;
211+
212+
case Message::Type::Unsubscribe:
213+
handleUnsubscribe(msg);
214+
break;
208215

209216
case Message::Type::WsData:
210217
case Message::Type::WsError:
@@ -284,7 +291,7 @@ void ZorroWebsocketProxy::openWs(Message& msg) {
284291
req->id = id;
285292
req->new_connection = (state == Websocket::Status::CONNECTING);
286293
onWsOpened(id, msg.pid);
287-
lwsl_user("Websocket %s already opened. id=%d, new=%d\n", req->url, id, req->new_connection);
294+
lwsl_user("Websocket %s already opened. id=%d, new=%d, client=%d\n", req->url, id, req->new_connection, msg.pid);
288295
msg.status.store(Message::Status::SUCCESS, std::memory_order_release);
289296
return;
290297
}
@@ -299,7 +306,7 @@ void ZorroWebsocketProxy::openWs(Message& msg) {
299306
}
300307

301308
void ZorroWebsocketProxy::openNewWs(Message& msg, WsOpen* req) {
302-
lwsl_user("Opening ws %s\n", req->url);
309+
lwsl_user("Opening ws %s, clinet=%d\n", req->url, msg.pid);
303310
req->new_connection = true;
304311
auto websocket = std::make_shared<Websocket>(this, pid_ * 10000 + (++websocket_id_), req->url);
305312
auto b = websocket->open(msg.pid);
@@ -343,6 +350,70 @@ void ZorroWebsocketProxy::closeWs(uint32_t id, DWORD pid) {
343350
lwsl_user("Close ws. socket not found id=%d\n", id);
344351
}
345352
}
353+
void ZorroWebsocketProxy::handleSubscribe(Message& msg) {
354+
auto req = reinterpret_cast<WsSubscription*>(msg.data);
355+
auto client = getClient(msg.pid);
356+
if (client) {
357+
lwsl_user("Subscribe %s client=%d ws_id=%d\n", req->symbol, msg.pid, req->id);
358+
auto it = websocketsById_.find(req->id);
359+
if (it != websocketsById_.end()) {
360+
auto& subscriptions = it->second->subscriptions_;
361+
auto sub_it = subscriptions.find(req->symbol);
362+
if (sub_it == subscriptions.end()) {
363+
subscriptions.emplace(req->symbol, 1);
364+
if (it->second->send(req->request, req->request_len)) {
365+
msg.status.store(Message::Status::SUCCESS, std::memory_order_release);
366+
return;
367+
}
368+
}
369+
else {
370+
++sub_it->second;
371+
req->existing = true;
372+
msg.status.store(Message::Status::SUCCESS, std::memory_order_release);
373+
return;
374+
}
375+
}
376+
else {
377+
lwsl_user("Websocket not found. id=\n", req->id);
378+
}
379+
}
380+
else {
381+
lwsl_user("Client not found. pid=\n", msg.pid);
382+
}
383+
msg.status.store(Message::Status::FAILED, std::memory_order_release);
384+
}
385+
386+
void ZorroWebsocketProxy::handleUnsubscribe(Message& msg) {
387+
auto req = reinterpret_cast<WsSubscription*>(msg.data);
388+
auto client = getClient(msg.pid);
389+
if (client) {
390+
lwsl_user("Unsubscribe %s client=%d ws_id=%d\n", req->symbol, msg.pid, req->id);
391+
auto it = websocketsById_.find(req->id);
392+
if (it != websocketsById_.end()) {
393+
auto& subscriptions = it->second->subscriptions_;
394+
auto sub_it = subscriptions.find(req->symbol);
395+
if (sub_it != subscriptions.end()) {
396+
if (--sub_it->second == 0) {
397+
subscriptions.erase(sub_it);
398+
if (!it->second->send(req->request, req->request_len)) {
399+
msg.status.store(Message::Status::FAILED, std::memory_order_release);
400+
return;
401+
}
402+
}
403+
}
404+
else {
405+
lwsl_user("Subscription not find. symbol=%s ws_id=%d\n", req->symbol, req->id);
406+
}
407+
}
408+
else {
409+
lwsl_user("Websocket not found. id=\n", req->id);
410+
}
411+
}
412+
else {
413+
lwsl_user("Client not found. pid=\n", msg.pid);
414+
}
415+
msg.status.store(Message::Status::SUCCESS, std::memory_order_release);
416+
}
346417

347418
void ZorroWebsocketProxy::sendWsRequest(Message& msg) {
348419
auto req = reinterpret_cast<WsRequest*>(msg.data);
@@ -352,6 +423,9 @@ void ZorroWebsocketProxy::sendWsRequest(Message& msg) {
352423
auto it = websocketsById_.find(req->id);
353424
if (it != websocketsById_.end()) {
354425
if (it->second->send(req->data, req->len)) {
426+
#ifdef _DEBUG
427+
lwsl_user("--> %.*s\n", req->len, req->data);
428+
#endif
355429
msg.status.store(Message::Status::SUCCESS, std::memory_order_release);
356430
return;
357431
}
@@ -413,6 +487,7 @@ bool ZorroWebsocketProxy::sendHeartbeat(uint64_t now) {
413487
if ((now - last_heartbeat_time_) > HEARTBEAT_INTERVAL) {
414488
auto [msg, index, size] = reserveMessage();
415489
msg->type = Message::Type::Heartbeat;
490+
lwsl_user(".\n");
416491
sendMessage(index, size, now);
417492
return true;
418493
}
@@ -481,5 +556,7 @@ void ZorroWebsocketProxy::onWsData(uint32_t id, const char* data, size_t len, si
481556
memcpy(d->data, data, len);
482557
}
483558
sendMessage(index, size);
484-
//lwsl_user("<-- %.*s\n", len, data);
559+
#ifdef _DEBUG
560+
lwsl_user("<-- %.*s\n", len, data);
561+
#endif
485562
}

zorro_websocket_proxy/src/zorro_websocket_proxy.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ namespace websocket {
5858
void closeWs(Message& msg);
5959
void closeWs(uint32_t id, DWORD pid);
6060
void sendWsRequest(Message& msg);
61+
void handleSubscribe(Message& msg);
62+
void handleUnsubscribe(Message& msg);
6163
ClientInfo* getClient(DWORD pid);
6264
bool checkHeartbeats();
6365
bool sendHeartbeat();

zorro_websocket_proxy_client/include/types.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace websocket {
1010
#define CLIENT_TO_SERVER_QUEUE "Global\\ZorroWebsocketProxy_client_server"
1111
#define SERVER_TO_CLIENT_QUEUE "Global\\ZorroWebsocketProxy_server_client"
1212
#define HEARTBEAT_INTERVAL 500 // 500ms
13-
#define HEARTBEAT_TIMEOUT 2000 // 2s
13+
#define HEARTBEAT_TIMEOUT 15000 // 15s
1414

1515
#pragma pack(1)
1616
struct Message {
@@ -23,6 +23,8 @@ namespace websocket {
2323
WsRequest,
2424
WsData,
2525
WsError,
26+
Subscribe,
27+
Unsubscribe,
2628
};
2729

2830
enum Status : uint8_t {
@@ -57,6 +59,14 @@ namespace websocket {
5759
uint32_t id;
5860
};
5961

62+
struct WsSubscription {
63+
char symbol[256];
64+
uint32_t id;
65+
size_t request_len;
66+
bool existing;
67+
char request[0];
68+
};
69+
6070
struct WsRequest {
6171
uint32_t id;
6272
size_t len;

zorro_websocket_proxy_client/include/zorro_websocket_proxy_client.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ namespace websocket {
5151

5252
std::pair<uint32_t, bool> openWebSocket(const std::string& url);
5353
bool closeWebSocket(uint32_t id = 0);
54+
bool subscribe(uint32_t id, const std::string& symbol, const char* subscription_request, uint32_t request_len, bool& existing);
55+
bool unsubscribe(uint32_t id, const std::string& symbol, const char* unsubscription_request, uint32_t request_len);
5456
void send(uint32_t id, const char* msg, size_t len);
5557

5658
private:

zorro_websocket_proxy_client/src/zorro_websocket_proxy_client.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,35 @@ bool ZorroWebsocketProxyClient::closeWebSocket(uint32_t id) {
208208
return true;
209209
}
210210

211+
bool ZorroWebsocketProxyClient::subscribe(uint32_t id, const std::string& symbol, const char* subscription_request, uint32_t request_len, bool& existing) {
212+
auto [msg, index, size] = reserveMessage<WsSubscription>(request_len);
213+
msg->type = Message::Type::Subscribe;
214+
auto req = reinterpret_cast<WsSubscription*>(msg->data);
215+
req->request_len = request_len;
216+
req->id = id;
217+
memcpy(&req->symbol[0], symbol.c_str(), symbol.size());
218+
memcpy(req->request, subscription_request, request_len);
219+
sendMessage(msg, index, size);
220+
if (!waitForResponse(msg)) {
221+
log_(L_DEBUG, "Subscribe " + symbol + " timedout");
222+
return false;
223+
}
224+
return true;
225+
}
226+
227+
bool ZorroWebsocketProxyClient::unsubscribe(uint32_t id, const std::string& symbol, const char* unsubscription_request, uint32_t request_len) {
228+
auto [msg, index, size] = reserveMessage<WsSubscription>(request_len);
229+
msg->type = Message::Type::Unsubscribe;
230+
auto req = reinterpret_cast<WsSubscription*>(msg->data);
231+
req->request_len = request_len;
232+
req->id = id;
233+
req->existing = false;
234+
memcpy(&req->symbol[0], symbol.c_str(), symbol.size());
235+
memcpy(req->request, unsubscription_request, request_len);
236+
sendMessage(msg, index, size);
237+
return true;
238+
}
239+
211240
void ZorroWebsocketProxyClient::send(uint32_t id, const char* data, size_t len) {
212241
auto [msg, index, size] = reserveMessage<WsRequest>(len);
213242
msg->type = Message::Type::WsRequest;
@@ -230,6 +259,7 @@ void ZorroWebsocketProxyClient::doWork() {
230259
auto now = get_timestamp();
231260
auto result = server_queue_->read(server_queue_index_);
232261
if (result.first) {
262+
log_(L_DEBUG, ".");
233263
auto msg = reinterpret_cast<Message*>(result.first);
234264
//log_(L_DEBUG, std::to_string(msg->type));
235265
last_server_heartbeat_time_ = now;

0 commit comments

Comments
 (0)