Skip to content

Commit ef1a1ff

Browse files
committed
added io.cancelEvent support and fixed other requested changes
1 parent a3278cb commit ef1a1ff

File tree

4 files changed

+99
-33
lines changed

4 files changed

+99
-33
lines changed

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
205205
spec.queueName || null,
206206
spec.runAt || null,
207207
spec.maxAttempts || null,
208-
spec.jobKey,
208+
spec.jobKey || null,
209209
spec.priority || null,
210210
spec.jobKeyMode || null,
211211
spec.flags || null

apps/webapp/app/routes/api.v1.events.$eventId.cancel.ts

Lines changed: 15 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { prisma } from "~/db.server";
55
import { authenticateApiRequest } from "~/services/apiAuth.server";
66
import { workerQueue } from "~/services/worker.server";
77
import { logger } from "~/services/logger.server";
8+
import { CancelEventService } from "~/services/events/cancelEvent.server";
89

910
const ParamsSchema = z.object({
1011
eventId: z.string(),
@@ -33,39 +34,21 @@ export async function action({ request, params }: ActionArgs) {
3334

3435
const { eventId } = parsed.data;
3536

36-
const event = await prisma.eventRecord.findFirst({
37-
select: {
38-
id: true,
39-
name: true,
40-
createdAt: true,
41-
updatedAt: true,
42-
environmentId: true,
43-
runs: {
44-
select: {
45-
id: true,
46-
status: true,
47-
startedAt: true,
48-
completedAt: true,
49-
},
50-
},
51-
},
52-
where: {
53-
id: eventId,
54-
environmentId: authenticatedEnv.id,
55-
},
56-
});
57-
if (!event) {
58-
return json({ error: "Event not found" }, { status: 404 });
59-
}
37+
const service = new CancelEventService();
38+
try {
39+
const updatedEvent = await service.call(authenticatedEnv, eventId);
6040

61-
//update the cancelledAt column in the eventRecord table
62-
const updatedEvent = await prisma.eventRecord.update({
63-
where: { id: event.id },
64-
data: { cancelledAt: new Date() },
65-
});
41+
if (!updatedEvent) {
42+
return json({ error: "Event not found" }, { status: 404 });
43+
}
6644

67-
// Dequeue the event after the db has been updated
68-
await workerQueue.dequeue(event.id, { tx: prisma });
45+
return json(updatedEvent);
46+
} catch (err) {
47+
//send the error
48+
logger.error("CancelEventService.call() error", {
49+
error: err,
50+
});
6951

70-
return json(updatedEvent);
52+
return json({ error: "Internal Server Error" }, { status: 500 });
53+
}
7154
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import type { EventDispatcher, EventRecord } from "@trigger.dev/database";
2+
import type { EventFilter } from "@trigger.dev/core";
3+
import { EventFilterSchema, eventFilterMatches } from "@trigger.dev/core";
4+
import { $transaction, PrismaClientOrTransaction, prisma } from "~/db.server";
5+
import { logger } from "~/services/logger.server";
6+
import { workerQueue } from "../worker.server";
7+
import { AuthenticatedEnvironment } from "../apiAuth.server";
8+
9+
export class CancelEventService {
10+
#prismaClient: PrismaClientOrTransaction;
11+
12+
constructor(prismaClient: PrismaClientOrTransaction = prisma) {
13+
this.#prismaClient = prismaClient;
14+
}
15+
16+
public async call(environment: AuthenticatedEnvironment, eventId: string): Promise<EventRecord | undefined> {
17+
return await $transaction(
18+
this.#prismaClient,
19+
async (tx) => {
20+
const event = await tx.eventRecord.findFirst({
21+
select: {
22+
id: true,
23+
name: true,
24+
createdAt: true,
25+
updatedAt: true,
26+
environmentId: true,
27+
runs: {
28+
select: {
29+
id: true,
30+
status: true,
31+
startedAt: true,
32+
completedAt: true,
33+
},
34+
},
35+
},
36+
where: {
37+
id: eventId,
38+
environmentId: environment.id,
39+
},
40+
});
41+
42+
if (!event) {
43+
return;
44+
}
45+
46+
//update the cancelledAt column in the eventRecord table
47+
const updatedEvent = await prisma.eventRecord.update({
48+
where: { id: event.id },
49+
data: { cancelledAt: new Date() },
50+
});
51+
52+
// Dequeue the event after the db has been updated
53+
await workerQueue.dequeue(event.id, { tx: prisma });
54+
55+
return updatedEvent;
56+
},
57+
{ timeout: 10000 }
58+
);
59+
}
60+
}

packages/trigger-sdk/src/io.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,29 @@ export class IO {
220220
);
221221
}
222222

223+
async cancelEvent(key: string | any[], eventId: string) {
224+
return await this.runTask(
225+
key,
226+
{
227+
name: "cancelEvent",
228+
params: {
229+
eventId,
230+
},
231+
properties: [
232+
{
233+
label: "id",
234+
text: eventId,
235+
},
236+
],
237+
noop: true,
238+
},
239+
async (task) => {
240+
return await this._triggerClient.cancelEvent(eventId);
241+
}
242+
);
243+
}
244+
245+
223246
async updateSource(key: string | any[], options: { key: string } & UpdateTriggerSourceBody) {
224247
return this.runTask(
225248
key,

0 commit comments

Comments
 (0)