Skip to content

Commit b9dc7ce

Browse files
committed
Merge branch 'feat/cancelEvent-function' of https://github.com/Chigala/trigger.dev into Chigala-feat/cancelEvent-function
# Conflicts: # apps/webapp/app/platform/zodWorker.server.ts
2 parents 8cf8544 + 18a0282 commit b9dc7ce

File tree

8 files changed

+168
-5
lines changed

8 files changed

+168
-5
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import type { ActionArgs } from "@remix-run/server-runtime";
2+
import { json } from "@remix-run/server-runtime";
3+
import { z } from "zod";
4+
import { prisma } from "~/db.server";
5+
import { authenticateApiRequest } from "~/services/apiAuth.server";
6+
import { workerQueue } from "~/services/worker.server";
7+
import { logger } from "~/services/logger.server";
8+
import { CancelEventService } from "~/services/events/cancelEvent.server";
9+
10+
const ParamsSchema = z.object({
11+
eventId: z.string(),
12+
});
13+
14+
export async function action({ request, params }: ActionArgs) {
15+
// Ensure this is a POST request
16+
if (request.method.toUpperCase() !== "POST") {
17+
return { status: 405, body: "Method Not Allowed" };
18+
}
19+
20+
// Next authenticate the request
21+
const authenticationResult = await authenticateApiRequest(request);
22+
23+
if (!authenticationResult) {
24+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
25+
}
26+
27+
const authenticatedEnv = authenticationResult.environment;
28+
29+
const parsed = ParamsSchema.safeParse(params);
30+
31+
if (!parsed.success) {
32+
return json({ error: "Invalid or Missing eventId" }, { status: 400 });
33+
}
34+
35+
const { eventId } = parsed.data;
36+
37+
const service = new CancelEventService();
38+
try {
39+
const updatedEvent = await service.call(authenticatedEnv, eventId);
40+
41+
if (!updatedEvent) {
42+
return json({ error: "Event not found" }, { status: 404 });
43+
}
44+
45+
return json(updatedEvent);
46+
} catch (err) {
47+
logger.error("CancelEventService.call() error", {
48+
error: err,
49+
});
50+
51+
return json({ error: "Internal Server Error" }, { status: 500 });
52+
}
53+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import type { EventRecord } from "@trigger.dev/database";
2+
import { $transaction, PrismaClientOrTransaction, prisma } from "~/db.server";
3+
import { workerQueue } from "../worker.server";
4+
import { AuthenticatedEnvironment } from "../apiAuth.server";
5+
6+
export class CancelEventService {
7+
#prismaClient: PrismaClientOrTransaction;
8+
9+
constructor(prismaClient: PrismaClientOrTransaction = prisma) {
10+
this.#prismaClient = prismaClient;
11+
}
12+
13+
public async call(
14+
environment: AuthenticatedEnvironment,
15+
eventId: string
16+
): Promise<EventRecord | undefined> {
17+
return await $transaction(
18+
this.#prismaClient,
19+
async (tx) => {
20+
const event = await tx.eventRecord.findUnique({
21+
select: {
22+
id: true,
23+
name: true,
24+
createdAt: true,
25+
updatedAt: true,
26+
environmentId: true,
27+
cancelledAt: true,
28+
runs: {
29+
select: {
30+
id: true,
31+
status: true,
32+
startedAt: true,
33+
completedAt: true,
34+
},
35+
},
36+
},
37+
where: {
38+
eventId_environmentId: {
39+
eventId: eventId,
40+
environmentId: environment.id,
41+
},
42+
},
43+
});
44+
45+
if (!event) {
46+
return;
47+
}
48+
49+
if (event.cancelledAt) {
50+
return event;
51+
}
52+
53+
//update the cancelledAt column in the eventRecord table
54+
const updatedEvent = await prisma.eventRecord.update({
55+
where: { id: event.id },
56+
data: { cancelledAt: new Date() },
57+
});
58+
59+
// Dequeue the event after the db has been updated
60+
await workerQueue.dequeue(`event:${event.id}`, { tx: prisma });
61+
62+
return updatedEvent;
63+
},
64+
{ timeout: 10000 }
65+
);
66+
}
67+
}

apps/webapp/app/services/events/ingestSendEvent.server.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@ import { workerQueue } from "~/services/worker.server";
66
export class IngestSendEvent {
77
#prismaClient: PrismaClientOrTransaction;
88

9-
constructor(
10-
prismaClient: PrismaClientOrTransaction = prisma,
11-
private deliverEvents = true
12-
) {
9+
constructor(prismaClient: PrismaClientOrTransaction = prisma, private deliverEvents = true) {
1310
this.#prismaClient = prismaClient;
1411
}
1512

@@ -91,7 +88,7 @@ export class IngestSendEvent {
9188
{
9289
id: eventLog.id,
9390
},
94-
{ runAt: eventLog.deliverAt, tx }
91+
{ runAt: eventLog.deliverAt, tx, jobKey: `event:${eventLog.id}` }
9592
);
9693
}
9794

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "EventRecord" ADD COLUMN "cancelledAt" TIMESTAMP(3);

packages/database/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ model EventRecord {
642642
643643
createdAt DateTime @default(now())
644644
updatedAt DateTime @updatedAt
645+
cancelledAt DateTime?
645646
646647
isTest Boolean @default(false)
647648
runs JobRun[]

packages/trigger-sdk/src/apiClient.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,22 @@ export class ApiClient {
201201
});
202202
}
203203

204+
async cancelEvent(eventId: string) {
205+
const apiKey = await this.#apiKey();
206+
207+
this.#logger.debug("Cancelling event", {
208+
eventId,
209+
});
210+
211+
return await zodfetch(ApiEventLogSchema, `${this.#apiUrl}/api/v1/events/${eventId}/cancel`, {
212+
method: "POST",
213+
headers: {
214+
"Content-Type": "application/json",
215+
Authorization: `Bearer ${apiKey}`,
216+
},
217+
});
218+
}
219+
204220
async updateSource(
205221
client: string,
206222
key: string,

packages/trigger-sdk/src/io.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,29 @@ export class IO {
221221
);
222222
}
223223

224+
async cancelEvent(key: string | any[], eventId: string) {
225+
return await this.runTask(
226+
key,
227+
{
228+
name: "cancelEvent",
229+
params: {
230+
eventId,
231+
},
232+
properties: [
233+
{
234+
label: "id",
235+
text: eventId,
236+
},
237+
],
238+
noop: true,
239+
},
240+
async (task) => {
241+
return await this._triggerClient.cancelEvent(eventId);
242+
}
243+
);
244+
}
245+
246+
224247
async updateSource(key: string | any[], options: { key: string } & UpdateTriggerSourceBody) {
225248
return this.runTask(
226249
key,

packages/trigger-sdk/src/triggerClient.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,10 @@ export class TriggerClient {
551551
return this.#client.sendEvent(event, options);
552552
}
553553

554+
async cancelEvent(eventId: string) {
555+
return this.#client.cancelEvent(eventId);
556+
}
557+
554558
async registerSchedule(id: string, key: string, schedule: ScheduleMetadata) {
555559
return this.#client.registerSchedule(this.id, id, key, schedule);
556560
}

0 commit comments

Comments
 (0)