Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 6793e73

Browse files
committed
Refine controllers for stream service
1 parent fad06a2 commit 6793e73

13 files changed

+639
-369
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
{
2+
"webrtc": {
3+
"className": "RtcController",
4+
"requirePath": "./controllers/rtcController",
5+
"test": {}
6+
},
7+
"audio": {
8+
"className": "AudioController",
9+
"requirePath": "./controllers/audioController",
10+
"test": {}
11+
},
12+
"video": {
13+
"className": "VideoController",
14+
"requirePath": "./controllers/videoController",
15+
"test": {}
16+
},
17+
"streaming": {
18+
"className": "StreamingController",
19+
"requirePath": "./controllers/streamingController",
20+
"test": {}
21+
},
22+
"analytics": {
23+
"className": "AnalyticsController",
24+
"requirePath": "./controllers/analyticsController",
25+
"test": {}
26+
},
27+
"quic": {
28+
"className": "QuicController",
29+
"requirePath": "./controllers/quicController",
30+
"test": {}
31+
},
32+
"virtual": {
33+
"className": "VirtualController",
34+
"requirePath": "./controllers/virtualController",
35+
"test": {}
36+
}
37+
}

source/stream_service/analyticsController.js renamed to source/stream_service/controllers/analyticsController.js

Lines changed: 16 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,9 @@
44

55
'use strict';
66

7-
const { EventEmitter } = require('events');
8-
const log = require('./logger').logger.getLogger('AnalyticsController');
9-
const {
10-
Publication,
11-
Subscription,
12-
Processor,
13-
} = require('./session');
7+
const log = require('../logger').logger.getLogger('AnalyticsController');
8+
const {TypeController} = require('./typeController');
9+
const {Publication, Subscription, Processor} = require('../stateTypes')
1410

1511
function AnalyticsSession(config, direction) {
1612
const session = Object.assign({}, config);
@@ -24,14 +20,9 @@ function AnalyticsSession(config, direction) {
2420
* 'session-updated': (id, Publication|Subscription)
2521
* 'session-aborted': (id, reason)
2622
*/
27-
class AnalyticsController extends EventEmitter {
28-
29-
constructor(rpcChannel, selfRpcId, clusterRpcId) {
30-
log.debug(`constructor ${selfRpcId}, ${clusterRpcId}`);
31-
super();
32-
this.rpcChannel = rpcChannel;
33-
this.selfRpcId = selfRpcId;
34-
this.clusterRpcId = clusterRpcId;
23+
class AnalyticsController extends TypeController {
24+
constructor(rpc) {
25+
super(rpc);
3526
// Map {processorId => Processor}
3627
this.processors = new Map();
3728
this.sessions = new Map();
@@ -48,12 +39,10 @@ class AnalyticsController extends EventEmitter {
4839
*/
4940
async addProcessor(procConfig) {
5041
log.debug('addProcessor:', procConfig);
51-
const rpcChannel = this.rpcChannel;
52-
const taskConfig = {room: procConfig.domain, task: procConfig.id};
5342
// Unused media preference for analytics
5443
const mediaPreference = {audio: {encode:[], decode:[]}};
55-
const locality = await this._getWorkerNode(
56-
this.clusterRpcId, 'analytics', taskConfig, mediaPreference);
44+
const locality = await this.getWorkerNode(
45+
'audio', procConfig.domain, procConfig.id, mediaPreference);
5746
log.debug('locality:', locality);
5847
const analyzer = new Processor(procConfig.id, 'analyzer', procConfig);
5948
analyzer.locality = locality;
@@ -64,11 +53,9 @@ class AnalyticsController extends EventEmitter {
6453
}
6554

6655
async removeProcessor(id) {
67-
const rpcChannel = this.rpcChannel;
6856
const processor = this.processors.get(id);
6957
if (processor) {
7058
const procConfig = this.processors.get(id).info;
71-
const taskId = procConfig.analytics.id;
7259
const locality = processor.locality;
7360
// Add clean up method for analytics agent
7461

@@ -80,9 +67,8 @@ class AnalyticsController extends EventEmitter {
8067
this.emit('session-aborted', videoTrack.id, 'Processor removed');
8168
});
8269
this.processors.delete(id);
83-
const taskConfig = {room: procConfig.domain, task: procConfig.id};
8470
// Recycle node
85-
this._recycleWorkerNode(locality.agent, locality.node, taskConfig)
71+
this.recycleWorkerNode(locality, procConfig.domain, procConfig.id)
8672
.catch((err) => log.debug('Failed to recycleNode:', err));
8773
} else {
8874
return Promise.reject(new Error('Invalid processor ID'));
@@ -127,7 +113,6 @@ class AnalyticsController extends EventEmitter {
127113
return Promise.reject(new Error('Invalid processor ID'));
128114
}
129115

130-
const rpcChannel = this.rpcChannel;
131116
const session = AnalyticsSession(sessionConfig, direction);
132117
if (direction === 'in') {
133118
// Generate video stream for analyzer
@@ -152,9 +137,9 @@ class AnalyticsController extends EventEmitter {
152137
const options = {
153138
media: sessionConfig.media,
154139
connection: sessionConfig.connection,
155-
controller: this.selfRpcId,
140+
controller: this.selfId,
156141
};
157-
await rpcChannel.makeRPC(processor.locality.node, 'subscribe',
142+
await this.makeRPC(processor.locality.node, 'subscribe',
158143
[inputId, 'analytics', options]);
159144
this.sessions.set(inputId, session);
160145
// Create subscription
@@ -177,31 +162,31 @@ class AnalyticsController extends EventEmitter {
177162

178163
async removeSession(id, direction, reason) {
179164
if (this.sessions.has(id)) {
180-
const rpcChannel = this.rpcChannel;
181165
const session = this.sessions.get(id);
182166
const processor = this.processors.get(session.processor);
183167
if (!processor) {
184168
throw new Error(`Processor for ${id} not found`);
185169
}
186170

187-
if (session.direction === 'in') {
171+
reason = reason || 'Participant terminate';
172+
if (direction === 'in') {
188173
// Degenerate
189174
const idx = processor.outputs.video.findIndex((track) => track.id === id);
190175
if (idx >= 0) {
191176
processor.outputs.audio.splice(idx, 1);
192-
this.emit('session-aborted', id, 'Participant terminate');
177+
this.emit('session-aborted', id, reason);
193178
}
194179
} else {
195180
// Remove input
196181
log.debug('session:', session);
197182
// Let cutoff do remove-input
198183
const inputId = session.media?.audio?.from;
199-
rpcChannel.makeRPC(processor.locality.node, 'unsubscribe', [inputId])
184+
this.makeRPC(processor.locality.node, 'unsubscribe', [inputId])
200185
.catch((e) => log.debug('ignore unpublish callback'));
201186
const idx = processor.inputs.audio.findIndex((track) => track.id === id);
202187
if (idx >= 0) {
203188
processor.inputs.video.splice(idx, 1);
204-
this.emit('session-aborted', id, 'Participant terminate');
189+
this.emit('session-aborted', id, reason);
205190
}
206191
}
207192
} else {
@@ -262,27 +247,6 @@ class AnalyticsController extends EventEmitter {
262247
//
263248
});
264249
}
265-
266-
_getWorkerNode(clusterManager, purpose, forWhom, preference) {
267-
const rpcChannel = this.rpcChannel;
268-
return rpcChannel.makeRPC(clusterManager, 'schedule', [purpose, forWhom.task, preference, 30 * 1000])
269-
.then(function(workerAgent) {
270-
return rpcChannel.makeRPC(workerAgent.id, 'getNode', [forWhom])
271-
.then(function(workerNode) {
272-
return {agent: workerAgent.id, node: workerNode};
273-
});
274-
});
275-
}
276-
277-
_recycleWorkerNode(workerAgent, workerNode, forWhom) {
278-
return this.rpcChannel.makeRPC(workerAgent, 'recycleNode', [workerNode, forWhom])
279-
. catch((result) => {
280-
return 'ok';
281-
}, (err) => {
282-
return 'ok';
283-
});
284-
}
285-
286250
}
287251

288252
exports.AnalyticsController = AnalyticsController;

0 commit comments

Comments
 (0)