Skip to content

Commit 3386775

Browse files
RTC-15334: Fix crash caused by a race condition (#397)
1 parent 4f11ef2 commit 3386775

File tree

3 files changed

+79
-29
lines changed

3 files changed

+79
-29
lines changed

bridge/Bridge.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ Bridge::~Bridge()
9090

9191
_timers->stop();
9292

93+
if (_probeServer)
94+
{
95+
_probeServer->stop();
96+
_probeServer = nullptr;
97+
}
98+
9399
if (_backgroundJobQueue)
94100
{
95101
_backgroundJobQueue->stop();
@@ -103,11 +109,6 @@ Bridge::~Bridge()
103109

104110
_timers.reset();
105111

106-
if (_probeServer)
107-
{
108-
_probeServer->stop();
109-
}
110-
111112
uint32_t n = 0;
112113
for (auto& workerThread : _workerThreads)
113114
{
@@ -209,7 +210,7 @@ void Bridge::initialize(std::shared_ptr<transport::EndpointFactory> endpointFact
209210
return;
210211
}
211212

212-
_probeServer = std::make_unique<transport::ProbeServer>(_iceConfig, _config);
213+
_probeServer = std::make_unique<transport::ProbeServer>(_iceConfig, _config, *_rtJobManager);
213214

214215
const auto credentials = _probeServer->getCredentials();
215216

transport/ProbeServer.cpp

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,18 @@
55
#include "transport/ice/IceSession.h"
66
#include "transport/ice/Stun.h"
77
#include "utils/ContainerAlgorithms.h"
8+
#include "utils/Function.h"
89
#include "utils/Time.h"
910
#include <unistd.h>
1011

1112
namespace transport
1213
{
13-
ProbeServer::ProbeServer(const ice::IceConfig& iceConfig, const config::Config& config)
14+
ProbeServer::ProbeServer(const ice::IceConfig& iceConfig,
15+
const config::Config& config,
16+
jobmanager::JobManager& jobmanager)
1417
: _iceConfig(iceConfig),
1518
_config(config),
19+
_jobQueue(jobmanager, 1024),
1620
_queue(1024),
1721
_maintenanceThreadIsRunning(true),
1822
_maintenanceThread(new std::thread([this] { this->run(); }))
@@ -40,7 +44,7 @@ void ProbeServer::onDtlsReceived(Endpoint& endpoint,
4044
const SocketAddress& source,
4145
const SocketAddress& target,
4246
memory::UniquePacket packet,
43-
const uint64_t timestamp){};
47+
const uint64_t timestamp) {};
4448

4549
void ProbeServer::onRtcpReceived(Endpoint& endpoint,
4650
const SocketAddress& source,
@@ -56,7 +60,12 @@ void ProbeServer::onIceReceived(Endpoint& endpoint,
5660
memory::UniquePacket packet,
5761
const uint64_t timestamp)
5862
{
59-
replyStunOk(endpoint, source, std::move(packet));
63+
_jobQueue.post(utils::bind(&ProbeServer::onIceReceivedInternal,
64+
this,
65+
std::ref(endpoint),
66+
source,
67+
utils::moveParam(packet),
68+
timestamp));
6069
}
6170

6271
void ProbeServer::onRegistered(Endpoint& endpoint)
@@ -242,26 +251,24 @@ void ProbeServer::onIceTcpConnect(std::shared_ptr<Endpoint> endpoint,
242251
memory::UniquePacket packet,
243252
const uint64_t timestamp)
244253
{
245-
if (endpoint->getTransportType() == ice::TransportType::TCP)
246-
{
247-
replyStunOk(*endpoint, source, std::move(packet));
248254

249-
ProbeTcpConnection connection;
250-
connection.endpoint = endpoint;
251-
connection.timestamp = utils::Time::getAbsoluteTime();
252-
_queue.push(connection);
253-
}
255+
_jobQueue.post(utils::bind(&ProbeServer::onIceTcpConnectInternal,
256+
this,
257+
endpoint,
258+
source,
259+
utils::moveParam(packet),
260+
timestamp));
254261
}
255262

256263
// Endpoint::IStopEvents
257264
void ProbeServer::onEndpointStopped(Endpoint* endpoint) {}
258265

259-
void ProbeServer::replyStunOk(Endpoint& endpoint, const SocketAddress& destination, memory::UniquePacket packet)
266+
bool ProbeServer::replyStunOk(Endpoint& endpoint,
267+
const SocketAddress& destination,
268+
memory::UniquePacket packet,
269+
const uint64_t timestamp)
260270
{
261-
uint64_t timestamp = utils::Time::getAbsoluteTime();
262-
const void* data = packet->get();
263-
264-
auto* stunMessage = ice::StunMessage::fromPtr(data);
271+
auto* stunMessage = ice::StunMessage::fromPtr(packet->get());
265272

266273
if (stunMessage && stunMessage->isValid() && stunMessage->header.isRequest() &&
267274
stunMessage->isAuthentic(_hmacComputer))
@@ -274,11 +281,40 @@ void ProbeServer::replyStunOk(Endpoint& endpoint, const SocketAddress& destinati
274281
response.addFingerprint();
275282

276283
endpoint.sendStunTo(destination, response.header.transactionId.get(), &response, response.size(), timestamp);
284+
return true;
277285
}
278-
else
286+
287+
if (endpoint.getTransportType() != ice::TransportType::UDP)
279288
{
280289
endpoint.stop(this);
281290
}
291+
292+
return false;
293+
}
294+
295+
void ProbeServer::onIceReceivedInternal(Endpoint& endpoint,
296+
const SocketAddress& source,
297+
memory::UniquePacket packet,
298+
uint64_t timestamp)
299+
{
300+
replyStunOk(endpoint, source, std::move(packet), timestamp);
301+
}
302+
303+
void ProbeServer::onIceTcpConnectInternal(std::shared_ptr<Endpoint> endpoint,
304+
const SocketAddress& source,
305+
memory::UniquePacket packet,
306+
const uint64_t timestamp)
307+
{
308+
if (endpoint->getTransportType() == ice::TransportType::TCP)
309+
{
310+
if (replyStunOk(*endpoint, source, std::move(packet), timestamp))
311+
{
312+
ProbeTcpConnection connection;
313+
connection.endpoint = endpoint;
314+
connection.timestamp = utils::Time::getAbsoluteTime();
315+
_queue.push(connection);
316+
}
317+
}
282318
}
283319

284320
void ProbeServer::addCandidate(const ice::IceCandidate& candidate)

transport/ProbeServer.h

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "crypto/SslHelper.h"
44
#include "ice/IceCandidate.h"
55
#include "ice/Stun.h"
6+
#include "jobmanager/JobQueue.h"
67
#include "transport/Endpoint.h"
78
#include <config/Config.h>
89
#include <mutex>
@@ -15,8 +16,8 @@ class ProbeServer : public Endpoint::IEvents, public ServerEndpoint::IEvents, pu
1516
{
1617

1718
public:
18-
ProbeServer(const ice::IceConfig& iceConfig, const config::Config& config);
19-
virtual ~ProbeServer(){};
19+
ProbeServer(const ice::IceConfig& iceConfig, const config::Config& config, jobmanager::JobManager& jobmanager);
20+
virtual ~ProbeServer() {};
2021

2122
// Endpoint::IEvents
2223
void onRtpReceived(Endpoint&,
@@ -69,6 +70,21 @@ class ProbeServer : public Endpoint::IEvents, public ServerEndpoint::IEvents, pu
6970
void run();
7071
void stop();
7172

73+
private:
74+
void onIceReceivedInternal(Endpoint& endpoint,
75+
const SocketAddress& source,
76+
memory::UniquePacket packet,
77+
uint64_t timestamp);
78+
79+
void onIceTcpConnectInternal(std::shared_ptr<Endpoint> endpoint,
80+
const SocketAddress& source,
81+
memory::UniquePacket packet,
82+
const uint64_t timestamp);
83+
84+
bool replyStunOk(Endpoint&, const SocketAddress&, memory::UniquePacket, const uint64_t timestamp);
85+
void addCandidate(const ice::IceCandidate& candidate);
86+
int getInterfaceIndex(transport::SocketAddress address);
87+
7288
private:
7389
std::pair<std::string, std::string> _credentials;
7490
const ice::IceConfig& _iceConfig;
@@ -86,13 +102,10 @@ class ProbeServer : public Endpoint::IEvents, public ServerEndpoint::IEvents, pu
86102
};
87103

88104
crypto::HMAC _hmacComputer;
105+
jobmanager::JobQueue _jobQueue;
89106
std::vector<ProbeTcpConnection> _tcpConnections;
90107
concurrency::MpmcQueue<ProbeTcpConnection> _queue;
91108
std::atomic_bool _maintenanceThreadIsRunning;
92109
std::unique_ptr<std::thread> _maintenanceThread;
93-
94-
void replyStunOk(Endpoint&, const SocketAddress&, memory::UniquePacket);
95-
void addCandidate(const ice::IceCandidate& candidate);
96-
int getInterfaceIndex(transport::SocketAddress address);
97110
};
98111
} // namespace transport

0 commit comments

Comments
 (0)