Skip to content

Commit 19db506

Browse files
fengmk2claude
andauthored
feat(cluster): support reusePort on server listen (#5754)
Add SO_REUSEPORT socket option support for server listen, which allows multiple server sockets to bind to the same port with the OS distributing incoming connections. This improves load balancing in cluster scenarios. - Add reusePort option to ClusterOptions interface - Add platform validation (linux, freebsd, sunos, aix) - Use server.listen({ port, reusePort, host }) when reusePort is enabled - Handle message routing for reusePort (cluster doesn't emit 'listening' event) - Support reusePort in worker_threads mode (all workers share same port) - Add test fixtures and tests for reusePort functionality Synced from eggjs/cluster#115 close #5728 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes * **New Features** * Added reusePort configuration option for cluster server listening. When enabled on supported platforms, this allows multiple workers to bind to the same port, improving socket reuse behavior and resource utilization (default: false). * **Tests** * Added test coverage and fixture applications for reusePort functionality. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai --> Co-authored-by: Claude Opus 4.5 <[email protected]>
1 parent 715f67a commit 19db506

File tree

12 files changed

+144
-18
lines changed

12 files changed

+144
-18
lines changed

packages/cluster/src/app_worker.ts

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import fs from 'node:fs';
22
import { createServer as createHttpServer, type Server } from 'node:http';
33
import { createServer as createHttpsServer } from 'node:https';
4-
import type { Socket } from 'node:net';
4+
import type { Socket, ListenOptions } from 'node:net';
5+
import os from 'node:os';
56
import { debuglog } from 'node:util';
67

78
import { importModule } from '@eggjs/utils';
@@ -13,6 +14,12 @@ import { AppThreadWorker } from './utils/mode/impl/worker_threads/app.ts';
1314

1415
const debug = debuglog('egg/cluster/app_worker');
1516

17+
// https://nodejs.org/api/net.html#serverlistenoptions-callback
18+
// https://github.com/nodejs/node/blob/main/node.gypi#L310
19+
// https://docs.python.org/3/library/sys.html#sys.platform
20+
// This option is available only on some platforms, such as Linux 3.9+, DragonFlyBSD 3.6+, FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+.
21+
const REUSE_PORT_SUPPORTED_PLATFORMS = ['linux', 'freebsd', 'sunos', 'aix'];
22+
1623
async function main() {
1724
// $ node app_worker.js options-json-string
1825
const options = JSON.parse(process.argv[2]) as {
@@ -25,6 +32,7 @@ async function main() {
2532
https?: object;
2633
sticky?: boolean;
2734
stickyWorkerPort?: number;
35+
reusePort?: boolean;
2836
};
2937
if (options.require) {
3038
// inject
@@ -89,13 +97,26 @@ async function main() {
8997
const port = (app.options.port = options.port || listenConfig.port);
9098
const debugPort = options.debugPort;
9199
const protocol = httpsOptions.key && httpsOptions.cert ? 'https' : 'http';
100+
101+
// Check reusePort option and validate platform support
102+
let reusePort = options.reusePort ?? listenConfig.reusePort ?? false;
103+
if (reusePort && !REUSE_PORT_SUPPORTED_PLATFORMS.includes(os.platform())) {
104+
reusePort = false;
105+
debug(
106+
'[app_worker:%s] platform %s is not supported for reusePort, set reusePort to false',
107+
process.pid,
108+
os.platform(),
109+
);
110+
}
111+
92112
debug(
93-
'[app_worker:%s] listenConfig: %j, real port: %o, protocol: %o, debugPort: %o',
113+
'[app_worker:%s] listenConfig: %j, real port: %o, protocol: %o, debugPort: %o, reusePort: %o',
94114
process.pid,
95115
listenConfig,
96116
port,
97117
protocol,
98118
debugPort,
119+
reusePort,
99120
);
100121

101122
AppWorker.send({
@@ -158,12 +179,23 @@ async function main() {
158179
exitProcess();
159180
return;
160181
}
161-
const args = [port];
162-
if (listenConfig.hostname) {
163-
args.push(listenConfig.hostname);
182+
if (reusePort) {
183+
// https://nodejs.org/api/net.html#serverlistenoptions-callback
184+
// Use options object when reusePort is enabled
185+
const listenOptions: ListenOptions = { port, reusePort };
186+
if (listenConfig.hostname) {
187+
listenOptions.host = listenConfig.hostname;
188+
}
189+
debug('[app_worker:%s] listen with reusePort options %j', process.pid, listenOptions);
190+
server.listen(listenOptions);
191+
} else {
192+
const args = [port];
193+
if (listenConfig.hostname) {
194+
args.push(listenConfig.hostname);
195+
}
196+
debug('listen options %j', args);
197+
server.listen(...args);
164198
}
165-
debug('listen options %j', args);
166-
server.listen(...args);
167199
}
168200
if (debugPortServer) {
169201
debug('listen on debug port: %s', debugPort);
@@ -181,14 +213,15 @@ async function main() {
181213
addressType: -1,
182214
};
183215
}
184-
debug('[app_worker:%s] listening at %j', process.pid, address);
216+
debug('[app_worker:%s] listening at %j, reusePort: %o', process.pid, address, reusePort);
185217
AppWorker.send({
186218
to: 'master',
187219
action: 'app-start',
188220
data: {
189221
address,
190222
workerId: AppWorker.workerId,
191223
},
224+
reusePort,
192225
});
193226
});
194227
}

packages/cluster/src/utils/messenger.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ export interface MessageBody {
1919
receiverPid?: string;
2020
receiverWorkerId?: string;
2121
senderWorkerId?: string;
22+
/**
23+
* Whether reusePort is enabled for server listen.
24+
* When reusePort is true, cluster won't get `listening` event,
25+
* so we need to use cluster `message` event instead.
26+
*/
27+
reusePort?: boolean;
2228
}
2329

2430
/**

packages/cluster/src/utils/mode/impl/process/app.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import cluster, { type Worker as ClusterProcessWorker } from 'node:cluster';
2+
import { debuglog } from 'node:util';
23

34
import { cfork } from 'cfork';
45
import { graceful as gracefulExit, type Options as gracefulExitOptions } from 'graceful-process';
@@ -8,6 +9,8 @@ import type { MessageBody } from '../../../messenger.ts';
89
import { terminate } from '../../../terminate.ts';
910
import { BaseAppWorker, BaseAppUtils } from '../../base/app.ts';
1011

12+
const debug = debuglog('egg/cluster/utils/mode/impl/process/app');
13+
1114
export class AppProcessWorker extends BaseAppWorker<ClusterProcessWorker> {
1215
get id(): number {
1316
return this.instance.id;
@@ -45,6 +48,13 @@ export class AppProcessWorker extends BaseAppWorker<ClusterProcessWorker> {
4548

4649
static send(message: MessageBody): void {
4750
message.senderWorkerId = String(process.pid);
51+
// cluster won't get `listening` event when reusePort is true,
52+
// use cluster `message` event instead
53+
if (message.action === 'app-start' && message.reusePort) {
54+
debug('send app-start message with reusePort, use cluster.worker.send()');
55+
cluster.worker!.send(message);
56+
return;
57+
}
4858
process.send!(message);
4959
}
5060

packages/cluster/src/utils/mode/impl/worker_threads/app.ts

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -140,17 +140,30 @@ export class AppThreadUtils extends BaseAppUtils {
140140
this.startTime = Date.now();
141141
this.startSuccessCount = 0;
142142

143-
const ports = this.options.ports ?? [];
144-
if (!ports.length) {
145-
ports.push(this.options.port!);
143+
if (this.options.reusePort) {
144+
// When reusePort is enabled, all workers share the same port
145+
// and each worker has its own socket
146+
if (!this.options.port) {
147+
throw new Error('options.port must be specified when reusePort is enabled');
148+
}
149+
for (let i = 0; i < this.options.workers; i++) {
150+
const argv = [JSON.stringify(this.options)];
151+
this.#forkSingle(this.getAppWorkerFile(), { argv }, i + 1);
152+
}
153+
} else {
154+
// Normal mode: each worker can have a different port
155+
const ports = this.options.ports ?? [];
156+
if (!ports.length) {
157+
ports.push(this.options.port!);
158+
}
159+
this.options.workers = ports.length;
160+
let i = 0;
161+
do {
162+
const options = Object.assign({}, this.options, { port: ports[i] });
163+
const argv = [JSON.stringify(options)];
164+
this.#forkSingle(this.getAppWorkerFile(), { argv }, ++i);
165+
} while (i < ports.length);
146166
}
147-
this.options.workers = ports.length;
148-
let i = 0;
149-
do {
150-
const options = Object.assign({}, this.options, { port: ports[i] });
151-
const argv = [JSON.stringify(options)];
152-
this.#forkSingle(this.getAppWorkerFile(), { argv }, ++i);
153-
} while (i < ports.length);
154167

155168
return this;
156169
}

packages/cluster/src/utils/options.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ export interface ClusterOptions {
7878
* sticky mode server
7979
*/
8080
sticky?: boolean;
81+
/**
82+
* enable SO_REUSEPORT socket option for server listen, default is `false`.
83+
* Only available on Linux 3.9+, DragonFlyBSD 3.6+, FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+.
84+
* @see https://nodejs.org/api/net.html#serverlistenoptions-callback
85+
*/
86+
reusePort?: boolean;
8187
/** customized plugins, for unittest */
8288
plugins?: object;
8389
isDebug?: boolean;

packages/cluster/test/app_worker.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,30 @@ describe.skipIf(process.version.startsWith('v24') || process.platform === 'win32
285285
const sock = encodeURIComponent(sockFile);
286286
await request(`http+unix://${sock}`).get('/').expect('done').expect(200);
287287
});
288+
289+
it.skipIf(process.platform !== 'linux')('should use reusePort in config on Linux', async () => {
290+
app = cluster('apps/app-listen-reusePort', { port: 0, workers: 2 });
291+
// app.debug();
292+
await app.ready();
293+
294+
app.expect('code', 0);
295+
app.expect('stdout', /egg started on http:\/\/127.0.0.1:17010/);
296+
297+
await request('http://127.0.0.1:17010').get('/').expect('done').expect(200);
298+
await request('http://127.0.0.1:17010').get('/port').expect('17010').expect(200);
299+
});
300+
301+
it('should set reusePort=true in config (non-Linux will fallback to false)', async () => {
302+
app = cluster('apps/app-listen-reusePort', { port: 0 });
303+
// app.debug();
304+
await app.ready();
305+
306+
app.expect('code', 0);
307+
app.expect('stdout', /egg started on http:\/\/127.0.0.1:17010/);
308+
309+
await request('http://127.0.0.1:17010').get('/').expect('done').expect(200);
310+
await request('http://127.0.0.1:17010').get('/port').expect('17010').expect(200);
311+
});
288312
});
289313

290314
it('should exit when EADDRINUSE', async () => {
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
module.exports = (app) => {
2+
// don't use the port that egg-mock defined
3+
app._options.port = undefined;
4+
};
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module.exports = (app) => {
2+
app.get('/', (ctx) => {
3+
ctx.body = 'done';
4+
});
5+
6+
app.get('/port', (ctx) => {
7+
ctx.body = ctx.app.options.port;
8+
});
9+
};
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module.exports = {
2+
keys: '123',
3+
cluster: {
4+
listen: {
5+
port: 17010,
6+
reusePort: true,
7+
},
8+
},
9+
};
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"name": "app-listen-reusePort"
3+
}

0 commit comments

Comments
 (0)