File tree Expand file tree Collapse file tree 8 files changed +36
-11
lines changed
Expand file tree Collapse file tree 8 files changed +36
-11
lines changed Original file line number Diff line number Diff line change @@ -36,7 +36,7 @@ WebSocketClient::WebSocketClient() {}
3636
3737WebSocketClient::~WebSocketClient () { DisconnectInternal (); }
3838
39- void WebSocketClient::Init () {}
39+ void WebSocketClient::Init () { work_thread_. init (); }
4040
4141bool WebSocketClient::Connect (const std::string &url) {
4242 LOGI (" WebSocketClient::Connect" );
@@ -51,6 +51,8 @@ bool WebSocketClient::Connect(const std::string &url) {
5151void WebSocketClient::ConnectInternal (const std::string &url) {
5252 LOGI (" WebSocketClient::ConnectInternal: use " << url << " to connect." );
5353 current_task_ = std::make_unique<WebSocketTask>(shared_from_this (), url);
54+ current_task_->init ();
55+ current_task_->Start ();
5456}
5557
5658void WebSocketClient::Disconnect () {
Original file line number Diff line number Diff line change @@ -44,9 +44,7 @@ WebSocketTask::WebSocketTask(
4444 const std::string &url)
4545 : transceiver_(transceiver),
4646 url_ (url),
47- socket_guard_(std::make_unique<base::SocketGuard>(kInvalidSocket )) {
48- submit ([this ]() { start (); });
49- }
47+ socket_guard_(std::make_unique<base::SocketGuard>(kInvalidSocket )) {}
5048
5149WebSocketTask::~WebSocketTask () { shutdown (); }
5250
@@ -98,7 +96,11 @@ void WebSocketTask::SendInternal(const std::string &data) {
9896 LOGI (" send: prefix_len and buf success." );
9997}
10098
101- void WebSocketTask::start () {
99+ void WebSocketTask::Start () {
100+ submit ([this ]() { StartInternal (); });
101+ }
102+
103+ void WebSocketTask::StartInternal () {
102104 if (!do_connect ()) {
103105 onFailure ();
104106 return ;
Original file line number Diff line number Diff line change @@ -19,10 +19,11 @@ class WebSocketTask : public base::WorkThreadExecutor {
1919 virtual ~WebSocketTask () override ;
2020
2121 void Stop ();
22+ void Start ();
2223 void SendInternal (const std::string &data);
2324
2425 private:
25- void start ();
26+ void StartInternal ();
2627
2728 bool do_connect ();
2829
Original file line number Diff line number Diff line change @@ -94,10 +94,11 @@ void SocketServerPosix::Start() {
9494 NotifyInit (GetErrorMessage (), " accept socket error" );
9595 return ;
9696 }
97- auto current_usb_client_ = std::make_shared<UsbClient>(accept_socket_fd);
97+ auto temp_usb_client = std::make_shared<UsbClient>(accept_socket_fd);
9898 std::shared_ptr<ClientListener> listener =
9999 std::make_shared<ClientListener>(shared_from_this ());
100- current_usb_client_->StartUp (listener);
100+ temp_usb_client->Init ();
101+ temp_usb_client->StartUp (listener);
101102}
102103
103104void SocketServerPosix::CloseSocket (int socket_fd) {
Original file line number Diff line number Diff line change @@ -55,6 +55,8 @@ void UsbClient::SetConnectStatus(USBConnectStatus status) {
5555 });
5656}
5757
58+ void UsbClient::Init () { work_thread_.init (); }
59+
5860void UsbClient::StartUp (const std::shared_ptr<UsbClientListener> &listener) {
5961 LOGI (" UsbClient: StartUp." );
6062 work_thread_.submit ([client_ptr = shared_from_this (), listener]() {
Original file line number Diff line number Diff line change @@ -23,6 +23,7 @@ static const char *kMessageQuit = "quit";
2323// Client of socket_server
2424class UsbClient : public std ::enable_shared_from_this<UsbClient> {
2525 public:
26+ void Init ();
2627 // below three functions work only on one work thread
2728 void StartUp (const std::shared_ptr<UsbClientListener> &listener);
2829 // true means the message are added to message queue
Original file line number Diff line number Diff line change 99namespace debugrouter {
1010namespace base {
1111
12- WorkThreadExecutor::WorkThreadExecutor () : is_shut_down(false ) {
13- worker = std::make_unique<std::thread>([this ]() { run (); });
12+ WorkThreadExecutor::WorkThreadExecutor ()
13+ : is_shut_down(false ), alive_flag(std::make_shared<bool >(true )) {}
14+
15+ void WorkThreadExecutor::init () {
16+ std::lock_guard<std::mutex> lock (task_mtx);
17+ if (!worker) {
18+ worker = std::make_unique<std::thread>([this ]() { run (); });
19+ }
1420}
1521
1622WorkThreadExecutor::~WorkThreadExecutor () { shutdown (); }
@@ -62,7 +68,15 @@ void WorkThreadExecutor::shutdown() {
6268}
6369
6470void WorkThreadExecutor::run () {
65- while (!is_shut_down) {
71+ std::weak_ptr<bool > weak_flag = alive_flag;
72+ while (true ) {
73+ auto flag = weak_flag.lock ();
74+ if (!flag) {
75+ break ;
76+ }
77+ if (is_shut_down) {
78+ break ;
79+ }
6680 std::unique_lock<std::mutex> lock (task_mtx);
6781 cond.wait (lock, [this ] { return !tasks.empty () || is_shut_down; });
6882 if (is_shut_down) {
Original file line number Diff line number Diff line change @@ -19,6 +19,7 @@ class WorkThreadExecutor {
1919 WorkThreadExecutor ();
2020 virtual ~WorkThreadExecutor ();
2121
22+ void init ();
2223 void submit (std::function<void ()> task);
2324 void shutdown ();
2425
@@ -30,6 +31,7 @@ class WorkThreadExecutor {
3031 std::queue<std::function<void ()>> tasks;
3132 std::mutex task_mtx;
3233 std::condition_variable cond;
34+ std::shared_ptr<bool > alive_flag;
3335};
3436
3537} // namespace base
You can’t perform that action at this time.
0 commit comments