Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 406950d

Browse files
authored
Add region based schedule for video/audio/streaming agent (#612)
* Add region based schedule for video/audio/streaming agent
1 parent 1f44ef2 commit 406950d

File tree

8 files changed

+136
-51
lines changed

8 files changed

+136
-51
lines changed

source/agent/audio/agent.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ max_load = 0.85 #default: 0.85
2323
host = "localhost" #default: "localhost"
2424
port = 5672 #default: 5672
2525

26+
[capacity]
27+
#The ISP list this agent is able to handle.
28+
#If the ISP list is set to be non-empty, only the creating token requests with preference.ips being matched with one element of this list will be scheduled to this agent.
29+
isps = [] #default: [], which means all ISPs.
30+
#The region list this agent prefers to handle
31+
#If the region list is set to be non-empty, the creating token requests with preference.region being matched with one element of this list will be priorly scheduled to this agent.
32+
regions = [] #default: [], which means all regions.
33+
2634
[internal]
2735
#The IP address used for internal-cluster media spreading. Will use the IP got from the 'network_interface' item if 'ip_address' is not specified or equal to "".
2836
ip_address = "" #default: ""

source/agent/conference/conference.js

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ var Conference = function (rpcClient, selfRpcId) {
340340
}
341341
};
342342

343-
var initRoom = function(roomId) {
343+
var initRoom = function(roomId, origin) {
344344
if (is_initializing) {
345345
return new Promise(function(resolve, reject) {
346346
var interval = setInterval(function() {
@@ -376,6 +376,7 @@ var Conference = function (rpcClient, selfRpcId) {
376376
rpcClient: rpcClient,
377377
room: roomId,
378378
config: room_config,
379+
origin: origin,
379380
selfRpcId: selfRpcId
380381
},
381382
function onOk(rmController) {
@@ -396,6 +397,7 @@ var Conference = function (rpcClient, selfRpcId) {
396397
var mixed_stream_info = Stream.createMixStream(room_id, viewSettings, room_config.mediaOut, av_capability);
397398

398399
streams[mixed_stream_id] = mixed_stream_info;
400+
streams[mixed_stream_id].info.origin = origin;
399401
log.debug('Mixed stream info:', mixed_stream_info);
400402
room_config.notifying.streamChange &&
401403
sendMsg('room', 'all', 'stream', {id: mixed_stream_id, status: 'add', data: Stream.toPortalFormat(mixed_stream_info)});
@@ -406,7 +408,7 @@ var Conference = function (rpcClient, selfRpcId) {
406408
user: 'admin',
407409
role: 'admin',
408410
portal: undefined,
409-
origin: {isp: 'isp', region: 'region'},//FIXME: hard coded.
411+
origin: origin,
410412
permission: {
411413
subscribe: {audio: true, video: true},
412414
publish: {audio: true, video: true}
@@ -598,9 +600,10 @@ var Conference = function (rpcClient, selfRpcId) {
598600
}
599601

600602
var isReadded = !!(streams[id] && !streams[id].isInConnecting);
601-
603+
var origin = streams[id].info.origin;
604+
info.origin = origin;
602605
return new Promise((resolve, reject) => {
603-
roomController && roomController.publish(info.owner, id, locality, media, info.type, function() {
606+
roomController && roomController.publish(info.owner, id, locality, media, info.type, origin, function() {
604607
if (participants[info.owner]) {
605608
var st = Stream.createForwardStream(id, media, info, room_config);
606609
st.info.inViews = [];
@@ -733,6 +736,7 @@ var Conference = function (rpcClient, selfRpcId) {
733736
};
734737
}
735738
}
739+
media.origin = streams[media.video.from].info.origin;
736740
}
737741

738742
if (media.audio) {
@@ -745,6 +749,7 @@ var Conference = function (rpcClient, selfRpcId) {
745749
media.audio.format = (media.audio.format || source.format);
746750
mediaSpec.audio.format = media.audio.format;
747751
mediaSpec.audio.status = (media.audio.status || 'active');
752+
media.origin = streams[media.audio.from].info.origin;
748753
}
749754

750755
const isAudioPubPermitted = !!participants[info.owner].isPublishPermitted('audio');
@@ -851,7 +856,7 @@ var Conference = function (rpcClient, selfRpcId) {
851856
that.join = function(roomId, participantInfo, callback) {
852857
log.debug('participant:', participantInfo, 'join room:', roomId);
853858
var permission;
854-
return initRoom(roomId)
859+
return initRoom(roomId, participantInfo.origin)
855860
.then(function() {
856861
log.debug('room_config.participantLimit:', room_config.participantLimit, 'current participants count:', Object.keys(participants).length);
857862
if (room_config.participantLimit > 0 && (Object.keys(participants).length >= room_config.participantLimit + 1)) {
@@ -1006,7 +1011,8 @@ var Conference = function (rpcClient, selfRpcId) {
10061011
}
10071012
}
10081013

1009-
initiateStream(streamId, {owner: participantId, type: pubInfo.type});
1014+
var origin = participants[participantId].getOrigin();
1015+
initiateStream(streamId, {owner: participantId, type: pubInfo.type, origin: origin});
10101016
return accessController.initiate(participantId, streamId, 'in', participants[participantId].getOrigin(), pubInfo, format_preference)
10111017
.then((result) => {
10121018
callback('callback', result);
@@ -1796,8 +1802,12 @@ var Conference = function (rpcClient, selfRpcId) {
17961802
log.debug('addStreamingIn, roomId:', roomId, 'pubInfo:', JSON.stringify(pubInfo));
17971803

17981804
if (pubInfo.type === 'streaming') {
1805+
var origin = { isp:'isp', region:'region'};
1806+
if(pubInfo.connection.origin != null) {
1807+
origin = pubInfo.connection.origin;
1808+
}
17991809
var stream_id = Math.round(Math.random() * 1000000000000000000) + '';
1800-
return initRoom(roomId)
1810+
return initRoom(roomId, origin)
18011811
.then(() => {
18021812
if (room_config.inputLimit >= 0 && (room_config.inputLimit <= currentInputCount())) {
18031813
return Promise.reject('Too many inputs');
@@ -1811,8 +1821,8 @@ var Conference = function (rpcClient, selfRpcId) {
18111821
return Promise.reject('Video is forbiden');
18121822
}
18131823

1814-
initiateStream(stream_id, {owner: 'admin', type: pubInfo.type});
1815-
return accessController.initiate('admin', stream_id, 'in', participants['admin'].getOrigin(), pubInfo);
1824+
initiateStream(stream_id, {owner: 'admin', type: pubInfo.type, origin: origin});
1825+
return accessController.initiate('admin', stream_id, 'in', origin, pubInfo);
18161826
}).then((result) => {
18171827
return 'ok';
18181828
}).then(() => {
@@ -2082,7 +2092,8 @@ var Conference = function (rpcClient, selfRpcId) {
20822092

20832093
if (subDesc.type === 'streaming' || subDesc.type === 'recording' || subDesc.type === 'analytics') {
20842094
var subscription_id = Math.round(Math.random() * 1000000000000000000) + '';
2085-
return initRoom(roomId)
2095+
var origin = {isp: 'isp', region: 'region'};
2096+
return initRoom(roomId, origin)
20862097
.then(() => {
20872098
if (subDesc.media.audio && !room_config.mediaOut.audio.length) {
20882099
return Promise.reject('Audio is forbiden');
@@ -2180,7 +2191,16 @@ var Conference = function (rpcClient, selfRpcId) {
21802191
}
21812192

21822193
// Schedule preference for worker node
2183-
var accessPreference = Object.assign({}, participants['admin'].getOrigin());
2194+
var streamFrom = undefined;
2195+
if(subDesc.media.video && subDesc.media.video.from) {
2196+
streamFrom = subDesc.media.video.from;
2197+
} else if(subDesc.media.audio && subDesc.media.audio.from) {
2198+
streamFrom = subDesc.media.audio.from;
2199+
} else {
2200+
return Promise.reject('No video or audio source to process');
2201+
}
2202+
2203+
var accessPreference = Object.assign({}, streams[streamFrom].info.origin);
21842204
if (subDesc.type === 'analytics') {
21852205
// Schedule analytics agent according to the algorithm
21862206
accessPreference.algorithm = subDesc.connection.algorithm;

source/agent/conference/roomController.js

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
5757
rpcClient = spec.rpcClient,
5858
config = spec.config,
5959
room_id = spec.room,
60+
origin = spec.origin,
6061
selfRpcId = spec.selfRpcId,
6162
enable_audio_transcoding = config.transcoding && !!config.transcoding.audio,
6263
enable_video_transcoding = config.transcoding && !!config.transcoding.video,
@@ -214,7 +215,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
214215
// Media mixer initializer
215216
var mixStreamId = getMixStreamOfView(view);
216217
var initMixer = (mixerId, type, mixConfig) => new Promise(function(resolve, reject) {
217-
newTerminal(mixerId, type, mixStreamId, false,
218+
newTerminal(mixerId, type, mixStreamId, false, origin,
218219
function onTerminalReady() {
219220
log.debug('new terminal ok. terminal_id', mixerId, 'type:', type, 'view:', view, 'mixConfig:', mixConfig);
220221
initMediaProcessor(mixerId, ['mixing', mixConfig, room_id, selfRpcId, view])
@@ -350,19 +351,20 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
350351
streams = {};
351352
};
352353

353-
var newTerminal = function (terminal_id, terminal_type, owner, preAssignedNode, on_ok, on_error) {
354-
log.debug('newTerminal:', terminal_id, 'terminal_type:', terminal_type, 'owner:', owner);
354+
var newTerminal = function (terminal_id, terminal_type, owner, preAssignedNode, origin, on_ok, on_error) {
355+
log.debug('newTerminal:', terminal_id, 'terminal_type:', terminal_type, 'owner:', owner, " origin:", origin);
355356
if (terminals[terminal_id] === undefined) {
356357
var purpose = (terminal_type === 'vmixer' || terminal_type === 'vxcoder') ? 'video'
357358
: ((terminal_type === 'amixer' || terminal_type === 'axcoder') ? 'audio' : 'unknown');
358-
359+
mediaPreference.origin = origin;
359360
var nodeLocality = (preAssignedNode ? Promise.resolve(preAssignedNode)
360361
: rpcReq.getWorkerNode(cluster, purpose, {room: room_id, task: terminal_id}, mediaPreference));
361362

362363
return nodeLocality
363364
.then(function(locality) {
364365
terminals[terminal_id] = {
365366
owner: owner,
367+
origin: origin,
366368
type: terminal_type,
367369
locality: locality,
368370
published: [],
@@ -903,7 +905,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
903905
var getAudioTranscoder = function (stream_id, on_ok, on_error) {
904906
findExistingAudioTranscoder(stream_id, on_ok, function () {
905907
var axcoder = randomId();
906-
newTerminal(axcoder, 'axcoder', streams[stream_id].owner, false, function () {
908+
var terminalId = streams[stream_id].owner;
909+
newTerminal(axcoder, 'axcoder', streams[stream_id].owner, false, terminals[terminalId].origin, function () {
907910
var on_failed = function (reason) {
908911
makeRPC(
909912
rpcClient,
@@ -1016,7 +1019,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
10161019
var getVideoTranscoder = function (stream_id, on_ok, on_error) {
10171020
findExistingVideoTranscoder(stream_id, on_ok, function () {
10181021
var vxcoder = randomId();
1019-
newTerminal(vxcoder, 'vxcoder', streams[stream_id].owner, false, function () {
1022+
var terminalId = streams[stream_id].owner;
1023+
newTerminal(vxcoder, 'vxcoder', streams[stream_id].owner, false, terminals[terminalId].origin, function () {
10201024
var on_failed = function (error_reason) {
10211025
makeRPC(
10221026
rpcClient,
@@ -1422,12 +1426,12 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
14221426
});
14231427
};
14241428

1425-
that.publish = function (participantId, streamId, accessNode, streamInfo, streamType, on_ok, on_error) {
1426-
log.debug('publish, participantId: ', participantId, 'streamId:', streamId, 'accessNode:', accessNode.node, 'streamInfo:', JSON.stringify(streamInfo));
1429+
that.publish = function (participantId, streamId, accessNode, streamInfo, streamType, origin, on_ok, on_error) {
1430+
log.debug('publish, participantId: ', participantId, 'streamId:', streamId, 'accessNode:', accessNode.node, 'streamInfo:', JSON.stringify(streamInfo), ' origin is:', origin);
14271431
if (streams[streamId] === undefined) {
14281432
var terminal_id = pubTermId(participantId, streamId);
14291433
var terminal_owner = (streamType === 'webrtc' || streamType === 'sip') ? participantId : room_id + '-' + randomId();
1430-
newTerminal(terminal_id, streamType, terminal_owner, accessNode, function () {
1434+
newTerminal(terminal_id, streamType, terminal_owner, accessNode, origin, function () {
14311435
streams[streamId] = {owner: terminal_id,
14321436
audio: streamInfo.audio ? {format: formatStr(streamInfo.audio),
14331437
subscribers: [],
@@ -1680,7 +1684,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
16801684
};
16811685

16821686
var terminal_owner = (((subType === 'webrtc' || subType === 'sip') && isAudioPubPermitted) ? participantId : room_id);
1683-
newTerminal(terminal_id, subType, terminal_owner, accessNode, function () {
1687+
newTerminal(terminal_id, subType, terminal_owner, accessNode, subInfo.origin, function () {
16841688
doSubscribe();
16851689
}, on_error);
16861690
} else {
@@ -1913,6 +1917,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
19131917
}
19141918

19151919
log.debug('rebuildVideoMixer, vmixerId:', vmixerId, 'view:', view);
1920+
var origin = terminals[vmixerId].origin;
19161921
for (var sub_id in terminals[vmixerId].subscribed) {
19171922
var vst_id = terminals[vmixerId].subscribed[sub_id].video;
19181923
inputs.push(vst_id);
@@ -1936,7 +1941,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
19361941
}
19371942
});
19381943
terminals[vmixerId].published = [];
1939-
1944+
mediaPreference.origin = origin;
19401945
return rpcReq.getWorkerNode(cluster, 'video', {room: room_id, task: vmixerId}, mediaPreference)
19411946
.then(function (locality) {
19421947
log.debug('Got new video mixer node:', locality);
@@ -2013,6 +2018,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
20132018
var input, outputs = [];
20142019

20152020
log.debug('rebuildVideoTranscoder, vxcoderId:', vxcoderId);
2021+
var origin = terminals[vxcoderId].origin;
20162022
for (var sub_id in terminals[vxcoderId].subscribed) {
20172023
var vst_id = terminals[vxcoderId].subscribed[sub_id].video;
20182024
input = vst_id;
@@ -2129,6 +2135,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
21292135
}
21302136
}
21312137

2138+
var origin = terminals[amixerId].origin;
21322139
for (var sub_id in terminals[amixerId].subscribed) {
21332140
var ast_id = terminals[amixerId].subscribed[sub_id].audio;
21342141
inputs.push(ast_id);
@@ -2154,6 +2161,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
21542161
});
21552162
terminals[amixerId].published = [];
21562163

2164+
mediaPreference.origin = origin;
21572165
return rpcReq.getWorkerNode(cluster, 'audio', {room: room_id, task: amixerId}, mediaPreference)
21582166
.then(function (locality) {
21592167
log.debug('Got new audio mixer node:', locality);
@@ -2228,6 +2236,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
22282236
var old_locality = terminals[axcoderId].locality;
22292237
var input, outputs = [];
22302238

2239+
var origin = terminals[axcoderId].origin;
22312240
for (var sub_id in terminals[axcoderId].subscribed) {
22322241
var vst_id = terminals[axcoderId].subscribed[sub_id].audio;
22332242
input = vst_id;

source/agent/conference/stream.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,9 @@ function toPortalFormat (internalStream) {
418418
delete videoInfo.alternative;
419419
}
420420
}
421+
if(stream && stream.info && stream.info.origin) {
422+
delete stream.info.origin;
423+
}
421424
return stream;
422425
};
423426

source/agent/streaming/agent.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ max_load = 0.85 #default: 0.85
2121
#The bandwidth of network-interface used for RTSP/RTMP connections.
2222
network_max_scale = 1000 #unit: Mbps
2323

24+
[capacity]
25+
#The ISP list this agent is able to handle.
26+
#If the ISP list is set to be non-empty, only the creating token requests with preference.ips being matched with one element of this list will be scheduled to this agent.
27+
isps = [] #default: [], which means all ISPs.
28+
#The region list this agent prefers to handle
29+
#If the region list is set to be non-empty, the creating token requests with preference.region being matched with one element of this list will be priorly scheduled to this agent.
30+
regions = [] #default: [], which means all regions.
2431

2532
[rabbit]
2633
host = "localhost" #default: "localhost"

source/agent/video/agent.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ report_load_interval = 1000 #default: 1000, unit: millisecond
1818
#The max CPU/GPU load under which this worker can take new tasks.
1919
max_load = 0.85 #default: 0.85
2020

21+
[capacity]
22+
#The ISP list this agent is able to handle.
23+
#If the ISP list is set to be non-empty, only the creating token requests with preference.ips being matched with one element of this list will be scheduled to this agent.
24+
isps = [] #default: [], which means all ISPs.
25+
#The region list this agent prefers to handle
26+
#If the region list is set to be non-empty, the creating token requests with preference.region being matched with one element of this list will be priorly scheduled to this agent.
27+
regions = [] #default: [], which means all regions.
2128

2229
[rabbit]
2330
host = "localhost" #default: "localhost"

0 commit comments

Comments
 (0)