diff --git a/.github/workflows/nodejs.yml b/.github/workflows/nodejs.yml index eeae341..a957a0b 100644 --- a/.github/workflows/nodejs.yml +++ b/.github/workflows/nodejs.yml @@ -12,6 +12,6 @@ jobs: uses: node-modules/github-actions/.github/workflows/node-test.yml@master with: os: 'ubuntu-latest, macos-latest' - version: '14, 16, 18, 20, 22' + version: '14, 16, 18, 20, 22, 24' secrets: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/.gitignore b/.gitignore index 7f075b6..b15c3e4 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ run .nyc_output package-lock.json .package-lock.json +pnpm-lock.yaml \ No newline at end of file diff --git a/README.md b/README.md index d2a3330..3037d76 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,8 @@ [npm-image]: https://img.shields.io/npm/v/egg-cluster.svg?style=flat-square [npm-url]: https://npmjs.org/package/egg-cluster -[codecov-image]: https://codecov.io/github/eggjs/egg-cluster/coverage.svg?branch=master -[codecov-url]: https://codecov.io/github/eggjs/egg-cluster?branch=master +[codecov-image]: https://codecov.io/github/eggjs/cluster/coverage.svg?branch=master +[codecov-url]: https://codecov.io/github/eggjs/cluster?branch=master [snyk-image]: https://snyk.io/test/npm/egg-cluster/badge.svg?style=flat-square [snyk-url]: https://snyk.io/test/npm/egg-cluster [download-image]: https://img.shields.io/npm/dm/egg-cluster.svg?style=flat-square @@ -53,12 +53,13 @@ startCluster(options, () => { | workers | `Number` | numbers of app workers | | sticky | `Boolean` | sticky mode server | | port | `Number` | port | +| reusePort | `Boolean` | (Required Node.js >= 22.12.0) allows multiple sockets on the same host to bind to the same port. Incoming connections are distributed by the operating system to listening sockets. This option is available only on some platforms, such as Linux 3.9+, DragonFlyBSD 3.6+, FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+. **Default:** `false` | | debugPort | `Number` | the debug port only listen on http protocol | | https | `Object` | start a https server, note: `key` / `cert` / `ca` should be full path to file | | require | `Array\|String` | will inject into worker/agent process | | pidFile | `String` | will save master pid to this file | | startMode | `String` | default is 'process', use 'worker_threads' to start the app & agent worker by worker_threads | -| ports | `Array` | startup port of each app worker, such as: [7001, 7002, 7003], only effects when the startMode is 'worker_threads' | +| ports | `Array` | startup port of each app worker, such as: [7001, 7002, 7003], only effects when the `startMode` is `'worker_threads'` and `reusePort` is `false` | | env | `String` | custom env, default is process.env.EGG_SERVER_ENV | ## Env diff --git a/lib/agent_worker.js b/lib/agent_worker.js index 678e70b..b1a0d58 100644 --- a/lib/agent_worker.js +++ b/lib/agent_worker.js @@ -24,7 +24,7 @@ if (options.startMode === 'worker_threads') { AgentWorker = require('./utils/mode/impl/process/agent').AgentWorker; } -const debug = require('util').debuglog('egg-cluster'); +const debug = require('util').debuglog('egg-cluster:agent_worker'); const ConsoleLogger = require('egg-logger').EggConsoleLogger; const consoleLogger = new ConsoleLogger({ level: process.env.EGG_AGENT_WORKER_LOGGER_LEVEL }); diff --git a/lib/app_worker.js b/lib/app_worker.js index dabbbbc..6cfd821 100644 --- a/lib/app_worker.js +++ b/lib/app_worker.js @@ -16,9 +16,11 @@ if (options.startMode === 'worker_threads') { AppWorker = require('./utils/mode/impl/process/app').AppWorker; } +const os = require('os'); const fs = require('fs'); -const debug = require('util').debuglog('egg-cluster'); +const debug = require('util').debuglog('egg-cluster:app_worker'); const ConsoleLogger = require('egg-logger').EggConsoleLogger; + const consoleLogger = new ConsoleLogger({ level: process.env.EGG_APP_WORKER_LOGGER_LEVEL, }); @@ -38,6 +40,18 @@ const port = options.port = options.port || listenConfig.port; const debugPort = options.debugPort; const protocol = (httpsOptions.key && httpsOptions.cert) ? 'https' : 'http'; +// https://nodejs.org/api/net.html#serverlistenoptions-callback +// https://github.com/nodejs/node/blob/main/node.gypi#L310 +// https://docs.python.org/3/library/sys.html#sys.platform +// This option is available only on some platforms, such as Linux 3.9+, DragonFlyBSD 3.6+, FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+. +const supportedPlatforms = [ 'linux', 'freebsd', 'sunos', 'aix' ]; +let reusePort = options.reusePort = options.reusePort || listenConfig.reusePort; +if (reusePort && !supportedPlatforms.includes(os.platform())) { + reusePort = false; + options.reusePort = false; + debug('platform %s is not supported currently, set reusePort to false', os.platform()); +} + AppWorker.send({ to: 'master', action: 'realport', @@ -121,10 +135,22 @@ function startServer(err) { exitProcess(); return; } - const args = [ port ]; - if (listenConfig.hostname) args.push(listenConfig.hostname); - debug('listen options %s', args); - server.listen(...args); + if (reusePort) { + // https://nodejs.org/api/net.html#serverlistenoptions-callback + const listenOptions = { port, reusePort }; + if (listenConfig.hostname) { + listenOptions.host = listenConfig.hostname; + } + debug('listen options %j', listenOptions); + server.listen(listenOptions); + } else { + const args = [ port ]; + if (listenConfig.hostname) { + args.push(listenConfig.hostname); + } + debug('listen options %s', args); + server.listen(...args); + } } if (debugPortServer) { debug('listen on debug port: %s', debugPort); @@ -132,10 +158,13 @@ function startServer(err) { } } - AppWorker.send({ - to: 'master', - action: 'listening', - data: server.address() || { port }, + server.once('listening', () => { + AppWorker.send({ + to: 'master', + action: 'listening', + data: server.address() || { port }, + reusePort, + }); }); } diff --git a/lib/master.js b/lib/master.js index 9e5ee98..97afb95 100644 --- a/lib/master.js +++ b/lib/master.js @@ -34,6 +34,7 @@ class Master extends EventEmitter { * - {Object} [plugins] - customized plugins, for unittest * - {Number} [workers] numbers of app workers, default to `os.cpus().length` * - {Number} [port] listening port, default to 7001(http) or 8443(https) + * - {Boolean} [reusePort] setting `reusePort` to `true` allows multiple sockets on the same host to bind to the same port. Incoming connections are distributed by the operating system to listening sockets. This option is available only on some platforms, such as Linux 3.9+, DragonFlyBSD 3.6+, FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+. **Default:** `false`. * - {Number} [debugPort] listening a debug port on http protocol * - {Object} [https] https options, { key, cert, ca }, full path * - {Array|String} [require] will inject into worker/agent process diff --git a/lib/utils/mode/impl/process/app.js b/lib/utils/mode/impl/process/app.js index 61b8a2e..8e8f2cc 100644 --- a/lib/utils/mode/impl/process/app.js +++ b/lib/utils/mode/impl/process/app.js @@ -41,8 +41,15 @@ class AppWorker extends BaseAppWorker { process.on(event, callback); } - static send(data) { - process.send(data); + static send(message) { + if (message && message.action === 'listening' && message.reusePort) { + // cluster won't get `listening` event when reusePort is true, use cluster `message` event instead + // rewrite message.action to 'reuse-port-listening' + message.action = 'reuse-port-listening'; + cluster.worker.send(message); + return; + } + process.send(message); } static kill() { @@ -124,6 +131,25 @@ class AppUtils extends BaseAppUtils { }); cluster.on('listening', (worker, address) => { const appWorker = new AppWorker(worker); + this.logger.info('[master] app_worker#%s:%s listening on %j', appWorker.id, appWorker.workerId, address); + this.messenger.send({ + action: 'app-start', + data: { + workerId: appWorker.workerId, + address, + }, + to: 'master', + from: 'app', + }); + }); + // handle 'reuse-port-listening' message: { action: 'reuse-port-listening', data: { port: 3000 } } + cluster.on('message', (worker, message) => { + if (!message || message.action !== 'reuse-port-listening') { + return; + } + const address = message.data; + const appWorker = new AppWorker(worker); + this.logger.info('[master] app_worker#%s:%s reuse-port listening on %j', appWorker.id, appWorker.workerId, address); this.messenger.send({ action: 'app-start', data: { diff --git a/lib/utils/mode/impl/worker_threads/app.js b/lib/utils/mode/impl/worker_threads/app.js index bf6404e..e93bf86 100644 --- a/lib/utils/mode/impl/worker_threads/app.js +++ b/lib/utils/mode/impl/worker_threads/app.js @@ -71,15 +71,14 @@ class AppWorker extends BaseAppWorker { } class AppUtils extends BaseAppUtils { - #workers = []; + #appWorkers = []; #forkSingle(appPath, options, id) { // start app worker const worker = new workerThreads.Worker(appPath, options); - this.#workers.push(worker); - // wrap app worker const appWorker = new AppWorker(worker, id); + this.#appWorkers.push(appWorker); this.emit('worker_forked', appWorker); appWorker.disableRefork = true; worker.on('message', msg => { @@ -129,23 +128,40 @@ class AppUtils extends BaseAppUtils { to: 'master', from: 'app', }); - }); // handle worker exit worker.on('exit', async code => { + this.log('[master] app_worker#%s (tid:%s) exit with code: %s', appWorker.id, appWorker.workerId, code); + // remove worker from workers array + const idx = this.#appWorkers.indexOf(appWorker); + if (idx !== -1) { + this.#appWorkers.splice(idx, 1); + } + // remove all listeners to avoid memory leak + worker.removeAllListeners(); + appWorker.state = 'dead'; - this.messenger.send({ - action: 'app-exit', - data: { - workerId: appWorker.workerId, - code, - }, - to: 'master', - from: 'app', - }); + try { + this.messenger.send({ + action: 'app-exit', + data: { + workerId: appWorker.workerId, + code, + }, + to: 'master', + from: 'app', + }); + } catch (err) { + this.log('[master][warning] app_worker#%s (tid:%s) send "app-exit" message error: %s', + appWorker.id, appWorker.workerId, err); + } + if (appWorker.disableRefork) { + return; + } // refork app worker + this.log('[master] app_worker#%s (tid:%s) refork after 1s', appWorker.id, appWorker.workerId); await sleep(1000); this.#forkSingle(appPath, options, id); }); @@ -155,26 +171,38 @@ class AppUtils extends BaseAppUtils { this.startTime = Date.now(); this.startSuccessCount = 0; - const ports = this.options.ports; - if (!ports.length) { - ports.push(this.options.port); + if (this.options.reusePort) { + if (!this.options.port) { + throw new Error('options.port must be specified when reusePort is enabled'); + } + for (let i = 0; i < this.options.workers; i++) { + const argv = [ JSON.stringify(this.options) ]; + const appWorkerId = i + 1; + this.#forkSingle(this.getAppWorkerFile(), { argv }, appWorkerId); + } + } else { + const ports = this.options.ports; + if (!ports.length) { + ports.push(this.options.port); + } + this.options.workers = ports.length; + let i = 0; + do { + const options = Object.assign({}, this.options, { port: ports[i] }); + const argv = [ JSON.stringify(options) ]; + this.#forkSingle(this.getAppWorkerFile(), { argv }, ++i); + } while (i < ports.length); } - this.options.workers = ports.length; - let i = 0; - do { - const options = Object.assign({}, this.options, { port: ports[i] }); - const argv = [ JSON.stringify(options) ]; - this.#forkSingle(this.getAppWorkerFile(), { argv }, ++i); - } while (i < ports.length); return this; } async kill() { - for (const worker of this.#workers) { - this.log(`[master] kill app worker#${worker.id} (worker_threads) by worker.terminate()`); - worker.removeAllListeners(); - worker.terminate(); + for (const appWorker of this.#appWorkers) { + this.log('[master] kill app_worker#%s (tid:%s) (worker_threads) by worker.terminate()', appWorker.id, appWorker.workerId); + appWorker.disableRefork = true; + appWorker.instance.removeAllListeners(); + appWorker.instance.terminate(); } } } diff --git a/lib/utils/options.js b/lib/utils/options.js index 3fff0f6..3103ea3 100644 --- a/lib/utils/options.js +++ b/lib/utils/options.js @@ -12,6 +12,7 @@ module.exports = function(options) { framework: '', baseDir: process.cwd(), port: options.https ? 8443 : null, + reusePort: false, workers: null, plugins: null, https: false, diff --git a/package.json b/package.json index a8e0849..fbe6fd4 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "egg-cluster", - "version": "2.4.0", + "version": "2.5.0-beta.3", "description": "cluster manager for egg", "main": "index.js", "scripts": { @@ -8,7 +8,7 @@ "test": "npm run lint -- --fix && npm run test-local", "test-local": "egg-bin test --ts false", "cov": "egg-bin cov --prerequire --timeout 100000 --ts false", - "ci": "npm run lint && npm run cov" + "ci": "npm run lint && node test/reuseport_cluster.js && npm run cov" }, "files": [ "index.js", @@ -16,7 +16,7 @@ ], "repository": { "type": "git", - "url": "git+https://github.com/eggjs/egg-cluster.git" + "url": "git+https://github.com/eggjs/cluster.git" }, "keywords": [ "egg", @@ -26,9 +26,9 @@ "author": "dead-horse ", "license": "MIT", "bugs": { - "url": "https://github.com/eggjs/egg-cluster/issues" + "url": "https://github.com/eggjs/cluster/issues" }, - "homepage": "https://github.com/eggjs/egg-cluster#readme", + "homepage": "https://github.com/eggjs/cluster#readme", "dependencies": { "await-event": "^2.1.0", "cfork": "^1.7.1", diff --git a/test/app_worker.test.js b/test/app_worker.test.js index 12db96b..ec312e1 100644 --- a/test/app_worker.test.js +++ b/test/app_worker.test.js @@ -232,6 +232,36 @@ describe('test/app_worker.test.js', () => { .expect(200); }); + it('should set reusePort=true in config', async () => { + app = utils.cluster('apps/app-listen-reusePort'); + app.debug(); + await app.ready(); + + app.expect('code', 0); + // IPv6 first: http://:::17010 + app.expect('stdout', /egg started on http:\/\/.+?:17010/); + + await request('http://0.0.0.0:17010') + .get('/') + .expect('done') + .expect(200); + + await request('http://127.0.0.1:17010') + .get('/') + .expect('done') + .expect(200); + + await request('http://localhost:17010') + .get('/') + .expect('done') + .expect(200); + + await request('http://127.0.0.1:17010') + .get('/port') + .expect('17010') + .expect(200); + }); + it('should use hostname in config', async () => { const url = address.ip() + ':17010'; diff --git a/test/fixtures/apps/app-listen-reusePort/app.js b/test/fixtures/apps/app-listen-reusePort/app.js new file mode 100644 index 0000000..dc71e21 --- /dev/null +++ b/test/fixtures/apps/app-listen-reusePort/app.js @@ -0,0 +1,6 @@ +'use strict'; + +module.exports = app => { + // don't use the port that egg-mock defined + app._options.port = undefined; +}; diff --git a/test/fixtures/apps/app-listen-reusePort/app/router.js b/test/fixtures/apps/app-listen-reusePort/app/router.js new file mode 100644 index 0000000..26e4658 --- /dev/null +++ b/test/fixtures/apps/app-listen-reusePort/app/router.js @@ -0,0 +1,9 @@ +module.exports = app => { + app.get('/', ctx => { + ctx.body = 'done'; + }); + + app.get('/port', ctx => { + ctx.body = ctx.app._options.port; + }); +}; diff --git a/test/fixtures/apps/app-listen-reusePort/config/config.default.js b/test/fixtures/apps/app-listen-reusePort/config/config.default.js new file mode 100644 index 0000000..eb4d18c --- /dev/null +++ b/test/fixtures/apps/app-listen-reusePort/config/config.default.js @@ -0,0 +1,9 @@ +module.exports = { + keys: '123', + cluster: { + listen: { + port: 17010, + reusePort: true, + }, + }, +}; diff --git a/test/fixtures/apps/app-listen-reusePort/package.json b/test/fixtures/apps/app-listen-reusePort/package.json new file mode 100644 index 0000000..f78929e --- /dev/null +++ b/test/fixtures/apps/app-listen-reusePort/package.json @@ -0,0 +1,3 @@ +{ + "name": "app-listen-reusePort" +} diff --git a/test/master.test.js b/test/master.test.js index abf32c6..1bcdc93 100644 --- a/test/master.test.js +++ b/test/master.test.js @@ -29,9 +29,38 @@ describe('test/master.test.js', () => { .end(done); }); + it('start success with reusePort=true', done => { + mm.env('local'); + app = utils.cluster('apps/master-worker-started', { reusePort: true }); + app.debug(); + + app.expect('stdout', /egg start/) + .expect('stdout', /egg started/) + .notExpect('stdout', /\[master\] agent_worker#1:\d+ start with clusterPort:\d+/) + .expect('code', 0) + .end(done); + }); + it('start success in prod env', done => { mm.env('prod'); - app = utils.cluster('apps/mock-production-app').debug(false); + app = utils.cluster('apps/mock-production-app') + .debug(false); + + app.expect('stdout', /egg start/) + .expect('stdout', /egg started/) + .expect('code', 0) + .end(err => { + assert.ifError(err); + console.log(app.stdout); + console.log(app.stderr); + done(); + }); + }); + + it('start success with reusePort=true in prod env', done => { + mm.env('prod'); + app = utils.cluster('apps/mock-production-app', { reusePort: true, workers: 4 }) + .debug(); app.expect('stdout', /egg start/) .expect('stdout', /egg started/) diff --git a/test/reuseport_cluster.js b/test/reuseport_cluster.js new file mode 100644 index 0000000..b947c51 --- /dev/null +++ b/test/reuseport_cluster.js @@ -0,0 +1,81 @@ +const cluster = require('node:cluster'); +const http = require('node:http'); +const os = require('node:os'); +const process = require('node:process'); + +const numCPUs = typeof os.availableParallelism === 'function' ? os.availableParallelism() : os.cpus().length; + +// pid: count +const totals = {}; +function request(index) { + http.get('http://localhost:17001/', { + headers: { + connection: 'close', + }, + }, res => { + const { statusCode } = res; + console.log(index, res.statusCode, res.headers); + let error; + // Any 2xx status code signals a successful response but + // here we're only checking for 200. + if (statusCode !== 200) { + error = new Error('Request Failed.\n' + + `Status Code: ${statusCode}`); + } + if (error) { + console.error(error.message); + // Consume response data to free up memory + res.resume(); + return; + } + res.setEncoding('utf8'); + let rawData = ''; + res.on('data', chunk => { rawData += chunk; }); + res.on('end', () => { + try { + console.log(rawData); + } catch (e) { + console.error(e.message); + } + const data = JSON.parse(rawData); + totals[data.pid] = (totals[data.pid] || 0) + 1; + }); + }).on('error', e => { + console.error(`Got error: ${e.stack}`); + }); +} + +if (cluster.isPrimary || cluster.isMaster) { + console.log(`Primary ${process.pid} is running`); + + // Fork workers. + for (let i = 0; i < numCPUs; i++) { + cluster.fork(); + } + + cluster.on('exit', (worker, code, signal) => { + console.log(`worker ${worker.process.pid} died, code: ${code}, signal: ${signal}`); + }); + + setTimeout(() => { + for (let i = 0; i < 1000; i++) { + request(i); + } + }, 2000); + setTimeout(() => { + console.log(totals); + process.exit(0); + }, 5000); +} else { + // Workers can share any TCP connection + // In this case it is an HTTP server + http.createServer((req, res) => { + res.writeHead(200); + res.end(JSON.stringify({ pid: process.pid })); + }).listen({ + port: 17001, + reusePort: os.platform() === 'linux', + }); + + console.log(`Worker ${process.pid} started`); +}