-
Notifications
You must be signed in to change notification settings - Fork 53
feat: support reusePort on server listen #115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3d8d116
dd45194
cb897b6
eadbdc9
4a526b2
0153ec5
efdc99b
8aea270
d1495cc
60f08cd
16b546a
9bd82df
d1c0d2d
6ef351c
1c8b6fe
35bb21a
cd94e19
bd3c36a
43841b7
6490b75
6789f2b
fe60af0
72133ca
ca5d801
545a442
0da63a6
4a33697
144b559
cf121f1
2b065f9
063012c
9e99e6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,3 +9,4 @@ run | |
| .nyc_output | ||
| package-lock.json | ||
| .package-lock.json | ||
| pnpm-lock.yaml | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -16,9 +16,11 @@ if (options.startMode === 'worker_threads') { | |||||
| AppWorker = require('./utils/mode/impl/process/app').AppWorker; | ||||||
| } | ||||||
|
|
||||||
| const os = require('os'); | ||||||
| const fs = require('fs'); | ||||||
| const debug = require('util').debuglog('egg-cluster'); | ||||||
| const debug = require('util').debuglog('egg-cluster:app_worker'); | ||||||
| const ConsoleLogger = require('egg-logger').EggConsoleLogger; | ||||||
|
|
||||||
| const consoleLogger = new ConsoleLogger({ | ||||||
| level: process.env.EGG_APP_WORKER_LOGGER_LEVEL, | ||||||
| }); | ||||||
|
|
@@ -38,6 +40,18 @@ const port = options.port = options.port || listenConfig.port; | |||||
| const debugPort = options.debugPort; | ||||||
| const protocol = (httpsOptions.key && httpsOptions.cert) ? 'https' : 'http'; | ||||||
|
|
||||||
| // https://nodejs.org/api/net.html#serverlistenoptions-callback | ||||||
| // https://github.com/nodejs/node/blob/main/node.gypi#L310 | ||||||
| // https://docs.python.org/3/library/sys.html#sys.platform | ||||||
| // This option is available only on some platforms, such as Linux 3.9+, DragonFlyBSD 3.6+, FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+. | ||||||
| const supportedPlatforms = [ 'linux', 'freebsd', 'sunos', 'aix' ]; | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||
| let reusePort = options.reusePort = options.reusePort || listenConfig.reusePort; | ||||||
fengmk2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| if (reusePort && !supportedPlatforms.includes(os.platform())) { | ||||||
| reusePort = false; | ||||||
fengmk2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| options.reusePort = false; | ||||||
| debug('platform %s is not supported currently, set reusePort to false', os.platform()); | ||||||
| } | ||||||
|
|
||||||
| AppWorker.send({ | ||||||
| to: 'master', | ||||||
| action: 'realport', | ||||||
|
|
@@ -121,21 +135,36 @@ function startServer(err) { | |||||
| exitProcess(); | ||||||
| return; | ||||||
| } | ||||||
| const args = [ port ]; | ||||||
| if (listenConfig.hostname) args.push(listenConfig.hostname); | ||||||
| debug('listen options %s', args); | ||||||
| server.listen(...args); | ||||||
| if (reusePort) { | ||||||
| // https://nodejs.org/api/net.html#serverlistenoptions-callback | ||||||
| const listenOptions = { port, reusePort }; | ||||||
| if (listenConfig.hostname) { | ||||||
| listenOptions.host = listenConfig.hostname; | ||||||
| } | ||||||
| debug('listen options %j', listenOptions); | ||||||
| server.listen(listenOptions); | ||||||
| } else { | ||||||
| const args = [ port ]; | ||||||
| if (listenConfig.hostname) { | ||||||
| args.push(listenConfig.hostname); | ||||||
| } | ||||||
| debug('listen options %s', args); | ||||||
| server.listen(...args); | ||||||
| } | ||||||
| } | ||||||
| if (debugPortServer) { | ||||||
| debug('listen on debug port: %s', debugPort); | ||||||
| debugPortServer.listen(debugPort); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| AppWorker.send({ | ||||||
| to: 'master', | ||||||
| action: 'listening', | ||||||
| data: server.address() || { port }, | ||||||
| server.once('listening', () => { | ||||||
| AppWorker.send({ | ||||||
| to: 'master', | ||||||
| action: 'listening', | ||||||
| data: server.address() || { port }, | ||||||
| reusePort, | ||||||
| }); | ||||||
| }); | ||||||
| } | ||||||
|
|
||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,15 +71,14 @@ class AppWorker extends BaseAppWorker { | |
| } | ||
|
|
||
| class AppUtils extends BaseAppUtils { | ||
| #workers = []; | ||
| #appWorkers = []; | ||
|
|
||
| #forkSingle(appPath, options, id) { | ||
| // start app worker | ||
| const worker = new workerThreads.Worker(appPath, options); | ||
| this.#workers.push(worker); | ||
|
|
||
| // wrap app worker | ||
| const appWorker = new AppWorker(worker, id); | ||
| this.#appWorkers.push(appWorker); | ||
| this.emit('worker_forked', appWorker); | ||
| appWorker.disableRefork = true; | ||
| worker.on('message', msg => { | ||
|
|
@@ -129,23 +128,40 @@ class AppUtils extends BaseAppUtils { | |
| to: 'master', | ||
| from: 'app', | ||
| }); | ||
|
|
||
| }); | ||
|
|
||
| // handle worker exit | ||
| worker.on('exit', async code => { | ||
| this.log('[master] app_worker#%s (tid:%s) exit with code: %s', appWorker.id, appWorker.workerId, code); | ||
| // remove worker from workers array | ||
| const idx = this.#appWorkers.indexOf(appWorker); | ||
| if (idx !== -1) { | ||
| this.#appWorkers.splice(idx, 1); | ||
| } | ||
| // remove all listeners to avoid memory leak | ||
| worker.removeAllListeners(); | ||
|
|
||
| appWorker.state = 'dead'; | ||
| this.messenger.send({ | ||
| action: 'app-exit', | ||
| data: { | ||
| workerId: appWorker.workerId, | ||
| code, | ||
| }, | ||
| to: 'master', | ||
| from: 'app', | ||
| }); | ||
| try { | ||
| this.messenger.send({ | ||
| action: 'app-exit', | ||
| data: { | ||
| workerId: appWorker.workerId, | ||
| code, | ||
| }, | ||
| to: 'master', | ||
| from: 'app', | ||
| }); | ||
| } catch (err) { | ||
| this.log('[master][warning] app_worker#%s (tid:%s) send "app-exit" message error: %s', | ||
| appWorker.id, appWorker.workerId, err); | ||
| } | ||
|
|
||
| if (appWorker.disableRefork) { | ||
| return; | ||
| } | ||
| // refork app worker | ||
| this.log('[master] app_worker#%s (tid:%s) refork after 1s', appWorker.id, appWorker.workerId); | ||
| await sleep(1000); | ||
| this.#forkSingle(appPath, options, id); | ||
| }); | ||
fengmk2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
@@ -155,26 +171,38 @@ class AppUtils extends BaseAppUtils { | |
| this.startTime = Date.now(); | ||
| this.startSuccessCount = 0; | ||
|
|
||
| const ports = this.options.ports; | ||
| if (!ports.length) { | ||
| ports.push(this.options.port); | ||
| if (this.options.reusePort) { | ||
| if (!this.options.port) { | ||
| throw new Error('options.port must be specified when reusePort is enabled'); | ||
| } | ||
| for (let i = 0; i < this.options.workers; i++) { | ||
| const argv = [ JSON.stringify(this.options) ]; | ||
| const appWorkerId = i + 1; | ||
| this.#forkSingle(this.getAppWorkerFile(), { argv }, appWorkerId); | ||
| } | ||
| } else { | ||
| const ports = this.options.ports; | ||
| if (!ports.length) { | ||
| ports.push(this.options.port); | ||
| } | ||
fengmk2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| this.options.workers = ports.length; | ||
| let i = 0; | ||
| do { | ||
| const options = Object.assign({}, this.options, { port: ports[i] }); | ||
| const argv = [ JSON.stringify(options) ]; | ||
| this.#forkSingle(this.getAppWorkerFile(), { argv }, ++i); | ||
| } while (i < ports.length); | ||
|
Comment on lines
+189
to
+194
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The for (let i = 0; i < ports.length; i++) {
const options = Object.assign({}, this.options, { port: ports[i] });
const argv = [ JSON.stringify(options) ];
this.#forkSingle(this.getAppWorkerFile(), { argv }, i + 1);
} |
||
| } | ||
| this.options.workers = ports.length; | ||
| let i = 0; | ||
| do { | ||
| const options = Object.assign({}, this.options, { port: ports[i] }); | ||
| const argv = [ JSON.stringify(options) ]; | ||
| this.#forkSingle(this.getAppWorkerFile(), { argv }, ++i); | ||
| } while (i < ports.length); | ||
|
|
||
| return this; | ||
| } | ||
|
|
||
| async kill() { | ||
| for (const worker of this.#workers) { | ||
| this.log(`[master] kill app worker#${worker.id} (worker_threads) by worker.terminate()`); | ||
| worker.removeAllListeners(); | ||
| worker.terminate(); | ||
| for (const appWorker of this.#appWorkers) { | ||
| this.log('[master] kill app_worker#%s (tid:%s) (worker_threads) by worker.terminate()', appWorker.id, appWorker.workerId); | ||
| appWorker.disableRefork = true; | ||
| appWorker.instance.removeAllListeners(); | ||
| appWorker.instance.terminate(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,22 +1,22 @@ | ||
| { | ||
| "name": "egg-cluster", | ||
| "version": "2.4.0", | ||
| "version": "2.5.0-beta.3", | ||
| "description": "cluster manager for egg", | ||
| "main": "index.js", | ||
| "scripts": { | ||
| "lint": "eslint .", | ||
| "test": "npm run lint -- --fix && npm run test-local", | ||
| "test-local": "egg-bin test --ts false", | ||
| "cov": "egg-bin cov --prerequire --timeout 100000 --ts false", | ||
| "ci": "npm run lint && npm run cov" | ||
| "ci": "npm run lint && node test/reuseport_cluster.js && npm run cov" | ||
fengmk2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| }, | ||
| "files": [ | ||
| "index.js", | ||
| "lib" | ||
| ], | ||
| "repository": { | ||
| "type": "git", | ||
| "url": "git+https://github.com/eggjs/egg-cluster.git" | ||
| "url": "git+https://github.com/eggjs/cluster.git" | ||
| }, | ||
| "keywords": [ | ||
| "egg", | ||
|
|
@@ -26,9 +26,9 @@ | |
| "author": "dead-horse <[email protected]>", | ||
| "license": "MIT", | ||
| "bugs": { | ||
| "url": "https://github.com/eggjs/egg-cluster/issues" | ||
| "url": "https://github.com/eggjs/cluster/issues" | ||
| }, | ||
| "homepage": "https://github.com/eggjs/egg-cluster#readme", | ||
| "homepage": "https://github.com/eggjs/cluster#readme", | ||
| "dependencies": { | ||
| "await-event": "^2.1.0", | ||
| "cfork": "^1.7.1", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| 'use strict'; | ||
|
|
||
| module.exports = app => { | ||
| // don't use the port that egg-mock defined | ||
| app._options.port = undefined; | ||
| }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| module.exports = app => { | ||
| app.get('/', ctx => { | ||
| ctx.body = 'done'; | ||
| }); | ||
|
|
||
| app.get('/port', ctx => { | ||
| ctx.body = ctx.app._options.port; | ||
| }); | ||
| }; |
Uh oh!
There was an error while loading. Please reload this page.