Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit fad06a2

Browse files
committed
Add scheduler for stream service
1 parent c715380 commit fad06a2

File tree

9 files changed

+285
-132
lines changed

9 files changed

+285
-132
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright (C) <2022> Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
'use strict';
6+
7+
const log = require('./logger').logger.getLogger('MyController');
8+
const {DomainHandler, RemoteDomainHandler} = require('./domainHandler');
9+
10+
const STREAM_ENGINE_ID = 'stream-service';
11+
12+
class MyController {
13+
14+
constructor(rpcId, rpcClient) {
15+
this.rpcId = rpcId;
16+
this.domain = null;
17+
// id => JoinData
18+
this.clients = new Map();
19+
// id => count
20+
this.portals = new Map();
21+
// id => stream
22+
this.streams = new Map();
23+
}
24+
}
25+
26+
function RPCInterface(rpcClient, rpcID) {
27+
const controller = new DomainHandler();
28+
const API = {};
29+
for (const method of RemoteDomainHandler.supportedMethods) {
30+
API[method] = function (req, callback) {
31+
controller[method](req).then((ret) => {
32+
callback('callback', ret ? ret : 'ok');
33+
}).catch((e) => {
34+
callback('callback', 'error', e && e.message);
35+
});
36+
}
37+
}
38+
return API;
39+
}
40+
41+
module.exports = RPCInterface;

source/portal/portal.js

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,13 +238,11 @@ var Portal = function(spec, rpcReq) {
238238

239239
that.onSessionSignaling = function(participantId, sessionId, signaling) {
240240
log.debug('onSessionSignaling, participantId:', participantId, 'sessionId:', sessionId, 'signaling:', signaling);
241-
242-
var participant = participants[participantId];
243241
if (participants[participantId] === undefined) {
244242
return Promise.reject('Participant has NOT joined');
245243
}
246244

247-
return rpcReq.onSessionSignaling(participants[participantId].controller, sessionId, signaling);
245+
return rpcReq.onSessionSignaling(participants[participantId].controller, sessionId, signaling, participantId);
248246
};
249247

250248
that.text = function(participantId, to, msg) {

source/portal/rpcRequest.js

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,6 @@ var RpcRequest = function(rpcChannel) {
119119
};
120120

121121
that.join = function(controller, roomId, participant) {
122-
if (STREAM_ENGINE) {
123-
participant.domain = roomId;
124-
}
125122
if (enableGrpc) {
126123
startConferenceClientIfNeeded(controller);
127124
const req = {roomId, participant};
@@ -135,6 +132,11 @@ var RpcRequest = function(rpcChannel) {
135132
});
136133
});
137134
}
135+
if (STREAM_ENGINE) {
136+
participant.domain = roomId;
137+
participant.participant = participant.id;
138+
return rpcChannel.makeRPC(controller, 'join', [participant], 6000);
139+
}
138140
return rpcChannel.makeRPC(controller, 'join', [roomId, participant], 6000);
139141
};
140142

@@ -153,7 +155,7 @@ var RpcRequest = function(rpcChannel) {
153155
});
154156
}
155157
if (STREAM_ENGINE) {
156-
const req = {id: participantId};
158+
const req = {id: participantId, participant: participantId};
157159
return rpcChannel.makeRPC(controller, 'leave', [req]);
158160
}
159161
return rpcChannel.makeRPC(controller, 'leave', [participantId]);
@@ -362,6 +364,14 @@ var RpcRequest = function(rpcChannel) {
362364
});
363365
});
364366
}
367+
if (STREAM_ENGINE) {
368+
const req = {
369+
id: sessionId,
370+
signaling,
371+
participant: participantId
372+
};
373+
return rpcChannel.makeRPC(controller, 'onSessionSignaling', [req]);
374+
}
365375
return rpcChannel.makeRPC(controller, 'onSessionSignaling', [sessionId, signaling]);
366376
};
367377

source/stream_service/dist.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"domainHandler.js",
1919
"stateStores.js",
2020
"stateTypes.js",
21+
"scheduler.js",
2122
"controllers.json",
2223
"../agent/conference/rpcRequest.js",
2324
"../common/amqpClient.js",

source/stream_service/scheduler.js

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright (C) <2022> Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
'use strict';
6+
7+
const log = require('./logger').logger.getLogger('Scheduler');
8+
9+
const HASH_SIZE = 512;
10+
function stringHash(str) {
11+
let hash = 0;
12+
if (str.length === 0) return hash;
13+
for (let i = 0; i < str.length; i++) {
14+
const ch = str.charCodeAt(i);
15+
hash = ((hash << 5) - hash) + ch;
16+
hash |= 0;
17+
}
18+
return hash % HASH_SIZE + HASH_SIZE;
19+
}
20+
21+
const CHECK_INTERVAL = 60 * 1000; // 1 min
22+
23+
class ServiceScheduler {
24+
static supportedMethods = [
25+
'join',
26+
'leave',
27+
'publish',
28+
'unpublish',
29+
'getPublications',
30+
'subscribe',
31+
'unsubscribe',
32+
'getSubscriptions',
33+
'streamControl',
34+
'subscriptionControl',
35+
'onStatus',
36+
'addProcessor',
37+
'removeProcessor',
38+
'getProcessors',
39+
'onSessionSignaling',
40+
];
41+
constructor(rpcChannel, stateStores) {
42+
this.rpcChannel = rpcChannel;
43+
this.stateStores = stateStores;
44+
this.checkAliveTimer = setInterval(() => {
45+
this.checkService();
46+
}, CHECK_INTERVAL);
47+
}
48+
49+
async scheduleService(req) {
50+
const key = (req.participant || '') + (req.domain || '');
51+
const hash = stringHash(key);
52+
log.debug('hash:', key, hash);
53+
const ret = await this.stateStores.readMany('streamEngineNodes', {});
54+
const serviceNodes = ret.data || [];
55+
if (serviceNodes.length === 0) {
56+
throw new Error('No available stream engine nodes');
57+
}
58+
const scheduled = await this.stateStores.read('scheduleMaps', {_id: hash});
59+
if (scheduled) {
60+
log.debug('scheduled:', scheduled);
61+
return scheduled.node;
62+
} else {
63+
const node = serviceNodes[hash % serviceNodes.length].id;
64+
log.debug('node:', node);
65+
try {
66+
const map = {_id: hash, node};
67+
await this.stateStores.create('scheduleMaps', map);
68+
} catch (e) {
69+
log.debug('Failed to update schedule map:', e?.message);
70+
}
71+
return node;
72+
}
73+
}
74+
75+
// Check service availability
76+
checkService() {
77+
log.debug('Check service availability');
78+
this.stateStores.readMany('streamEngineNodes', {})
79+
.then(async (ret) => {
80+
const serviceNodes = ret.data || [];
81+
const req = {id: 'non-existent'};
82+
for (const service of serviceNodes) {
83+
try {
84+
await this.rpcChannel.makeRPC(
85+
service.id, 'getNodes', [req]);
86+
} catch (e) {
87+
log.warn('Failed to call service node:', service.id);
88+
}
89+
}
90+
}).catch((e) => {
91+
log.debug('Read service error:', e?.message);
92+
});
93+
}
94+
95+
getAPI() {
96+
const api = {};
97+
const self = this;
98+
for (const method of ServiceScheduler.supportedMethods) {
99+
api[method] = async function (req, callback) {
100+
try {
101+
const serviceNode = await self.scheduleService(req);
102+
log.debug('Schedule req:', req, serviceNode);
103+
const ret = await self.rpcChannel.makeRPC(
104+
serviceNode, method, [req]);
105+
log.debug('Schedule ret:', ret, serviceNode);
106+
callback('callback', ret);
107+
} catch (e) {
108+
callback('callback', 'error', e?.message || e);
109+
}
110+
};
111+
}
112+
return api;
113+
}
114+
}
115+
116+
module.exports.ServiceScheduler = ServiceScheduler;

source/stream_service/service.toml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
[cluster]
22
name = "owt-cluster"
33

4+
[service]
5+
#name = "stream-service"
6+
control_agent = "mycontroller"
7+
8+
#Use scheduler for service requests
9+
[scheduler]
10+
name = "stream-service"
11+
start = true #Start a scheduler with server
12+
413
[rabbit]
514
host = "localhost" #default: "localhost"
615
port = 5672 #default: 5672
716

8-
[service]
9-
name = "stream-service"
10-
control_agent = "mycontroller"
17+
[mongo]
18+
dataBaseURL = "localhost/owtstreams" #default: "localhost/owtstreams"

source/stream_service/stateStores.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class StateStores {
6565
// CRUD operations
6666
async create(type, obj) {
6767
const db = await this.prepare;
68-
obj._id = (obj.id || obj._id);
68+
obj.id && (obj._id = obj.id);
6969
return new Promise((resolve, reject) => {
7070
db.collection(type).insertOne(obj, (err, result) => {
7171
if (err) {
@@ -78,7 +78,7 @@ class StateStores {
7878
}
7979
async read(type, query) {
8080
const db = await this.prepare;
81-
query._id = (query.id || query._id);
81+
query.id && (query._id = query.id);
8282
return new Promise((resolve, reject) => {
8383
db.collection(type).findOne(query, (err, result) => {
8484
if (err) {
@@ -109,7 +109,7 @@ class StateStores {
109109
// update: {jsonPath: value}
110110
async update(type, filter, updates) {
111111
const db = await this.prepare;
112-
filter._id = (filter.id || filter._id);
112+
filter.id && (filter._id = filter.id);
113113
const mongoUpdate = {'$set': updates};
114114
for (const key in updates) {
115115
if (updates[key] === null) {
@@ -130,9 +130,9 @@ class StateStores {
130130
}
131131
async delete(type, filter) {
132132
const db = await this.prepare;
133-
filter._id = (filter.id || filter._id);
133+
filter.id && (filter._id = filter.id);
134134
return new Promise((resolve, reject) => {
135-
db.collection(type).deleteOne(filter, (err, result) => {
135+
db.collection(type).deleteMany(filter, (err, result) => {
136136
if (err) {
137137
reject(err);
138138
} else {
@@ -144,7 +144,7 @@ class StateStores {
144144
// Add value in set
145145
async setAdd(type, filter, updates) {
146146
const db = await this.prepare;
147-
filter._id = (filter.id || filter._id);
147+
filter.id && (filter._id = filter.id);
148148
const mongoUpdate = {'$addToSet': updates};
149149
return new Promise((resolve, reject) => {
150150
db.collection(type).updateOne(filter, mongoUpdate, (err, result) => {
@@ -159,7 +159,7 @@ class StateStores {
159159
// Remove value in set
160160
async setRemove(type, filter, updates) {
161161
const db = await this.prepare;
162-
filter._id = (filter.id || filter._id);
162+
filter.id && (filter._id = filter.id);
163163
const mongoUpdate = {'$pull': updates};
164164
return new Promise((resolve, reject) => {
165165
db.collection(type).updateOne(filter, mongoUpdate, (err, result) => {

source/stream_service/stateTypes.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,4 +242,3 @@ exports.Publication = Publication;
242242
exports.Subscription = Subscription;
243243
exports.Processor = Processor;
244244
exports.SourceTrack = SourceTrack;
245-

0 commit comments

Comments
 (0)