Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 7f5532c

Browse files
committed
Resolve issues after rebase
1 parent c4f6fd6 commit 7f5532c

File tree

5 files changed

+161
-7
lines changed

5 files changed

+161
-7
lines changed

source/portal/rpcRequest.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ var RpcRequest = function(rpcChannel) {
347347
return rpcChannel.makeRPC(controller, 'subscriptionControl', [participantId, subscriptionId, command]);
348348
};
349349

350-
that.onSessionSignaling = function(controller, sessionId, signaling) {
350+
that.onSessionSignaling = function(controller, sessionId, signaling, participantId) {
351351
if (enableGrpc) {
352352
startConferenceClientIfNeeded(controller);
353353
const req = {

source/stream_service/dist.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@
1515
"stateTypes.js",
1616
"scheduler.js",
1717
"controllers.json",
18-
"../agent/conference/rpcRequest.js",
18+
"rpcRequest.js",
1919
"../common/amqpClient.js",
2020
"../common/cipher.js",
2121
"../common/logger.js",
2222
"../common/makeRPC.js",
2323
"../common/rpcChannel.js",
2424
"../common/rpcStarter.js",
25+
"../common/grpcTools.js",
26+
"../protos/protoConfig.json",
27+
"../protos/*.proto",
2528
"../../scripts/release/initauth.js",
2629
"../../scripts/release/initcert.js",
2730
"../../scripts/detectOS.sh"

source/stream_service/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
"lodash": "^4.17.11",
88
"protobufjs": "^6.11.2",
99
"mongodb": "^4.10.0",
10-
"toml": "^3.0.0"
10+
"toml": "^3.0.0",
11+
"@grpc/proto-loader": "^0.5.0",
12+
"@grpc/grpc-js": "^1.1.0"
1113
},
1214
"devDependencies": {
1315
"mocha": "^6.0.2",

source/stream_service/rpcRequest.js

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Copyright (C) <2019> Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
'use strict';
6+
7+
const grpcTools = require('./grpcTools');
8+
const packOption = grpcTools.packOption;
9+
const unpackNotification = grpcTools.unpackNotification;
10+
11+
const makeRPC = require('./makeRPC').makeRPC;
12+
const log = require('./logger').logger.getLogger('RpcRequest');
13+
14+
const enableGrpc = global.config?.service?.enable_grpc || false;
15+
const GRPC_TIMEOUT = 2000;
16+
17+
const RpcRequest = function(rpcChannel, listener) {
18+
const that = {};
19+
const grpcAgents = {}; // workerAgent => grpcClient
20+
const grpcNode = {}; // workerNode => grpcClient
21+
const nodeType = {}; // NodeId => Type
22+
let clusterClient;
23+
const opt = () => ({deadline: new Date(Date.now() + GRPC_TIMEOUT)});
24+
25+
that.getWorkerNode = function(clusterManager, purpose, forWhom, preference) {
26+
log.debug('getworker node:', purpose, forWhom, 'enable grpc:', enableGrpc, clusterManager);
27+
if (enableGrpc) {
28+
if (!clusterClient) {
29+
clusterClient = grpcTools.startClient(
30+
'clusterManager',
31+
clusterManager
32+
);
33+
}
34+
let agentAddress;
35+
return new Promise((resolve, reject) => {
36+
const req = {
37+
purpose,
38+
task: forWhom.task,
39+
preference, // Change data for some preference
40+
reserveTime: 30 * 1000
41+
};
42+
clusterClient.schedule(req, opt(), (err, result) => {
43+
if (err) {
44+
log.debug('Schedule node error:', err);
45+
reject(err);
46+
} else {
47+
resolve(result);
48+
}
49+
});
50+
}).then((workerAgent) => {
51+
agentAddress = workerAgent.info.ip + ':' + workerAgent.info.grpcPort;
52+
if (!grpcAgents[agentAddress]) {
53+
grpcAgents[agentAddress] = grpcTools.startClient(
54+
'nodeManager',
55+
agentAddress
56+
);
57+
}
58+
return new Promise((resolve, reject) => {
59+
grpcAgents[agentAddress].getNode({info: forWhom}, opt(), (err, result) => {
60+
if (!err) {
61+
resolve(result.message);
62+
} else {
63+
reject(err);
64+
}
65+
});
66+
});
67+
}).then((workerNode) => {
68+
if (grpcNode[workerNode]) {
69+
// Has client already
70+
return {agent: agentAddress, node: workerNode};
71+
}
72+
log.debug('Start gRPC client:', purpose, workerNode);
73+
grpcNode[workerNode] = grpcTools.startClient(
74+
purpose,
75+
workerNode
76+
);
77+
nodeType[workerNode] = purpose;
78+
// Register listener
79+
const call = grpcNode[workerNode].listenToNotifications({id: ''});
80+
call.on('data', (notification) => {
81+
if (listener) {
82+
// Unpack notification.data
83+
const data = unpackNotification(notification);
84+
if (data) {
85+
listener.processNotification(data);
86+
}
87+
}
88+
});
89+
call.on('end', (err) => {
90+
log.debug('Call on end:', err);
91+
if (grpcNode[workerNode]) {
92+
grpcNode[workerNode].close();
93+
delete grpcNode[workerNode];
94+
}
95+
});
96+
call.on('error', (err) => {
97+
// On error
98+
log.debug('Call on error:', err);
99+
});
100+
return {agent: agentAddress, node: workerNode};
101+
});
102+
}
103+
104+
return rpcChannel.makeRPC(clusterManager, 'schedule', [purpose, forWhom.task, preference, 30 * 1000])
105+
.then(function(workerAgent) {
106+
return rpcChannel.makeRPC(workerAgent.id, 'getNode', [forWhom])
107+
.then(function(workerNode) {
108+
return {agent: workerAgent.id, node: workerNode};
109+
});
110+
});
111+
};
112+
113+
that.recycleWorkerNode = function(workerAgent, workerNode, forWhom) {
114+
if (grpcAgents[workerAgent]) {
115+
const req = {id: workerNode, info: forWhom};
116+
return new Promise((resolve, reject) => {
117+
grpcAgents[workerAgent].recycleNode(req, opt(), (err, result) => {
118+
if (err) {
119+
log.debug('Recycle node error:', err);
120+
reject(err);
121+
}
122+
if (grpcNode[workerNode]) {
123+
grpcNode[workerNode].close();
124+
delete grpcNode[workerNode];
125+
delete nodeType[workerNode];
126+
}
127+
resolve('ok');
128+
});
129+
});
130+
} else {
131+
return rpcChannel.makeRPC(workerAgent, 'recycleNode', [workerNode, forWhom]);
132+
}
133+
};
134+
135+
that.sendMsg = function(portal, participantId, event, data) {
136+
if (enableGrpc) {
137+
return Promise.resolve();
138+
}
139+
return rpcChannel.makeRPC(portal, 'notify', [participantId, event, data]);
140+
};
141+
142+
that.broadcast = function(portal, controller, excludeList, event, data) {
143+
if (enableGrpc) {
144+
return Promise.resolve();
145+
}
146+
return rpcChannel.makeRPC(portal, 'broadcast', [controller, excludeList, event, data]);
147+
};
148+
149+
return that;
150+
};
151+
152+
module.exports = RpcRequest;
153+

source/stream_service/streamService.js

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,8 @@ const {DomainHandler, RemoteDomainHandler} = require('./domainHandler');
1919
const {StateStores} = require('./stateStores');
2020
const State = require('./stateTypes');
2121
const {ServiceScheduler} = require('./scheduler');
22-
2322
const controllerConfig = require('./controllers.json');
2423

25-
26-
const PROTO_FILE = "service.proto";
27-
2824
let config;
2925
try {
3026
config = toml.parse(fs.readFileSync('./service.toml')) || {};

0 commit comments

Comments
 (0)