Skip to content

Commit 57d9025

Browse files
committed
Merge branch 'dev_tcp_client_chengxin' into 'feature_tcp_client'
【ADD】提供给openapi使用的基于长连接的客户端类封装 See merge request server/openapi/openapi-cpp-sdk!14
2 parents ddd8d61 + e8822b0 commit 57d9025

File tree

2 files changed

+221
-0
lines changed

2 files changed

+221
-0
lines changed

include/tigerapi/push_client.h

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#ifndef PUSH_CLIENT_H
2+
#define PUSH_CLIENT_H
3+
#include <memory>
4+
#include <functional>
5+
#include <thread>
6+
#include "boost/asio/io_service.hpp"
7+
#include "tigerapi/win32.h"
8+
#include "openapi_pb/pb_source/Request.pb.h"
9+
#include "openapi_pb/pb_source/Response.pb.h"
10+
#include "openapi_pb/pb_source/AssetData.pb.h"
11+
12+
namespace TIGER_API
13+
{
14+
class PushSocket;
15+
class ClientConfig;
16+
17+
class OPENAPI_EXPORT PushClient : public std::enable_shared_from_this<PushClient>
18+
{
19+
public:
20+
static std::shared_ptr<PushClient> create_push_client(const TIGER_API::ClientConfig& client_config);
21+
~PushClient();
22+
PushClient(const PushClient&) = delete;
23+
PushClient& operator=(const PushClient&) = delete;
24+
private:
25+
PushClient();
26+
public:
27+
void connect(const TIGER_API::ClientConfig& client_config);
28+
void disconnect();
29+
30+
void set_connected_callback(const std::function<void()>& cb);
31+
void set_disconnected_callback(const std::function<void()>& cb);
32+
void set_inner_error_callback(const std::function<void(std::string)>& cb);
33+
34+
void set_asset_changed_callback(const std::function<void(const tigeropen::push::pb::AssetData&)>& cb);
35+
bool subscribe_asset(const std::string& account);
36+
bool unsubscribe_asset(const std::string& account);
37+
38+
//TODO:other
39+
40+
private:
41+
bool send_frame(const tigeropen::push::pb::Request& request);
42+
void do_write(const std::string& frame);
43+
void do_disconnect();
44+
void on_message(const std::shared_ptr<tigeropen::push::pb::Response>& response_pb_object);
45+
private:
46+
std::function<void(const tigeropen::push::pb::AssetData&)> asset_changed_;
47+
private:
48+
boost::asio::io_service io_service_;
49+
std::shared_ptr<TIGER_API::PushSocket> socket_;
50+
std::shared_ptr<std::thread> worker_thread_;
51+
};
52+
}
53+
54+
#endif // PUSH_CLIENT_H

src/push_client.cpp

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
#include "tigerapi/push_client.h"
2+
#include "tigerapi/client_config.h"
3+
#include "tigerapi/push_socket/push_socket.h"
4+
#include "openapi_pb\pb_source\PushData.pb.h"
5+
#include <iostream>
6+
#include "google/protobuf/util/json_util.h"
7+
8+
std::shared_ptr<TIGER_API::PushClient> TIGER_API::PushClient::create_push_client(const TIGER_API::ClientConfig& client_config)
9+
{
10+
return std::shared_ptr<TIGER_API::PushClient>(new TIGER_API::PushClient());
11+
}
12+
13+
TIGER_API::PushClient::~PushClient()
14+
{
15+
io_service_.stop();
16+
17+
if (worker_thread_)
18+
{
19+
worker_thread_->join();
20+
}
21+
}
22+
23+
TIGER_API::PushClient::PushClient()
24+
{
25+
26+
}
27+
28+
void TIGER_API::PushClient::connect(const TIGER_API::ClientConfig& client_config)
29+
{
30+
LOG(INFO) << "create a worker thread to perform asynchronous network connections";
31+
//启动工作线程
32+
worker_thread_ = std::shared_ptr<std::thread>(new std::thread([this, client_config]
33+
{
34+
socket_ = PushSocket::create_push_socket(&io_service_, client_config);
35+
socket_->connect();
36+
37+
LOG(INFO) << "io_service run on work thread";
38+
io_service_.run();
39+
}));
40+
}
41+
42+
void TIGER_API::PushClient::disconnect()
43+
{
44+
//跨线程调用,需要异步投递任务
45+
io_service_.post(boost::bind(&PushClient::do_disconnect, this));
46+
}
47+
48+
void TIGER_API::PushClient::set_connected_callback(const std::function<void()>& cb)
49+
{
50+
if (socket_)
51+
{
52+
socket_->set_connected_callback(cb);
53+
}
54+
}
55+
56+
void TIGER_API::PushClient::set_disconnected_callback(const std::function<void()>& cb)
57+
{
58+
if (socket_)
59+
{
60+
socket_->set_disconnected_callback(cb);
61+
}
62+
}
63+
64+
void TIGER_API::PushClient::set_inner_error_callback(const std::function<void(std::string)>& cb)
65+
{
66+
if (socket_)
67+
{
68+
socket_->set_inner_error_callback(cb);
69+
}
70+
}
71+
72+
void TIGER_API::PushClient::set_asset_changed_callback(const std::function<void(const tigeropen::push::pb::AssetData&)>& cb)
73+
{
74+
asset_changed_ = cb;
75+
}
76+
77+
bool TIGER_API::PushClient::subscribe_asset(const std::string& account)
78+
{
79+
if (!socket_)
80+
{
81+
return false;
82+
}
83+
84+
tigeropen::push::pb::Request request;
85+
request.set_command(tigeropen::push::pb::SocketCommon_Command_SUBSCRIBE);
86+
request.set_id(socket_->get_next_id());
87+
88+
tigeropen::push::pb::Request_Subscribe* subscribe = request.mutable_subscribe();
89+
subscribe->set_datatype(tigeropen::push::pb::SocketCommon_DataType_Asset);
90+
if (!account.empty())
91+
{
92+
subscribe->set_account(account.c_str());
93+
}
94+
95+
send_frame(request);
96+
97+
return true;
98+
}
99+
100+
bool TIGER_API::PushClient::unsubscribe_asset(const std::string& account)
101+
{
102+
if (!socket_)
103+
{
104+
return false;
105+
}
106+
107+
tigeropen::push::pb::Request request;
108+
request.set_command(tigeropen::push::pb::SocketCommon_Command_UNSUBSCRIBE);
109+
request.set_id(socket_->get_next_id());
110+
111+
tigeropen::push::pb::Request_Subscribe* subscribe = request.mutable_subscribe();
112+
subscribe->set_datatype(tigeropen::push::pb::SocketCommon_DataType_Asset);
113+
if (!account.empty())
114+
{
115+
subscribe->set_account(account.c_str());
116+
}
117+
118+
send_frame(request);
119+
120+
return true;
121+
}
122+
123+
bool TIGER_API::PushClient::send_frame(const tigeropen::push::pb::Request& request)
124+
{
125+
//序列化pb对象到字符串
126+
std::string packed_frame = request.SerializeAsString();
127+
if (packed_frame.empty())
128+
{
129+
return false;
130+
}
131+
132+
std::string packed_frame_json;
133+
google::protobuf::util::JsonPrintOptions options;
134+
MessageToJsonString(request, &packed_frame_json, options).ok();
135+
136+
LOG(DEBUG) << "send frame:" << packed_frame_json;
137+
138+
//跨线程,异步投递任务
139+
io_service_.post(boost::bind(&PushClient::do_write, this, packed_frame));
140+
141+
return true;
142+
}
143+
144+
void TIGER_API::PushClient::do_write(const std::string& frame)
145+
{
146+
if (socket_)
147+
{
148+
socket_->send_message(frame);
149+
}
150+
}
151+
152+
void TIGER_API::PushClient::do_disconnect()
153+
{
154+
if (socket_)
155+
{
156+
socket_->disconnect();
157+
}
158+
}
159+
160+
void TIGER_API::PushClient::on_message(const std::shared_ptr<tigeropen::push::pb::Response>& response_pb_object)
161+
{
162+
if (response_pb_object->body().datatype() == tigeropen::push::pb::SocketCommon_DataType_Asset && asset_changed_)
163+
{
164+
asset_changed_(response_pb_object->body().assetdata());
165+
}
166+
//TODO:other message type
167+
}

0 commit comments

Comments
 (0)