Skip to content

Commit 92f80fd

Browse files
author
Rishi Raj Jain
authored
Merge branch 'triggerdotdev:main' into main
2 parents 1bf8454 + 0493ceb commit 92f80fd

File tree

11 files changed

+256
-59
lines changed

11 files changed

+256
-59
lines changed

.changeset/curly-balloons-matter.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Adding the ability to cancel events that were sent with a delayed delivery

apps/webapp/app/db.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ export type PrismaTransactionOptions = {
3030

3131
/** Sets the transaction isolation level. By default this is set to the value currently configured in your database. */
3232
isolationLevel?: Prisma.TransactionIsolationLevel;
33+
34+
rethrowPrismaErrors?: boolean;
3335
};
3436

3537
export async function $transaction<R>(
@@ -53,6 +55,10 @@ export async function $transaction<R>(
5355
name: error.name,
5456
});
5557

58+
if (options?.rethrowPrismaErrors) {
59+
throw error;
60+
}
61+
5662
return;
5763
}
5864

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import type { ActionArgs } from "@remix-run/server-runtime";
2+
import { json } from "@remix-run/server-runtime";
3+
import { z } from "zod";
4+
import { authenticateApiRequest } from "~/services/apiAuth.server";
5+
import { CancelEventService } from "~/services/events/cancelEvent.server";
6+
import { logger } from "~/services/logger.server";
7+
8+
const ParamsSchema = z.object({
9+
eventId: z.string(),
10+
});
11+
12+
export async function action({ request, params }: ActionArgs) {
13+
// Ensure this is a POST request
14+
if (request.method.toUpperCase() !== "POST") {
15+
return { status: 405, body: "Method Not Allowed" };
16+
}
17+
18+
// Next authenticate the request
19+
const authenticationResult = await authenticateApiRequest(request);
20+
21+
if (!authenticationResult) {
22+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
23+
}
24+
25+
const authenticatedEnv = authenticationResult.environment;
26+
27+
const parsed = ParamsSchema.safeParse(params);
28+
29+
if (!parsed.success) {
30+
return json({ error: "Invalid or Missing eventId" }, { status: 400 });
31+
}
32+
33+
const { eventId } = parsed.data;
34+
35+
const service = new CancelEventService();
36+
try {
37+
const updatedEvent = await service.call(authenticatedEnv, eventId);
38+
39+
if (!updatedEvent) {
40+
return json({ error: "Event not found" }, { status: 404 });
41+
}
42+
43+
return json(updatedEvent);
44+
} catch (err) {
45+
logger.error("CancelEventService.call() error", {
46+
error: err,
47+
});
48+
49+
return json({ error: "Internal Server Error" }, { status: 500 });
50+
}
51+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import type { EventRecord } from "@trigger.dev/database";
2+
import { $transaction, PrismaClientOrTransaction, prisma } from "~/db.server";
3+
import { workerQueue } from "../worker.server";
4+
import { AuthenticatedEnvironment } from "../apiAuth.server";
5+
6+
export class CancelEventService {
7+
#prismaClient: PrismaClientOrTransaction;
8+
9+
constructor(prismaClient: PrismaClientOrTransaction = prisma) {
10+
this.#prismaClient = prismaClient;
11+
}
12+
13+
public async call(
14+
environment: AuthenticatedEnvironment,
15+
eventId: string
16+
): Promise<EventRecord | undefined> {
17+
return await $transaction(this.#prismaClient, async (tx) => {
18+
const event = await tx.eventRecord.findUnique({
19+
where: {
20+
eventId_environmentId: {
21+
eventId: eventId,
22+
environmentId: environment.id,
23+
},
24+
},
25+
});
26+
27+
if (!event) {
28+
return;
29+
}
30+
31+
if (event.cancelledAt) {
32+
return event;
33+
}
34+
35+
//update the cancelledAt column in the eventRecord table
36+
const updatedEvent = await tx.eventRecord.update({
37+
where: { id: event.id },
38+
data: { cancelledAt: new Date() },
39+
});
40+
41+
// Dequeue the event after the db has been updated
42+
await workerQueue.dequeue(`event:${event.id}`, { tx });
43+
44+
return updatedEvent;
45+
});
46+
}
47+
}

apps/webapp/app/services/events/ingestSendEvent.server.ts

Lines changed: 71 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@ import type { RawEvent, SendEventOptions } from "@trigger.dev/core";
22
import { $transaction, PrismaClientOrTransaction, PrismaErrorSchema, prisma } from "~/db.server";
33
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
44
import { workerQueue } from "~/services/worker.server";
5+
import { logger } from "../logger.server";
56

67
export class IngestSendEvent {
78
#prismaClient: PrismaClientOrTransaction;
89

9-
constructor(
10-
prismaClient: PrismaClientOrTransaction = prisma,
11-
private deliverEvents = true
12-
) {
10+
constructor(prismaClient: PrismaClientOrTransaction = prisma, private deliverEvents = true) {
1311
this.#prismaClient = prismaClient;
1412
}
1513

@@ -36,71 +34,87 @@ export class IngestSendEvent {
3634
try {
3735
const deliverAt = this.#calculateDeliverAt(options);
3836

39-
return await $transaction(this.#prismaClient, async (tx) => {
40-
const externalAccount = options?.accountId
41-
? await tx.externalAccount.findUniqueOrThrow({
42-
where: {
43-
environmentId_identifier: {
44-
environmentId: environment.id,
45-
identifier: options.accountId,
37+
return await $transaction(
38+
this.#prismaClient,
39+
async (tx) => {
40+
const externalAccount = options?.accountId
41+
? await tx.externalAccount.findUniqueOrThrow({
42+
where: {
43+
environmentId_identifier: {
44+
environmentId: environment.id,
45+
identifier: options.accountId,
46+
},
4647
},
47-
},
48-
})
49-
: undefined;
48+
})
49+
: undefined;
5050

51-
// Create a new event in the database
52-
const eventLog = await tx.eventRecord.create({
53-
data: {
54-
organization: {
55-
connect: {
56-
id: environment.organizationId,
51+
// Create a new event in the database
52+
const eventLog = await tx.eventRecord.create({
53+
data: {
54+
organization: {
55+
connect: {
56+
id: environment.organizationId,
57+
},
5758
},
58-
},
59-
project: {
60-
connect: {
61-
id: environment.projectId,
59+
project: {
60+
connect: {
61+
id: environment.projectId,
62+
},
6263
},
63-
},
64-
environment: {
65-
connect: {
66-
id: environment.id,
64+
environment: {
65+
connect: {
66+
id: environment.id,
67+
},
6768
},
69+
eventId: event.id,
70+
name: event.name,
71+
timestamp: event.timestamp ?? new Date(),
72+
payload: event.payload ?? {},
73+
context: event.context ?? {},
74+
source: event.source ?? "trigger.dev",
75+
sourceContext,
76+
deliverAt: deliverAt,
77+
externalAccount: externalAccount
78+
? {
79+
connect: {
80+
id: externalAccount.id,
81+
},
82+
}
83+
: {},
6884
},
69-
eventId: event.id,
70-
name: event.name,
71-
timestamp: event.timestamp ?? new Date(),
72-
payload: event.payload ?? {},
73-
context: event.context ?? {},
74-
source: event.source ?? "trigger.dev",
75-
sourceContext,
76-
deliverAt: deliverAt,
77-
externalAccount: externalAccount
78-
? {
79-
connect: {
80-
id: externalAccount.id,
81-
},
82-
}
83-
: {},
84-
},
85-
});
85+
});
8686

87-
if (this.deliverEvents) {
88-
// Produce a message to the event bus
89-
await workerQueue.enqueue(
90-
"deliverEvent",
91-
{
92-
id: eventLog.id,
93-
},
94-
{ runAt: eventLog.deliverAt, tx }
95-
);
96-
}
87+
if (this.deliverEvents) {
88+
// Produce a message to the event bus
89+
await workerQueue.enqueue(
90+
"deliverEvent",
91+
{
92+
id: eventLog.id,
93+
},
94+
{ runAt: eventLog.deliverAt, tx, jobKey: `event:${eventLog.id}` }
95+
);
96+
}
9797

98-
return eventLog;
99-
});
98+
return eventLog;
99+
},
100+
{ rethrowPrismaErrors: true }
101+
);
100102
} catch (error) {
101103
const prismaError = PrismaErrorSchema.safeParse(error);
104+
105+
if (!prismaError.success) {
106+
logger.debug("Error parsing prisma error", {
107+
error,
108+
parseError: prismaError.error.format(),
109+
});
110+
111+
throw error;
112+
}
113+
102114
// If the error is a Prisma unique constraint error, it means that the event already exists
103115
if (prismaError.success && prismaError.data.code === "P2002") {
116+
logger.debug("Event already exists, finding and returning", { event, environment });
117+
104118
return this.#prismaClient.eventRecord.findUniqueOrThrow({
105119
where: {
106120
eventId_environmentId: {

examples/job-catalog/src/events.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,26 @@ client.defineJob({
2929
},
3030
});
3131

32+
client.defineJob({
33+
id: "cancel-event-example",
34+
name: "Cancel Event Example",
35+
version: "1.0.0",
36+
trigger: eventTrigger({
37+
name: "cancel.event.example",
38+
}),
39+
run: async (payload, io, ctx) => {
40+
await io.sendEvent(
41+
"send-event",
42+
{ name: "Cancellable Event", id: payload.id },
43+
{
44+
deliverAt: new Date(Date.now() + 1000 * 60 * 60 * 24), // 24 hours from now
45+
}
46+
);
47+
48+
await io.wait("wait-1", 60); // 1 minute
49+
50+
await io.cancelEvent("cancel-event", payload.id);
51+
},
52+
});
53+
3254
createExpressServer(client);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "EventRecord" ADD COLUMN "cancelledAt" TIMESTAMP(3);

packages/database/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ model EventRecord {
642642
643643
createdAt DateTime @default(now())
644644
updatedAt DateTime @updatedAt
645+
cancelledAt DateTime?
645646
646647
isTest Boolean @default(false)
647648
runs JobRun[]

packages/trigger-sdk/src/apiClient.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,22 @@ export class ApiClient {
201201
});
202202
}
203203

204+
async cancelEvent(eventId: string) {
205+
const apiKey = await this.#apiKey();
206+
207+
this.#logger.debug("Cancelling event", {
208+
eventId,
209+
});
210+
211+
return await zodfetch(ApiEventLogSchema, `${this.#apiUrl}/api/v1/events/${eventId}/cancel`, {
212+
method: "POST",
213+
headers: {
214+
"Content-Type": "application/json",
215+
Authorization: `Bearer ${apiKey}`,
216+
},
217+
});
218+
}
219+
204220
async updateSource(
205221
client: string,
206222
key: string,

packages/trigger-sdk/src/io.ts

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,32 @@ export class IO {
221221
);
222222
}
223223

224+
/** `io.cancelEvent()` allows you to cancel an event that was previously sent with `io.sendEvent()`. This will prevent any Jobs from running that are listening for that event if the event was sent with a delay
225+
* @param key
226+
* @param eventId
227+
* @returns
228+
*/
229+
async cancelEvent(key: string | any[], eventId: string) {
230+
return await this.runTask(
231+
key,
232+
{
233+
name: "cancelEvent",
234+
params: {
235+
eventId,
236+
},
237+
properties: [
238+
{
239+
label: "id",
240+
text: eventId,
241+
},
242+
],
243+
},
244+
async (task) => {
245+
return await this._triggerClient.cancelEvent(eventId);
246+
}
247+
);
248+
}
249+
224250
async updateSource(key: string | any[], options: { key: string } & UpdateTriggerSourceBody) {
225251
return this.runTask(
226252
key,
@@ -432,7 +458,10 @@ export class IO {
432458
= * @param onError The callback that will be called when the Task fails. The callback receives the error, the Task and the IO as parameters. If you wish to retry then return an object with a `retryAt` property.
433459
* @returns A Promise that resolves with the returned value of the callback.
434460
*/
435-
async runTask<TResult extends SerializableJson | void = void, TCallbackResult extends unknown = TResult>(
461+
async runTask<
462+
TResult extends SerializableJson | void = void,
463+
TCallbackResult extends unknown = TResult,
464+
>(
436465
key: string | any[],
437466
options: RunTaskOptions,
438467
callback: (task: IOTask, io: IO) => Promise<TCallbackResult | TResult>,
@@ -525,7 +554,7 @@ export class IO {
525554
const result = await callback(task, this);
526555

527556
const output = SerializableJsonSchema.parse(result) as TResult;
528-
557+
529558
this._logger.debug("Completing using output", {
530559
idempotencyKey,
531560
task,

0 commit comments

Comments
 (0)