|
| 1 | +From d36d0cece07bf2160d3f945f5977854644429ce7 Mon Sep 17 00:00:00 2001 |
| 2 | +From: Chen Li1 < [email protected]> |
| 3 | +Date: Fri, 24 Apr 2020 10:42:27 +0800 |
| 4 | +Subject: [PATCH] Change IOThreadPool as thread pool for g_main_loop in |
| 5 | + LibNiceConnection |
| 6 | + |
| 7 | +--- |
| 8 | + erizo/src/erizo/DtlsTransport.cpp | 8 ++--- |
| 9 | + erizo/src/erizo/LibNiceConnection.cpp | 30 +++++-------------- |
| 10 | + erizo/src/erizo/LibNiceConnection.h | 6 ++-- |
| 11 | + erizo/src/erizo/thread/IOWorker.cpp | 56 +++++++++++------------------------ |
| 12 | + erizo/src/erizo/thread/IOWorker.h | 11 +++---- |
| 13 | + 5 files changed, 38 insertions(+), 73 deletions(-) |
| 14 | + |
| 15 | +diff --git a/erizo/src/erizo/DtlsTransport.cpp b/erizo/src/erizo/DtlsTransport.cpp |
| 16 | +index a11533a..8a6539e 100644 |
| 17 | +--- a/erizo/src/erizo/DtlsTransport.cpp |
| 18 | ++++ b/erizo/src/erizo/DtlsTransport.cpp |
| 19 | +@@ -107,12 +107,8 @@ DtlsTransport::DtlsTransport(MediaType med, const std::string &transport_name, c |
| 20 | + iceConfig_.ice_components = comps; |
| 21 | + iceConfig_.username = username; |
| 22 | + iceConfig_.password = password; |
| 23 | +- if (iceConfig_.use_nicer) { |
| 24 | +- // ice_ = NicerConnection::create(io_worker_, this, iceConfig_); |
| 25 | +- ice_.reset(LibNiceConnection::create(iceConfig_)); |
| 26 | +- } else { |
| 27 | +- ice_.reset(LibNiceConnection::create(iceConfig_)); |
| 28 | +- } |
| 29 | ++ // We only use libnice connection |
| 30 | ++ ice_.reset(LibNiceConnection::create(iceConfig_, io_worker_)); |
| 31 | + rtp_timeout_checker_.reset(new TimeoutChecker(this, dtlsRtp.get())); |
| 32 | + if (!rtcp_mux) { |
| 33 | + rtcp_timeout_checker_.reset(new TimeoutChecker(this, dtlsRtcp.get())); |
| 34 | +diff --git a/erizo/src/erizo/LibNiceConnection.cpp b/erizo/src/erizo/LibNiceConnection.cpp |
| 35 | +index 2f1d3cc..3c4d777 100644 |
| 36 | +--- a/erizo/src/erizo/LibNiceConnection.cpp |
| 37 | ++++ b/erizo/src/erizo/LibNiceConnection.cpp |
| 38 | +@@ -60,12 +60,15 @@ void cb_new_selected_pair(NiceAgent *agent, guint stream_id, guint component_id, |
| 39 | + conn->updateComponentState(component_id, IceState::READY); |
| 40 | + } |
| 41 | + |
| 42 | +-LibNiceConnection::LibNiceConnection(boost::shared_ptr<LibNiceInterface> libnice, const IceConfig& ice_config) |
| 43 | ++LibNiceConnection::LibNiceConnection(boost::shared_ptr<LibNiceInterface> libnice, const IceConfig& ice_config, |
| 44 | ++ std::shared_ptr<IOWorker> worker) |
| 45 | + : IceConnection{ice_config}, |
| 46 | +- lib_nice_{libnice}, agent_{NULL}, loop_{NULL}, candsDelivered_{0}, receivedLastCandidate_{false} { |
| 47 | ++ lib_nice_{libnice}, agent_{NULL}, context_{NULL}, loop_{NULL}, candsDelivered_{0}, receivedLastCandidate_{false} { |
| 48 | + #if !GLIB_CHECK_VERSION(2, 35, 0) |
| 49 | + g_type_init(); |
| 50 | + #endif |
| 51 | ++ context_ = worker->getMainContext(); |
| 52 | ++ loop_ = worker->getMainLoop(); |
| 53 | + } |
| 54 | + |
| 55 | + LibNiceConnection::~LibNiceConnection() { |
| 56 | +@@ -79,10 +82,6 @@ void LibNiceConnection::close() { |
| 57 | + } |
| 58 | + ELOG_DEBUG("%s message:closing", toLog()); |
| 59 | + this->updateIceState(IceState::FINISHED); |
| 60 | +- if (loop_ != NULL) { |
| 61 | +- ELOG_DEBUG("%s message:main loop quit", toLog()); |
| 62 | +- g_main_loop_quit(loop_); |
| 63 | +- } |
| 64 | + cond_.notify_one(); |
| 65 | + listener_.reset(); |
| 66 | + boost::system_time const timeout = boost::get_system_time() + boost::posix_time::milliseconds(5); |
| 67 | +@@ -91,21 +90,11 @@ void LibNiceConnection::close() { |
| 68 | + ELOG_DEBUG("%s message: interrupt thread to close, this: %p", toLog(), this); |
| 69 | + m_Thread_.interrupt(); |
| 70 | + } |
| 71 | +- if (loop_ != NULL) { |
| 72 | +- ELOG_DEBUG("%s message:Unrefing loop", toLog()); |
| 73 | +- g_main_loop_unref(loop_); |
| 74 | +- loop_ = NULL; |
| 75 | +- } |
| 76 | + if (agent_ != NULL) { |
| 77 | + ELOG_DEBUG("%s message: unrefing agent", toLog()); |
| 78 | + g_object_unref(agent_); |
| 79 | + agent_ = NULL; |
| 80 | + } |
| 81 | +- if (context_ != NULL) { |
| 82 | +- ELOG_DEBUG("%s message: Unrefing context", toLog()); |
| 83 | +- g_main_context_unref(context_); |
| 84 | +- context_ = NULL; |
| 85 | +- } |
| 86 | + ELOG_DEBUG("%s message: closed, this: %p", toLog(), this); |
| 87 | + } |
| 88 | + |
| 89 | +@@ -143,13 +132,10 @@ void LibNiceConnection::start() { |
| 90 | + if (this->checkIceState() != INITIAL) { |
| 91 | + return; |
| 92 | + } |
| 93 | +- context_ = g_main_context_new(); |
| 94 | + ELOG_DEBUG("%s message: creating Nice Agent", toLog()); |
| 95 | + nice_debug_enable(FALSE); |
| 96 | + // Create a nice agent |
| 97 | + agent_ = lib_nice_->NiceAgentNew(context_); |
| 98 | +- loop_ = g_main_loop_new(context_, FALSE); |
| 99 | +- m_Thread_ = boost::thread(&LibNiceConnection::mainLoop, this); |
| 100 | + GValue controllingMode = { 0 }; |
| 101 | + g_value_init(&controllingMode, G_TYPE_BOOLEAN); |
| 102 | + g_value_set_boolean(&controllingMode, false); |
| 103 | +@@ -251,7 +237,7 @@ void LibNiceConnection::mainLoop() { |
| 104 | + if (agent_ == NULL || loop_ == NULL) { |
| 105 | + return; |
| 106 | + } |
| 107 | +- g_main_loop_run(loop_); |
| 108 | ++ // g_main_loop_run(loop_); |
| 109 | + ELOG_DEBUG("%s message: finished g_main_loop, this: %p", toLog(), this); |
| 110 | + } |
| 111 | + |
| 112 | +@@ -472,8 +458,8 @@ void LibNiceConnection::setReceivedLastCandidate(bool hasReceived) { |
| 113 | + this->receivedLastCandidate_ = hasReceived; |
| 114 | + } |
| 115 | + |
| 116 | +-LibNiceConnection* LibNiceConnection::create(const IceConfig& ice_config) { |
| 117 | +- return new LibNiceConnection(boost::shared_ptr<LibNiceInterface>(new LibNiceInterfaceImpl()), ice_config); |
| 118 | ++LibNiceConnection* LibNiceConnection::create(const IceConfig& ice_config, std::shared_ptr<IOWorker> worker) { |
| 119 | ++ return new LibNiceConnection(boost::shared_ptr<LibNiceInterface>(new LibNiceInterfaceImpl()), ice_config, worker); |
| 120 | + } |
| 121 | + |
| 122 | + bool LibNiceConnection::removeRemoteCandidates() { |
| 123 | +diff --git a/erizo/src/erizo/LibNiceConnection.h b/erizo/src/erizo/LibNiceConnection.h |
| 124 | +index 19f62fa..2b78168 100644 |
| 125 | +--- a/erizo/src/erizo/LibNiceConnection.h |
| 126 | ++++ b/erizo/src/erizo/LibNiceConnection.h |
| 127 | +@@ -17,6 +17,7 @@ |
| 128 | + #include "./SdpInfo.h" |
| 129 | + #include "./logger.h" |
| 130 | + #include "lib/LibNiceInterface.h" |
| 131 | ++#include "thread/IOWorker.h" |
| 132 | + |
| 133 | + typedef struct _NiceAgent NiceAgent; |
| 134 | + typedef struct _GMainContext GMainContext; |
| 135 | +@@ -41,7 +42,8 @@ class LibNiceConnection : public IceConnection { |
| 136 | + DECLARE_LOGGER(); |
| 137 | + |
| 138 | + public: |
| 139 | +- LibNiceConnection(boost::shared_ptr<LibNiceInterface> libnice, const IceConfig& ice_config); |
| 140 | ++ LibNiceConnection(boost::shared_ptr<LibNiceInterface> libnice, |
| 141 | ++ const IceConfig& ice_config, std::shared_ptr<IOWorker> worker); |
| 142 | + |
| 143 | + virtual ~LibNiceConnection(); |
| 144 | + /** |
| 145 | +@@ -62,7 +64,7 @@ class LibNiceConnection : public IceConnection { |
| 146 | + |
| 147 | + bool removeRemoteCandidates() override; |
| 148 | + |
| 149 | +- static LibNiceConnection* create(const IceConfig& ice_config); |
| 150 | ++ static LibNiceConnection* create(const IceConfig& ice_config, std::shared_ptr<IOWorker> worker); |
| 151 | + |
| 152 | + private: |
| 153 | + void mainLoop(); |
| 154 | +diff --git a/erizo/src/erizo/thread/IOWorker.cpp b/erizo/src/erizo/thread/IOWorker.cpp |
| 155 | +index 05dceef..faaa491 100644 |
| 156 | +--- a/erizo/src/erizo/thread/IOWorker.cpp |
| 157 | ++++ b/erizo/src/erizo/thread/IOWorker.cpp |
| 158 | +@@ -1,18 +1,10 @@ |
| 159 | + #include "thread/IOWorker.h" |
| 160 | + |
| 161 | +-/* |
| 162 | +-extern "C" { |
| 163 | +-#include <r_errors.h> |
| 164 | +-#include <async_wait.h> |
| 165 | +-#include <async_timer.h> |
| 166 | +-} |
| 167 | +-*/ |
| 168 | +- |
| 169 | + #include <chrono> // NOLINT |
| 170 | + |
| 171 | + using erizo::IOWorker; |
| 172 | + |
| 173 | +-IOWorker::IOWorker() : started_{false}, closed_{false} { |
| 174 | ++IOWorker::IOWorker() : started_{false}, closed_{false}, context_{NULL}, loop_{NULL} { |
| 175 | + } |
| 176 | + |
| 177 | + IOWorker::~IOWorker() { |
| 178 | +@@ -25,46 +17,34 @@ void IOWorker::start() { |
| 179 | + } |
| 180 | + |
| 181 | + void IOWorker::start(std::shared_ptr<std::promise<void>> start_promise) { |
| 182 | +- /* |
| 183 | + if (started_.exchange(true)) { |
| 184 | + return; |
| 185 | + } |
| 186 | + |
| 187 | +- thread_ = std::unique_ptr<std::thread>(new std::thread([this, start_promise] { |
| 188 | +- start_promise->set_value(); |
| 189 | +- while (!closed_) { |
| 190 | +- int events; |
| 191 | +- struct timeval towait = {0, 100000}; |
| 192 | +- struct timeval tv; |
| 193 | +- int r = NR_async_event_wait2(&events, &towait); |
| 194 | +- if (r == R_EOD) { |
| 195 | +- std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| 196 | +- } |
| 197 | +- gettimeofday(&tv, 0); |
| 198 | +- NR_async_timer_update_time(&tv); |
| 199 | +- std::vector<Task> tasks; |
| 200 | +- { |
| 201 | +- std::unique_lock<std::mutex> lock(task_mutex_); |
| 202 | +- tasks.swap(tasks_); |
| 203 | ++ if (!context_ && !loop_) { |
| 204 | ++ context_ = g_main_context_new(); |
| 205 | ++ loop_ = g_main_loop_new(context_, FALSE); |
| 206 | ++ thread_ = std::unique_ptr<std::thread>(new std::thread([this, start_promise] { |
| 207 | ++ start_promise->set_value(); |
| 208 | ++ if (!this->closed_ && this->loop_) { |
| 209 | ++ g_main_loop_run(this->loop_); |
| 210 | + } |
| 211 | +- for (Task &task : tasks) { |
| 212 | +- task(); |
| 213 | +- } |
| 214 | +- } |
| 215 | +- })); |
| 216 | +- */ |
| 217 | +-} |
| 218 | +- |
| 219 | +-void IOWorker::task(Task f) { |
| 220 | +- std::unique_lock<std::mutex> lock(task_mutex_); |
| 221 | +- tasks_.push_back(f); |
| 222 | ++ })); |
| 223 | ++ } |
| 224 | + } |
| 225 | + |
| 226 | + void IOWorker::close() { |
| 227 | + if (!closed_.exchange(true)) { |
| 228 | ++ if (context_ && loop_) { |
| 229 | ++ g_main_loop_quit(loop_); |
| 230 | ++ g_main_loop_unref(loop_); |
| 231 | ++ g_main_context_unref(context_); |
| 232 | ++ loop_ = NULL; |
| 233 | ++ context_ = NULL; |
| 234 | ++ } |
| 235 | + if (thread_ != nullptr) { |
| 236 | + thread_->join(); |
| 237 | ++ thread_ = nullptr; |
| 238 | + } |
| 239 | +- tasks_.clear(); |
| 240 | + } |
| 241 | + } |
| 242 | +diff --git a/erizo/src/erizo/thread/IOWorker.h b/erizo/src/erizo/thread/IOWorker.h |
| 243 | +index 93a7b4f..814c6b7 100644 |
| 244 | +--- a/erizo/src/erizo/thread/IOWorker.h |
| 245 | ++++ b/erizo/src/erizo/thread/IOWorker.h |
| 246 | +@@ -3,16 +3,15 @@ |
| 247 | + |
| 248 | + #include <atomic> |
| 249 | + #include <memory> |
| 250 | +-#include <mutex> // NOLINT |
| 251 | + #include <future> // NOLINT |
| 252 | + #include <thread> // NOLINT |
| 253 | + #include <vector> |
| 254 | ++#include <glib.h> |
| 255 | + |
| 256 | + namespace erizo { |
| 257 | + |
| 258 | + class IOWorker : public std::enable_shared_from_this<IOWorker> { |
| 259 | + public: |
| 260 | +- typedef std::function<void()> Task; |
| 261 | + IOWorker(); |
| 262 | + ~IOWorker(); |
| 263 | + |
| 264 | +@@ -20,14 +19,16 @@ class IOWorker : public std::enable_shared_from_this<IOWorker> { |
| 265 | + virtual void start(std::shared_ptr<std::promise<void>> start_promise); |
| 266 | + virtual void close(); |
| 267 | + |
| 268 | +- virtual void task(Task f); |
| 269 | ++ GMainContext* getMainContext() { return context_; } |
| 270 | ++ GMainLoop* getMainLoop() { return loop_; } |
| 271 | + |
| 272 | + private: |
| 273 | + std::atomic<bool> started_; |
| 274 | + std::atomic<bool> closed_; |
| 275 | + std::unique_ptr<std::thread> thread_; |
| 276 | +- std::vector<Task> tasks_; |
| 277 | +- mutable std::mutex task_mutex_; |
| 278 | ++ |
| 279 | ++ GMainContext* context_; |
| 280 | ++ GMainLoop* loop_; |
| 281 | + }; |
| 282 | + } // namespace erizo |
| 283 | + |
| 284 | +-- |
| 285 | +2.7.4 |
| 286 | + |
0 commit comments