Skip to content

Commit 4e5737f

Browse files
multi core better error handling and race condition stsartup debugging
1 parent 8930b89 commit 4e5737f

File tree

5 files changed

+133
-20
lines changed

5 files changed

+133
-20
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "jsonrpc-bidirectional",
33
"description": "Bidirectional JSONRPC over web sockets or HTTP with extensive plugin support.",
4-
"version": "10.0.1",
4+
"version": "10.0.10",
55
"scripts": {
66
"build": "node --experimental-worker build.js",
77
"prepublish": "node --experimental-worker build.js && node --expose-gc --max-old-space-size=1024 --experimental-worker tests/main.js",

src/NodeClusterBase/WorkerEndpoint.js

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const cluster = require("cluster");
22
const NodeMultiCoreCPUBase = require("../NodeMultiCoreCPUBase");
3+
const sleep = require("sleep-promise");
34

45
const JSONRPC = {
56
BidirectionalWorkerRouter: require("../BidirectionalWorkerRouter")
@@ -36,6 +37,33 @@ class WorkerEndpoint extends NodeMultiCoreCPUBase.WorkerEndpoint
3637
*/
3738
async _currentWorkerID()
3839
{
40+
// https://github.com/nodejs/node/issues/1269
41+
if(
42+
!this._bAlreadyDelayedReadingWorkerID
43+
&& (
44+
!cluster.worker
45+
|| cluster.worker.id === null
46+
|| cluster.worker.id === undefined
47+
)
48+
)
49+
{
50+
await sleep(2000);
51+
this._bAlreadyDelayedReadingWorkerID = true;
52+
}
53+
54+
55+
if(
56+
!cluster.worker
57+
|| cluster.worker.id === null
58+
|| cluster.worker.id === undefined
59+
)
60+
{
61+
console.error("cluster.worker: ", cluster.worker);
62+
console.error("cluster.worker.id: ", cluster.worker ? cluster.worker.id : "");
63+
console.error(`Returning 0 as cluster.worker.id.`);
64+
return 0;
65+
}
66+
3967
return cluster.worker.id;
4068
}
4169

src/NodeMultiCoreCPUBase/MasterEndpoint.js

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class MasterEndpoint extends JSONRPC.EndpointBase
4949
this._nNextAvailablePersistentWorkerID = 0;
5050

5151
this._bWorkersStarted = false;
52+
this._promiseStart = null;
5253
this._bWatchingForUpgrade = false;
5354

5455
this._nMaxWorkersCount = Number.MAX_SAFE_INTEGER;
@@ -189,29 +190,51 @@ class MasterEndpoint extends JSONRPC.EndpointBase
189190
*/
190191
async start()
191192
{
192-
if(this._bWorkersStarted)
193+
if(this._promiseStart)
193194
{
194-
throw new Error("Workers have already been started.");
195+
return this._promiseStart;
195196
}
196-
this._bWorkersStarted = true;
197197

198-
this._jsonrpcServer = new JSONRPC.Server();
199-
200-
// By default, JSONRPC.Server rejects all requests as not authenticated and not authorized.
201-
this._jsonrpcServer.addPlugin(new JSONRPC.Plugins.Server.AuthenticationSkip());
202-
this._jsonrpcServer.addPlugin(new JSONRPC.Plugins.Server.AuthorizeAll());
203-
this._jsonrpcServer.registerEndpoint(this);
198+
this._promiseStart = new Promise(async(fnResolve, fnReject) => {
199+
try
200+
{
201+
if(this._jsonrpcServer)
202+
{
203+
this._jsonrpcServer.dispose();
204+
}
205+
204206

205-
this._bidirectionalWorkerRouter = await this._makeBidirectionalRouter();
207+
this._jsonrpcServer = new JSONRPC.Server();
208+
209+
// By default, JSONRPC.Server rejects all requests as not authenticated and not authorized.
210+
this._jsonrpcServer.addPlugin(new JSONRPC.Plugins.Server.AuthenticationSkip());
211+
this._jsonrpcServer.addPlugin(new JSONRPC.Plugins.Server.AuthorizeAll());
212+
this._jsonrpcServer.registerEndpoint(this);
213+
214+
this._bidirectionalWorkerRouter = await this._makeBidirectionalRouter();
215+
216+
await this._configureBeforeStart();
217+
218+
await this._startServices();
219+
220+
for (let i = 0; i < Math.min(Math.max(os.cpus().length, 1), this.maxWorkersCount); i++)
221+
{
222+
this._addWorker();
223+
}
206224

207-
await this._configureBeforeStart();
225+
this._bWorkersStarted = true;
208226

209-
await this._startServices();
227+
fnResolve();
228+
}
229+
catch(error)
230+
{
231+
this._bWorkersStarted = false;
232+
this._promiseStart = false;
233+
fnReject(error);
234+
}
235+
});
210236

211-
for (let i = 0; i < Math.min(Math.max(os.cpus().length, 1), this.maxWorkersCount); i++)
212-
{
213-
this._addWorker();
214-
}
237+
return this._promiseStart;
215238
}
216239

217240

@@ -283,6 +306,28 @@ class MasterEndpoint extends JSONRPC.EndpointBase
283306
*/
284307
async workerServicesReady(incomingRequest, nWorkerID)
285308
{
309+
if(!this.objWorkerIDToState[nWorkerID])
310+
{
311+
console.error(`[Master] .workerServicesReady() Could not find worker ID ${nWorkerID} key in this.objWorkerIDToState to set the .ready=true. Retrying after 10 seconds sleep in case race condition.`);
312+
await sleep(10 * 1000);
313+
314+
if(!this.objWorkerIDToState[nWorkerID])
315+
{
316+
console.error(`[Master] .workerServicesReady() Could not find worker ID ${nWorkerID} key in this.objWorkerIDToState to set the .ready=true. Going berserk and marking all as ready. this.objWorkerIDToState: ${JSON.stringify(this.objWorkerIDToState, undefined, " ")}`);
317+
318+
for(const _nWorkerID in this.objWorkerIDToState)
319+
{
320+
this.objWorkerIDToState[_nWorkerID].ready = true;
321+
}
322+
323+
return;
324+
}
325+
else
326+
{
327+
console.error(`[Master] .workerServicesReady() Found worker ID ${nWorkerID} key in this.objWorkerIDToState to set the .ready=true. This indicates a race condition exists somewhere.`);
328+
}
329+
}
330+
286331
this.objWorkerIDToState[nWorkerID].ready = true;
287332
}
288333

src/NodeMultiCoreCPUBase/WorkerEndpoint.js

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class WorkerEndpoint extends JSONRPC.EndpointBase
4141
this._promiseStart = null;
4242

4343
this._nPersistentWorkerID = undefined;
44+
45+
this._bAlreadyDelayedReadingWorkerID = false;
4446
}
4547

4648

@@ -72,7 +74,7 @@ class WorkerEndpoint extends JSONRPC.EndpointBase
7274

7375

7476
/**
75-
* @returns {JSONRPC.Client}
77+
* @returns {JSONRPC.Client|null}
7678
*/
7779
get masterClient()
7880
{
@@ -96,7 +98,8 @@ class WorkerEndpoint extends JSONRPC.EndpointBase
9698
this.start().catch(console.error);
9799
}
98100

99-
throw new Error("The .masterClient property was not initialized by .start().");
101+
console.error("[jsornpc-bidirectional] [WorkerEndpoint] The .masterClient property was not initialized by .start(). Returning null.", new Error().stack);
102+
return null;
100103
}
101104

102105
return this._masterClient;
@@ -168,7 +171,13 @@ class WorkerEndpoint extends JSONRPC.EndpointBase
168171
throw new Error("WorkerEndpoint.start() was already called.");
169172
}
170173
this._bWorkerStarted = true;
171-
174+
175+
176+
if(this._jsonrpcServer)
177+
{
178+
this._jsonrpcServer.dispose();
179+
}
180+
172181

173182
this._jsonrpcServer = new JSONRPC.Server();
174183
this._bidirectionalWorkerRouter = await this._makeBidirectionalRouter();
@@ -194,6 +203,8 @@ class WorkerEndpoint extends JSONRPC.EndpointBase
194203
}
195204
catch(error)
196205
{
206+
this._bWorkerStarted = false;
207+
this._promiseStart = false;
197208
fnReject(error);
198209
}
199210
});

src/NodeWorkerThreadsBase/WorkerEndpoint.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ const JSONRPC = {
44
BidirectionalWorkerThreadRouter: require("../BidirectionalWorkerThreadRouter")
55
};
66

7+
const sleep = require("sleep-promise");
8+
79
let Threads;
810
try
911
{
@@ -41,6 +43,33 @@ class WorkerEndpoint extends NodeMultiCoreCPUBase.WorkerEndpoint
4143
*/
4244
async _currentWorkerID()
4345
{
46+
// https://github.com/nodejs/node/issues/1269
47+
if(
48+
!this._bAlreadyDelayedReadingWorkerID
49+
&& (
50+
!Threads
51+
|| Threads.threadId === null
52+
|| Threads.threadId === undefined
53+
)
54+
)
55+
{
56+
await sleep(2000);
57+
this._bAlreadyDelayedReadingWorkerID = true;
58+
}
59+
60+
61+
if(
62+
!Threads
63+
|| Threads.threadId === null
64+
|| Threads.threadId === undefined
65+
)
66+
{
67+
console.error("Threads: ", Threads);
68+
console.error("Threads.threadId: ", Threads ? Threads.threadId : "");
69+
console.error(`Returning 0 as Threads.threadId.`);
70+
return 0;
71+
}
72+
4473
return Threads.threadId;
4574
}
4675

0 commit comments

Comments
 (0)