Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit c715380

Browse files
committed
Add data stores for stream states
1 parent 513493d commit c715380

File tree

7 files changed

+1059
-505
lines changed

7 files changed

+1059
-505
lines changed

source/stream_service/controllers/typeController.js

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,14 @@ class TypeController extends EventEmitter {
4040
*/
4141
constructor(rpc) {
4242
super();
43-
log.debug('rpc:', rpc);
4443
this.makeRPC = rpc.makeRPC.bind(rpc);
45-
log.debug('this.makeRPC:', this.makeRPC);
4644
this.selfId = rpc.selfId;
4745
this.clusterId = rpc.clusterId;
4846
}
4947

5048
// Return Promise<{agent: string, node: string}>
5149
async getWorkerNode(purpose, domain, taskId, preference) {
5250
log.debug('getWorkerNode:', purpose, domain, taskId, preference);
53-
log.debug('this:', this);
5451
const args = [purpose, taskId, preference, 30 * 1000];
5552
return this.makeRPC(this.clusterId, 'schedule', args)
5653
.then((workerAgent) => {
@@ -70,49 +67,5 @@ class TypeController extends EventEmitter {
7067
}
7168
}
7269

73-
class LocalState {
74-
constructor(type) {
75-
this.type = type ? type + '.' : '';
76-
this.tsc = false;
77-
this.state = new Map();
78-
}
79-
async create(key, value) {
80-
key = type + key;
81-
if (this.state.has(key)) {
82-
throw new Error('Duplicate key');
83-
}
84-
this.state.set(key, value);
85-
return value;
86-
}
87-
async read(key, value) {
88-
key = type + key;
89-
return this.state.get(key, value);
90-
}
91-
// condition: {op: string, path: string, value: any}
92-
async update(key, value, condition) {
93-
key = type + key;
94-
if (!this.state.has(key)) {
95-
throw new Error(`${key} not found`);
96-
}
97-
const prev = this.state.get(key);
98-
if (condition) {
99-
if (condition.op === 'eq') {
100-
if (_.get(prev, condition.path) !== condition.value) {
101-
throw new Error('Update condition not match');
102-
}
103-
} else if (condition.op === 'exists') {
104-
if (!_.get(prev, condition.path)) {
105-
throw new Error('Update condition not match');
106-
}
107-
}
108-
}
109-
this.state.set(key, value);
110-
}
111-
createTransaction() {
112-
throw new Error('Transaction not supported');
113-
}
114-
}
115-
116-
11770
exports.TypeController = TypeController;
118-
exports.LocalState = LocalState;
71+
// exports.LocalState = LocalState;

source/stream_service/dist.json

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@
1111
"format.proto",
1212
"service.proto",
1313
"service.toml",
14-
"rtcController.js",
1514
"streamingController.js",
1615
"audioController.js",
1716
"videoController.js",
1817
"analyticsController.js",
1918
"domainHandler.js",
19+
"stateStores.js",
20+
"stateTypes.js",
2021
"controllers.json",
2122
"../agent/conference/rpcRequest.js",
2223
"../common/amqpClient.js",
@@ -30,7 +31,10 @@
3031
"../../scripts/detectOS.sh"
3132
],
3233
"folders": {
33-
"controllers": ["controllers/*"]
34+
"controllers": ["controllers/*"],
35+
"data_access": [
36+
"../data_access/*"
37+
]
3438
}
3539
},
3640
"debug": {

source/stream_service/domainHandler.js

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
'use strict';
66

7+
const {randomUUID} = require('crypto');
78
const log = require('./logger').logger.getLogger('DomainHandler');
89
const {Publication, Subscription, Processor} = require('./session');
910

@@ -37,7 +38,7 @@ class DomainHandler {
3738
throw new Error('Duplicate join ID');
3839
}
3940
// Build join response
40-
const ret = {
41+
const response = {
4142
permission: {},
4243
room: {
4344
id: this.domain,
@@ -46,13 +47,17 @@ class DomainHandler {
4647
},
4748
};
4849
for (const [id, ppt] of this.participants) {
49-
ret.room.participants.push(ppt);
50+
response.room.participants.push(ppt);
5051
}
5152
for (const [id, st] of this.streams) {
52-
ret.room.streams.push(st.toSignalingFormat());
53+
response.room.streams.push(st.toSignalingFormat());
5354
}
55+
req.notifying = true;
5456
this.participants.set(req.id, req);
55-
return ret;
57+
return {
58+
participant: req,
59+
response,
60+
};
5661
}
5762

5863
// Leave from portal
@@ -143,6 +148,11 @@ class DomainHandler {
143148
}
144149
}
145150
}
151+
// Add processor request
152+
async addProcessor(req) {
153+
req.id = req.id || randomUUID().replace(/-/g, '');
154+
return req;
155+
}
146156
}
147157

148158
class RemoteDomainHandler {
@@ -154,6 +164,7 @@ class RemoteDomainHandler {
154164
'streamControl',
155165
'subscriptionControl',
156166
'onStatus',
167+
'addProcessor',
157168
];
158169

159170
constructor(locality, rpcChannel) {

source/stream_service/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
{
22
"name": "OWT-Server-Stream-Service",
3-
"version":"5.0.0",
3+
"version": "5.0.0",
44
"dependencies": {
55
"amqplib": "^0.7.0",
66
"log4js": "^6.4.0",
77
"lodash": "^4.17.11",
88
"protobufjs": "^6.11.2",
9+
"mongodb": "^4.10.0",
910
"toml": "^3.0.0"
1011
},
1112
"devDependencies": {

0 commit comments

Comments
 (0)