Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 30d1d88

Browse files
committed
Add server pfx certificate and quic client token validation
1 parent cc2acef commit 30d1d88

File tree

13 files changed

+296
-216
lines changed

13 files changed

+296
-216
lines changed

source/agent/addons/quicCascading/QuicTransportServer.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ DEFINE_LOGGER(QuicTransportServer, "QuicTransportServer");
2626
Nan::Persistent<v8::Function> QuicTransportServer::s_constructor;
2727

2828
// QUIC Incomming
29-
QuicTransportServer::QuicTransportServer(unsigned int port, const std::string& cert_file, const std::string& key_file)
30-
: m_quicServer(QuicFactory::getQuicTransportFactory()->CreateQuicTransportServer(port, cert_file.c_str(), key_file.c_str())) {
29+
QuicTransportServer::QuicTransportServer(unsigned int port, const std::string& pfxPath, const std::string& password)
30+
: m_quicServer(QuicFactory::getQuicTransportFactory()->CreateQuicTransportServer(port, pfxPath.c_str(), password.c_str())) {
3131
m_quicServer->SetVisitor(this);
3232
}
3333

source/agent/addons/quicCascading/QuicTransportServer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class QuicTransportServer : public owt::quic::QuicTransportServerInterface::Visi
4141
protected:
4242
QuicTransportServer() = delete;
4343
virtual ~QuicTransportServer();
44-
explicit QuicTransportServer(unsigned int port, const std::string& cert_file, const std::string& key_file);
44+
explicit QuicTransportServer(unsigned int port, const std::string& pfxPath, const std::string& password);
4545

4646
// Implements QuicTransportClientInterface.
4747
void OnSession(owt::quic::QuicTransportSessionInterface*) override;

source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ gboolean VideoGstAnalyzer::StreamEventCallBack(GstBus *bus, GstMessage *message,
194194
str.append(std::to_string(width));
195195
str.append(",\"height\":");
196196
str.append(std::to_string(height));
197-
str.append("}\0");
197+
str.append("}");
198198
pStreamObj->notifyAsyncEvent("streamadded", str);
199199

200200
gst_caps_unref (caps);

source/agent/conference/conference.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ var Conference = function (rpcClient, selfRpcId) {
108108
var rtcController;
109109
let quicController;
110110

111+
var roomToken = Math.round(Math.random() * 1000000000000000000) + '';
112+
111113
/*
112114
* {
113115
* _id: string(RoomID),
@@ -503,7 +505,7 @@ var Conference = function (rpcClient, selfRpcId) {
503505
});
504506
});
505507
}).then(() => {
506-
rpcReq.getClusterID(cluster)
508+
rpcReq.getClusterID(cluster, room_id, roomToken)
507509
.then((id) => {
508510
log.info('Get cluster id:', id);
509511
clusterID = id;
@@ -3306,6 +3308,10 @@ var Conference = function (rpcClient, selfRpcId) {
33063308
callback('callback', result);
33073309
};
33083310

3311+
that.getRoomToken = function(callback) {
3312+
callback('callback', roomToken);
3313+
}
3314+
33093315
// Listener callback for GRPC
33103316
that.processNotification = (notification) => {
33113317
const name = notification.name;
@@ -3476,6 +3482,7 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
34763482
//RPC for cluster cascading
34773483
handleCascadingEvents: conference.handleCascadingEvents,
34783484
onCascadingConnected: conference.onCascadingConnected,
3485+
getRoomToken: conference.getRoomToken,
34793486
// Callback for GRPC
34803487
processNotification: (notification) => {
34813488
const name = notification.name;

source/agent/conference/rpcRequest.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -359,8 +359,8 @@ const RpcRequest = function(rpcChannel, listener) {
359359
return rpcChannel.makeRPC(sipNode, 'endCall', [sipCallId]);
360360
};
361361

362-
that.getClusterID = function(clusterManager) {
363-
return rpcChannel.makeRPC(clusterManager, 'getClusterID', [])
362+
that.getClusterID = function(clusterManager, room_id, roomToken) {
363+
return rpcChannel.makeRPC(clusterManager, 'getClusterID', [room_id, roomToken])
364364
}
365365

366366
that.leaveConference = function(clusterManager, roomId) {

source/agent/media_bridge/index.js

Lines changed: 117 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
'use strict';
1818
var logger = require('../logger').logger;
1919
var log = logger.getLogger('MediaBridge');
20+
const cipher = require('../cipher');
2021
var addon = require('../quicCascading/build/Release/quicCascading.node');
2122
const QuicTransportStreamPipeline =
2223
require('./quicTransportStreamPipeline');
@@ -432,6 +433,7 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
432433
var info = {
433434
type: 'cluster',
434435
room: data.room,
436+
token: data.token,
435437
cluster: data.selfCluster
436438
}
437439
quicStream.send(JSON.stringify(info));
@@ -543,113 +545,133 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
543545

544546
//work as quic server to wait for another OWT cluster to establish quic connection
545547
const start = function () {
546-
server = new addon.QuicTransportServer(port, cf, kf);
547-
548-
server.start();
549-
server.onNewSession((session) => {
550-
session.connected = true;
551-
var dest;
552-
var sessionId = session.getId();
553-
if (!clusters[sessionId]) {
554-
clusters[sessionId] = {};
555-
}
556-
clusters[sessionId].quicsession = session;
557-
clusters[sessionId].id = sessionId;
558-
559-
log.info("Server get new session:", sessionId);
560-
session.onNewStream((quicStream) => {
561-
var streamId = quicStream.getId();
562-
log.info("Server get new stream id:", streamId);
563-
if (clusters[dest]) {
564-
if (!clusters[dest].streams) {
565-
clusters[dest].streams = {};
566-
}
548+
const keystore = path.resolve(path.dirname(global.config.bridge.keystorePath), cipher.kstore);
549+
cipher.unlock(cipher.k, keystore, (error, password) => {
550+
if (error) {
551+
log.error('Failed to read certificate and key.');
552+
return;
553+
}
554+
log.info('path is '+path.resolve(global.config.bridge.keystorePath));
567555

568-
if (!clusters[dest].streams[streamId]) {
569-
clusters[dest].streams[streamId] = {};
570-
}
571-
clusters[dest].streams[streamId].quicstream = quicStream;
572-
}
573556

574-
quicStream.onStreamData((msg) => {
575-
var info = JSON.parse(msg);
576-
log.info("Server get stream data:", info, " in stream:", streamId, " and session:", sessionId);
577-
if (info.type === 'cluster') {
578-
dest = info.cluster + '-' + info.room;
579-
session.dest = dest;
580-
if (!clusters[dest]) {
581-
clusters[dest] = {};
582-
}
583-
clusters[dest].quicsession = session;
584-
clusters[dest].signalStream = quicStream;
585-
rpcReq.getController(clusterName, info.room)
586-
.then(function(controller) {
587-
clusters[dest].controller = controller;
588-
});
589-
} else if (info.type === 'track') {
590-
log.info("Server stream id:", streamId, " get track msg and track info:", info, " in stream:", streamId, " and session:", sessionId);
591-
var conn = createStreamPipeline(info.id, 'out', info.options);
592-
conn.quicStream(quicStream);
593-
if (!conn) {
594-
return;
557+
server = new addon.QuicTransportServer(port, path.resolve(global.config.bridge.keystorePath),password);
558+
559+
server.start();
560+
server.onNewSession((session) => {
561+
session.connected = true;
562+
var dest;
563+
var sessionId = session.getId();
564+
if (!clusters[sessionId]) {
565+
clusters[sessionId] = {};
566+
}
567+
clusters[sessionId].quicsession = session;
568+
clusters[sessionId].id = sessionId;
569+
570+
log.info("Server get new session:", sessionId);
571+
session.onNewStream((quicStream) => {
572+
var streamId = quicStream.getId();
573+
log.info("Server get new stream id:", streamId);
574+
if (clusters[dest]) {
575+
if (!clusters[dest].streams) {
576+
clusters[dest].streams = {};
577+
}
578+
579+
if (!clusters[dest].streams[streamId]) {
580+
clusters[dest].streams[streamId] = {};
581+
}
582+
clusters[dest].streams[streamId].quicstream = quicStream;
595583
}
596-
quicStream.trackKind = info.kind;
597-
var connid = 'quic-' + info.id;
598-
router.addLocalDestination(info.id, 'mediabridge', conn);
599-
600-
} else if (info.type === 'subscribe') {
601-
log.info("Server stream id:", streamId, " get subscribe msg with subscribe info:", info, " and session:", sessionId);
602-
info.options.locality = {agent: parentRpcId, node: selfRpcId};
603-
var connectionId = info.options.connectionId;
604-
var str = connectionId.split('-');
605-
connectionids[str[2]] = connectionId;
606-
log.info("get connection id:", connectionId, " split str", str[2], " connectionids:", connectionids);
607-
if (clusters[dest].controller) {
608-
rpcReq.subscribe(clusters[dest].controller, 'admin', connectionId, info.options);
609-
} else {
610-
rpcReq.getController(clusterName, info.options.room)
611-
.then(function(controller) {
612-
clusters[dest].controller = controller;
613-
log.info("Subscribe to controller:", controller, "connection id:", connectionId, " with info:", info.options.media.tracks);
614584

615-
return rpcReq.subscribe(controller, 'admin', connectionId, info.options);
616-
})
617-
.then(function(result) {
618-
log.info("subscribe result is:", result);
585+
quicStream.onStreamData((msg) => {
586+
var info = JSON.parse(msg);
587+
log.info("Server get stream data:", info, " in stream:", streamId, " and session:", sessionId);
588+
if (info.type === 'cluster') {
619589

590+
rpcReq.getController(clusterName, info.room)
591+
.then(function(controller) {
592+
dest = info.cluster + '-' + info.room;
593+
session.dest = dest;
594+
if (!clusters[dest]) {
595+
clusters[dest] = {};
596+
}
597+
clusters[dest].controller = controller;
598+
return rpcReq.getToken(controller);
620599
})
621-
.catch((e) => {
622-
log.info("subscribe failed with error:", e);
600+
.then(function(token) {
601+
if (info.token !== token) {
602+
//Quic client token validation failed
603+
delete clusters[dest];
604+
session.close();
605+
} else {
606+
clusters[dest].quicsession = session;
607+
clusters[dest].signalStream = quicStream;
608+
}
623609
});
610+
} else if (info.type === 'track') {
611+
log.info("Server stream id:", streamId, " get track msg and track info:", info, " in stream:", streamId, " and session:", sessionId);
612+
var conn = createStreamPipeline(info.id, 'out', info.options);
613+
conn.quicStream(quicStream);
614+
if (!conn) {
615+
return;
616+
}
617+
quicStream.trackKind = info.kind;
618+
var connid = 'quic-' + info.id;
619+
router.addLocalDestination(info.id, 'mediabridge', conn);
620+
621+
} else if (info.type === 'subscribe') {
622+
log.info("Server stream id:", streamId, " get subscribe msg with subscribe info:", info, " and session:", sessionId);
623+
info.options.locality = {agent: parentRpcId, node: selfRpcId};
624+
var connectionId = info.options.connectionId;
625+
var str = connectionId.split('-');
626+
connectionids[str[2]] = connectionId;
627+
log.info("get connection id:", connectionId, " split str", str[2], " connectionids:", connectionids);
628+
if (clusters[dest].controller) {
629+
rpcReq.subscribe(clusters[dest].controller, 'admin', connectionId, info.options);
630+
} else {
631+
rpcReq.getController(clusterName, info.options.room)
632+
.then(function(controller) {
633+
clusters[dest].controller = controller;
634+
log.info("Subscribe to controller:", controller, "connection id:", connectionId, " with info:", info.options.media.tracks);
635+
636+
return rpcReq.subscribe(controller, 'admin', connectionId, info.options);
637+
})
638+
.then(function(result) {
639+
log.info("subscribe result is:", result);
640+
641+
})
642+
.catch((e) => {
643+
log.info("subscribe failed with error:", e);
644+
});
645+
}
646+
} else if (info.type === 'unsubscribe') {
647+
//handle unsusbcribe request
648+
}
649+
});
650+
var data = {
651+
type: 'ready'
624652
}
625-
} else if (info.type === 'unsubscribe') {
626-
//handle unsusbcribe request
627-
}
653+
quicStream.send(JSON.stringify(data));
654+
})
655+
656+
session.onClosedStream((closedStreamId) => {
657+
log.info("server stream:", closedStreamId, " is closed");
658+
if (clusters[session.dest] && clusters[session.dest].streams[closedStreamId] && clusters[session.dest].streams[closedStreamId].connid) {
659+
rpcReq.unsubscribe(clusters[session.dest].controller, 'admin', clusters[session.dest].streams[closedStreamId].connid);
660+
delete clusters[session.dest].streams[closedStreamId]
661+
}
662+
})
628663
});
629-
var data = {
630-
type: 'ready'
631-
}
632-
quicStream.send(JSON.stringify(data));
633-
})
634-
635-
session.onClosedStream((closedStreamId) => {
636-
log.info("server stream:", closedStreamId, " is closed");
637-
if (clusters[session.dest] && clusters[session.dest].streams[closedStreamId] && clusters[session.dest].streams[closedStreamId].connid) {
638-
rpcReq.unsubscribe(clusters[session.dest].controller, 'admin', clusters[session.dest].streams[closedStreamId].connid);
639-
delete clusters[session.dest].streams[closedStreamId]
640-
}
641-
})
642-
});
643664

644-
server.onClosedSession((sessionId) => {
645-
log.info("Session:", sessionId, " in server is closed");
665+
server.onClosedSession((sessionId) => {
666+
log.info("Session:", sessionId, " in server is closed");
646667

647-
for (var item in clusters[sessionId].streams) {
648-
rpcReq.unsubscribe(clusters[sessionId].controller, 'admin', clusters[sessionId].streams[item].connid);
649-
}
650-
delete clusters[sessionId];
668+
for (var item in clusters[sessionId].streams) {
669+
rpcReq.unsubscribe(clusters[sessionId].controller, 'admin', clusters[sessionId].streams[item].connid);
670+
}
671+
delete clusters[sessionId];
651672

652-
})
673+
})
674+
});
653675
}
654676

655677
start();

source/agent/media_bridge/rpcRequest.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ var RpcRequest = function(rpcChannel) {
6262
return rpcChannel.makeRPC(controller, 'onSessionSignaling', [sessionId, signaling]);
6363
};
6464

65+
that.getToken = function(controller) {
66+
return rpcChannel.makeRPC(controller, 'getRoomToken', []);
67+
}
68+
6569
return that;
6670
};
6771

source/cluster_manager/clusterManager.js

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -259,22 +259,37 @@ var ClusterManager = function (clusterName, selfId, spec) {
259259
}
260260
};
261261

262-
var getClusterID = function (on_ok) {
263-
return on_ok(spec.clusterID);
262+
var getClusterID = function (room_id, roomToken, on_ok) {
263+
on_ok(spec.clusterID);
264+
265+
if (sendRequest) {
266+
var data = {
267+
clusterID: spec.clusterID,
268+
region: spec.region,
269+
info: {
270+
room: room_id,
271+
token: roomToken
272+
}
273+
}
274+
275+
log.info("Send room info to cloud with data:", data);
276+
send('POST', 'updateConference', data);
277+
}
264278
};
265279

266280
var registerInfo = function (info, on_ok) {
267281
on_ok('ok');
268-
var data = {
269-
clusterID: spec.clusterID,
270-
region: spec.region,
271-
info: {
272-
resturl: info.resturl,
273-
servicekey: info.servicekey,
274-
serviceid: info.serviceid
275-
}
276-
}
277282
if (sendRequest) {
283+
var data = {
284+
clusterID: spec.clusterID,
285+
region: spec.region,
286+
info: {
287+
resturl: info.resturl,
288+
servicekey: info.servicekey,
289+
serviceid: info.serviceid
290+
}
291+
}
292+
278293
log.info("Send registerCluster event to cloud with data:", data);
279294
send('POST', 'registerCluster', data);
280295
}
@@ -442,8 +457,8 @@ var ClusterManager = function (clusterName, selfId, spec) {
442457
callback('callback', 'error', error_reason);
443458
});
444459
},
445-
getClusterID: function (callback) {
446-
getClusterID(function (cluster) {
460+
getClusterID: function (room_id, roomToken, callback) {
461+
getClusterID(room_id, roomToken, function (cluster) {
447462
callback('callback', cluster);
448463
});
449464
},

0 commit comments

Comments
 (0)