Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 2a2a488

Browse files
committed
Fix remove participant bug and media bridge crash when getting room destroy message
1 parent 7486ce3 commit 2a2a488

File tree

8 files changed

+87
-31
lines changed

8 files changed

+87
-31
lines changed

source/agent/addons/quicCascading/QuicTransportClient.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,11 @@ QuicTransportClient::QuicTransportClient(const char* dest_ip, int dest_port)
3333

3434
QuicTransportClient::~QuicTransportClient() {
3535
ELOG_INFO("QuicTransportClient::~QuicTransportClient");
36-
m_quicClient->Stop();
37-
m_quicClient.reset();
36+
if (m_quicClient) {
37+
m_quicClient->Stop();
38+
m_quicClient->SetVisitor(nullptr);
39+
m_quicClient.reset();
40+
}
3841

3942
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_asyncOnConnected))) {
4043
uv_close(reinterpret_cast<uv_handle_t*>(&m_asyncOnConnected), NULL);
@@ -127,10 +130,12 @@ NAN_METHOD(QuicTransportClient::close) {
127130
ELOG_DEBUG("QuicTransportClient::close");
128131
QuicTransportClient* obj = Nan::ObjectWrap::Unwrap<QuicTransportClient>(info.Holder());
129132
obj->m_quicClient->Stop();
133+
obj->m_quicClient->SetVisitor(nullptr);
130134

131135
delete obj->stream_callback_;
132136
delete obj->connected_callback_;
133137
delete obj->streamClosed_callback_;
138+
obj->m_quicClient.reset();
134139
ELOG_DEBUG("End of QuicTransportClient::connect");
135140
}
136141

source/agent/addons/quicCascading/QuicTransportServer.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ NAN_METHOD(QuicTransportServer::stop)
101101
ELOG_DEBUG("QuicTransportServer::stop");
102102
QuicTransportServer* obj = Nan::ObjectWrap::Unwrap<QuicTransportServer>(info.Holder());
103103
obj->m_quicServer->Stop();
104+
obj->m_quicServer->SetVisitor(nullptr);
104105
ELOG_DEBUG("End of QuicTransportServer::stop");
105106
}
106107

source/agent/addons/quicCascading/QuicTransportSession.cc

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ QuicTransportSession::~QuicTransportSession() {
3838
m_session->SetVisitor(nullptr);
3939
delete asyncResourceNewStream_;
4040
delete asyncResourceClosedStream_;
41+
delete m_session;
42+
m_session = NULL;
4143
}
4244

4345
NAN_MODULE_INIT(QuicTransportSession::init)
@@ -112,14 +114,17 @@ NAN_METHOD(QuicTransportSession::onClosedStream) {
112114
}
113115

114116
NAN_METHOD(QuicTransportSession::close) {
115-
ELOG_DEBUG("QuicTransportSession::close");
117+
ELOG_DEBUG("QuicTransportSession::close");
116118
QuicTransportSession* obj = Nan::ObjectWrap::Unwrap<QuicTransportSession>(info.Holder());
117119
obj->m_session->Stop();
120+
obj->m_session->SetVisitor(nullptr);
118121
/*delete obj->asyncResource_;
119122
delete obj->asyncResourceStreamClosed_;*/
120123

124+
obj->has_stream_callback_ = false;
121125
delete obj->stream_callback_;
122126
delete obj->streamClosed_callback_;
127+
ELOG_DEBUG("QuicTransportSession::close end");
123128
}
124129

125130
NAUV_WORK_CB(QuicTransportSession::onNewStreamCallback){
@@ -141,7 +146,9 @@ NAUV_WORK_CB(QuicTransportSession::onNewStreamCallback){
141146
ELOG_DEBUG("QuicTransportSession::onNewStreamCallback call js stack");
142147
Local<Value> args[] = { streamObject };
143148

144-
obj->asyncResourceNewStream_->runInAsyncScope(Nan::GetCurrentContext()->Global(), obj->stream_callback_->GetFunction(), 1, args);
149+
if (obj->stream_callback_) {
150+
obj->asyncResourceNewStream_->runInAsyncScope(Nan::GetCurrentContext()->Global(), obj->stream_callback_->GetFunction(), 1, args);
151+
}
145152
obj->stream_messages.pop();
146153
}
147154
}
@@ -158,15 +165,17 @@ NAUV_WORK_CB(QuicTransportSession::onClosedStreamCallback){
158165

159166
if (obj->has_streamClosed_callback_) {
160167
ELOG_INFO("object has stream callback");
161-
//boost::mutex::scoped_lock lock(obj->mutex);
168+
boost::mutex::scoped_lock lock(obj->mutex);
162169
while (!obj->streamclosed_messages.empty()) {
163170
ELOG_INFO("streamclosed_messages is not empty");
164171
//auto streamid = obj->streamclosed_messages.front();
165172
//v8::Local<v8::Object> streamObject = QuicTransportStream::newInstance(quicStream);
166173
//Local<Value> args[] = { streamObject };
167174
Local<Value> args[] = { Nan::New(obj->streamclosed_messages.front()) };
168175

169-
obj->asyncResourceClosedStream_->runInAsyncScope(Nan::GetCurrentContext()->Global(), obj->streamClosed_callback_->GetFunction(), 1, args);
176+
if (obj->streamClosed_callback_) {
177+
obj->asyncResourceClosedStream_->runInAsyncScope(Nan::GetCurrentContext()->Global(), obj->streamClosed_callback_->GetFunction(), 1, args);
178+
}
170179
obj->streamclosed_messages.pop();
171180
}
172181
}
@@ -203,7 +212,7 @@ void QuicTransportSession::OnIncomingStream(owt::quic::QuicTransportStreamInterf
203212

204213
void QuicTransportSession::OnStreamClosed(uint32_t id) {
205214
ELOG_DEBUG("QuicTransportSession stream:%d is closed\n", id);
206-
//boost::mutex::scoped_lock lock(mutex);
215+
boost::mutex::scoped_lock lock(mutex);
207216
this->streamclosed_messages.push(id);
208217
m_asyncOnClosedStream.data = this;
209218
uv_async_send(&m_asyncOnClosedStream);

source/agent/addons/quicCascading/QuicTransportStream.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ QuicTransportStream::~QuicTransportStream() {
6060
}
6161
m_stream->SetVisitor(nullptr);
6262
delete asyncResource_;
63+
delete m_stream;
64+
m_stream = nullptr;
6365
/*delete[] m_receiveData.buffer;
6466
data_callback_.Reset();*/
6567
}
@@ -189,8 +191,7 @@ NAN_METHOD(QuicTransportStream::close)
189191
{
190192
QuicTransportStream* obj = Nan::ObjectWrap::Unwrap<QuicTransportStream>(info.Holder());
191193
obj->m_stream->Close();
192-
delete obj->m_stream;
193-
obj->m_stream = nullptr;
194+
obj->m_stream->SetVisitor(nullptr);
194195
}
195196

196197
NAUV_WORK_CB(QuicTransportStream::onStreamDataCallback){

source/agent/conference/conference.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ var Conference = function (rpcClient, selfRpcId) {
626626
var left_user = participant.getInfo();
627627
if (room_config.notifying.participantActivities) {
628628
sendMsg('room', 'all', 'participant', {action: 'leave', data: left_user.id});
629-
!participants[participantId].cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, {type: "addParticipant", data: participantId});
629+
!participants[participantId].cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, {type: "removeParticipant", data: participantId});
630630
}
631631
delete participants[participantId];
632632
}

source/agent/media_bridge/index.js

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
4848
//Map OWT internal connections with quic streams
4949
let connections = {};
5050
let server;
51+
let connectionids = {};
5152
var cf = 'leaf_cert.pem';
5253
var kf = 'leaf_cert.pkcs8';
5354
const clusterName = global && global.config && global.config.cluster ?
@@ -293,17 +294,15 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
293294
};
294295

295296
that.unpublish = function (connectionId, callback) {
296-
log.debug('unpublish, connectionId:', connectionId);
297-
var conn = router.getConnection(connectionId);
298-
router.removeConnection(connectionId).then(function(ok) {
297+
log.debug('unpublish, connectionId:', connectionId, " connectionids:", connectionids);
298+
var conn = router.getConnection(connectionids[connectionId]);
299+
router.removeConnection(connectionids[connectionId]).then(function(ok) {
299300
if (conn) {
300301
conn.close();
301302
}
302303
callback('callback', 'ok');
303304
}, onError(callback));
304-
if (publicationOptions.has(connectionId)) {
305-
publicationOptions.delete(connectionId);
306-
}
305+
delete connectionids[connectionId];
307306
};
308307

309308
that.subscribe = function (connectionId, connectionType, options, callback) {
@@ -373,13 +372,22 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
373372
server.stop();
374373
Object.keys(clusters).forEach(key => {
375374
log.info("close cluster:", key, " ");
376-
clusters[key].quicsession.stop();
375+
clusters[key].quicsession.close();
377376
});;
378377

379378
};
380379

381380
that.onFaultDetected = function (message) {
382381
router.onFaultDetected(message);
382+
if (message.purpose === 'conference') {
383+
for (var conn_id in clusters) {
384+
if ((message.type === 'node' && message.id === clusters[conn_id].controller) || (message.type === 'worker' && clusters[conn_id].controller.startsWith(message.id))) {
385+
log.error('Fault detected on controller (type:', message.type, 'id:', message.id, ') of connection:', conn_id , 'and remove it');
386+
clusters[conn_id].quicsession.close();
387+
delete clusters[conn_id];
388+
}
389+
}
390+
}
383391
};
384392

385393
//Work as quic client to proactively establish quic connection with another OWT cluster
@@ -463,6 +471,9 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
463471
log.info("Client stream id:", streamId, " get subscribe msg with info:", info);
464472
info.options.locality = {agent: parentRpcId, node: selfRpcId};
465473
var connectionId = info.options.connectionId;
474+
var str = connectionId.split('-');
475+
connectionids[str[2]] = connectionId;
476+
log.info("get connection id:", connectionId, " split str", str[2], " connectionids:", connectionids);
466477
if (clusters[dest].controller) {
467478
rpcReq.subscribe(clusters[dest].controller, 'admin', connectionId, info.options);
468479
} else {
@@ -590,6 +601,9 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
590601
log.info("Server stream id:", streamId, " get subscribe msg with subscribe info:", info, " and session:", sessionId);
591602
info.options.locality = {agent: parentRpcId, node: selfRpcId};
592603
var connectionId = info.options.connectionId;
604+
var str = connectionId.split('-');
605+
connectionids[str[2]] = connectionId;
606+
log.info("get connection id:", connectionId, " split str", str[2], " connectionids:", connectionids);
593607
if (clusters[dest].controller) {
594608
rpcReq.subscribe(clusters[dest].controller, 'admin', connectionId, info.options);
595609
} else {

source/event-bridge/eventCascading.js

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
var logger = require('./logger').logger;
88
var log = logger.getLogger('EventCascading');
99
var quicCas = require('./quicCascading/build/Release/quicCascading.node');
10-
const inspector = require('event-loop-inspector')();
1110

1211
var cf = 'leaf_cert.pem';
1312
var kf = 'leaf_cert.pkcs8';
@@ -95,8 +94,14 @@ var EventCascading = function(spec, rpcReq) {
9594
if (!sessions[clientID].streams[streamID]) {
9695
sessions[clientID].streams[streamID] = {};
9796
}
97+
98+
if (!sessions[clientID].cascadedRooms) {
99+
sessions[clientID].cascadedRooms = {};
100+
}
98101
sessions[clientID].streams[streamID].quicstream = quicStream;
99102
sessions[clientID].streams[streamID].controller = controller;
103+
sessions[clientID].streams[streamID].roomid = data.room;
104+
sessions[clientID].cascadedRooms[data.room] = true;
100105

101106
if (!controllers[controller]) {
102107
controllers[controller] = {};
@@ -188,15 +193,15 @@ var EventCascading = function(spec, rpcReq) {
188193
});
189194
} else {
190195
//A new different conference request between cascaded clusters
191-
if (!cascadedRooms[data.room]) {
196+
if (sessions[clientID].cascadedRooms && sessions[clientID].cascadedRooms[data.room]) {
197+
log.debug('Cluster already cascaded');
198+
return Promise.resolve('ok');
199+
} else {
192200
return rpcReq.getController(cluster_name, data.room)
193201
.then(function(controller) {
194202
//Create a new quic stream for the new conference to cascading room events
195203
createQuicStream(controller, clientID, data);
196204
});
197-
} else {
198-
log.debug('Cluster already cascaded');
199-
return Promise.resolve('ok');
200205
}
201206
}
202207
}
@@ -221,6 +226,10 @@ var EventCascading = function(spec, rpcReq) {
221226
sessions[sessionId].streams = {};
222227
}
223228

229+
if (!sessions[sessionId].cascadedRooms) {
230+
sessions[sessionId].cascadedRooms = {};
231+
}
232+
224233
if (!sessions[sessionId].streams[streamId]) {
225234
sessions[sessionId].streams[streamId] = {};
226235
}
@@ -236,6 +245,8 @@ var EventCascading = function(spec, rpcReq) {
236245
.then(function(controller) {
237246

238247
sessions[sessionId].streams[streamId].controller = controller;
248+
sessions[sessionId].streams[streamId].roomid = event.room;
249+
sessions[sessionId].cascadedRooms[event.room] = true;
239250

240251
if (!controllers[controller]) {
241252
controllers[controller] = {};
@@ -262,12 +273,14 @@ var EventCascading = function(spec, rpcReq) {
262273

263274
session.onClosedStream((closedStreamId) => {
264275
log.info("server stream:", closedStreamId, " is closed");
265-
delete controllers[sessions[session.id].streams[closedStreamId].controller][session.dest];
266276
if (sessions[sessionId] && sessions[session.id].streams[closedStreamId]) {
277+
delete controllers[sessions[session.id].streams[closedStreamId].controller][session.dest];
267278
var event = {
268279
type: 'onCascadingDisconnected'
269280
}
270281
rpcReq.handleCascadingEvents(sessions[session.id].streams[closedStreamId].controller, self_rpc_id, sessions[session.id].target, event);
282+
var roomid = sessions[session.id].streams[closedStreamId].roomid;
283+
sessions[session.id].cascadedRooms[roomid] = false;
271284
delete sessions[session.id].streams[closedStreamId];
272285
}
273286
})
@@ -307,6 +320,25 @@ var EventCascading = function(spec, rpcReq) {
307320
})
308321
}
309322

323+
that.onFaultDetected = function (message) {
324+
log.info("onFaultDetected:", message);
325+
326+
return new Promise((resolve, reject) => {
327+
for (var item in sessions) {
328+
for (var id in sessions[item].streams) {
329+
if ((message.type === 'node' && message.id === sessions[item].streams[id].controller) || (message.type === 'worker' && sessions[item].streams[id].controller.startsWith(message.id))) {
330+
log.error('Fault detected on controller (type:', message.type, 'id:', message.id, ')' , 'and remove it');
331+
sessions[item].streams[id].quicstream.close();
332+
var roomid = sessions[item].streams[id].roomid;
333+
sessions[item].cascadedRooms[roomid] = false;
334+
delete sessions[item].streams[id];
335+
}
336+
}
337+
}
338+
return resolve('ok');
339+
})
340+
}
341+
310342

311343
return that;
312344
};

source/event-bridge/index.js

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -203,15 +203,9 @@ amqper.connect(config.rabbit, function () {
203203
log.info('bridge initializing as rpc server ok');
204204
amqper.asMonitor(function (data) {
205205
if (data.reason === 'abnormal' || data.reason === 'error' || data.reason === 'quit') {
206-
if (bridge !== undefined) {
206+
if (event_cascading) {
207207
if (data.message.purpose === 'conference') {
208-
return bridge.getParticipantsByController(data.message.type, data.message.id)
209-
.then(function (impactedParticipants) {
210-
impactedParticipants.forEach(function(participantId) {
211-
log.error('Fault on conference controller(type:', data.message.type, 'id:', data.message.id, ') of participant', participantId, 'was detected, drop it.');
212-
//event_cascading && socketio_server.drop(participantId);
213-
});
214-
});
208+
return event_cascading.onFaultDetected(data.message);
215209
}
216210
}
217211
}

0 commit comments

Comments
 (0)