Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit ae54a7d

Browse files
authored
Merge pull request #1275 from starwarfan/mst-p-grpc
Enable gRPC option for internal service
2 parents aade51e + 934af87 commit ae54a7d

File tree

98 files changed

+5903
-188
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+5903
-188
lines changed

doc/servermd/EnableGRPC.md

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
##### Table of Contents
2+
* 1.[Introduction](#introduction)
3+
* 2.[Enable gRPC for internal RPC](#dependencies)
4+
* 2.1[Configuration for gRPC](#dependencies1)
5+
* 2.2[Enable TLS for gRPC](#dependencies2)
6+
7+
# 1 Introduction
8+
9+
By default, OWT[Open WebRTC Toolkit](https://github.com/open-webrtc-toolkit/owt-server) builds internal RPC utility based on message queue middleware(RabbitMQ). In this document we introduce the gRPC alternative for RabbitMQ, and a step by step guide to setup gRPC as OWT server's RPC framework.
10+
11+
# 2 Enable gRPC for internal RPC
12+
13+
In OWT server package, you can turn on gRPC option by updating configuration files. This section introduces the configuration settings for internal gRPC.
14+
15+
# 2.1 Configuration for gRPC
16+
17+
To enable basic gRPC functionality for OWT server, edit following configuration files in server package:
18+
19+
cluster_manager/cluster_manager.toml
20+
management_api/management_api
21+
portal/portal.toml
22+
webrtc_agent/agent.toml
23+
audio_agent/agent.toml
24+
video_agent/agent.toml
25+
conference_agent/agent.toml
26+
analytics_agent/agent.toml
27+
recording_agent/agent.toml
28+
streaming_agent/agent.toml
29+
quic_agent/agent.toml
30+
sip_agent/agent.toml
31+
sip_portal/sip_portal.toml
32+
33+
Set the item `enable_grpc` to `true` in these configuration files.
34+
35+
enable_grpc = true
36+
37+
In `cluster_manager/cluster_manager.toml`, set `grpc_host` of section `[manager]` with `IP|hostname:port` of your own(when using TLS, only hostname is allowed).
38+
39+
[manager]
40+
grpc_host = "localhost:10080"
41+
42+
In other modules' toml files, edit `host` of section `[cluster]` to the value of `cluster_manager`.
43+
44+
[cluster]
45+
host = "localhost:10080"
46+
47+
And to specify hostname of other modules' gRPC service, add following section in toml files.
48+
49+
[cluster.worker]
50+
ip="localhost" # You could also set hostname here. E.g, ip="myhost.com"
51+
52+
If you want to enable HTTP proxy for gRPC, set environment variable `GRPC_ARG_HTTP_PROXY` to `1`, then system proxy will be used.
53+
54+
Restart OWT service after above configuration files being updated.
55+
Note that RabbitMQ based RPC server will be disabled if gRPC is used.
56+
57+
# 2.2 Enable TLS for gRPC
58+
59+
To enable TLS for gRPC of OWT server, you need prepare SSL certificates for internal gRPC services.
60+
Both server certificate and client certificate are needed.
61+
62+
You could use following openssl commands to generate self-signed server and client certificates.
63+
64+
# Generate CA key
65+
openssl genrsa -des3 -out ca.key 4096
66+
# Generate CA certificate
67+
openssl req -new -x509 -days 365 -key ca.key -out ca.crt -subj "/C=XX/ST=XX/L=XX/O=XX/OU=OWT/CN=ca"
68+
# Generate server key
69+
openssl genrsa -des3 -out server.key 4096
70+
# Generate server CSR
71+
openssl req -new -key server.key -out server.csr -subj "/C=XX/ST=XX/L=XX/O=XX/OU=OWT/CN=localhost"
72+
# Generate server certificate
73+
openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt
74+
# Generate client key
75+
openssl genrsa -des3 -out client.key 4096
76+
# Generate client CSR
77+
openssl req -new -key client.key -out client.csr -subj "/C=XX/ST=XX/L=XX/O=XX/OU=OWT/CN=localhost"
78+
# Generate client certificate
79+
openssl x509 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out client.crt
80+
81+
During creating private keys, you need to enter passphrase for them. To save these passphrases in OWT's credential store, run `(cluster_manager|management_api|portal|webrtc_agent|{MODULE_NAME})/initauth.js --grpc` and save gRPC client/server key passphrase.
82+
83+
For each OWT server component, place root, server and client certificates in the same machine/instance. Make sure OWT processes have pemission to access those certificates.
84+
Set following environment variables for SSL certificates before starting OWT server:
85+
86+
OWT_GRPC_ROOT_CERT: The root certificate for server and client certificates.
87+
OWT_GRPC_SERVER_CERT: The server certificate.
88+
OWT_GRPC_SERVER_KEY: The encrypted server key.
89+
OWT_GRPC_CLIENT_CERT: The client certificate.
90+
OWT_GRPC_CLIENT_KEY: The encrypted client key.
91+
92+
Once you have updated these settings correctly, TLS for gRPC will be enabled after restart OWT service.

scripts/release/initauth.js

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,30 @@ const updateInternal = () => new Promise((resolve, reject) => {
122122
});
123123
});
124124

125+
const updateGKeyPass = () => new Promise((resolve, reject) => {
126+
question('Update passphrase for gRPC TLS key?[yes/no]')
127+
.then((answer) => {
128+
answer = answer.toLowerCase();
129+
if (answer !== 'y' && answer !== 'yes') {
130+
resolve();
131+
return;
132+
}
133+
question(`(${authBase}) Enter passphrase of server key: `)
134+
.then((serverPass) => {
135+
mutableStdout.muted = false;
136+
question(`(${authBase}) Enter passphrase of client key: `)
137+
.then((clientPass) => {
138+
mutableStdout.muted = false;
139+
saveAuth({ grpc: { serverPass, clientPass } }, authStore)
140+
.then(resolve)
141+
.catch(reject);
142+
});
143+
mutableStdout.muted = true;
144+
});
145+
mutableStdout.muted = true;
146+
});
147+
});
148+
125149
const options = {};
126150
const parseArgs = () => {
127151
if (process.argv.includes('--rabbitmq')) {
@@ -133,6 +157,9 @@ const parseArgs = () => {
133157
if (process.argv.includes('--internal')) {
134158
options.internal = true;
135159
}
160+
if (process.argv.includes('--grpc')) {
161+
options.grpc = true;
162+
}
136163
if (Object.keys(options).length === 0) {
137164
options.rabbit = true;
138165
options.mongo = true;
@@ -156,4 +183,9 @@ parseArgs()
156183
return updateInternal();
157184
}
158185
})
186+
.then(() => {
187+
if (options.grpc) {
188+
return updateGKeyPass();
189+
}
190+
})
159191
.finally(() => readline.close());

source/agent/analytics/agent.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ maxProcesses = 13 #default: 13
55
#Number of precesses that agent runs when it starts. 1 <= prerunProcesses <= maxProcesses.
66
prerunProcesses = 2 #default: 2
77

8+
# Setup as GRPC server
9+
#enable_grpc = true
810

911
[cluster]
1012
name = "owt-cluster"

source/agent/analytics/dist.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,20 @@
2323
"../../common/makeRPC.js",
2424
"../../common/rpcChannel.js",
2525
"../../common/mediaUtil.js",
26+
"../../common/grpcTools.js",
27+
"../../protos/protoConfig.json",
28+
"../../protos/*.proto",
2629
"../../../scripts/release/initauth.js",
2730
"../../../scripts/release/initcert.js",
2831
"../../../scripts/detectOS.sh"
2932
],
3033
"folders": {
3134
"analytics": [
3235
"index.js",
36+
"grpcAdapter.js",
37+
"../../common/grpcTools.js",
38+
"../../protos/protoConfig.json",
39+
"../../protos/*.proto",
3340
"../connections.js",
3441
"../internalConnectionRouter.js"
3542
],

source/agent/analytics/grpcAdapter.js

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright (C) <2022> Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
'use strict';
6+
7+
const unpackOption = require('./grpcTools').unpackOption;
8+
const packNotification = require('./grpcTools').packNotification;
9+
10+
// Create GRPC interface for analytics agent
11+
function createGrpcInterface(controller, streamingEmitter) {
12+
const that = {};
13+
14+
// GRPC export
15+
that.grpcInterface = {
16+
publish: function (call, callback) {
17+
const req = call.request;
18+
const option = unpackOption(req.type, req.option);
19+
controller.publish(req.id, req.type, option, (n, code, data) => {
20+
if (code === 'error') {
21+
callback(new Error(data), null);
22+
} else {
23+
callback(null, {id: req.id});
24+
}
25+
});
26+
},
27+
unpublish: function (call, callback) {
28+
controller.unpublish(call.request.id, (n, code, data) => {
29+
if (code === 'error') {
30+
callback(new Error(data), null);
31+
} else {
32+
callback(null, {});
33+
}
34+
});
35+
},
36+
subscribe: function (call, callback) {
37+
const req = call.request;
38+
const option = unpackOption(req.type, req.option);
39+
controller.subscribe(req.id, req.type, option, (n, code, data) => {
40+
if (code === 'error') {
41+
callback(new Error(data), null);
42+
} else {
43+
callback(null, {id: req.id});
44+
}
45+
});
46+
},
47+
unsubscribe: function (call, callback) {
48+
controller.unsubscribe(call.request.id, (n, code, data) => {
49+
if (code === 'error') {
50+
callback(new Error(data), null);
51+
} else {
52+
callback(null, {});
53+
}
54+
});
55+
},
56+
linkup: function (call, callback) {
57+
const req = call.request;
58+
controller.linkup(
59+
req.id, req.from,
60+
(n, code, data) => {
61+
if (code === 'error') {
62+
callback(new Error(data), null);
63+
} else {
64+
callback(null, {message: data});
65+
}
66+
});
67+
},
68+
cutoff: function (call, callback) {
69+
controller.cutoff(call.request.id, (n, code, data) => {
70+
if (code === 'error') {
71+
callback(new Error(data), null);
72+
} else {
73+
callback(null, {});
74+
}
75+
});
76+
},
77+
listenToNotifications: function (call, callback) {
78+
const writeNotification = (notification) => {
79+
const progress = packNotification({
80+
type: 'analytics',
81+
name: notification.name,
82+
data: notification.data,
83+
});
84+
call.write(progress);
85+
};
86+
const endCall = () => {
87+
call.end();
88+
};
89+
streamingEmitter.on('notification', writeNotification);
90+
streamingEmitter.on('close', endCall);
91+
call.on('cancelled', () => {
92+
call.end();
93+
});
94+
call.on('close', () => {
95+
streamingEmitter.off('notification', writeNotification);
96+
streamingEmitter.off('close', endCall);
97+
});
98+
},
99+
getInternalAddress: function (call, callback) {
100+
controller.getInternalAddress((n, code, data) => {
101+
if (code === 'error') {
102+
callback(new Error(data), null);
103+
} else {
104+
callback(null, code);
105+
}
106+
});
107+
},
108+
};
109+
110+
return that;
111+
}
112+
113+
exports.createGrpcInterface = createGrpcInterface;

source/agent/analytics/index.js

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ const VideoAnalyzer = require('../videoGstPipeline/build/Release/videoAnalyzer-p
1111

1212
var {InternalConnectionRouter} = require('./internalConnectionRouter');
1313

14+
// Setup GRPC server
15+
var createGrpcInterface = require('./grpcAdapter').createGrpcInterface;
16+
var enableGRPC = global.config.agent.enable_grpc || false;
17+
18+
var EventEmitter = require('events').EventEmitter;
19+
1420
function doPublish(rpcClient, conference, user, streamId, streamInfo) {
1521
return new Promise(function(resolve, reject) {
1622
makeRPC(
@@ -50,6 +56,8 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) {
5056
var outputs = {};
5157
var engine = new VideoAnalyzer();
5258
var controller;
59+
// For GRPC notifications
60+
var streamingEmitter = new EventEmitter();
5361

5462
try {
5563
algorithms = toml.parse(fs.readFileSync('./plugin.cfg'));
@@ -59,6 +67,16 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) {
5967

6068
const notifyStatus = (controller, sessionId, direction, status) => {
6169
rpcClient.remoteCast(controller, 'onSessionProgress', [sessionId, direction, status]);
70+
// Emit GRPC notifications
71+
const notification = {
72+
name: 'onSessionProgress',
73+
data: {
74+
id: sessionId,
75+
status,
76+
direction
77+
}
78+
};
79+
streamingEmitter.emit('notification', notification);
6280
};
6381

6482
// for algorithms that generate new stream
@@ -71,6 +89,16 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) {
7189
.catch((err) => {
7290
log.debug('Generated stream:', streamId, 'publish failed', err);
7391
});
92+
// Emit GRPC notifications
93+
const notification = {
94+
name: 'onStreamAdded',
95+
data: {
96+
id: streamId,
97+
owner: 'admin',
98+
info: streamInfo
99+
}
100+
};
101+
streamingEmitter.emit('notification', notification);
74102
};
75103
// destroy generated stream
76104
const destroyStream = (controller, streamId) => {
@@ -82,6 +110,15 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) {
82110
.catch((err) => {
83111
log.debug('Destroyed stream:', streamId, 'unpublish failed', err);
84112
});
113+
// Emit GRPC notifications
114+
const notification = {
115+
name: 'onStreamRemoved',
116+
data: {
117+
id: streamId,
118+
owner: 'admin'
119+
}
120+
};
121+
streamingEmitter.emit('notification', notification);
85122
};
86123

87124
// RPC callback
@@ -141,7 +178,7 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) {
141178
return callback('callback', {type: 'failed', reason: 'invalid connctionType'+connectionType});
142179
}
143180

144-
// check options
181+
// check options
145182
if (!algorithms[options.connection.algorithm]) {
146183
return callback('callback', {type: 'failed', reason: 'Not valid algorithm:'+options.connection.algorithm});
147184
}
@@ -289,5 +326,10 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) {
289326
}
290327
};
291328

329+
if (enableGRPC) {
330+
// Export GRPC interface.
331+
return createGrpcInterface(that, streamingEmitter);
332+
}
333+
292334
return that;
293335
};

source/agent/analytics/package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
{
2-
"name": "OWT-Server-Agent",
2+
"name": "OWT-Server-Analytics-Agent",
33
"version":"5.0.0",
44
"dependencies": {
55
"amqplib": "^0.7.0",
66
"log4js": "^6.4.0",
77
"node-getopt": "*",
8-
"toml": "*"
8+
"toml": "*",
9+
"@grpc/proto-loader": "^0.5.0",
10+
"@grpc/grpc-js": "^1.1.0"
911
},
1012
"devDependencies": {
1113
"node-gyp": "*"

0 commit comments

Comments
 (0)