Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 4b58cb6

Browse files
authored
Use fixed thread number for raw transport (#690)
1 parent 7156c23 commit 4b58cb6

File tree

4 files changed

+28
-26
lines changed

4 files changed

+28
-26
lines changed

source/agent/addons/internalIO/binding.gyp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
'../../../core/owt_base/MediaFramePipeline.cpp',
1313
'../../../core/owt_base/RawTransport.cpp',
1414
'../../../core/owt_base/SctpTransport.cpp',
15+
'../../../core/common/IOService.cpp',
1516
],
1617
'include_dirs': [
1718
'$(CORE_HOME)/common',

source/core/common/IOService.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77

88
namespace owt_base {
99

10+
static constexpr uint32_t kServiceNum = 4;
11+
static boost::mutex g_serviceMutex;
12+
static std::vector<std::shared_ptr<IOService>> g_services;
13+
1014
IOService::IOService()
1115
: m_count(0)
1216
, m_service()
@@ -33,7 +37,14 @@ void IOService::post(std::function<void()> task)
3337

3438
std::shared_ptr<IOService> getIOService()
3539
{
36-
return std::make_shared<IOService>();
40+
boost::mutex::scoped_lock lock(g_serviceMutex);
41+
if (g_services.empty()) {
42+
for (size_t i = 0; i < kServiceNum; i++) {
43+
g_services.push_back(std::make_shared<IOService>());
44+
}
45+
}
46+
int i = std::rand()/((RAND_MAX + 1u)/kServiceNum);
47+
return g_services[i];
3748
}
3849

3950
}

source/core/owt_base/RawTransport.cpp

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ RawTransport<prot>::RawTransport(RawTransportListener* listener, size_t initialB
1818
: m_isClosing(false)
1919
, m_tag(tag)
2020
, m_bufferSize(initialBufferSize)
21+
, m_service(getIOService())
2122
, m_listener(listener)
2223
, m_receivedBytes(0)
2324
{
@@ -36,11 +37,9 @@ void RawTransport<prot>::close()
3637
if (m_isClosing)
3738
return;
3839

39-
// We need to wait for the work thread to finish its job.
4040
m_isClosing = true;
41-
m_ioService.stop();
42-
m_workThread.join();
43-
41+
// Release the service
42+
m_service.reset();
4443
boost::system::error_code ec;
4544
switch (prot) {
4645
case TCP:
@@ -71,8 +70,8 @@ void RawTransport<prot>::createConnection(const std::string& ip, uint32_t port)
7170
if (m_socket.tcp.socket) {
7271
ELOG_WARN("TCP transport existed, ignoring the connection request for ip %s port %d\n", ip.c_str(), port);
7372
} else {
74-
m_socket.tcp.socket.reset(new tcp::socket(m_ioService));
75-
tcp::resolver resolver(m_ioService);
73+
m_socket.tcp.socket.reset(new tcp::socket(m_service->service()));
74+
tcp::resolver resolver(m_service->service());
7675
tcp::resolver::query query(ip.c_str(), boost::to_string(port).c_str());
7776
tcp::resolver::iterator iterator = resolver.resolve(query);
7877
// TODO: Accept IPv6.
@@ -87,8 +86,8 @@ void RawTransport<prot>::createConnection(const std::string& ip, uint32_t port)
8786
if (m_socket.udp.socket) {
8887
ELOG_WARN("UDP transport existed, ignoring the connection request for ip %s port %d\n", ip.c_str(), port);
8988
} else {
90-
m_socket.udp.socket.reset(new udp::socket(m_ioService));
91-
udp::resolver resolver(m_ioService);
89+
m_socket.udp.socket.reset(new udp::socket(m_service->service()));
90+
udp::resolver resolver(m_service->service());
9291
udp::resolver::query query(udp::v4(), ip.c_str(), boost::to_string(port).c_str());
9392
udp::resolver::iterator iterator = resolver.resolve(query);
9493

@@ -103,9 +102,6 @@ void RawTransport<prot>::createConnection(const std::string& ip, uint32_t port)
103102
default:
104103
break;
105104
}
106-
107-
if (m_workThread.get_id() == boost::thread::id()) // Not-A-Thread
108-
m_workThread = boost::thread(boost::bind(&boost::asio::io_service::run, &m_ioService));
109105
}
110106

111107
template<Protocol prot>
@@ -179,8 +175,8 @@ void RawTransport<prot>::listenTo(uint32_t port)
179175
if (m_socket.tcp.socket) {
180176
ELOG_WARN("TCP transport existed, ignoring the listening request for port %d\n", port);
181177
} else {
182-
m_socket.tcp.socket.reset(new tcp::socket(m_ioService));
183-
m_socket.tcp.acceptor.reset(new tcp::acceptor(m_ioService, tcp::endpoint(tcp::v4(), port)));
178+
m_socket.tcp.socket.reset(new tcp::socket(m_service->service()));
179+
m_socket.tcp.acceptor.reset(new tcp::acceptor(m_service->service(), tcp::endpoint(tcp::v4(), port)));
184180
m_socket.tcp.acceptor->async_accept(*(m_socket.tcp.socket.get()),
185181
boost::bind(&RawTransport::acceptHandler, this,
186182
boost::asio::placeholders::error));
@@ -192,17 +188,14 @@ void RawTransport<prot>::listenTo(uint32_t port)
192188
if (m_socket.udp.socket) {
193189
ELOG_WARN("UDP transport existed, ignoring the listening request for port %d\n", port);
194190
} else {
195-
m_socket.udp.socket.reset(new udp::socket(m_ioService, udp::endpoint(udp::v4(), port)));
191+
m_socket.udp.socket.reset(new udp::socket(m_service->service(), udp::endpoint(udp::v4(), port)));
196192
receiveData();
197193
}
198194
break;
199195
}
200196
default:
201197
break;
202198
}
203-
204-
if (m_workThread.get_id() == boost::thread::id()) // Not-A-Thread
205-
m_workThread = boost::thread(boost::bind(&boost::asio::io_service::run, &m_ioService));
206199
}
207200

208201
template<Protocol prot>
@@ -213,15 +206,15 @@ void RawTransport<prot>::listenTo(uint32_t minPort, uint32_t maxPort)
213206
if (m_socket.tcp.socket) {
214207
ELOG_WARN("TCP transport existed, ignoring the listening request for minPort %d, maxPort %d\n", minPort, maxPort);
215208
} else {
216-
m_socket.tcp.socket.reset(new tcp::socket(m_ioService));
209+
m_socket.tcp.socket.reset(new tcp::socket(m_service->service()));
217210

218211
// find port in range
219212
uint32_t portRange = maxPort - minPort + 1;
220213
uint32_t port = rand() % portRange + minPort;
221214
boost::system::error_code ec;
222215

223216
for (uint32_t i = 0; i < portRange; i++) {
224-
m_socket.tcp.acceptor.reset(new tcp::acceptor(m_ioService));
217+
m_socket.tcp.acceptor.reset(new tcp::acceptor(m_service->service()));
225218
m_socket.tcp.acceptor->open(tcp::v4());
226219
m_socket.tcp.acceptor->bind(tcp::endpoint(tcp::v4(), port), ec);
227220

@@ -260,17 +253,14 @@ void RawTransport<prot>::listenTo(uint32_t minPort, uint32_t maxPort)
260253
ELOG_WARN("UDP transport existed, ignoring the listening request for minPort %d, maxPort %d\n", minPort, maxPort);
261254
} else {
262255
ELOG_WARN("UDP transport does not support listening in specific range.");
263-
m_socket.udp.socket.reset(new udp::socket(m_ioService, udp::endpoint(udp::v4(), 0)));
256+
m_socket.udp.socket.reset(new udp::socket(m_service->service(), udp::endpoint(udp::v4(), 0)));
264257
receiveData();
265258
}
266259
break;
267260
}
268261
default:
269262
break;
270263
}
271-
272-
if (m_workThread.get_id() == boost::thread::id()) // Not-A-Thread
273-
m_workThread = boost::thread(boost::bind(&boost::asio::io_service::run, &m_ioService));
274264
}
275265

276266
template<Protocol prot>

source/core/owt_base/RawTransport.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <boost/thread/mutex.hpp>
1313
#include <logger.h>
1414
#include <queue>
15+
#include "IOService.h"
1516

1617
namespace owt_base {
1718

@@ -93,8 +94,7 @@ class RawTransport : public RawTransportInterface {
9394
// the order of the member declarations here.
9495
// Alternatively, we may make the io_service object reference counted but it
9596
// introduces unnecessary complexity.
96-
boost::asio::io_service m_ioService;
97-
boost::thread m_workThread;
97+
std::shared_ptr<IOService> m_service;
9898
struct Socket {
9999
Socket() { }
100100
~Socket() { }

0 commit comments

Comments
 (0)