Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 645b7aa

Browse files
authored
Selecting active audio streams for webrtc connection (#732)
* Add metadata methods in pipeline * Change metadata name case * Add onwerID metadata logic in AudioFramePacketizer and AudioRanker * Add active audio selecting in audio node * Add owner ID processing in webrtc node * Add active audio selecting in conference and configuration
1 parent aa4380a commit 645b7aa

19 files changed

+465
-75
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright (C) <2019> Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
'use strict';
6+
7+
const { EventEmitter } = require('events');
8+
const MediaFrameMulticaster = require('../mediaFrameMulticaster/build/Release/mediaFrameMulticaster');
9+
const { AudioRanker } = require('../audioRanker/build/Release/audioRanker');
10+
11+
const logger = require('../logger').logger;
12+
const log = logger.getLogger('ActiveAudioSelector');
13+
14+
const DEFAULT_K = 3;
15+
const DETECT_MUTE = true;
16+
const CHANGE_INTERVAL = 200;
17+
18+
class ActiveAudioSelector extends EventEmitter {
19+
20+
constructor(k) {
21+
super();
22+
k = (k > 0) ? k : DEFAULT_K;
23+
// streamId : { owner, codec, conn }
24+
this.inputs = new Map();
25+
this.ranker = new AudioRanker(
26+
this._onRankChange.bind(this), DETECT_MUTE, CHANGE_INTERVAL);
27+
this.topK = [];
28+
for (let i = 0; i < k; i++) {
29+
let multicaster = new MediaFrameMulticaster();
30+
this.topK.push(multicaster);
31+
this.ranker.addOutput(this.topK[i]);
32+
}
33+
this.currentRank = Array(k).fill('');
34+
log.debug('Init with K:', k);
35+
}
36+
37+
_onRankChange(jsonChanges) {
38+
log.debug('_onRankChange:', jsonChanges);
39+
let changes = JSON.parse(jsonChanges);
40+
// Get rank difference
41+
if (changes.length !== this.currentRank.length) {
42+
log.warn('Changes number not equal to K');
43+
return;
44+
}
45+
46+
let pendingAddIndexes = [];
47+
for (let i = 0; i < this.currentRank.length; i++) {
48+
if (this.currentRank[i] !== changes[i][0]) {
49+
// Remove previous input
50+
let streamId = this.currentRank[i];
51+
if (this.inputs.has(streamId)) {
52+
let { owner, codec } = this.inputs.get(streamId);
53+
log.debug('Inactive input:', owner, streamId);
54+
} else if (streamId !== '') {
55+
log.warn('Unknown stream ID', streamId);
56+
}
57+
pendingAddIndexes.push(i);
58+
}
59+
}
60+
for (let i of pendingAddIndexes) {
61+
// Re-add top K with updated owner/streamId/codec
62+
let streamId = changes[i][0];
63+
if (this.inputs.has(streamId)) {
64+
let { owner, codec } = this.inputs.get(streamId);
65+
log.debug('Active input:', owner, streamId, codec, i);
66+
this.emit('source-change', i, owner, streamId, codec);
67+
} else if (streamId !== '') {
68+
log.warn('Unknown stream ID', streamId);
69+
}
70+
}
71+
this.currentRank = changes.map(pair => pair[0]);
72+
}
73+
74+
addInput(owner, streamId, codec, conn) {
75+
log.debug('addInput:', owner, streamId, codec);
76+
if (!this.inputs.has(streamId)) {
77+
this.inputs.set(streamId, { owner, codec, conn });
78+
this.ranker.addInput(conn, streamId, owner);
79+
return true;
80+
} else {
81+
log.warn(`Duplicate addInput ${streamId}`);
82+
return false;
83+
}
84+
}
85+
86+
removeInput(owner, streamId) {
87+
log.debug('removeInput:', owner, streamId);
88+
if (this.inputs.delete(streamId)) {
89+
this.ranker.removeInput(streamId);
90+
} else {
91+
log.warn(`Remove non-add input ${streamId}`);
92+
}
93+
}
94+
95+
removeOutput() {}
96+
97+
getOutputs() {
98+
return this.topK;
99+
}
100+
}
101+
102+
exports.ActiveAudioSelector = ActiveAudioSelector;

source/agent/audio/dist.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
"folders": {
2828
"audio": [
2929
"index.js",
30-
"SelectiveMixer.js",
30+
"selectiveMixer.js",
31+
"activeAudioSelector.js",
3132
"../connections.js",
3233
"../InternalConnectionFactory.js"
3334
]

source/agent/audio/index.js

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ var InternalOut = internalIO.Out;
99
var MediaFrameMulticaster = require('../mediaFrameMulticaster/build/Release/mediaFrameMulticaster');
1010
var AudioMixer = require('../audioMixer/build/Release/audioMixer');
1111

12-
var { SelectiveMixer } = require('./SelectiveMixer');
12+
var { SelectiveMixer } = require('./selectiveMixer');
13+
var { ActiveAudioSelector } = require('./activeAudioSelector');
1314

1415
var logger = require('../logger').logger;
1516

@@ -131,6 +132,29 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
131132
callback('callback', {codecs: supported_codecs});
132133
};
133134

135+
const initSelector = function ({activeStreamIds}, belongToRoom, ctrlr, callback) {
136+
if (!activeStreamIds || !activeStreamIds.length) {
137+
callback('callback', 'error', 'No active stream IDs.');
138+
return;
139+
}
140+
const selector = new ActiveAudioSelector(activeStreamIds.length);
141+
selector.on('source-change', (i, owner, streamId, codec) => {
142+
// Notify controller
143+
const target = {id: activeStreamIds[i], owner, codec};
144+
ctrlr && rpcClient.remoteCall(ctrlr, 'onAudioActiveness',
145+
[belongToRoom, activeInput, target],
146+
{callback: function(){}});
147+
});
148+
engine = selector;
149+
for (let i = 0; i < activeStreamIds.length; i++) {
150+
outputs[activeStreamIds[i]] = {
151+
dispatcher: selector.getOutputs()[i],
152+
connections: {}
153+
};
154+
}
155+
callback('callback', {selectingNum: selector.getOutputs().length});
156+
}
157+
134158
that.deinit = function () {
135159
for (var stream_id in outputs) {
136160
removeOutput(stream_id);
@@ -282,7 +306,10 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
282306
log.debug('enableVAD, periodMS:', periodMS);
283307
engine.enableVAD(periodMS, function (activeInput) {
284308
log.debug('enableVAD, activeInput:', activeInput);
285-
controller && rpcClient.remoteCall(controller, 'onAudioActiveness', [belong_to_room, activeInput, view], {callback: function(){}});
309+
controller && rpcClient.remoteCall(
310+
controller, 'onAudioActiveness',
311+
[belong_to_room, activeInput, {view}],
312+
{callback: function(){}});
286313
});
287314
};
288315

@@ -303,6 +330,8 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
303330
initEngine(audioConfig, belongToRoom, controller, callback);
304331
} else if (service === 'transcoding') {
305332
initEngine(audioConfig, belongToRoom, controller, callback);
333+
} else if (service === 'selecting') {
334+
initSelector(config, belongToRoom, controller, callback);
306335
} else {
307336
log.error('Unknown service type to init an audio node:', service);
308337
callback('callback', 'error', 'Unknown service type to init an audio node.');

source/agent/conference/conference.js

Lines changed: 59 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const {
2828
const {
2929
ForwardStream,
3030
MixedStream,
31+
SelectedStream,
3132
StreamConfigure,
3233
} = require('./stream');
3334

@@ -356,15 +357,24 @@ var Conference = function (rpcClient, selfRpcId) {
356357
room_id = roomId;
357358
is_initializing = false;
358359

359-
room_config.views.forEach((viewSettings) => {
360-
var mixed_stream_id = room_id + '-' + viewSettings.label;
361-
var mixed_stream_info = new MixedStream(mixed_stream_id, viewSettings.label);
360+
roomController.getMixedStreams().forEach(({streamId, view}) => {
361+
const mixedStreamInfo = new MixedStream(streamId, view);
362+
streams[streamId] = mixedStreamInfo;
363+
streams[streamId].info.origin = origin;
364+
log.debug('Mixed stream info:', mixedStreamInfo);
365+
room_config.notifying.streamChange &&
366+
sendMsg('room', 'all', 'stream',
367+
{id: streamId, status: 'add', data: mixedStreamInfo.toPortalFormat()});
368+
});
362369

363-
streams[mixed_stream_id] = mixed_stream_info;
364-
streams[mixed_stream_id].info.origin = origin;
365-
log.debug('Mixed stream info:', mixed_stream_info);
370+
roomController.getActiveAudioStreams().forEach((streamId) => {
371+
const selectedStreamInfo = new SelectedStream(streamId);
372+
streams[streamId] = selectedStreamInfo;
373+
streams[streamId].info.origin = origin;
374+
log.debug('Selected stream info:', selectedStreamInfo);
366375
room_config.notifying.streamChange &&
367-
sendMsg('room', 'all', 'stream', {id: mixed_stream_id, status: 'add', data: mixed_stream_info.toPortalFormat()});
376+
sendMsg('room', 'all', 'stream',
377+
{id: streamId, status: 'add', data: selectedStreamInfo.toPortalFormat()});
368378
});
369379

370380
participants['admin'] = Participant({
@@ -639,6 +649,15 @@ var Conference = function (rpcClient, selfRpcId) {
639649
streams[id] = fwdStream;
640650
pubArgs.forEach(pubArg => {
641651
trackOwners[pubArg.id] = id;
652+
if (room_config.selectActiveAudio) {
653+
if (pubArg.media.audio) {
654+
roomController.selectAudio(pubArg.id, () => {
655+
log.debug('Select active audio ok:', pubArg.id);
656+
}, (err) => {
657+
log.info('Select active audio error:', pubArg.id, err);
658+
});
659+
}
660+
}
642661
});
643662
if (!isReadded) {
644663
setTimeout(() => {
@@ -1845,28 +1864,44 @@ var Conference = function (rpcClient, selfRpcId) {
18451864
}
18461865
};
18471866

1848-
that.onAudioActiveness = function(roomId, activeInputStream, view, callback) {
1849-
log.debug('onAudioActiveness, roomId:', roomId, 'activeInputStream:', activeInputStream, 'view:', view);
1867+
that.onAudioActiveness = function(roomId, activeInputStream, target, callback) {
1868+
log.debug('onAudioActiveness, roomId:', roomId, 'activeInputStream:', activeInputStream, 'target:', target);
18501869
if ((room_id === roomId) && roomController) {
1851-
room_config.views.forEach((viewSettings) => {
1852-
if (viewSettings.label === view && viewSettings.video.keepActiveInputPrimary) {
1853-
roomController.setPrimary(activeInputStream, view);
1854-
}
1855-
});
1870+
if (typeof target.view === 'string') {
1871+
const view = target.view;
1872+
room_config.views.forEach((viewSettings) => {
1873+
if (viewSettings.label === view && viewSettings.video.keepActiveInputPrimary) {
1874+
roomController.setPrimary(activeInputStream, view);
1875+
}
1876+
});
18561877

1857-
var input = streams[activeInputStream] ? activeInputStream : trackOwners[activeInputStream];
1858-
if (input && streams[input]) {
1859-
for (var id in streams) {
1860-
if (streams[id].type === 'mixed' && streams[id].info.label === view) {
1861-
if (streams[id].info.activeInput !== input) {
1862-
streams[id].info.activeInput = input;
1863-
room_config.notifying.streamChange && sendMsg('room', 'all', 'stream', {id: id, status: 'update', data: {field: 'activeInput', value: input}});
1864-
}
1865-
break;
1878+
const input = streams[activeInputStream] ?
1879+
activeInputStream : trackOwners[activeInputStream];
1880+
const mixedId = roomController.getMixedStream(view);
1881+
if (streams[mixedId] instanceof MixedStream) {
1882+
if (streams[mixedId].info.activeInput !== input) {
1883+
streams[mixedId].info.activeInput = input;
1884+
room_config.notifying.streamChange &&
1885+
sendMsg('room', 'all', 'stream',
1886+
{id: mixedId, status: 'update', data: {field: 'activeInput', value: input}});
1887+
}
1888+
}
1889+
callback('callback', 'ok');
1890+
} else {
1891+
const input = streams[activeInputStream] ?
1892+
activeInputStream : trackOwners[activeInputStream];
1893+
const activeAudioId = target.id;
1894+
if (streams[activeAudioId] instanceof SelectedStream) {
1895+
if (streams[activeAudioId].info.activeInput !== input) {
1896+
streams[activeAudioId].info.activeInput = input;
1897+
streams[activeAudioId].info.owner = target.owner;
1898+
room_config.notifying.streamChange &&
1899+
sendMsg('room', 'all', 'stream',
1900+
{id: activeAudioId, status: 'update', data: {field: 'activeInput', value: input}});
18661901
}
18671902
}
1903+
callback('callback', 'ok');
18681904
}
1869-
callback('callback', 'ok');
18701905
} else {
18711906
log.info('onAudioActiveness, room does not exist');
18721907
callback('callback', 'error', 'room is not in service');

0 commit comments

Comments
 (0)