Skip to content

Commit e3ea97c

Browse files
authored
Merge pull request #44 from aidanhibbard/add-workers-flag
add workers flag
2 parents fd0828a + 6c3e8d6 commit e3ea97c

13 files changed

+581
-91
lines changed

README.md

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,22 @@ nuxi dev
121121
node .nuxt/dev/workers/index.mjs
122122
```
123123

124+
By default all workers run. To run only specific workers, use the `--workers=` flag with a comma-separated list of worker names:
125+
126+
```bash
127+
node .nuxt/dev/workers/index.mjs --workers=basic,hello
128+
```
129+
124130
### CLI
125131

126-
A simple CLI is provided to run workers in development.
132+
A simple CLI is provided to run workers in development (with file watching and restarts).
127133

128134
```bash
129-
# from your project root
135+
# from your project root – runs all workers
130136
npx nuxt-processor dev
137+
138+
# run only specific workers
139+
npx nuxt-processor dev --workers=basic,hello
131140
```
132141

133142
Notes:
@@ -155,6 +164,12 @@ nuxi build
155164
node .output/server/workers/index.mjs
156165
```
157166

167+
To run only specific workers in production:
168+
169+
```bash
170+
node .output/server/workers/index.mjs --workers=basic,hello
171+
```
172+
158173
## Bull Board
159174

160175
[Bull Board](https://github.com/felixmosh/bull-board) is an excellent UI for watching your queues, you can follow the setup in the playground to use it.

docs/getting-started.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,22 @@ nuxi dev
8080
node .nuxt/dev/workers/index.mjs
8181
```
8282

83+
By default all workers run. To run only specific workers, use the `--workers=` flag with a comma-separated list of worker names:
84+
85+
```bash
86+
node .nuxt/dev/workers/index.mjs --workers=basic,hello
87+
```
88+
8389
### CLI
8490

8591
Use the CLI to run workers with file watching and restarts:
8692

8793
```bash
94+
# runs all workers
8895
npx nuxt-processor dev
96+
97+
# run only specific workers
98+
npx nuxt-processor dev --workers=basic,hello
8999
```
90100

91101
Notes:
@@ -113,6 +123,12 @@ nuxi build
113123
node .output/server/workers/index.mjs
114124
```
115125

126+
To run only specific workers in production:
127+
128+
```bash
129+
node .output/server/workers/index.mjs --workers=basic,hello
130+
```
131+
116132
## Bull Board
117133

118134
See the dedicated page: [Bull Board](/bull-board)

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
"dev:prepare": "nuxt-module-build build --stub && nuxt-module-build prepare && nuxi prepare playground",
3838
"release": "npm run lint && npm run prepack && changelogen --release && npm publish && git push --follow-tags",
3939
"lint": "eslint .",
40+
"lint:fix": "eslint . --fix",
4041
"test": "vitest run",
4142
"test:coverage": "vitest run --coverage",
4243
"test:watch": "vitest watch",

spec/cli.spec.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ describe('CLI dev command', () => {
5454
let signalHandlers: Record<string, Array<(...args: unknown[]) => void>>
5555

5656
beforeEach(() => {
57+
_spawnCalled = false
5758
tmpDir = mkdtempSync(join(os.tmpdir(), 'nuxt-processor-cli-'))
5859
// minimal package.json
5960
writeFileSync(join(tmpDir, 'package.json'), JSON.stringify({ name: 'app', version: '0.0.0', scripts: {} }, null, 2))
@@ -147,10 +148,29 @@ describe('CLI dev command', () => {
147148
expect(pkg2.scripts && pkg2.scripts['processor:dev']).toBe('nuxt-processor dev')
148149
})
149150

151+
it('exits when entry exists but processor:dev script could not be ensured', async () => {
152+
const entryDir = join(tmpDir, '.nuxt', 'dev', 'workers')
153+
mkdirSync(entryDir, { recursive: true })
154+
writeFileSync(join(entryDir, 'index.mjs'), 'export {}\n')
155+
promptAnswer = 'n'
156+
const { main } = await importCli()
157+
try {
158+
await main({ rawArgs: ['dev', tmpDir] })
159+
}
160+
catch (e) {
161+
expect(String(e)).toContain('process.exit(1)')
162+
}
163+
expect(_spawnCalled).toBe(false)
164+
})
165+
150166
it('kills child process on SIGINT signal', async () => {
151167
const entryDir = join(tmpDir, '.nuxt', 'dev', 'workers')
152168
mkdirSync(entryDir, { recursive: true })
153169
writeFileSync(join(entryDir, 'index.mjs'), 'export {}\n')
170+
const pkgPath = join(tmpDir, 'package.json')
171+
const pkg = JSON.parse(readFileSync(pkgPath, 'utf8')) as { scripts?: Record<string, string> }
172+
pkg.scripts = { ...(pkg.scripts || {}), 'processor:dev': 'nuxt-processor dev' }
173+
writeFileSync(pkgPath, JSON.stringify(pkg, null, 2))
154174

155175
promptAnswer = 'n'
156176
const { main } = await importCli()
@@ -175,6 +195,10 @@ describe('CLI dev command', () => {
175195
const entryDir = join(tmpDir, '.nuxt', 'dev', 'workers')
176196
mkdirSync(entryDir, { recursive: true })
177197
writeFileSync(join(entryDir, 'index.mjs'), 'export {}\n')
198+
const pkgPath = join(tmpDir, 'package.json')
199+
const pkg = JSON.parse(readFileSync(pkgPath, 'utf8')) as { scripts?: Record<string, string> }
200+
pkg.scripts = { ...(pkg.scripts || {}), 'processor:dev': 'nuxt-processor dev' }
201+
writeFileSync(pkgPath, JSON.stringify(pkg, null, 2))
178202

179203
promptAnswer = 'n'
180204
const { main } = await importCli()
@@ -189,4 +213,23 @@ describe('CLI dev command', () => {
189213
expect(String(e)).toContain('process.exit(2)')
190214
}
191215
})
216+
217+
it('forwards --workers flag to spawned script', async () => {
218+
const entryDir = join(tmpDir, '.nuxt', 'dev', 'workers')
219+
mkdirSync(entryDir, { recursive: true })
220+
writeFileSync(join(entryDir, 'index.mjs'), 'export {}\n')
221+
const pkgPath = join(tmpDir, 'package.json')
222+
const pkg = JSON.parse(readFileSync(pkgPath, 'utf8')) as { scripts?: Record<string, string> }
223+
pkg.scripts = { ...(pkg.scripts || {}), 'processor:dev': 'nuxt-processor dev' }
224+
writeFileSync(pkgPath, JSON.stringify(pkg, null, 2))
225+
226+
promptAnswer = 'n'
227+
const { main } = await importCli()
228+
await main({ rawArgs: ['dev', tmpDir, '--workers', 'basic,hello'] })
229+
230+
expect(_spawnCalled).toBe(true)
231+
const spawnArgs = lastSpawnArgs as [string, string[], Record<string, unknown>]
232+
const nodeArgs = spawnArgs[1]
233+
expect(nodeArgs).toContain('--workers=basic,hello')
234+
})
192235
})
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html
2+
3+
exports[`generate-workers-entry-content > matches snapshot for multiple workers and redis url 1`] = `
4+
"
5+
import { fileURLToPath } from 'node:url'
6+
import { resolve as resolvePath } from 'node:path'
7+
import { consola } from 'consola'
8+
import { $workers } from '#processor-utils'
9+
10+
// Initialize connection as early as possible so any imports that register
11+
// workers/queues have a valid connection available.
12+
const api = $workers()
13+
api.setConnection(new (await import("ioredis")).default("redis://localhost:6379"))
14+
15+
export async function createWorkersApp() {
16+
// Avoid EPIPE when stdout/stderr are closed by terminal (e.g., Ctrl+C piping)
17+
const handleStreamError = (err) => {
18+
try {
19+
const code = (typeof err === 'object' && err && 'code' in err) ? err.code : null
20+
if (code === 'EPIPE') return
21+
} catch (e) { console.warn?.('nuxt-processor: stream error inspection failed', e) }
22+
throw err
23+
}
24+
try { process.stdout?.on?.('error', handleStreamError) } catch (err) { console.warn('nuxt-processor: failed to attach stdout error handler', err) }
25+
try { process.stderr?.on?.('error', handleStreamError) } catch (err) { console.warn('nuxt-processor: failed to attach stderr error handler', err) }
26+
const modules = [
27+
() => import("/app/server/workers/basic.ts"),
28+
() => import("/app/server/workers/hello.ts")
29+
]
30+
for (const loader of modules) {
31+
await loader()
32+
}
33+
// Parse --workers flag (e.g. --workers=basic,hello)
34+
const workersArg = process.argv.find(a => typeof a === 'string' && a.startsWith('--workers='))
35+
const selectedWorkers = workersArg
36+
? workersArg.split('=')[1].split(',').map(s => s.trim()).filter(Boolean)
37+
: null
38+
const workersToRun = selectedWorkers
39+
? (Array.isArray(api.workers) ? api.workers.filter(w => w && selectedWorkers.includes(w.name)) : [])
40+
: (Array.isArray(api.workers) ? api.workers : [])
41+
const logger = consola.create({}).withTag('nuxt-processor')
42+
if (selectedWorkers && workersToRun.length === 0) {
43+
const available = (Array.isArray(api.workers) ? api.workers.map(w => w && w.name).filter(Boolean) : [])
44+
logger.warn('No workers matched --workers=' + selectedWorkers.join(',') + (available.length ? '. Available: ' + available.join(', ') : '.'))
45+
process.exit(1)
46+
}
47+
try {
48+
const workerNames = workersToRun.map(w => w && w.name).filter(Boolean)
49+
logger.info('starting workers:\\n' + workerNames.map(n => ' - ' + n).join('\\n'))
50+
for (const w of workersToRun) {
51+
w.on('error', (err) => logger.error('worker error', err))
52+
}
53+
// Explicitly start workers since autorun is disabled
54+
for (const w of workersToRun) {
55+
try {
56+
// run() returns a promise that resolves when the worker stops; do not await to avoid blocking
57+
// eslint-disable-next-line promise/catch-or-return
58+
w.run().catch((err) => logger.error('worker run error', err))
59+
}
60+
catch (err) {
61+
logger.error('failed to start worker', err)
62+
}
63+
}
64+
logger.success('workers started')
65+
} catch (err) {
66+
logger.error('failed to initialize workers', err)
67+
}
68+
const closeRunningWorkers = async () => {
69+
await Promise.allSettled(workersToRun.map(w => w.close()))
70+
}
71+
return { stop: closeRunningWorkers, workers: workersToRun }
72+
}
73+
74+
const isMain = (() => {
75+
try {
76+
if (typeof process === 'undefined' || !process.argv || !process.argv[1]) return false
77+
const argvPath = resolvePath(process.cwd?.() || '.', process.argv[1])
78+
const filePath = fileURLToPath(import.meta.url)
79+
return filePath === argvPath
80+
} catch {
81+
return false
82+
}
83+
})()
84+
if (isMain) {
85+
const logger = consola.create({}).withTag('nuxt-processor')
86+
const appPromise = createWorkersApp().catch((err) => {
87+
logger.error('failed to start workers', err)
88+
process.exit(1)
89+
})
90+
const shutdown = async () => {
91+
try { logger.info('closing workers...') } catch (err) { console.warn('nuxt-processor: failed to log shutdown start', err) }
92+
try {
93+
const app = await appPromise
94+
try {
95+
const names = (app?.workers || []).map(w => w && w.name).filter(Boolean)
96+
logger.info('closing workers:\\n' + names.map(n => ' - ' + n).join('\\n'))
97+
} catch (eL) { console.warn('nuxt-processor: failed to log workers list on shutdown', eL) }
98+
await app.stop()
99+
try { logger.success('workers closed') } catch (err2) { console.warn('nuxt-processor: failed to log shutdown complete', err2) }
100+
}
101+
finally { process.exit(0) }
102+
}
103+
;['SIGINT','SIGTERM','SIGQUIT'].forEach(sig => process.on(sig, shutdown))
104+
process.on('beforeExit', shutdown)
105+
}
106+
107+
export default { createWorkersApp }
108+
"
109+
`;
110+
111+
exports[`generate-workers-entry-content > matches snapshot for single worker and undefined redis 1`] = `
112+
"
113+
import { fileURLToPath } from 'node:url'
114+
import { resolve as resolvePath } from 'node:path'
115+
import { consola } from 'consola'
116+
import { $workers } from '#processor-utils'
117+
118+
// Initialize connection as early as possible so any imports that register
119+
// workers/queues have a valid connection available.
120+
const api = $workers()
121+
api.setConnection(undefined)
122+
123+
export async function createWorkersApp() {
124+
// Avoid EPIPE when stdout/stderr are closed by terminal (e.g., Ctrl+C piping)
125+
const handleStreamError = (err) => {
126+
try {
127+
const code = (typeof err === 'object' && err && 'code' in err) ? err.code : null
128+
if (code === 'EPIPE') return
129+
} catch (e) { console.warn?.('nuxt-processor: stream error inspection failed', e) }
130+
throw err
131+
}
132+
try { process.stdout?.on?.('error', handleStreamError) } catch (err) { console.warn('nuxt-processor: failed to attach stdout error handler', err) }
133+
try { process.stderr?.on?.('error', handleStreamError) } catch (err) { console.warn('nuxt-processor: failed to attach stderr error handler', err) }
134+
const modules = [
135+
() => import("/path/to/worker.mjs")
136+
]
137+
for (const loader of modules) {
138+
await loader()
139+
}
140+
// Parse --workers flag (e.g. --workers=basic,hello)
141+
const workersArg = process.argv.find(a => typeof a === 'string' && a.startsWith('--workers='))
142+
const selectedWorkers = workersArg
143+
? workersArg.split('=')[1].split(',').map(s => s.trim()).filter(Boolean)
144+
: null
145+
const workersToRun = selectedWorkers
146+
? (Array.isArray(api.workers) ? api.workers.filter(w => w && selectedWorkers.includes(w.name)) : [])
147+
: (Array.isArray(api.workers) ? api.workers : [])
148+
const logger = consola.create({}).withTag('nuxt-processor')
149+
if (selectedWorkers && workersToRun.length === 0) {
150+
const available = (Array.isArray(api.workers) ? api.workers.map(w => w && w.name).filter(Boolean) : [])
151+
logger.warn('No workers matched --workers=' + selectedWorkers.join(',') + (available.length ? '. Available: ' + available.join(', ') : '.'))
152+
process.exit(1)
153+
}
154+
try {
155+
const workerNames = workersToRun.map(w => w && w.name).filter(Boolean)
156+
logger.info('starting workers:\\n' + workerNames.map(n => ' - ' + n).join('\\n'))
157+
for (const w of workersToRun) {
158+
w.on('error', (err) => logger.error('worker error', err))
159+
}
160+
// Explicitly start workers since autorun is disabled
161+
for (const w of workersToRun) {
162+
try {
163+
// run() returns a promise that resolves when the worker stops; do not await to avoid blocking
164+
// eslint-disable-next-line promise/catch-or-return
165+
w.run().catch((err) => logger.error('worker run error', err))
166+
}
167+
catch (err) {
168+
logger.error('failed to start worker', err)
169+
}
170+
}
171+
logger.success('workers started')
172+
} catch (err) {
173+
logger.error('failed to initialize workers', err)
174+
}
175+
const closeRunningWorkers = async () => {
176+
await Promise.allSettled(workersToRun.map(w => w.close()))
177+
}
178+
return { stop: closeRunningWorkers, workers: workersToRun }
179+
}
180+
181+
const isMain = (() => {
182+
try {
183+
if (typeof process === 'undefined' || !process.argv || !process.argv[1]) return false
184+
const argvPath = resolvePath(process.cwd?.() || '.', process.argv[1])
185+
const filePath = fileURLToPath(import.meta.url)
186+
return filePath === argvPath
187+
} catch {
188+
return false
189+
}
190+
})()
191+
if (isMain) {
192+
const logger = consola.create({}).withTag('nuxt-processor')
193+
const appPromise = createWorkersApp().catch((err) => {
194+
logger.error('failed to start workers', err)
195+
process.exit(1)
196+
})
197+
const shutdown = async () => {
198+
try { logger.info('closing workers...') } catch (err) { console.warn('nuxt-processor: failed to log shutdown start', err) }
199+
try {
200+
const app = await appPromise
201+
try {
202+
const names = (app?.workers || []).map(w => w && w.name).filter(Boolean)
203+
logger.info('closing workers:\\n' + names.map(n => ' - ' + n).join('\\n'))
204+
} catch (eL) { console.warn('nuxt-processor: failed to log workers list on shutdown', eL) }
205+
await app.stop()
206+
try { logger.success('workers closed') } catch (err2) { console.warn('nuxt-processor: failed to log shutdown complete', err2) }
207+
}
208+
finally { process.exit(0) }
209+
}
210+
;['SIGINT','SIGTERM','SIGQUIT'].forEach(sig => process.on(sig, shutdown))
211+
process.on('beforeExit', shutdown)
212+
}
213+
214+
export default { createWorkersApp }
215+
"
216+
`;

0 commit comments

Comments
 (0)