Skip to content

Commit a3278cb

Browse files
committed
bug fixes
1 parent 24c8f08 commit a3278cb

File tree

3 files changed

+29
-37
lines changed

3 files changed

+29
-37
lines changed

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,9 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
175175
if (!this.#runner) {
176176
throw new Error("Worker not initialized");
177177
}
178+
178179
const job = await this.#removeJob(jobKey, option?.tx ?? this.#prisma);
179-
logger.debug("dequeued worker task", {
180-
job,
181-
});
180+
logger.debug("dequeued worker task", { job });
182181

183182
return job;
184183
}
@@ -206,7 +205,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
206205
spec.queueName || null,
207206
spec.runAt || null,
208207
spec.maxAttempts || null,
209-
identifier,
208+
spec.jobKey,
210209
spec.priority || null,
211210
spec.jobKeyMode || null,
212211
spec.flags || null
@@ -226,22 +225,25 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
226225
}
227226

228227
async #removeJob(jobKey: string, tx: PrismaClientOrTransaction) {
229-
const result = await tx.$queryRawUnsafe(
230-
`SELECT * FROM graphile_worker.remove_job(
228+
try {
229+
const result = await tx.$queryRawUnsafe(
230+
`SELECT * FROM graphile_worker.remove_job(
231231
job_key => $1::text
232232
)`,
233-
jobKey
234-
);
233+
jobKey
234+
);
235+
const job = AddJobResultsSchema.safeParse(result);
235236

236-
const job = GraphileJobSchema.safeParse(result);
237+
if (!job.success) {
238+
throw new Error(
239+
`Failed to remove job from queue, zod parsing error: ${JSON.stringify(job.error)}`
240+
);
241+
}
237242

238-
if (!job.success) {
239-
throw new Error(
240-
`Failed to remove job from queue, zod parsing error: ${JSON.stringify(job.error)}`
241-
);
243+
return job.data[0] as GraphileJob;
244+
} catch (e) {
245+
throw new Error(`Failed to remove job from queue, ${e}}`);
242246
}
243-
244-
return job.data as GraphileJob;
245247
}
246248

247249
#createTaskListFromTasks() {

apps/webapp/app/routes/api.v1.events.$eventId.cancel.ts

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { ActionArgs, LoaderArgs } from "@remix-run/server-runtime";
1+
import type { ActionArgs } from "@remix-run/server-runtime";
22
import { json } from "@remix-run/server-runtime";
33
import { z } from "zod";
44
import { prisma } from "~/db.server";
@@ -10,7 +10,7 @@ const ParamsSchema = z.object({
1010
eventId: z.string(),
1111
});
1212

13-
export async function loader({ request, params }: LoaderArgs) {
13+
export async function action({ request, params }: ActionArgs) {
1414
// Ensure this is a POST request
1515
if (request.method.toUpperCase() !== "POST") {
1616
return { status: 405, body: "Method Not Allowed" };
@@ -57,22 +57,15 @@ export async function loader({ request, params }: LoaderArgs) {
5757
if (!event) {
5858
return json({ error: "Event not found" }, { status: 404 });
5959
}
60-
// Update the event with cancelledAt
61-
let updatedEvent;
6260

63-
try {
64-
updatedEvent = await prisma.eventRecord.update({
65-
where: { id: event.id },
66-
data: { cancelledAt: new Date() },
67-
});
68-
69-
// Dequeue the event only if the update is successful
70-
await workerQueue.dequeue(updatedEvent.id, { tx: prisma });
61+
//update the cancelledAt column in the eventRecord table
62+
const updatedEvent = await prisma.eventRecord.update({
63+
where: { id: event.id },
64+
data: { cancelledAt: new Date() },
65+
});
7166

72-
return json(updatedEvent);
73-
} catch (error) {
74-
logger.error("Failed to update event", { error, event });
67+
// Dequeue the event after the db has been updated
68+
await workerQueue.dequeue(event.id, { tx: prisma });
7569

76-
return json({ error: "Failed to update event" }, { status: 500 });
77-
}
70+
return json(updatedEvent);
7871
}

apps/webapp/app/services/events/ingestSendEvent.server.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@ import { workerQueue } from "~/services/worker.server";
66
export class IngestSendEvent {
77
#prismaClient: PrismaClientOrTransaction;
88

9-
constructor(
10-
prismaClient: PrismaClientOrTransaction = prisma,
11-
private deliverEvents = true
12-
) {
9+
constructor(prismaClient: PrismaClientOrTransaction = prisma, private deliverEvents = true) {
1310
this.#prismaClient = prismaClient;
1411
}
1512

@@ -91,7 +88,7 @@ export class IngestSendEvent {
9188
{
9289
id: eventLog.id,
9390
},
94-
{ runAt: eventLog.deliverAt, tx }
91+
{ runAt: eventLog.deliverAt, tx, jobKey: eventLog.id }
9592
);
9693
}
9794

0 commit comments

Comments
 (0)