Skip to content

Commit 5a305b9

Browse files
authored
Batched job processing (opt-in) (#474)
2 parents bb03d1c + 9f03b79 commit 5a305b9

32 files changed

+3334
-604
lines changed

__tests__/migrate.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414

1515
const options: WorkerSharedOptions = {};
1616

17-
const MAX_MIGRATION_NUMBER = 18;
17+
const MAX_MIGRATION_NUMBER = 19;
1818

1919
test("migration installs schema; second migration does no harm", async () => {
2020
await withPgClient(async (pgClient) => {
@@ -238,6 +238,7 @@ test("throws helpful error message in migration 11", async () => {
238238

239239
// Manually run the first 10 migrations
240240
const event = {
241+
ctx: compiledSharedOptions,
241242
client: pgClient,
242243
postgresVersion: 120000, // TODO: use the actual postgres version
243244
scratchpad: Object.create(null),

__tests__/schema.sql

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,5 +374,25 @@ ALTER TABLE graphile_worker._private_job_queues ENABLE ROW LEVEL SECURITY;
374374
ALTER TABLE graphile_worker._private_jobs ENABLE ROW LEVEL SECURITY;
375375
ALTER TABLE graphile_worker._private_known_crontabs ENABLE ROW LEVEL SECURITY;
376376
ALTER TABLE graphile_worker._private_tasks ENABLE ROW LEVEL SECURITY;
377-
REVOKE USAGE ON SCHEMA public FROM PUBLIC;
378-
GRANT ALL ON SCHEMA public TO PUBLIC;
377+
SELECT pg_catalog.set_config('search_path', '', false);
378+
COPY graphile_worker.migrations (id, ts, breaking) FROM stdin;
379+
1 1970-01-01 00:00:00.000000+00 t
380+
2 1970-01-01 00:00:00.000000+00 f
381+
3 1970-01-01 00:00:00.000000+00 t
382+
4 1970-01-01 00:00:00.000000+00 f
383+
5 1970-01-01 00:00:00.000000+00 f
384+
6 1970-01-01 00:00:00.000000+00 f
385+
7 1970-01-01 00:00:00.000000+00 f
386+
8 1970-01-01 00:00:00.000000+00 f
387+
9 1970-01-01 00:00:00.000000+00 f
388+
10 1970-01-01 00:00:00.000000+00 f
389+
11 1970-01-01 00:00:00.000000+00 t
390+
12 1970-01-01 00:00:00.000000+00 f
391+
13 1970-01-01 00:00:00.000000+00 t
392+
14 1970-01-01 00:00:00.000000+00 t
393+
15 1970-01-01 00:00:00.000000+00 f
394+
16 1970-01-01 00:00:00.000000+00 t
395+
17 1970-01-01 00:00:00.000000+00 f
396+
18 1970-01-01 00:00:00.000000+00 f
397+
19 1970-01-01 00:00:00.000000+00 t
398+
\.

package.json

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "graphile-worker",
3-
"version": "0.16.6",
3+
"version": "0.17.0-canary.1fcb2a0",
44
"type": "commonjs",
55
"description": "Job queue for PostgreSQL",
66
"main": "dist/index.js",
@@ -18,6 +18,7 @@
1818
"depcheck": "depcheck --ignores='graphile-worker,faktory-worker,@google-cloud/tasks,bullmq,jest-environment-node,@docusaurus/*,@fortawesome/*,@mdx-js/*,@types/jest,clsx,eslint_d,graphile,juice,postcss-nested,prism-react-renderer,react,react-dom,svgo,ts-node,@types/debug,tslib'",
1919
"db:dump": "./scripts/dump_db",
2020
"perfTest": "cd perfTest && node ./run.js",
21+
"towerDefence": "cd towerDefence && node ./run.mjs",
2122
"preversion": "grep '^### Pending' RELEASE_NOTES.md && echo \"⚠️ Cannot publish with 'Pending' in RELEASE_NOTES ⚠️\" && exit 1 || true",
2223
"version": "node scripts/postversion.mjs && git add src/version.ts",
2324
"website": "cd website && yarn run",
@@ -51,11 +52,10 @@
5152
"homepage": "https://github.com/graphile/worker#readme",
5253
"dependencies": {
5354
"@graphile/logger": "^0.2.0",
54-
"@tsconfig/node18": "^18.2.4",
5555
"@types/debug": "^4.1.10",
5656
"@types/pg": "^8.10.5",
5757
"cosmiconfig": "^8.3.6",
58-
"graphile-config": "^0.0.1-beta.11",
58+
"graphile-config": "^0.0.1-beta.16",
5959
"json5": "^2.2.3",
6060
"pg": "^8.11.3",
6161
"tslib": "^2.6.2",
@@ -72,6 +72,7 @@
7272
"@fortawesome/free-solid-svg-icons": "^6.5.1",
7373
"@fortawesome/react-fontawesome": "^0.2.0",
7474
"@mdx-js/react": "^1.6.22",
75+
"@tsconfig/node18": "^18.2.4",
7576
"@types/jest": "^26.0.0",
7677
"@types/json5": "^2.2.0",
7778
"@types/node": "^20.8.7",
@@ -85,7 +86,7 @@
8586
"eslint-plugin-jest": "^26.0.0",
8687
"eslint-plugin-simple-import-sort": "^10.0.0",
8788
"eslint_d": "^13.0.0",
88-
"graphile": "^5.0.0-beta.16",
89+
"graphile": "^5.0.0-beta.41",
8990
"jest": "^26.0.0",
9091
"jest-time-helpers": "0.1.1",
9192
"juice": "5.2.0",

perfTest/graphile.config.js

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,29 @@
55

66
// import { WorkerProPreset } from "../graphile-pro-worker/dist/index.js";
77

8+
const CONCURRENT_JOBS = 24;
9+
810
/** @type {GraphileConfig.Preset} */
911
const preset = {
1012
// extends: [WorkerProPreset],
1113
worker: {
1214
connectionString:
1315
process.env.PERF_DATABASE_URL || "postgres:///graphile_worker_perftest",
14-
concurrentJobs: 3,
1516
fileExtensions: [".js", ".cjs", ".mjs"],
1617
// fileExtensions: [".js", ".cjs", ".mjs", ".ts", ".cts", ".mts"],
1718
gracefulShutdownAbortTimeout: 2500,
19+
20+
concurrentJobs: CONCURRENT_JOBS,
21+
maxPoolSize: CONCURRENT_JOBS + 1,
22+
23+
//localQueue: { size: -1 },
24+
//completeJobBatchDelay: -1,
25+
//failJobBatchDelay: -1,
26+
27+
localQueue: { size: 500, refetchDelay: { durationMs: 10 } },
28+
completeJobBatchDelay: 0,
29+
failJobBatchDelay: 0,
1830
},
1931
};
32+
2033
module.exports = preset;

perfTest/init.js

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,19 @@ $$ language plpgsql;`,
3636
} else {
3737
const jobs = [];
3838
for (let i = 0; i < jobCount; i++) {
39-
jobs.push(
40-
`("${taskIdentifier.replace(
41-
/["\\]/g,
42-
"\\$&",
43-
)}","{\\"id\\":${i}}",,,,,,)`,
39+
jobs.push({ identifier: taskIdentifier, payload: { id: i } });
40+
}
41+
console.time(`Adding jobs`);
42+
while (jobs.length > 0) {
43+
const jobsSlice = jobs.splice(0, 1000000);
44+
const jobsString = JSON.stringify(jobsSlice);
45+
console.log(`Adding ${jobsSlice.length} jobs`);
46+
await pgPool.query(
47+
`select 1 from graphile_worker.add_jobs(array(select json_populate_recordset(null::graphile_worker.job_spec, $1::json)));`,
48+
[jobsString],
4449
);
50+
console.log(`...added`);
4551
}
46-
const jobsString = `{"${jobs
47-
.map((j) => j.replace(/["\\]/g, "\\$&"))
48-
.join('","')}"}`;
49-
console.time("Adding jobs");
50-
await pgPool.query(
51-
`select graphile_worker.add_jobs($1::graphile_worker.job_spec[]);`,
52-
[jobsString],
53-
);
5452
console.timeEnd("Adding jobs");
5553
}
5654

perfTest/latencyTest.js

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ const preset = require("./graphile.config.js");
99
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
1010

1111
/** @type {import('../dist/index.js').WorkerPoolOptions} */
12-
const options = {
13-
concurrency: 1,
14-
preset,
15-
};
12+
const options = { preset };
1613

1714
async function main() {
1815
const pgPool = new Pool({ connectionString: process.env.PERF_DATABASE_URL });

perfTest/run.js

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@ const { execSync, exec: rawExec } = require("child_process");
33
const { promisify } = require("util");
44
const exec = promisify(rawExec);
55

6-
const JOB_COUNT = 20000;
6+
const JOB_COUNT = 200000;
77
const STUCK_JOB_COUNT = 0;
88
const PARALLELISM = 4;
9-
const CONCURRENCY = 10;
109

1110
const time = async (cb) => {
1211
const start = process.hrtime();
@@ -52,10 +51,7 @@ async function main() {
5251
console.log("Timing startup/shutdown time...");
5352
let result;
5453
const startupTime = await time(async () => {
55-
result = await exec(
56-
`node ../dist/cli.js --once -j ${CONCURRENCY} -m ${CONCURRENCY + 1}`,
57-
execOptions,
58-
);
54+
result = await exec(`node ../dist/cli.js --once`, execOptions);
5955
});
6056
logResult(result);
6157
console.log();
@@ -81,12 +77,7 @@ async function main() {
8177
const dur = await time(async () => {
8278
const promises = [];
8379
for (let i = 0; i < PARALLELISM; i++) {
84-
promises.push(
85-
exec(
86-
`node ../dist/cli.js --once -j ${CONCURRENCY} -m ${CONCURRENCY + 1}`,
87-
execOptions,
88-
),
89-
);
80+
promises.push(exec(`node ../dist/cli.js --once`, execOptions));
9081
}
9182
(await Promise.all(promises)).map(logResult);
9283
});

scripts/dump_db

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ dropuser graphile_worker_role || true
66
psql template1 -c "CREATE USER graphile_worker_role WITH SUPERUSER PASSWORD 'password';"
77
createdb graphile_worker_dump -O graphile_worker_role
88
PGUSER=graphile_worker_role PGPASSWORD=password PGHOST=127.0.0.1 ts-node src/cli.ts -c postgres:///graphile_worker_dump --schema-only
9-
pg_dump --schema-only --no-owner graphile_worker_dump | sed -e '/^--/d' -e '/^\s*$/d' -e '/^SET /d' -e 's/EXECUTE FUNCTION/EXECUTE PROCEDURE/g' > __tests__/schema.sql
9+
pg_dump --schema-only --no-owner graphile_worker_dump | sed -E -e '/^--/d' -e '/^\s*$/d' -e '/^SET /d' -e 's/EXECUTE FUNCTION/EXECUTE PROCEDURE/g' -e '/^(REVOKE|GRANT) .* ON SCHEMA public (FROM|TO) PUBLIC;$/d' > __tests__/schema.sql
10+
pg_dump --data-only --no-owner graphile_worker_dump --table=graphile_worker.migrations --table=graphile_worker._private_pro_migrations | sed -E -e '/^--/d' -e '/^\s*$/d' -e 's/\b2[0-9]{3}-[0-9]{2}-[0-9]{2}\s[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{1,6}\+[0-9]+/1970-01-01 00:00:00.000000+00/g' -e '/^SET /d' >> __tests__/schema.sql
1011
dropdb graphile_worker_dump
1112
dropuser graphile_worker_role

sql/000019.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
--! breaking-change
2+
-- This is just a breaking change marker for the v0.17 worker-centric to
3+
-- pool-centric jump. The migration itself is not breaking.
4+
select 1;

src/cron.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ async function scheduleCronJobs(
179179
* performs backfilling on any crontab tasks that need it.
180180
*/
181181
async function registerAndBackfillItems(
182+
ctx: CompiledSharedOptions,
182183
{ pgPool, events, cron }: { pgPool: Pool; events: WorkerEvents; cron: Cron },
183184
escapedWorkerSchema: string,
184185
parsedCronItems: ParsedCronItem[],
@@ -261,6 +262,7 @@ async function registerAndBackfillItems(
261262
// At this time it's not expected that backfilling will be sufficiently
262263
// expensive to justify optimising this further.
263264
events.emit("cron:backfill", {
265+
ctx,
264266
cron,
265267
itemsToBackfill,
266268
timestamp: ts,
@@ -338,19 +340,21 @@ export const runCron = (
338340
}
339341

340342
const start = new Date();
341-
events.emit("cron:starting", { cron, start });
343+
const ctx = compiledSharedOptions;
344+
events.emit("cron:starting", { ctx, cron, start });
342345

343346
// We must backfill BEFORE scheduling any new jobs otherwise backfill won't
344347
// work due to known_crontabs.last_execution having been updated.
345348
await registerAndBackfillItems(
349+
ctx,
346350
{ pgPool, events, cron },
347351
escapedWorkerSchema,
348352
parsedCronItems,
349353
new Date(+start),
350354
useNodeTime,
351355
);
352356

353-
events.emit("cron:started", { cron, start });
357+
events.emit("cron:started", { ctx, cron, start });
354358

355359
if (!cron._active) {
356360
return stop();
@@ -411,6 +415,7 @@ export const runCron = (
411415
},
412416
);
413417
events.emit("cron:prematureTimer", {
418+
ctx,
414419
cron,
415420
currentTimestamp,
416421
expectedTimestamp,
@@ -427,6 +432,7 @@ export const runCron = (
427432
)}s behind)`,
428433
);
429434
events.emit("cron:overdueTimer", {
435+
ctx,
430436
cron,
431437
currentTimestamp,
432438
expectedTimestamp,
@@ -449,6 +455,7 @@ export const runCron = (
449455
// Finally actually run the jobs.
450456
if (jobsAndIdentifiers.length) {
451457
events.emit("cron:schedule", {
458+
ctx,
452459
cron,
453460
timestamp: expectedTimestamp,
454461
jobsAndIdentifiers,
@@ -461,6 +468,7 @@ export const runCron = (
461468
useNodeTime,
462469
);
463470
events.emit("cron:scheduled", {
471+
ctx,
464472
cron,
465473
timestamp: expectedTimestamp,
466474
jobsAndIdentifiers,

0 commit comments

Comments
 (0)