Skip to content

Commit 24c8f08

Browse files
committed
feat: added the ability to cancel Events based on the event ID
1 parent a998d6c commit 24c8f08

File tree

6 files changed

+137
-1
lines changed

6 files changed

+137
-1
lines changed

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ export type ZodWorkerEnqueueOptions = TaskSpec & {
7676
tx?: PrismaClientOrTransaction;
7777
};
7878

79+
export type ZodWorkerDequeueOptions = {
80+
tx?: PrismaClientOrTransaction;
81+
};
82+
7983
export type ZodWorkerOptions<TMessageCatalog extends MessageCatalogSchema> = {
8084
runnerOptions: RunnerOptions;
8185
prisma: PrismaClient;
@@ -167,6 +171,18 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
167171
return job;
168172
}
169173

174+
public async dequeue(jobKey: string, option?: ZodWorkerDequeueOptions): Promise<GraphileJob> {
175+
if (!this.#runner) {
176+
throw new Error("Worker not initialized");
177+
}
178+
const job = await this.#removeJob(jobKey, option?.tx ?? this.#prisma);
179+
logger.debug("dequeued worker task", {
180+
job,
181+
});
182+
183+
return job;
184+
}
185+
170186
async #addJob(
171187
identifier: string,
172188
payload: unknown,
@@ -190,7 +206,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
190206
spec.queueName || null,
191207
spec.runAt || null,
192208
spec.maxAttempts || null,
193-
spec.jobKey || null,
209+
identifier,
194210
spec.priority || null,
195211
spec.jobKeyMode || null,
196212
spec.flags || null
@@ -209,6 +225,25 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
209225
return job as GraphileJob;
210226
}
211227

228+
async #removeJob(jobKey: string, tx: PrismaClientOrTransaction) {
229+
const result = await tx.$queryRawUnsafe(
230+
`SELECT * FROM graphile_worker.remove_job(
231+
job_key => $1::text
232+
)`,
233+
jobKey
234+
);
235+
236+
const job = GraphileJobSchema.safeParse(result);
237+
238+
if (!job.success) {
239+
throw new Error(
240+
`Failed to remove job from queue, zod parsing error: ${JSON.stringify(job.error)}`
241+
);
242+
}
243+
244+
return job.data as GraphileJob;
245+
}
246+
212247
#createTaskListFromTasks() {
213248
const taskList: TaskList = {};
214249

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import type { ActionArgs, LoaderArgs } from "@remix-run/server-runtime";
2+
import { json } from "@remix-run/server-runtime";
3+
import { z } from "zod";
4+
import { prisma } from "~/db.server";
5+
import { authenticateApiRequest } from "~/services/apiAuth.server";
6+
import { workerQueue } from "~/services/worker.server";
7+
import { logger } from "~/services/logger.server";
8+
9+
const ParamsSchema = z.object({
10+
eventId: z.string(),
11+
});
12+
13+
export async function loader({ request, params }: LoaderArgs) {
14+
// Ensure this is a POST request
15+
if (request.method.toUpperCase() !== "POST") {
16+
return { status: 405, body: "Method Not Allowed" };
17+
}
18+
19+
// Next authenticate the request
20+
const authenticationResult = await authenticateApiRequest(request);
21+
22+
if (!authenticationResult) {
23+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
24+
}
25+
26+
const authenticatedEnv = authenticationResult.environment;
27+
28+
const parsed = ParamsSchema.safeParse(params);
29+
30+
if (!parsed.success) {
31+
return json({ error: "Invalid or Missing eventId" }, { status: 400 });
32+
}
33+
34+
const { eventId } = parsed.data;
35+
36+
const event = await prisma.eventRecord.findFirst({
37+
select: {
38+
id: true,
39+
name: true,
40+
createdAt: true,
41+
updatedAt: true,
42+
environmentId: true,
43+
runs: {
44+
select: {
45+
id: true,
46+
status: true,
47+
startedAt: true,
48+
completedAt: true,
49+
},
50+
},
51+
},
52+
where: {
53+
id: eventId,
54+
environmentId: authenticatedEnv.id,
55+
},
56+
});
57+
if (!event) {
58+
return json({ error: "Event not found" }, { status: 404 });
59+
}
60+
// Update the event with cancelledAt
61+
let updatedEvent;
62+
63+
try {
64+
updatedEvent = await prisma.eventRecord.update({
65+
where: { id: event.id },
66+
data: { cancelledAt: new Date() },
67+
});
68+
69+
// Dequeue the event only if the update is successful
70+
await workerQueue.dequeue(updatedEvent.id, { tx: prisma });
71+
72+
return json(updatedEvent);
73+
} catch (error) {
74+
logger.error("Failed to update event", { error, event });
75+
76+
return json({ error: "Failed to update event" }, { status: 500 });
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "EventRecord" ADD COLUMN "cancelledAt" TIMESTAMP(3);

packages/database/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,7 @@ model EventRecord {
640640
641641
createdAt DateTime @default(now())
642642
updatedAt DateTime @updatedAt
643+
cancelledAt DateTime?
643644
644645
isTest Boolean @default(false)
645646
runs JobRun[]

packages/trigger-sdk/src/apiClient.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,22 @@ export class ApiClient {
201201
});
202202
}
203203

204+
async cancelEvent(eventId: string) {
205+
const apiKey = await this.#apiKey();
206+
207+
this.#logger.debug("Cancelling event", {
208+
eventId,
209+
});
210+
211+
return await zodfetch(ApiEventLogSchema, `${this.#apiUrl}/api/v1/events/${eventId}/cancel`, {
212+
method: "POST",
213+
headers: {
214+
"Content-Type": "application/json",
215+
Authorization: `Bearer ${apiKey}`,
216+
},
217+
});
218+
}
219+
204220
async updateSource(
205221
client: string,
206222
key: string,

packages/trigger-sdk/src/triggerClient.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,10 @@ export class TriggerClient {
556556
return this.#client.sendEvent(event, options);
557557
}
558558

559+
async cancelEvent(eventId: string) {
560+
return this.#client.cancelEvent(eventId);
561+
}
562+
559563
async registerSchedule(id: string, key: string, schedule: ScheduleMetadata) {
560564
return this.#client.registerSchedule(this.id, id, key, schedule);
561565
}

0 commit comments

Comments
 (0)