Skip to content

Commit 5ac229b

Browse files
author
ionut.stan
committed
Added NodeClusterBase.MasterEndpoint.rpcToRoundRobinWorker() utility function.
1 parent f555e61 commit 5ac229b

File tree

2 files changed

+144
-2
lines changed

2 files changed

+144
-2
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "jsonrpc-bidirectional",
33
"description": "Bidirectional JSONRPC over web sockets or HTTP with extensive plugin support.",
4-
"version": "6.6.1",
4+
"version": "6.7.0",
55
"scripts": {
66
"build": "node build.js",
77
"test": "node --expose-gc --max-old-space-size=1024 tests/main.js",
@@ -81,4 +81,4 @@
8181
"node_modules/es6-promise/dist/es6-promise.auto.min.js",
8282
"node_modules/es6-promise/dist/es6-promise.auto.min.js.map"
8383
]
84-
}
84+
}

src/NodeClusterBase/MasterEndpoint.js

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ class MasterEndpoint extends JSONRPC.EndpointBase
5656
this._bWatchingForUpgrade = false;
5757

5858
this._nMaxWorkersCount = Number.MAX_SAFE_INTEGER;
59+
60+
61+
this._objRPCToWorkersRoundRobinStates = {};
62+
63+
// Used if the call is proxied from another worker (which is then round robined to the executor worker),
64+
// and the result is allowed to be from a recently cached value.
65+
this._objWorkerToMethodNameLastFreshness = {};
5966
}
6067

6168

@@ -286,6 +293,141 @@ class MasterEndpoint extends JSONRPC.EndpointBase
286293
}
287294

288295

296+
/**
297+
* Helps distribute calls to workers to even out workload,
298+
* and optionally reuse identical function calls' results based on a freshness counter when acting as a proxy between workers.
299+
*
300+
* If bFreshlyCachedWorkerProxyMode is true, the call must come from another worker.
301+
* It is guaranteed that the calling worker will always get a new promise,
302+
* that is the same promise from the cache will never be returned more than once to the same calling worker.
303+
*
304+
* bFreshlyCachedWorkerProxyMode does not allow any params, the arrParams array must be empty.
305+
*
306+
* bFreshlyCachedWorkerProxyMode *is not* and *does not resemble* a time based expiration cache.
307+
* If you need a time based expiration cache, add JSONRPC.Plugins.Client.Cache to your client to your JSONRPC.Client subclass client (if you have one),
308+
* or use any other similar caching mechanism however you need it implemented.
309+
*
310+
* @param {JSONRPC.IncomingRequest|null} incomingRequest
311+
* @param {string} strMethodName
312+
* @param {Array} arrParams
313+
* @param {boolean} bFreshlyCachedWorkerProxyMode = false
314+
*
315+
* @returns {Promise<any>}
316+
*/
317+
async rpcToRoundRobinWorker(incomingRequest, strMethodName, arrParams, bFreshlyCachedWorkerProxyMode = false)
318+
{
319+
// Warning to future developers: from this point up until saving the promise from the actual .rpc call,
320+
// the code must be 100% synchronous (no awaits).
321+
322+
let nMinimumRequestedFreshness = 0;
323+
324+
if(bFreshlyCachedWorkerProxyMode)
325+
{
326+
// assert(incomingRequest.reverseCallsClient instanceof JSONRPC.Plugins.Client.WorkerTransport);
327+
if(
328+
!incomingRequest
329+
|| !incomingRequest.reverseCallsClient
330+
)
331+
{
332+
throw new Error("bFreshlyCachedWorkerProxyMode needs incomingRequest.reverseCallsClient to be initialized (bidirectional JSONRPC");
333+
}
334+
335+
const workerTransportPlugin = incomingRequest.reverseCallsClient.plugins.filter(plugin => plugin.worker && plugin.worker.id && plugin.worker.on)[0];
336+
337+
if(!workerTransportPlugin)
338+
{
339+
throw new Error("bFreshlyCachedWorkerProxyMode needs to know the cluster worker ID of the calling worker from incomingRequest.reverseCallsClient.plugins[?].worker.id (and it must be of type number.");
340+
}
341+
342+
const nWorkerID = workerTransportPlugin.worker.id;
343+
344+
if(arrParams.length)
345+
{
346+
throw new Error("bFreshlyCachedWorkerProxyMode does not allow any params, the arrParams array must be empty.");
347+
}
348+
349+
if(!this._objWorkerToMethodNameLastFreshness[nWorkerID])
350+
{
351+
this._objWorkerToMethodNameLastFreshness[nWorkerID] = {};
352+
353+
workerTransportPlugin.worker.on(
354+
"exit",
355+
(nCode, nSignal) => {
356+
delete this._objWorkerToMethodNameLastFreshness[nWorkerID];
357+
}
358+
);
359+
}
360+
361+
if(!this._objWorkerToMethodNameLastFreshness[nWorkerID][strMethodName])
362+
{
363+
this._objWorkerToMethodNameLastFreshness[nWorkerID][strMethodName] = 0;
364+
}
365+
366+
nMinimumRequestedFreshness = ++this._objWorkerToMethodNameLastFreshness[nWorkerID][strMethodName];
367+
}
368+
369+
if(!this._objRPCToWorkersRoundRobinStates[strMethodName])
370+
{
371+
this._objRPCToWorkersRoundRobinStates[strMethodName] = {
372+
counter: 0,
373+
promiseRPCResult: null
374+
};
375+
}
376+
377+
const objRoundRobinState = this._objRPCToWorkersRoundRobinStates[strMethodName];
378+
379+
let nCounter = objRoundRobinState.counter;
380+
381+
const arrWorkerStates = Object.values(this.objWorkerIDToState);
382+
383+
if(bFreshlyCachedWorkerProxyMode && nMinimumRequestedFreshness <= nCounter && objRoundRobinState.promiseRPCResult)
384+
{
385+
return objRoundRobinState.promiseRPCResult;
386+
}
387+
388+
objRoundRobinState.promiseRPCResult = null;
389+
nCounter = ++objRoundRobinState.counter;
390+
391+
if(arrWorkerStates.length)
392+
{
393+
let i, objWorkerState;
394+
395+
for(i = nCounter % arrWorkerStates.length; !objWorkerState && i < arrWorkerStates.length; i++)
396+
{
397+
if(arrWorkerStates[i].ready)
398+
{
399+
objWorkerState = arrWorkerStates[i];
400+
}
401+
}
402+
403+
for(i = 0; !objWorkerState && i < nCounter % arrWorkerStates.length; i++)
404+
{
405+
if(arrWorkerStates[i].ready)
406+
{
407+
objWorkerState = arrWorkerStates[i];
408+
}
409+
}
410+
411+
if(objWorkerState)
412+
{
413+
if(bFreshlyCachedWorkerProxyMode)
414+
{
415+
console.log("Round robinned " + nCounter);
416+
objRoundRobinState.promiseRPCResult = /*await*/ objWorkerState.client.rpc(strMethodName, arrParams);
417+
return objRoundRobinState.promiseRPCResult;
418+
}
419+
else
420+
{
421+
console.log("Round robinned NOT CACHED " + nCounter);
422+
return /*await*/ objWorkerState.client.rpc(strMethodName, arrParams);
423+
}
424+
}
425+
}
426+
427+
throw new JSONRPC.Exception("No ready for RPC cluster workers were found.", JSONRPC.Exception.INTERNAL_ERROR);
428+
}
429+
430+
289431
/**
290432
* @param {JSONRPC.IncomingRequest|null} incomingRequest
291433
*

0 commit comments

Comments
 (0)