Skip to content

Commit b1b9321

Browse files
authored
Job run performance improvements and adding "worker only" mode (#360)
* WIP job run performance improvements - Added a `perf` tool to better measure job run performance under heavy load - Removed `runFinished` job (not really needed) - startQueuedRuns now uses a jobKey with replace - Fixed an issue with ZodWorker when using jobKey * Publish improvement docker images * fixed the improvement docker publishing * Downgrade back to prisma 4.16.0 because 5.1.x broke docker builds * Changes to how queued runs work - Split the worker into two different workers, one dedicated to performRunExecution - Schedule performRunExecution in a single place, with a queue and using a round robin manually controlled concurrency - Remove startQueuedRuns - All runs are queued before they are started - Setting the worker maxPoolSize to the same as the worker concurrency - Starting to be able to split the docker image * Remove queue name from startRun graphile job * Make the prisma connection pool stuff configurable through env vars * Hardcode (for now) the max concurrent runs limit * Rewrite performRunExecution to be more performant PerformRunExecutionV2: - Does not create and manage jobRunExecution records - Does not reimplement retrying, uses graphile worker retrying instead I’ve kept around PerformRunExecutionV1 so this works when deploying. Definitely needs LOTS of testing * Fix issues with cached tasks - Limit the size of the cached tasks sent when executing a run, using the knapsack problem dynamic programming approach - Actually USE the cached tasks in IO by using the idempotencyKey instead of the task ID - Remove output from all logs - Added a stress test job catalog * Forgot to commit the logger updates * Never log connectionString * Login to docker hub to get around rate limits * Add additional logging to the graphile workers * Fix the *_ENABLED env vars * Allow adding and removing jobs to be done from the webapp * Don’t set the job to failed if it’s being retried * Deprecated queue options in the job and removed startPosition. Now using the job/env combo as the job queue name * Dequeung jobs doesn’t check if the runner is initialized * Fixed issues with retrying a run getting stuck on a cancelled task, and errors from parsing the results of dequeing a job * Remove queued round robin thing that isn’t used anymore * Added slack to job catalog * Better forwards compat * Added long delay * Fixed lock file
1 parent a69f756 commit b1b9321

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1702
-881
lines changed

.changeset/fuzzy-trees-lay.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Fixed IO not setting the cached task key correctly, resulting in unnecessary API calls to trigger.dev

.changeset/happy-foxes-play.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Deprecated queue options in the job and removed startPosition

.github/workflows/publish.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ on:
44
push:
55
branches:
66
- main
7+
- improvements/*
78
tags:
89
- "v.docker.*"
910
paths:
@@ -95,6 +96,12 @@ jobs:
9596
name: e2e Tests
9697
runs-on: buildjet-4vcpu-ubuntu-2204
9798
steps:
99+
- name: 🐳 Login to Docker Hub
100+
uses: docker/login-action@v2
101+
with:
102+
username: ${{ secrets.DOCKERHUB_USERNAME }}
103+
password: ${{ secrets.DOCKERHUB_TOKEN }}
104+
98105
- name: ⬇️ Checkout repo
99106
uses: actions/checkout@v3
100107
with:
@@ -154,6 +161,11 @@ jobs:
154161
version: ${{ steps.get_version.outputs.version }}
155162
short_sha: ${{ steps.get_commit.outputs.sha_short }}
156163
steps:
164+
- name: 🐳 Login to Docker Hub
165+
uses: docker/login-action@v2
166+
with:
167+
username: ${{ secrets.DOCKERHUB_USERNAME }}
168+
password: ${{ secrets.DOCKERHUB_TOKEN }}
157169
- name: ⬇️ Checkout repo
158170
uses: actions/checkout@v3
159171

@@ -167,6 +179,10 @@ jobs:
167179
IMAGE_TAG="v${ORIGINAL_VERSION}"
168180
fi
169181
echo "IMAGE_TAG=${IMAGE_TAG}"
182+
elif [[ $GITHUB_REF == refs/heads/improvements/* ]]; then
183+
ORIGINAL_VERSION="${GITHUB_REF#refs/heads/improvements/}"
184+
IMAGE_TAG="${ORIGINAL_VERSION}.rc"
185+
echo "IMAGE_TAG=${IMAGE_TAG}"
170186
elif [[ $GITHUB_REF == refs/heads/* ]]; then
171187
IMAGE_TAG="${GITHUB_REF#refs/heads/}"
172188
echo "IMAGE_TAG=${IMAGE_TAG}"

apps/webapp/app/consts.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export const LIVE_ENVIRONMENT = "live";
22
export const DEV_ENVIRONMENT = "development";
33
export const MAX_LIVE_PROJECTS = 1;
4-
export const DEFAULT_MAX_CONCURRENT_RUNS = 10000;
4+
export const DEFAULT_MAX_CONCURRENT_RUNS = 10;
5+
export const MAX_CONCURRENT_RUNS_LIMIT = 20;
56
export const PREPROCESS_RETRY_LIMIT = 2;
67
export const EXECUTE_JOB_RETRY_LIMIT = 10;

apps/webapp/app/db.server.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { PrismaClient, Prisma } from "@trigger.dev/database";
22
import invariant from "tiny-invariant";
33
import { z } from "zod";
44
import { logger } from "./services/logger.server";
5+
import { env } from "./env.server";
56

67
export type PrismaTransactionClient = Omit<
78
PrismaClient,
@@ -84,17 +85,24 @@ function getClient() {
8485
const { DATABASE_URL } = process.env;
8586
invariant(typeof DATABASE_URL === "string", "DATABASE_URL env var not set");
8687

88+
const databaseUrl = new URL(DATABASE_URL);
89+
90+
// We need to add the connection_limit and pool_timeout query params to the url, in a way that works if the DATABASE_URL already has query params
91+
const query = databaseUrl.searchParams;
92+
query.set("connection_limit", env.DATABASE_CONNECTION_LIMIT.toString());
93+
query.set("pool_timeout", env.DATABASE_POOL_TIMEOUT.toString());
94+
databaseUrl.search = query.toString();
95+
8796
// Remove the username:password in the url and print that to the console
88-
const urlWithoutCredentials = new URL(DATABASE_URL);
97+
const urlWithoutCredentials = new URL(databaseUrl.href);
8998
urlWithoutCredentials.password = "";
9099

91100
console.log(`🔌 setting up prisma client to ${urlWithoutCredentials.toString()}`);
92101

93102
const client = new PrismaClient({
94103
datasources: {
95104
db: {
96-
url: DATABASE_URL,
97-
// We can't set directUrl here, and we don't have to
105+
url: databaseUrl.href,
98106
},
99107
},
100108
log: [

apps/webapp/app/env.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import { SecretStoreOptionsSchema } from "./services/secrets/secretStore.server"
44
const EnvironmentSchema = z.object({
55
NODE_ENV: z.union([z.literal("development"), z.literal("production"), z.literal("test")]),
66
DATABASE_URL: z.string(),
7+
DATABASE_CONNECTION_LIMIT: z.coerce.number().int().default(10),
8+
DATABASE_POOL_TIMEOUT: z.coerce.number().int().default(60),
79
DIRECT_URL: z.string(),
810
SESSION_SECRET: z.string(),
911
MAGIC_LINK_SECRET: z.string(),
@@ -31,6 +33,13 @@ const EnvironmentSchema = z.object({
3133
RESEND_API_KEY: z.string().optional(),
3234
PLAIN_API_KEY: z.string().optional(),
3335
RUNTIME_PLATFORM: z.enum(["docker-compose", "ecs", "local"]).default("local"),
36+
WORKER_SCHEMA: z.string().default("graphile_worker"),
37+
WORKER_CONCURRENCY: z.coerce.number().int().default(10),
38+
WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
39+
EXECUTION_WORKER_CONCURRENCY: z.coerce.number().int().default(10),
40+
EXECUTION_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
41+
WORKER_ENABLED: z.string().default("true"),
42+
EXECUTION_WORKER_ENABLED: z.string().default("true"),
3443
});
3544

3645
export type Environment = z.infer<typeof EnvironmentSchema>;
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { JobRun, JobRunExecution } from "@trigger.dev/database";
2+
import { PrismaClientOrTransaction } from "~/db.server";
3+
import { executionWorker } from "~/services/worker.server";
4+
5+
export async function enqueueRunExecutionV1(
6+
execution: JobRunExecution,
7+
queueId: string,
8+
concurrency: number,
9+
tx: PrismaClientOrTransaction,
10+
runAt?: Date
11+
) {
12+
const job = await executionWorker.enqueue(
13+
"performRunExecution",
14+
{
15+
id: execution.id,
16+
},
17+
{
18+
queueName: `job:queue:${queueId}`,
19+
tx,
20+
runAt,
21+
jobKey: `execution:${execution.runId}`,
22+
}
23+
);
24+
}
25+
26+
export type EnqueueRunExecutionV2Options = {
27+
runAt?: Date;
28+
resumeTaskId?: string;
29+
isRetry?: boolean;
30+
};
31+
32+
export async function enqueueRunExecutionV2(
33+
run: JobRun,
34+
tx: PrismaClientOrTransaction,
35+
options: EnqueueRunExecutionV2Options = {}
36+
) {
37+
const job = await executionWorker.enqueue(
38+
"performRunExecutionV2",
39+
{
40+
id: run.id,
41+
reason: run.status === "PREPROCESSING" ? "PREPROCESS" : "EXECUTE_JOB",
42+
resumeTaskId: options.resumeTaskId,
43+
isRetry: typeof options.isRetry === "boolean" ? options.isRetry : false,
44+
},
45+
{
46+
queueName: `job:${run.jobId}:env:${run.environmentId}`,
47+
tx,
48+
runAt: options.runAt,
49+
jobKey: `job_run:${run.id}`,
50+
}
51+
);
52+
}
53+
54+
export async function dequeueRunExecutionV2(run: JobRun, tx: PrismaClientOrTransaction) {
55+
return await executionWorker.dequeue(`job_run:${run.id}`, {
56+
tx,
57+
});
58+
}

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 116 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ const AddJobResultsSchema = z.array(GraphileJobSchema);
5151
export type ZodTasks<TConsumerSchema extends MessageCatalogSchema> = {
5252
[K in keyof TConsumerSchema]: {
5353
queueName?: string | ((payload: z.infer<TConsumerSchema[K]>) => string);
54+
jobKey?: string | ((payload: z.infer<TConsumerSchema[K]>) => string | undefined);
5455
priority?: number;
5556
maxAttempts?: number;
5657
jobKeyMode?: "replace" | "preserve_run_at" | "unsafe_dedupe";
@@ -76,7 +77,12 @@ export type ZodWorkerEnqueueOptions = TaskSpec & {
7677
tx?: PrismaClientOrTransaction;
7778
};
7879

80+
export type ZodWorkerDequeueOptions = {
81+
tx?: PrismaClientOrTransaction;
82+
};
83+
7984
export type ZodWorkerOptions<TMessageCatalog extends MessageCatalogSchema> = {
85+
name: string;
8086
runnerOptions: RunnerOptions;
8187
prisma: PrismaClient;
8288
schema: TMessageCatalog;
@@ -85,6 +91,7 @@ export type ZodWorkerOptions<TMessageCatalog extends MessageCatalogSchema> = {
8591
};
8692

8793
export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
94+
#name: string;
8895
#schema: TMessageCatalog;
8996
#prisma: PrismaClient;
9097
#runnerOptions: RunnerOptions;
@@ -93,6 +100,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
93100
#runner?: GraphileRunner;
94101

95102
constructor(options: ZodWorkerOptions<TMessageCatalog>) {
103+
this.#name = options.name;
96104
this.#schema = options.schema;
97105
this.#prisma = options.prisma;
98106
this.#runnerOptions = options.runnerOptions;
@@ -105,7 +113,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
105113
return true;
106114
}
107115

108-
logger.debug("Initializing worker queue with options", {
116+
this.#logDebug("Initializing worker queue with options", {
109117
runnerOptions: this.#runnerOptions,
110118
});
111119

@@ -121,6 +129,54 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
121129
throw new Error("Failed to initialize worker queue");
122130
}
123131

132+
this.#runner?.events.on("pool:create", ({ workerPool }) => {
133+
this.#logDebug("pool:create");
134+
});
135+
136+
this.#runner?.events.on("pool:listen:connecting", ({ workerPool, attempts }) => {
137+
this.#logDebug("pool:create", { attempts });
138+
});
139+
140+
this.#runner?.events.on("pool:listen:success", ({ workerPool, client }) => {
141+
this.#logDebug("pool:listen:success");
142+
});
143+
144+
this.#runner?.events.on("pool:listen:error", ({ error }) => {
145+
this.#logDebug("pool:listen:error", { error });
146+
});
147+
148+
this.#runner?.events.on("pool:gracefulShutdown", ({ message }) => {
149+
this.#logDebug("pool:gracefulShutdown", { workerMessage: message });
150+
});
151+
152+
this.#runner?.events.on("pool:gracefulShutdown:error", ({ error }) => {
153+
this.#logDebug("pool:gracefulShutdown:error", { error });
154+
});
155+
156+
this.#runner?.events.on("worker:create", ({ worker }) => {
157+
this.#logDebug("worker:create", { workerId: worker.workerId });
158+
});
159+
160+
this.#runner?.events.on("worker:release", ({ worker }) => {
161+
this.#logDebug("worker:release", { workerId: worker.workerId });
162+
});
163+
164+
this.#runner?.events.on("worker:stop", ({ worker, error }) => {
165+
this.#logDebug("worker:stop", { workerId: worker.workerId, error });
166+
});
167+
168+
this.#runner?.events.on("worker:fatalError", ({ worker, error, jobError }) => {
169+
this.#logDebug("worker:fatalError", { workerId: worker.workerId, error, jobError });
170+
});
171+
172+
this.#runner?.events.on("gracefulShutdown", ({ signal }) => {
173+
this.#logDebug("gracefulShutdown", { signal });
174+
});
175+
176+
this.#runner?.events.on("stop", () => {
177+
this.#logDebug("stop");
178+
});
179+
124180
return true;
125181
}
126182

@@ -133,23 +189,34 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
133189
payload: z.infer<TMessageCatalog[K]>,
134190
options?: ZodWorkerEnqueueOptions
135191
): Promise<GraphileJob> {
136-
if (!this.#runner) {
137-
throw new Error("Worker not initialized");
138-
}
139-
140192
const task = this.#tasks[identifier];
141193

142194
const optionsWithoutTx = omit(options ?? {}, ["tx"]);
195+
const taskWithoutJobKey = omit(task, ["jobKey"]);
143196

144197
const spec = {
145198
...optionsWithoutTx,
146-
...task,
199+
...taskWithoutJobKey,
147200
};
148201

149202
if (typeof task.queueName === "function") {
150203
spec.queueName = task.queueName(payload);
151204
}
152205

206+
if (typeof task.jobKey === "function") {
207+
const jobKey = task.jobKey(payload);
208+
209+
if (jobKey) {
210+
spec.jobKey = jobKey;
211+
}
212+
}
213+
214+
logger.debug("Enqueuing worker task", {
215+
identifier,
216+
payload,
217+
spec,
218+
});
219+
153220
const job = await this.#addJob(
154221
identifier as string,
155222
payload,
@@ -167,6 +234,17 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
167234
return job;
168235
}
169236

237+
public async dequeue(
238+
jobKey: string,
239+
option?: ZodWorkerDequeueOptions
240+
): Promise<GraphileJob | undefined> {
241+
const results = await this.#removeJob(jobKey, option?.tx ?? this.#prisma);
242+
243+
logger.debug("dequeued worker task", { results, jobKey });
244+
245+
return results;
246+
}
247+
170248
async #addJob(
171249
identifier: string,
172250
payload: unknown,
@@ -192,8 +270,8 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
192270
spec.maxAttempts || null,
193271
spec.jobKey || null,
194272
spec.priority || null,
195-
spec.jobKeyMode || null,
196-
spec.flags || null
273+
spec.flags || null,
274+
spec.jobKeyMode || null
197275
);
198276

199277
const rows = AddJobResultsSchema.safeParse(results);
@@ -209,6 +287,32 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
209287
return job as GraphileJob;
210288
}
211289

290+
async #removeJob(jobKey: string, tx: PrismaClientOrTransaction) {
291+
try {
292+
const result = await tx.$queryRawUnsafe(
293+
`SELECT * FROM graphile_worker.remove_job(
294+
job_key => $1::text
295+
)`,
296+
jobKey
297+
);
298+
const job = AddJobResultsSchema.safeParse(result);
299+
300+
if (!job.success) {
301+
logger.debug("results returned from remove_job could not be parsed", {
302+
error: job.error.flatten(),
303+
result,
304+
jobKey,
305+
});
306+
307+
return;
308+
}
309+
310+
return job.data[0] as GraphileJob;
311+
} catch (e) {
312+
throw new Error(`Failed to remove job from queue, ${e}}`);
313+
}
314+
}
315+
212316
#createTaskListFromTasks() {
213317
const taskList: TaskList = {};
214318

@@ -324,4 +428,8 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
324428
throw error;
325429
}
326430
}
431+
432+
#logDebug(message: string, args?: any) {
433+
logger.debug(`[worker][${this.#name}] ${message}`, args);
434+
}
327435
}

0 commit comments

Comments
 (0)