Skip to content

Commit b737f7a

Browse files
committed
Report worker metrics when WORKER_REPORTER_EMAIL env var is set
1 parent f54a9ff commit b737f7a

File tree

5 files changed

+121
-8
lines changed

5 files changed

+121
-8
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const EnvironmentSchema = z.object({
4040
EXECUTION_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
4141
WORKER_ENABLED: z.string().default("true"),
4242
EXECUTION_WORKER_ENABLED: z.string().default("true"),
43+
WORKER_REPORTER_EMAIL: z.string().email().optional(),
4344
});
4445

4546
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,12 @@ export type ZodWorkerDequeueOptions = {
8282
};
8383

8484
const CLEANUP_TASK_NAME = "__cleanupOldJobs";
85+
const REPORTER_TASK_NAME = "__reporter";
8586

8687
export type ZodWorkerCleanupOptions = {
8788
frequencyExpression: string; // cron expression
8889
ttl: number;
90+
maxCount: number;
8991
taskOptions?: CronItemOptions;
9092
};
9193

@@ -97,6 +99,7 @@ export type ZodWorkerOptions<TMessageCatalog extends MessageCatalogSchema> = {
9799
tasks: ZodTasks<TMessageCatalog>;
98100
recurringTasks?: ZodRecurringTasks;
99101
cleanup?: ZodWorkerCleanupOptions;
102+
reporter?: (subject: string, message: string) => Promise<void>;
100103
};
101104

102105
export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
@@ -108,6 +111,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
108111
#recurringTasks?: ZodRecurringTasks;
109112
#runner?: GraphileRunner;
110113
#cleanup: ZodWorkerCleanupOptions | undefined;
114+
#reporter?: (subject: string, message: string) => Promise<void>;
111115

112116
constructor(options: ZodWorkerOptions<TMessageCatalog>) {
113117
this.#name = options.name;
@@ -117,6 +121,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
117121
this.#tasks = options.tasks;
118122
this.#recurringTasks = options.recurringTasks;
119123
this.#cleanup = options.cleanup;
124+
this.#reporter = options.reporter;
120125
}
121126

122127
get graphileWorkerSchema() {
@@ -356,6 +361,14 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
356361
taskList[CLEANUP_TASK_NAME] = task;
357362
}
358363

364+
if (this.#reporter) {
365+
const task: Task = (payload, helpers) => {
366+
return this.#handleReporter(payload, helpers);
367+
};
368+
369+
taskList[REPORTER_TASK_NAME] = task;
370+
}
371+
359372
return taskList;
360373
}
361374

@@ -371,6 +384,14 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
371384
});
372385
}
373386

387+
if (this.#reporter) {
388+
cronItems.push({
389+
pattern: "50 * * * *", // Every hour at 50 minutes past the hour
390+
identifier: REPORTER_TASK_NAME,
391+
task: REPORTER_TASK_NAME,
392+
});
393+
}
394+
374395
if (!this.#recurringTasks) {
375396
return cronItems;
376397
}
@@ -493,8 +514,9 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
493514
});
494515

495516
const rawResults = await this.#prisma.$queryRawUnsafe(
496-
`DELETE FROM ${this.graphileWorkerSchema}.jobs WHERE run_at < $1 AND locked_at IS NULL AND max_attempts = attempts RETURNING id`,
497-
expirationDate
517+
`WITH rows AS (SELECT id FROM ${this.graphileWorkerSchema}.jobs WHERE run_at < $1 AND locked_at IS NULL AND max_attempts = attempts ORDER BY run_at ASC LIMIT $2) DELETE FROM ${this.graphileWorkerSchema}.jobs WHERE id IN (SELECT id FROM rows) RETURNING id`,
518+
expirationDate,
519+
this.#cleanup.maxCount
498520
);
499521

500522
const results = Array.isArray(rawResults) ? rawResults : [];
@@ -504,6 +526,65 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
504526
expirationDate,
505527
payload,
506528
});
529+
530+
if (this.#reporter) {
531+
await this.#reporter(
532+
"Worker Queue Cleanup",
533+
`Cleaned up ${results.length} jobs older than ${expirationDate.toISOString()}`
534+
);
535+
}
536+
}
537+
538+
async #handleReporter(rawPayload: unknown, helpers: JobHelpers): Promise<void> {
539+
if (!this.#reporter) {
540+
return;
541+
}
542+
543+
logger.debug("Received reporter task", {
544+
payload: rawPayload,
545+
});
546+
547+
const parsedPayload = RawCronPayloadSchema.safeParse(rawPayload);
548+
549+
if (!parsedPayload.success) {
550+
throw new Error(
551+
`Failed to parse cleanup task payload: ${JSON.stringify(parsedPayload.error)}`
552+
);
553+
}
554+
555+
const payload = parsedPayload.data;
556+
557+
// Subtract an hour from the payload._cron.ts
558+
const startAt = new Date(payload._cron.ts.getTime() - 1000 * 60 * 60);
559+
560+
const schema = z.array(z.object({ count: z.coerce.number() }));
561+
562+
// Count the number of jobs that have been added since the startAt date and before the payload._cron.ts date
563+
const rawAddedResults = await this.#prisma.$queryRawUnsafe(
564+
`SELECT COUNT(*) FROM ${this.graphileWorkerSchema}.jobs WHERE created_at > $1 AND created_at < $2`,
565+
startAt,
566+
payload._cron.ts
567+
);
568+
569+
const addedCountResults = schema.parse(rawAddedResults)[0];
570+
571+
// Count the total number of jobs in the jobs table
572+
const rawTotalResults = await this.#prisma.$queryRawUnsafe(
573+
`SELECT COUNT(*) FROM ${this.graphileWorkerSchema}.jobs`
574+
);
575+
576+
const totalCountResults = schema.parse(rawTotalResults)[0];
577+
578+
logger.debug("Calculated metrics about the jobs table", {
579+
rawAddedResults,
580+
rawTotalResults,
581+
payload,
582+
});
583+
584+
await this.#reporter(
585+
"Worker Queue Metrics",
586+
`Added ${addedCountResults.count} jobs in the last hour, total jobs: ${totalCountResults.count}`
587+
);
507588
}
508589

509590
#logDebug(message: string, args?: any) {

apps/webapp/app/services/email.server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { DeliverEmail } from "emails";
1+
import type { DeliverEmail, SendPlainTextOptions } from "emails";
22
import { EmailClient } from "emails";
33
import type { SendEmailOptions } from "remix-auth-email-link";
44
import { redirect } from "remix-typedjson";
@@ -27,6 +27,10 @@ export async function sendMagicLinkEmail(options: SendEmailOptions<AuthUser>): P
2727
});
2828
}
2929

30+
export async function sendPlainTextEmail(options: SendPlainTextOptions) {
31+
return client.sendPlainText(options);
32+
}
33+
3034
export async function scheduleWelcomeEmail(user: User) {
3135
//delay for one minute in development, longer in production
3236
const delay = process.env.NODE_ENV === "development" ? 1000 * 60 : 1000 * 60 * 22;

apps/webapp/app/services/worker.server.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { z } from "zod";
44
import { prisma } from "~/db.server";
55
import { env } from "~/env.server";
66
import { ZodWorker } from "~/platform/zodWorker.server";
7-
import { sendEmail } from "./email.server";
7+
import { sendEmail, sendPlainTextEmail } from "./email.server";
88
import { IndexEndpointService } from "./endpoints/indexEndpoint.server";
99
import { RecurringEndpointIndexService } from "./endpoints/recurringEndpointIndex.server";
1010
import { DeliverEventService } from "./events/deliverEvent.server";
@@ -21,6 +21,7 @@ import { DeliverHttpSourceRequestService } from "./sources/deliverHttpSourceRequ
2121
import { PerformTaskOperationService } from "./tasks/performTaskOperation.server";
2222
import { ProcessCallbackTimeoutService } from "./tasks/processCallbackTimeout";
2323
import { addMissingVersionField } from "@trigger.dev/core";
24+
import { logger } from "./logger.server";
2425

2526
const workerCatalog = {
2627
indexEndpoint: z.object({
@@ -129,10 +130,20 @@ function getWorkerQueue() {
129130
name: "workerQueue",
130131
prisma,
131132
cleanup: {
132-
// cleanup once per hour
133-
frequencyExpression: "0 * * * *",
134-
// delete jobs that have been completed for more than 7 days
135-
ttl: 7 * 24 * 60 * 60 * 1000,
133+
frequencyExpression: "13,27,43 * * * *",
134+
ttl: 7 * 24 * 60 * 60 * 1000, // 7 days
135+
maxCount: 1000,
136+
},
137+
reporter: async (subject, message) => {
138+
logger.info("workerQueue reporter", { workerMessage: message, subject });
139+
140+
if (env.WORKER_REPORTER_EMAIL) {
141+
await sendPlainTextEmail({
142+
to: env.WORKER_REPORTER_EMAIL,
143+
subject: `workerQueue Report: ${subject}`,
144+
text: message,
145+
});
146+
}
136147
},
137148
runnerOptions: {
138149
connectionString: env.DATABASE_URL,

packages/emails/src/index.tsx

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ export const DeliverEmailSchema = z
4242

4343
export type DeliverEmail = z.infer<typeof DeliverEmailSchema>;
4444

45+
export type SendPlainTextOptions = { to: string; subject: string; text: string };
46+
4547
export class EmailClient {
4648
#client?: Resend;
4749
#imagesBaseUrl: string;
@@ -66,6 +68,20 @@ export class EmailClient {
6668
});
6769
}
6870

71+
async sendPlainText(options: SendPlainTextOptions) {
72+
if (this.#client) {
73+
await this.#client.sendEmail({
74+
from: this.#from,
75+
to: options.to,
76+
replyTo: this.#replyTo,
77+
subject: options.subject,
78+
text: options.text,
79+
});
80+
81+
return;
82+
}
83+
}
84+
6985
#getTemplate(data: DeliverEmail): {
7086
subject: string;
7187
component: ReactElement;

0 commit comments

Comments
 (0)