Skip to content

Commit 221f274

Browse files
committed
feat(update): minor changes
1 parent d482ae9 commit 221f274

File tree

2 files changed

+99
-1
lines changed

2 files changed

+99
-1
lines changed

README.md

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,97 @@ const main = async () => {
113113
main();
114114
```
115115

116+
## Advanced usage
117+
118+
### Threading
119+
120+
You can easily spin of your workers in separate threads using [worker_threads](https://nodejs.org/api/worker_threads.html#worker-threads) (Node.js >= 12.17.0).
121+
122+
It enables you to fully leverage your CPU cores and isolate your main application queue from potential memory leaks or crashes.
123+
124+
```ts
125+
import { JobPayload, JobResult, PrismaJob } from "@mgcrea/prisma-queue";
126+
import { Worker } from "node:worker_threads";
127+
import { ROOT_DIR } from "src/config/env";
128+
import { log } from "src/config/log";
129+
130+
const WORKER_SCRIPT = `${ROOT_DIR}/dist/worker.js`;
131+
132+
export const processInWorker = async <P extends JobPayload, R extends JobResult>(
133+
job: PrismaJob<P, R>,
134+
): Promise<R> =>
135+
new Promise((resolve, reject) => {
136+
const workerData = getJobWorkerData(job);
137+
138+
log.debug(`Starting worker thread for job id=${job.id} in queue=${job.record.queue}`);
139+
try {
140+
const worker = new Worker(WORKER_SCRIPT, {
141+
workerData,
142+
});
143+
144+
worker.on("message", resolve);
145+
worker.on("error", reject);
146+
worker.on("exit", (code) => {
147+
if (code !== 0) {
148+
reject(
149+
new Error(
150+
`Worker for job id=${job.id} in queue=${job.record.queue} stopped with exit code ${code}`,
151+
),
152+
);
153+
}
154+
});
155+
} catch (error) {
156+
reject(error as Error);
157+
}
158+
});
159+
160+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
161+
export type JobWorkerData<P extends JobPayload = any> = {
162+
id: bigint;
163+
payload: P;
164+
queue: string;
165+
};
166+
167+
const getJobWorkerData = <P extends JobPayload, R extends JobResult>(job: PrismaJob<P, R>): JobWorkerData => {
168+
// Prepare the job data for structured cloning in worker thread
169+
return {
170+
id: job.id,
171+
payload: job.payload,
172+
queue: job.record.queue,
173+
};
174+
};
175+
```
176+
177+
- `worker.ts`
178+
179+
```ts
180+
import { parentPort, workerData } from "node:worker_threads";
181+
import { log } from "src/config/log";
182+
import { workers } from "src/queue";
183+
import { type JobWorkerData } from "src/utils/queue";
184+
import { logMemoryUsage } from "./utils/system";
185+
186+
log.info(`Worker thread started with data=${JSON.stringify(workerData)}`);
187+
188+
const typedWorkerData = workerData as JobWorkerData;
189+
const { queue } = typedWorkerData;
190+
const workerName = queue.replace(/Queue$/, "Worker") as keyof typeof workers;
191+
192+
log.debug(`Importing worker ${workerName} for queue=${queue}`);
193+
const jobWorker = workers[workerName];
194+
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
195+
if (!jobWorker) {
196+
log.error(`No worker found for queue=${queue}`);
197+
process.exit(1);
198+
}
199+
200+
log.info(`Running worker for queue=${queue}`);
201+
const result = await jobWorker(typedWorkerData);
202+
log.info(`Worker for queue=${queue} completed with result=${JSON.stringify(result)}`);
203+
parentPort?.postMessage(result);
204+
process.exit(0);
205+
```
206+
116207
## Authors
117208

118209
- [Olivier Louvignes](https://github.com/mgcrea) <<[email protected]>>
@@ -122,7 +213,6 @@ main();
122213
Inspired by
123214

124215
- [pg-queue](https://github.com/OrKoN/pg-queue) by
125-
[Alex Rudenko](https://github.com/OrKoN)
126216

127217
## License
128218

src/PrismaJob.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ export class PrismaJob<Payload, Result> {
1717
#record: DatabaseJob<Payload, Result>;
1818

1919
public readonly id;
20+
public readonly createdAt: Date = new Date();
2021

2122
/**
2223
* Constructs a new PrismaJob instance with the provided job record and database access objects.
@@ -55,6 +56,13 @@ export class PrismaJob<Payload, Result> {
5556
return this.#record.key;
5657
}
5758

59+
/**
60+
* Gets the job's queue name.
61+
*/
62+
public get queue() {
63+
return this.#record.queue;
64+
}
65+
5866
/**
5967
* Gets the CRON expression associated with the job for recurring scheduling.
6068
*/

0 commit comments

Comments
 (0)