Skip to content

Commit cd59f19

Browse files
committed
Add addJobs
1 parent 7ab2aa9 commit cd59f19

File tree

2 files changed

+49
-9
lines changed

2 files changed

+49
-9
lines changed

libs/graphile-worker/src/graphile-worker.service.ts

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import { Injectable, Logger } from '@nestjs/common';
22
import {
33
Job,
4-
quickAddJob,
4+
makeWorkerUtils,
55
run,
66
runMigrations,
77
RunnerOptions,
8+
runOnce,
89
TaskSpec,
910
} from 'graphile-worker';
1011

@@ -16,9 +17,11 @@ export class GraphileWorkerService {
1617
constructor(private readonly options: RunnerOptions) {}
1718

1819
/**
19-
* Run a new worker
20+
* Runs until either stopped by a signal event like `SIGINT` or by calling the `stop()` method on the resolved object.
21+
*
22+
* The resolved `Runner` object has a number of helpers on it, see [Runner object](https://github.com/graphile/worker#runner-object) for more information.
2023
*/
21-
async run() {
24+
async run(): Promise<void> {
2225
await this.runMigrations();
2326

2427
this.logger.debug('Start runner');
@@ -27,18 +30,43 @@ export class GraphileWorkerService {
2730
return runner.promise;
2831
}
2932

30-
async quickAddJob(
33+
/**
34+
* Runs until there are no runnable jobs left, and then resolve.
35+
*/
36+
async runOnce(): Promise<void> {
37+
await this.runMigrations();
38+
39+
this.logger.debug('Start runner');
40+
41+
await runOnce(this.options);
42+
}
43+
44+
async addJob(
3145
identifier: string,
3246
payload?: unknown,
3347
spec?: TaskSpec,
3448
): Promise<Job> {
35-
await this.runMigrations();
49+
const [job] = await this.addJobs([{ identifier, payload, spec }]);
50+
return job;
51+
}
3652

37-
const job = await quickAddJob(this.options, identifier, payload, spec);
53+
async addJobs(
54+
jobs: Array<{ identifier: string; payload?: unknown; spec?: TaskSpec }>,
55+
): Promise<Job[]> {
56+
const workerUtils = await makeWorkerUtils(this.options);
57+
const createdJobs: Job[] = [];
3858

39-
this.logger.debug(`quickAddJob add job #${job.id}`);
59+
try {
60+
await workerUtils.migrate();
4061

41-
return job;
62+
for (const { identifier, payload, spec } of jobs) {
63+
const job = await workerUtils.addJob(identifier, payload, spec);
64+
createdJobs.push(job);
65+
}
66+
return createdJobs;
67+
} finally {
68+
await workerUtils.release();
69+
}
4270
}
4371

4472
private async runMigrations() {

src/app.controller.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,18 @@ export class AppController {
88
@Post()
99
@HttpCode(201)
1010
async addJob() {
11-
await this.graphileWorker.quickAddJob('hello', { hello: 'world' });
11+
await this.graphileWorker.addJob('hello', { hello: 'world' });
12+
}
13+
14+
@Post('bulk')
15+
@HttpCode(201)
16+
async addJobs() {
17+
const jobs: Array<{ identifier: string; payload?: unknown }> = new Array(
18+
100,
19+
)
20+
.fill(undefined)
21+
.map((_, i) => ({ identifier: 'hello', payload: { hello: i } }));
22+
23+
return this.graphileWorker.addJobs(jobs);
1224
}
1325
}

0 commit comments

Comments
 (0)