Skip to content

Commit c8ddc19

Browse files
committed
Fixed an issue with IngestSendEvent not returning an existing event
1 parent c6df574 commit c8ddc19

File tree

4 files changed

+100
-99
lines changed

4 files changed

+100
-99
lines changed

apps/webapp/app/db.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ export type PrismaTransactionOptions = {
3030

3131
/** Sets the transaction isolation level. By default this is set to the value currently configured in your database. */
3232
isolationLevel?: Prisma.TransactionIsolationLevel;
33+
34+
rethrowPrismaErrors?: boolean;
3335
};
3436

3537
export async function $transaction<R>(
@@ -53,6 +55,10 @@ export async function $transaction<R>(
5355
name: error.name,
5456
});
5557

58+
if (options?.rethrowPrismaErrors) {
59+
throw error;
60+
}
61+
5662
return;
5763
}
5864

apps/webapp/app/routes/api.v1.events.$eventId.cancel.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
import type { ActionArgs } from "@remix-run/server-runtime";
22
import { json } from "@remix-run/server-runtime";
33
import { z } from "zod";
4-
import { prisma } from "~/db.server";
54
import { authenticateApiRequest } from "~/services/apiAuth.server";
6-
import { workerQueue } from "~/services/worker.server";
7-
import { logger } from "~/services/logger.server";
85
import { CancelEventService } from "~/services/events/cancelEvent.server";
6+
import { logger } from "~/services/logger.server";
97

108
const ParamsSchema = z.object({
119
eventId: z.string(),

apps/webapp/app/services/events/cancelEvent.server.ts

Lines changed: 23 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,54 +14,34 @@ export class CancelEventService {
1414
environment: AuthenticatedEnvironment,
1515
eventId: string
1616
): Promise<EventRecord | undefined> {
17-
return await $transaction(
18-
this.#prismaClient,
19-
async (tx) => {
20-
const event = await tx.eventRecord.findUnique({
21-
select: {
22-
id: true,
23-
name: true,
24-
createdAt: true,
25-
updatedAt: true,
26-
environmentId: true,
27-
cancelledAt: true,
28-
runs: {
29-
select: {
30-
id: true,
31-
status: true,
32-
startedAt: true,
33-
completedAt: true,
34-
},
35-
},
17+
return await $transaction(this.#prismaClient, async (tx) => {
18+
const event = await tx.eventRecord.findUnique({
19+
where: {
20+
eventId_environmentId: {
21+
eventId: eventId,
22+
environmentId: environment.id,
3623
},
37-
where: {
38-
eventId_environmentId: {
39-
eventId: eventId,
40-
environmentId: environment.id,
41-
},
42-
},
43-
});
24+
},
25+
});
4426

45-
if (!event) {
46-
return;
47-
}
27+
if (!event) {
28+
return;
29+
}
4830

49-
if (event.cancelledAt) {
50-
return event;
51-
}
31+
if (event.cancelledAt) {
32+
return event;
33+
}
5234

53-
//update the cancelledAt column in the eventRecord table
54-
const updatedEvent = await prisma.eventRecord.update({
55-
where: { id: event.id },
56-
data: { cancelledAt: new Date() },
57-
});
35+
//update the cancelledAt column in the eventRecord table
36+
const updatedEvent = await tx.eventRecord.update({
37+
where: { id: event.id },
38+
data: { cancelledAt: new Date() },
39+
});
5840

59-
// Dequeue the event after the db has been updated
60-
await workerQueue.dequeue(`event:${event.id}`, { tx: prisma });
41+
// Dequeue the event after the db has been updated
42+
await workerQueue.dequeue(`event:${event.id}`, { tx });
6143

62-
return updatedEvent;
63-
},
64-
{ timeout: 10000 }
65-
);
44+
return updatedEvent;
45+
});
6646
}
6747
}

apps/webapp/app/services/events/ingestSendEvent.server.ts

Lines changed: 70 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { RawEvent, SendEventOptions } from "@trigger.dev/core";
22
import { $transaction, PrismaClientOrTransaction, PrismaErrorSchema, prisma } from "~/db.server";
33
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
44
import { workerQueue } from "~/services/worker.server";
5+
import { logger } from "../logger.server";
56

67
export class IngestSendEvent {
78
#prismaClient: PrismaClientOrTransaction;
@@ -33,71 +34,87 @@ export class IngestSendEvent {
3334
try {
3435
const deliverAt = this.#calculateDeliverAt(options);
3536

36-
return await $transaction(this.#prismaClient, async (tx) => {
37-
const externalAccount = options?.accountId
38-
? await tx.externalAccount.findUniqueOrThrow({
39-
where: {
40-
environmentId_identifier: {
41-
environmentId: environment.id,
42-
identifier: options.accountId,
37+
return await $transaction(
38+
this.#prismaClient,
39+
async (tx) => {
40+
const externalAccount = options?.accountId
41+
? await tx.externalAccount.findUniqueOrThrow({
42+
where: {
43+
environmentId_identifier: {
44+
environmentId: environment.id,
45+
identifier: options.accountId,
46+
},
4347
},
44-
},
45-
})
46-
: undefined;
48+
})
49+
: undefined;
4750

48-
// Create a new event in the database
49-
const eventLog = await tx.eventRecord.create({
50-
data: {
51-
organization: {
52-
connect: {
53-
id: environment.organizationId,
51+
// Create a new event in the database
52+
const eventLog = await tx.eventRecord.create({
53+
data: {
54+
organization: {
55+
connect: {
56+
id: environment.organizationId,
57+
},
5458
},
55-
},
56-
project: {
57-
connect: {
58-
id: environment.projectId,
59+
project: {
60+
connect: {
61+
id: environment.projectId,
62+
},
5963
},
60-
},
61-
environment: {
62-
connect: {
63-
id: environment.id,
64+
environment: {
65+
connect: {
66+
id: environment.id,
67+
},
6468
},
69+
eventId: event.id,
70+
name: event.name,
71+
timestamp: event.timestamp ?? new Date(),
72+
payload: event.payload ?? {},
73+
context: event.context ?? {},
74+
source: event.source ?? "trigger.dev",
75+
sourceContext,
76+
deliverAt: deliverAt,
77+
externalAccount: externalAccount
78+
? {
79+
connect: {
80+
id: externalAccount.id,
81+
},
82+
}
83+
: {},
6584
},
66-
eventId: event.id,
67-
name: event.name,
68-
timestamp: event.timestamp ?? new Date(),
69-
payload: event.payload ?? {},
70-
context: event.context ?? {},
71-
source: event.source ?? "trigger.dev",
72-
sourceContext,
73-
deliverAt: deliverAt,
74-
externalAccount: externalAccount
75-
? {
76-
connect: {
77-
id: externalAccount.id,
78-
},
79-
}
80-
: {},
81-
},
82-
});
85+
});
8386

84-
if (this.deliverEvents) {
85-
// Produce a message to the event bus
86-
await workerQueue.enqueue(
87-
"deliverEvent",
88-
{
89-
id: eventLog.id,
90-
},
91-
{ runAt: eventLog.deliverAt, tx, jobKey: `event:${eventLog.id}` }
92-
);
93-
}
87+
if (this.deliverEvents) {
88+
// Produce a message to the event bus
89+
await workerQueue.enqueue(
90+
"deliverEvent",
91+
{
92+
id: eventLog.id,
93+
},
94+
{ runAt: eventLog.deliverAt, tx, jobKey: `event:${eventLog.id}` }
95+
);
96+
}
9497

95-
return eventLog;
96-
});
98+
return eventLog;
99+
},
100+
{ rethrowPrismaErrors: true }
101+
);
97102
} catch (error) {
98103
const prismaError = PrismaErrorSchema.safeParse(error);
104+
105+
if (!prismaError.success) {
106+
logger.debug("Error parsing prisma error", {
107+
error,
108+
parseError: prismaError.error.format(),
109+
});
110+
111+
throw error;
112+
}
113+
99114
// If the error is a Prisma unique constraint error, it means that the event already exists
100115
if (prismaError.success && prismaError.data.code === "P2002") {
116+
logger.debug("Event already exists, finding and returning", { event, environment });
117+
101118
return this.#prismaClient.eventRecord.findUniqueOrThrow({
102119
where: {
103120
eventId_environmentId: {

0 commit comments

Comments
 (0)