Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit b38f7a8

Browse files
authored
Implement bandwidth estimation based SVC subscription (#1090)
1 parent 777050c commit b38f7a8

34 files changed

+1195
-77
lines changed

scripts/build.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"media-frame-multicaster",
99
"audio",
1010
"audio-ranker",
11+
"video-switch",
1112
"webrtc",
1213
"avstream",
1314
"sip"
@@ -112,8 +113,10 @@
112113
},
113114
"audio-ranker-test" : {
114115
"path" : "source/agent/addons/audioRanker/test"
115-
}
116-
,
116+
},
117+
"video-switch" : {
118+
"path" : "source/agent/addons/videoSwitch"
119+
},
117120
"quicIO" : {
118121
"path" : "source/agent/addons/quicIO"
119122
},
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright (C) <2021> Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
#ifndef BUILDING_NODE_EXTENSION
6+
#define BUILDING_NODE_EXTENSION
7+
#endif
8+
9+
#include "VideoSwitchWrapper.h"
10+
11+
using namespace v8;
12+
13+
Nan::Persistent<Function> VideoSwitch::constructor;
14+
15+
VideoSwitch::VideoSwitch() {}
16+
VideoSwitch::~VideoSwitch() {}
17+
18+
NAN_MODULE_INIT(VideoSwitch::Init) {
19+
// Constructor template
20+
Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
21+
tpl->SetClassName(Nan::New("VideoSwitch").ToLocalChecked());
22+
tpl->InstanceTemplate()->SetInternalFieldCount(1);
23+
24+
// Prototype
25+
Nan::SetPrototypeMethod(tpl, "close", close);
26+
Nan::SetPrototypeMethod(tpl, "setTargetBitrate", setTargetBitrate);
27+
Nan::SetPrototypeMethod(tpl, "addDestination", addDestination);
28+
Nan::SetPrototypeMethod(tpl, "removeDestination", removeDestination);
29+
30+
constructor.Reset(Nan::GetFunction(tpl).ToLocalChecked());
31+
Nan::Set(target, Nan::New("VideoSwitch").ToLocalChecked(),
32+
Nan::GetFunction(tpl).ToLocalChecked());
33+
}
34+
35+
NAN_METHOD(VideoSwitch::New) {
36+
if (info.IsConstructCall()) {
37+
std::vector<owt_base::FrameSource*> sources;
38+
for (int i = 0; i < info.Length(); i++) {
39+
FrameSource* param = node::ObjectWrap::Unwrap<FrameSource>(
40+
info[i]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
41+
owt_base::FrameSource* src = param->src;
42+
sources.push_back(src);
43+
}
44+
45+
VideoSwitch* obj = new VideoSwitch();
46+
obj->me.reset(new owt_base::VideoQualitySwitch(sources));
47+
obj->src = obj->me.get();
48+
49+
obj->Wrap(info.This());
50+
info.GetReturnValue().Set(info.This());
51+
} else {
52+
// ELOG_WARN("Not construct call");
53+
}
54+
}
55+
56+
NAN_METHOD(VideoSwitch::close) {
57+
VideoSwitch* obj = ObjectWrap::Unwrap<VideoSwitch>(info.Holder());
58+
obj->src = nullptr;
59+
obj->me.reset();
60+
}
61+
62+
63+
NAN_METHOD(VideoSwitch::setTargetBitrate) {
64+
VideoSwitch* obj = ObjectWrap::Unwrap<VideoSwitch>(info.Holder());
65+
unsigned int bitrate = Nan::To<unsigned int>(info[0]).FromJust();
66+
obj->me->setTargetBitrate(bitrate);
67+
}
68+
69+
NAN_METHOD(VideoSwitch::addDestination) {
70+
VideoSwitch* obj = ObjectWrap::Unwrap<VideoSwitch>(info.Holder());
71+
72+
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
73+
std::string track = std::string(*param0);
74+
75+
FrameDestination* param =
76+
ObjectWrap::Unwrap<FrameDestination>(
77+
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
78+
owt_base::FrameDestination* dest = param->dest;
79+
80+
if (track == "audio") {
81+
obj->me->addAudioDestination(dest);
82+
} else if (track == "video") {
83+
obj->me->addVideoDestination(dest);
84+
}
85+
}
86+
87+
NAN_METHOD(VideoSwitch::removeDestination) {
88+
VideoSwitch* obj = ObjectWrap::Unwrap<VideoSwitch>(info.Holder());
89+
90+
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
91+
std::string track = std::string(*param0);
92+
93+
FrameDestination* param =
94+
ObjectWrap::Unwrap<FrameDestination>(
95+
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
96+
owt_base::FrameDestination* dest = param->dest;
97+
98+
if (track == "audio") {
99+
obj->me->removeAudioDestination(dest);
100+
} else if (track == "video") {
101+
obj->me->removeVideoDestination(dest);
102+
}
103+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (C) <2021> Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
#ifndef VIDEOSWITCHWRAPPER_H
6+
#define VIDEOSWITCHWRAPPER_H
7+
8+
#include "../../addons/common/MediaFramePipelineWrapper.h"
9+
#include <VideoQualitySwitch.h>
10+
#include <nan.h>
11+
12+
#include <memory>
13+
14+
/*
15+
* Wrapper class of owt_base::VideoQualitySwitch
16+
*/
17+
class VideoSwitch : public FrameSource {
18+
public:
19+
static NAN_MODULE_INIT(Init);
20+
std::shared_ptr<owt_base::VideoQualitySwitch> me;
21+
22+
private:
23+
VideoSwitch();
24+
~VideoSwitch();
25+
26+
static Nan::Persistent<v8::Function> constructor;
27+
28+
static NAN_METHOD(New);
29+
static NAN_METHOD(close);
30+
static NAN_METHOD(setTargetBitrate);
31+
32+
static NAN_METHOD(addDestination);
33+
static NAN_METHOD(removeDestination);
34+
};
35+
36+
#endif
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright (C) <2021> Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
#include "VideoSwitchWrapper.h"
6+
7+
#include <node.h>
8+
9+
using namespace v8;
10+
11+
void InitAll(Local<Object> exports) {
12+
VideoSwitch::Init(exports);
13+
}
14+
15+
NODE_MODULE(addon, InitAll)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
{
2+
'targets': [{
3+
'target_name': 'videoSwitch',
4+
'sources': [
5+
'addon.cc',
6+
'VideoSwitchWrapper.cc',
7+
'../../../core/owt_base/MediaFramePipeline.cpp',
8+
'../../../core/owt_base/selector/VideoQualitySwitch.cpp',
9+
],
10+
'include_dirs': [
11+
"<!(node -e \"require('nan')\")",
12+
'$(CORE_HOME)/common',
13+
'$(CORE_HOME)/owt_base',
14+
'$(CORE_HOME)/owt_base/selector',
15+
'$(DEFAULT_DEPENDENCY_PATH)/include',
16+
'$(CUSTOM_INCLUDE_PATH)'
17+
],
18+
'libraries': [
19+
'-lboost_system',
20+
'-lboost_thread',
21+
'-llog4cxx',
22+
'-L$(DEFAULT_DEPENDENCY_PATH)/lib',
23+
],
24+
'conditions': [
25+
[ 'OS=="mac"', {
26+
'xcode_settings': {
27+
'GCC_ENABLE_CPP_EXCEPTIONS': 'YES', # -fno-exceptions
28+
'MACOSX_DEPLOYMENT_TARGET': '10.7', # from MAC OS 10.7
29+
'OTHER_CFLAGS': ['-g -O$(OPTIMIZATION_LEVEL) -stdlib=libc++']
30+
},
31+
}, { # OS!="mac"
32+
'cflags!': ['-fno-exceptions'],
33+
'cflags_cc': ['-Wall', '-O$(OPTIMIZATION_LEVEL)', '-g', '-std=c++11'],
34+
'cflags_cc!': ['-fno-exceptions']
35+
}],
36+
]
37+
}
38+
]
39+
}

source/agent/conference/conference.js

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,7 @@ var Conference = function (rpcClient, selfRpcId) {
744744
streams[id] = fwdStream;
745745
pubArgs.forEach(pubArg => {
746746
trackOwners[pubArg.id] = id;
747+
747748
if (room_config.selectActiveAudio) {
748749
if (pubArg.media.audio) {
749750
roomController.selectAudio(pubArg.id, () => {
@@ -919,9 +920,43 @@ var Conference = function (rpcClient, selfRpcId) {
919920
log.warn('Add duplicate subscription:', id);
920921
}
921922
}
923+
const switchMap = new Map();
922924
for (const from of subscription.froms()) {
923925
const streamId = streams[from] ? from : trackOwners[from];
924926
if (streams[streamId]) {
927+
if (streams[streamId].type === 'forward' && room_config.enableBandwidthAdaptation) {
928+
// Create layer stream or find simulcast tracks
929+
const svcTrack = streams[streamId].media.tracks.find(t => t.scalabilityMode);
930+
let createSwitch = null;
931+
if (svcTrack) {
932+
// Make RPC to create layer streams
933+
const layerOption = svcTrack.source !== 'screen-cast' ?
934+
{spatial: true, temporal: false} :
935+
{spatial: false, temporal: true};
936+
createSwitch = rtcController.createLayerStreams(svcTrack.id, layerOption)
937+
.catch((e) => log.warn('fail layer :', e))
938+
.then((result) => {
939+
// Make RPC to create quality switch
940+
log.debug('qualitySources:', JSON.stringify(result));
941+
const streamAddr = roomController.getStreamAddress(svcTrack.id);
942+
const qualitySources = result.map((id) => {
943+
return {id, ip: streamAddr.ip, port: streamAddr.port};
944+
});
945+
return rtcController.createQualitySwitch(locality, qualitySources);
946+
});
947+
switchMap.set(svcTrack.id, createSwitch);
948+
}
949+
let simTracks = streams[streamId].media.tracks.filter(t => t.rid);
950+
if (simTracks.length > 0) {
951+
// Make RPC to create quality switch
952+
const qualitySources = simTracks.map((t) => {
953+
return roomController.getStreamAddress(t.id);
954+
});
955+
createSwitch = rtcController.createQualitySwitch(locality, qualitySources);
956+
const simSource = trackOwners[from] ? from : simTracks[0].id;
957+
switchMap.set(simSource, createSwitch);
958+
}
959+
}
925960
subscription.setSource(from, streams[streamId]);
926961
} else {
927962
return Promise.reject('Subscription source not found: ' + from);
@@ -932,10 +967,40 @@ var Conference = function (rpcClient, selfRpcId) {
932967
const subArgs = subscription.toRoomCtrlSubArgs();
933968
const subs = subArgs.map(subArg => new Promise((resolve, reject) => {
934969
if (roomController) {
935-
const subInfo = { transport : transport, media : subArg.media, data : subArg.data, origin: subArg.media.origin };
936-
roomController.subscribe(
970+
const subInfo = {
971+
transport : transport,
972+
media : subArg.media,
973+
data : subArg.data,
974+
origin: subArg.media.origin
975+
};
976+
if (subInfo.media.video && switchMap.has(subInfo.media.video.from)) {
977+
const switchPromise = switchMap.get(subInfo.media.video.from);
978+
switchPromise.then((result) => {
979+
const switchId = result.id;
980+
const originMedia = getStreamTrack(subInfo.media.video.from, 'video');
981+
const pubMedia = {video: originMedia.format};
982+
log.debug('Publish switch to room controller:', switchId, originMedia);
983+
roomController.publish(
984+
subArg.owner,
985+
switchId,
986+
subArg.locality,
987+
{origin: subInfo.origin, media:pubMedia, data:subInfo.data},
988+
subInfo.type,
989+
function pubOk() {
990+
log.debug('Try subscribe switch');
991+
subInfo.media.video = {from: switchId};
992+
roomController.subscribe(
993+
subArg.owner, subArg.id, subArg.locality, subInfo, subArg.type,
994+
isAudioPubPermitted, resolve, reject);
995+
},
996+
reject
997+
);
998+
});
999+
} else {
1000+
roomController.subscribe(
9371001
subArg.owner, subArg.id, subArg.locality, subInfo, subArg.type,
9381002
isAudioPubPermitted, resolve, reject);
1003+
}
9391004
} else {
9401005
reject('RoomController is not ready');
9411006
}
@@ -1256,7 +1321,7 @@ var Conference = function (rpcClient, selfRpcId) {
12561321
if (rtcPubInfo.tracks) {
12571322
// Set formatPreference
12581323
rtcPubInfo.tracks.forEach(track => {
1259-
track.formatPreference = { optional : room_config.mediaIn[track.type] };
1324+
track.formatPreference = {optional : room_config.mediaIn[track.type]};
12601325
});
12611326
}
12621327
initiateStream(streamId, { owner : participantId, type : pubInfo.type, origin });
@@ -1721,6 +1786,10 @@ var Conference = function (rpcClient, selfRpcId) {
17211786
if (subDesc.type === 'webrtc' || subDesc.type === 'quic') {
17221787
const controller = subDesc.type === 'webrtc' ? rtcController : quicController;
17231788
const rtcSubInfo = translateRtcSubIfNeeded(subDesc);
1789+
// Check bandwidth estimation
1790+
if (room_config.enableBandwidthAdaptation) {
1791+
rtcSubInfo.enableBWE = true;
1792+
}
17241793
if (rtcSubInfo.tracks){
17251794
// Set formatPreference
17261795
rtcSubInfo.tracks.forEach(track => {

source/agent/conference/roomController.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,6 +1939,10 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
19391939
}
19401940
};
19411941

1942+
that.getStreamAddress = function (streamId) {
1943+
return getStreamLinkInfo(streamId);
1944+
};
1945+
19421946
var isImpacted = function (locality, type, id) {
19431947
return (type === 'worker' && locality.agent === id) || (type === 'node' && locality.node === id);
19441948
};

source/agent/conference/rpcRequest.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,14 @@ const RpcRequest = function(rpcChannel, listener) {
359359
return rpcChannel.makeRPC(sipNode, 'endCall', [sipCallId]);
360360
};
361361

362+
that.createLayerStreams = function(accessNode, trackId, preferredLayers) {
363+
return rpcChannel.makeRPC(accessNode, 'createLayerStreams', [trackId, preferredLayers]);
364+
};
365+
366+
that.createQualitySwitch = function(accessNode, froms) {
367+
return rpcChannel.makeRPC(accessNode, 'createQualitySwitch', [froms]);
368+
};
369+
362370
that.getClusterID = function(clusterManager, room_id, roomToken) {
363371
return rpcChannel.makeRPC(clusterManager, 'getClusterID', [room_id, roomToken])
364372
}

0 commit comments

Comments
 (0)