Skip to content

Commit 7a1b5b4

Browse files
committed
cluster: fix port reuse between cluster
Fixes: #60086
1 parent cff138c commit 7a1b5b4

File tree

4 files changed

+118
-8
lines changed

4 files changed

+118
-8
lines changed

lib/internal/cluster/primary.js

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,12 @@ function queryServer(worker, message) {
271271
return;
272272

273273
const key = `${message.address}:${message.port}:${message.addressType}:` +
274-
`${message.fd}:${message.index}`;
275-
let handle = handles.get(key);
274+
`${message.fd}` + (message.port === 0 ? `:${message.index}` : '');
275+
const cachedHandle = handles.get(key);
276+
let handle;
277+
if (cachedHandle && !cachedHandle.has(worker)) {
278+
handle = cachedHandle;
279+
}
276280

277281
if (handle === undefined) {
278282
let address = message.address;
@@ -298,25 +302,30 @@ function queryServer(worker, message) {
298302
handle = new RoundRobinHandle(key, address, message);
299303
}
300304

301-
handles.set(key, handle);
305+
if (!cachedHandle) {
306+
handles.set(key, handle);
307+
}
302308
}
303309

304310
handle.data ||= message.data;
305311

306312
// Set custom server data
307-
handle.add(worker, (errno, reply, handle) => {
313+
handle.add(worker, (errno, reply, serverHandle) => {
314+
if (!errno) {
315+
handles.set(key, handle); // Update in case it was replaced.
316+
}
308317
const { data } = handles.get(key);
309-
310-
if (errno)
311-
handles.delete(key); // Gives other workers a chance to retry.
318+
if (!cachedHandle && errno) {
319+
handles.delete(key);
320+
}
312321

313322
send(worker, {
314323
errno,
315324
key,
316325
ack: message.seq,
317326
data,
318327
...reply,
319-
}, handle);
328+
}, serverHandle);
320329
});
321330
}
322331

lib/internal/cluster/round_robin_handle.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,7 @@ RoundRobinHandle.prototype.handoff = function(worker) {
137137
this.handoff(worker);
138138
});
139139
};
140+
141+
RoundRobinHandle.prototype.has = function(worker) {
142+
return this.all.has(worker.id);
143+
};

lib/internal/cluster/shared_handle.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,7 @@ SharedHandle.prototype.remove = function(worker) {
4747
this.handle = null;
4848
return true;
4949
};
50+
51+
SharedHandle.prototype.has = function(worker) {
52+
return this.workers.has(worker.id);
53+
};
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const cluster = require('cluster');
5+
const assert = require('assert');
6+
7+
const acts = {
8+
WORKER1_SERVER1_CLOSED: { cmd: 'WORKER1_SERVER1_CLOSED' },
9+
WORKER2_SERVER1_STARTED: { cmd: 'WORKER2_SERVER1_STARTED' },
10+
WORKER1_SERVER2_CLOSED: { cmd: 'WORKER1_SERVER2_CLOSED' },
11+
};
12+
13+
if (cluster.isMaster) {
14+
const currentHost = '::';
15+
const worker1 = cluster.fork({
16+
WORKER_ID: 'worker1',
17+
HOST: currentHost,
18+
});
19+
let worker2;
20+
worker1.on('error', common.mustNotCall());
21+
worker1.on('message', onMessage);
22+
23+
function createWorker2() {
24+
worker2 = cluster.fork({
25+
WORKER_ID: 'worker2',
26+
HOST: currentHost,
27+
});
28+
worker2.on('error', common.mustNotCall());
29+
worker2.on('message', onMessage);
30+
}
31+
32+
function onMessage(msg) {
33+
switch (msg.cmd) {
34+
case acts.WORKER1_SERVER1_CLOSED.cmd:
35+
createWorker2();
36+
break;
37+
case acts.WORKER2_SERVER1_STARTED.cmd:
38+
worker1.send(acts.WORKER2_SERVER1_STARTED);
39+
break;
40+
case acts.WORKER1_SERVER2_CLOSED.cmd:
41+
worker1.kill();
42+
worker2.kill();
43+
break;
44+
default:
45+
assert.fail(`Unexpected message ${msg.cmd}`);
46+
}
47+
}
48+
} else {
49+
const WORKER_ID = process.env.WORKER_ID;
50+
function createServer() {
51+
return new Promise((resolve, reject) => {
52+
const net = require('net');
53+
const PORT = 8000;
54+
const server = net
55+
.createServer((socket) => {
56+
socket.end(
57+
`Handled by worker ${process.env.WORKER_ID} (${process.pid})\n`
58+
);
59+
})
60+
.on('error', (e) => {
61+
reject(e);
62+
});
63+
64+
server.listen(
65+
{
66+
port: PORT,
67+
host: process.env.HOST,
68+
},
69+
() => resolve(server)
70+
);
71+
});
72+
}
73+
(async () => {
74+
const server1 = await createServer();
75+
if (WORKER_ID === 'worker2') {
76+
process.send(acts.WORKER2_SERVER1_STARTED);
77+
} else {
78+
await createServer().catch(common.mustCall());
79+
await new Promise((r) => server1.close(r));
80+
process.send(acts.WORKER1_SERVER1_CLOSED);
81+
82+
process.on('message', async (msg) => {
83+
if (msg.cmd === acts.WORKER2_SERVER1_STARTED.cmd) {
84+
const server2 = await createServer();
85+
await new Promise((r) => server2.close(r));
86+
process.send(acts.WORKER1_SERVER2_CLOSED);
87+
} else {
88+
assert.fail(`Unexpected message ${msg.cmd}`);
89+
}
90+
});
91+
}
92+
})().then(common.mustCall());
93+
}

0 commit comments

Comments
 (0)