Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 97ea467

Browse files
authored
Merge pull request #1134 from starwarfan/mst-st-layer
Add OWT stream layer control
2 parents cc2acef + fe616d0 commit 97ea467

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+5838
-78
lines changed

doc/design/adding-an-agent.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
| Adding an agent in OWT |
2+
3+
4+
# Introduction
5+
6+
OWT agent is an independent module which provides RPC interfaces for external callers. It has following characteristics:
7+
1. Use same RPC mechanism as other OWT modules.
8+
2. Register on OWT cluster manager as cluster worker.
9+
3. Spawn child process to handle actual tasks.
10+
11+
12+
# How to add an agent
13+
14+
1. Create a directory under source, take `source/agent/sample` as an example.
15+
2. Create `package.json` in the new agent directory. This file plays a same role as other `package.json` in Node.js. Besides, we need define a `start` command in `scripts` which will be used when we start the new agent using default daemon script(e.g. bin/start-all.sh). The `start` command should have format as `node . -U [name]`. The [name] is also the entry file name of the new agent.
16+
3. Create `dist.json` in the new agent directory. This file defines packing rules for the new agent. See `source/agent/sample/dist.json` for reference.
17+
4. Create js files with new agent implementation following RPC-agent interfaces. The entry file should have same name as defined in start script of `package.json`. See `source/agent/sample/sample.js` for reference.

scripts/release/daemon-mcu.sh

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ mkdir -p "$LogDir"
7878
stdout=${LogDir}/${command}.stdout
7979
pid=${LogDir}/${command}.pid
8080

81+
CommandDir=${command//-/_}
82+
StartCmd=""
83+
8184
# Set default scheduling priority
8285
if [ "$OWT_NICENESS" = "" ]; then
8386
export OWT_NICENESS=0
@@ -95,6 +98,8 @@ case $startStop in
9598
check_node_version || exit 1
9699
rotate_log $stdout
97100
echo "starting $command, stdout -> $stdout"
101+
102+
98103
case ${command} in
99104
management-api )
100105
cd ${OWT_HOME}/management_api
@@ -212,8 +217,16 @@ case $startStop in
212217
echo $! > ${pid}
213218
;;
214219
* )
215-
echo $usage
216-
exit 1
220+
if [ -d ${OWT_HOME}/${CommandDir} ]; then
221+
cd ${OWT_HOME}/${CommandDir}
222+
StartCmd=$(node -e "process.stdout.write(require('./package.json').scripts.start)")
223+
nohup nice -n ${OWT_NICENESS} ${StartCmd} \
224+
> "${stdout}" 2>&1 </dev/null &
225+
echo $! > ${pid}
226+
else
227+
echo $usage
228+
exit 1
229+
fi
217230
;;
218231
esac
219232

source/agent/agent.toml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
[agent]
2+
#Max processes that agent can run
3+
maxProcesses = -1 #default: -1, unlimited
4+
5+
#Number of precesses that agent runs when it starts. 1 <= prerunProcesses <= maxProcesses.
6+
prerunProcesses = 2 #default: 2
7+
8+
#Roles defined in services have been moved to database
9+
10+
[cluster]
11+
name = "owt-cluster"
12+
13+
#The number of times to retry joining if the first try fails.
14+
join_retry = 60 #default: 60
15+
16+
#The interval of reporting the work load
17+
report_load_interval = 1000 #default: 1000, unit: millisecond
18+
19+
#The max CPU load under which this worker can take new tasks.
20+
max_load = 0.85 #default: 0.85
21+
22+
[rabbit]
23+
host = "localhost" #default: "localhost"
24+
port = 5672 #default: 5672

source/agent/analytics/index.js

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) {
6666
}
6767

6868
const notifyStatus = (controller, sessionId, direction, status) => {
69+
log.warn('notifyStatus:', controller, sessionId, direction, status);
6970
rpcClient.remoteCast(controller, 'onSessionProgress', [sessionId, direction, status]);
7071
// Emit GRPC notifications
7172
const notification = {
@@ -257,7 +258,12 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) {
257258
return;
258259
}
259260
}
260-
generateStream(options.controller, newStreamId, streamInfo);
261+
262+
// For Stream Engine, onSessionProgress(id, name, data)
263+
streamInfo.id = newStreamId;
264+
notifyStatus(controller, connectionId, 'onNewStream', streamInfo);
265+
266+
// generateStream(options.controller, newStreamId, streamInfo);
261267
} catch (e) {
262268
log.error("Parse stream added data with error:", e);
263269
}
@@ -313,7 +319,9 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) {
313319

314320
that.cutoff = function (connectionId, callback) {
315321
log.debug('cutoff, connectionId:', connectionId);
316-
router.cutoff(connectionId).then(onSuccess(callback), onError(callback));
322+
router.cutoff(connectionId).then(
323+
() => callback('callback', 'ok'),
324+
() => callback('callback', 'error', 'Failed to cutoff'));
317325
};
318326

319327
that.onFaultDetected = function (message) {

source/agent/audio/index.js

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
6161

6262
// For GRPC notifications
6363
var streamingEmitter = new EventEmitter();
64+
// ConnectionId => InputId
65+
const fromMap = new Map();
66+
// InputId => options
67+
const unlinkedInputs = new Map();
6468

6569
var addInput = function (stream_id, owner, codec, options, on_ok, on_error) {
6670
if (engine) {
@@ -70,6 +74,13 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
7074
port: options.port
7175
});
7276

77+
if (!conn) {
78+
log.debug('addInput waiting to link up:', stream_id);
79+
unlinkedInputs.set(stream_id, {options});
80+
on_ok(stream_id);
81+
return;
82+
}
83+
7384
if (engine.addInput(owner, stream_id, codec, conn)) {
7485
inputs[stream_id] = {owner: owner,
7586
connection: conn};
@@ -261,7 +272,12 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
261272

262273
that.unpublish = function (stream_id) {
263274
log.debug('unpublish, stream_id:', stream_id);
264-
removeInput(stream_id);
275+
if (fromMap.has(stream_id)) {
276+
removeInput(fromMap.get(stream_id));
277+
fromMap.delete(stream_id);
278+
} else {
279+
removeInput(stream_id);
280+
}
265281
};
266282

267283
that.setInputActive = function (stream_id, active, callback) {
@@ -283,13 +299,41 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
283299
log.debug('unsubscribe, connectionId:', connectionId);
284300
};
285301

286-
that.linkup = function (connectionId, audio_stream_id, callback) {
287-
log.debug('linkup, connectionId:', connectionId, 'audio_stream_id:', audio_stream_id);
302+
// streamInfo = {id: 'string', ip: 'string', port: 'number'}
303+
// from = {audio: streamInfo, video: streamInfo, data: streamInfo}
304+
that.linkup = function (connectionId, from, callback) {
305+
log.debug('linkup, connectionId:', connectionId, 'from:', from);
306+
if (unlinkedInputs.has(connectionId)) {
307+
const {options} = unlinkedInputs.get(connectionId);
308+
unlinkedInputs.delete(connectionId);
309+
const fromId = from.video.id;
310+
options.ip = from.video?.ip;
311+
options.port = from.video?.port;
312+
addInput(fromId, options.publisher, options.audio.codec, options, function () {
313+
if (unlinkedInputs.has(fromId)) {
314+
// Inputs link failed
315+
unlinkedInputs.delete(fromId);
316+
callback('callback', 'error', 'Invalid link address');
317+
} else {
318+
fromMap.set(connectionId, fromId);
319+
callback('callback', 'ok');
320+
}
321+
}, function (reason) {
322+
log.error(reason);
323+
callback('callback', 'error', reason);
324+
});
325+
return;
326+
}
288327
callback('callback', 'ok');
289328
};
290329

291330
that.cutoff = function (connectionId) {
292331
log.debug('cutoff, connectionId:', connectionId);
332+
if (fromMap.has(connectionId)) {
333+
const fromId = fromMap.get(connectionId);
334+
fromMap.delete(connectionId);
335+
connectionId = fromId;
336+
}
293337
if (connections[connectionId] && connections[connectionId].audioFrom) {
294338
if (outputs[connections[connectionId].audioFrom]) {
295339
outputs[connections[connectionId].audioFrom].dispatcher.removeDestination('audio', connections[connectionId].connection.receiver());

0 commit comments

Comments
 (0)