Skip to content

Commit 0d1311c

Browse files
committed
move v3 over to new methods
1 parent 3cb3a99 commit 0d1311c

File tree

4 files changed

+84
-271
lines changed

4 files changed

+84
-271
lines changed

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 1 addition & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@ import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions"
44
import {
55
AttemptFailedSpanEvent,
66
correctErrorStackTrace,
7-
createPacketAttributesAsJson,
87
ExceptionEventProperties,
98
ExceptionSpanEvent,
109
flattenAttributes,
11-
IOPacket,
1210
isExceptionSpanEvent,
1311
NULL_SENTINEL,
1412
omit,
@@ -23,14 +21,7 @@ import {
2321
unflattenAttributes,
2422
} from "@trigger.dev/core/v3";
2523
import { parseTraceparent, serializeTraceparent } from "@trigger.dev/core/v3/isomorphic";
26-
import {
27-
Prisma,
28-
RuntimeEnvironmentType,
29-
TaskEvent,
30-
TaskEventKind,
31-
TaskEventStatus,
32-
TaskRun,
33-
} from "@trigger.dev/database";
24+
import { Prisma, TaskEvent, TaskEventKind, TaskEventStatus, TaskRun } from "@trigger.dev/database";
3425
import { nanoid } from "nanoid";
3526
import { createHash } from "node:crypto";
3627
import { EventEmitter } from "node:stream";
@@ -316,66 +307,6 @@ export class EventRepository {
316307
return await this.#flushBatch(nanoid(), events);
317308
}
318309

319-
async completeEvent(
320-
storeTable: TaskEventStoreTable,
321-
spanId: string,
322-
startCreatedAt: Date,
323-
endCreatedAt?: Date,
324-
options?: UpdateEventOptions
325-
) {
326-
const events = await this.queryIncompleteEvents(
327-
storeTable,
328-
{ spanId },
329-
startCreatedAt,
330-
endCreatedAt
331-
);
332-
333-
if (events.length === 0) {
334-
logger.warn("No incomplete events found for spanId", { spanId, options });
335-
return;
336-
}
337-
338-
const event = events[0];
339-
340-
const output = options?.attributes.output
341-
? await createPacketAttributesAsJson(
342-
options?.attributes.output,
343-
options?.attributes.outputType ?? "application/json"
344-
)
345-
: undefined;
346-
347-
logger.debug("Completing event", {
348-
spanId,
349-
eventId: event.id,
350-
});
351-
352-
const completedEvent = {
353-
...omit(event, "id"),
354-
isPartial: false,
355-
isError: options?.attributes.isError ?? false,
356-
isCancelled: false,
357-
status: options?.attributes.isError ? "ERROR" : "OK",
358-
links: event.links ?? [],
359-
events: event.events ?? (options?.events as any) ?? [],
360-
duration: calculateDurationFromStart(event.startTime, options?.endTime),
361-
properties: event.properties as Attributes,
362-
metadata: event.metadata as Attributes,
363-
style: event.style as Attributes,
364-
output: output,
365-
outputType:
366-
options?.attributes.outputType === "application/store" ||
367-
options?.attributes.outputType === "text/plain"
368-
? options?.attributes.outputType
369-
: "application/json",
370-
payload: event.payload as Attributes,
371-
payloadType: event.payloadType,
372-
} satisfies CreatableEvent;
373-
374-
await this.insert(completedEvent);
375-
376-
return completedEvent;
377-
}
378-
379310
async completeSuccessfulRunEvent({ run, endTime }: { run: CompleteableTaskRun; endTime?: Date }) {
380311
const startTime = convertDateToNanoseconds(run.createdAt);
381312

@@ -717,75 +648,6 @@ export class EventRepository {
717648
});
718649
}
719650

720-
async #queryEvents(
721-
storeTable: TaskEventStoreTable,
722-
queryOptions: QueryOptions,
723-
startCreatedAt: Date,
724-
endCreatedAt?: Date
725-
): Promise<TaskEventRecord[]> {
726-
return await this.taskEventStore.findMany(
727-
storeTable,
728-
queryOptions,
729-
startCreatedAt,
730-
endCreatedAt
731-
);
732-
}
733-
734-
async queryIncompleteEvents(
735-
storeTable: TaskEventStoreTable,
736-
queryOptions: QueryOptions,
737-
startCreatedAt: Date,
738-
endCreatedAt?: Date,
739-
allowCompleteDuplicate = false
740-
) {
741-
// First we will find all the events that match the query options (selecting minimal data).
742-
const taskEvents = await this.taskEventStore.findMany(
743-
storeTable,
744-
queryOptions,
745-
startCreatedAt,
746-
endCreatedAt,
747-
{ spanId: true, isPartial: true, isCancelled: true },
748-
undefined,
749-
{ limit: 500 }
750-
);
751-
752-
// Optimize the filtering by pre-processing the data
753-
const completeEventSpanIds = new Set<string>();
754-
const incompleteEvents: Array<{ spanId: string }> = [];
755-
756-
// Single pass to categorize events and build lookup structures
757-
for (const event of taskEvents) {
758-
if (!event.isPartial && !event.isCancelled) {
759-
// This is a complete event
760-
completeEventSpanIds.add(event.spanId);
761-
} else if (event.isPartial && !event.isCancelled) {
762-
// This is a potentially incomplete event
763-
incompleteEvents.push(event);
764-
}
765-
// Skip cancelled events as they are not incomplete
766-
}
767-
768-
// Filter incomplete events, excluding those with complete duplicates
769-
const filteredTaskEvents = allowCompleteDuplicate
770-
? incompleteEvents
771-
: incompleteEvents.filter((event) => !completeEventSpanIds.has(event.spanId));
772-
773-
if (filteredTaskEvents.length === 0) {
774-
return [];
775-
}
776-
777-
return this.#queryEvents(
778-
storeTable,
779-
{
780-
spanId: {
781-
in: filteredTaskEvents.map((event) => event.spanId),
782-
},
783-
},
784-
startCreatedAt,
785-
endCreatedAt
786-
);
787-
}
788-
789651
public async getTraceSummary(
790652
storeTable: TaskEventStoreTable,
791653
traceId: string,

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 50 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import { CancelAttemptService } from "./cancelAttempt.server";
3333
import { CreateCheckpointService } from "./createCheckpoint.server";
3434
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
3535
import { RetryAttemptService } from "./retryAttempt.server";
36+
import { tryCatch } from "@trigger.dev/core/utils";
3637

3738
type FoundAttempt = Awaited<ReturnType<typeof findAttempt>>;
3839

@@ -164,27 +165,20 @@ export class CompleteAttemptService extends BaseService {
164165
env,
165166
});
166167

167-
// Now we need to "complete" the task run event/span
168-
await eventRepository.completeEvent(
169-
getTaskEventStoreTableForRun(taskRunAttempt.taskRun),
170-
taskRunAttempt.taskRun.spanId,
171-
taskRunAttempt.taskRun.createdAt,
172-
taskRunAttempt.taskRun.completedAt ?? undefined,
173-
{
168+
const [completeSuccessfulRunEventError] = await tryCatch(
169+
eventRepository.completeSuccessfulRunEvent({
170+
run: taskRunAttempt.taskRun,
174171
endTime: new Date(),
175-
attributes: {
176-
isError: false,
177-
output:
178-
completion.outputType === "application/store" || completion.outputType === "text/plain"
179-
? completion.output
180-
: completion.output
181-
? (safeJsonParse(completion.output) as Attributes)
182-
: undefined,
183-
outputType: completion.outputType,
184-
},
185-
}
172+
})
186173
);
187174

175+
if (completeSuccessfulRunEventError) {
176+
logger.error("[CompleteAttemptService] Failed to complete successful run event", {
177+
error: completeSuccessfulRunEventError,
178+
runId: taskRunAttempt.taskRunId,
179+
});
180+
}
181+
188182
return "COMPLETED";
189183
}
190184

@@ -322,29 +316,21 @@ export class CompleteAttemptService extends BaseService {
322316
exitRun(taskRunAttempt.taskRunId);
323317
}
324318

325-
// Now we need to "complete" the task run event/span
326-
await eventRepository.completeEvent(
327-
getTaskEventStoreTableForRun(taskRunAttempt.taskRun),
328-
taskRunAttempt.taskRun.spanId,
329-
taskRunAttempt.taskRun.createdAt,
330-
taskRunAttempt.taskRun.completedAt ?? undefined,
331-
{
319+
const [completeFailedRunEventError] = await tryCatch(
320+
eventRepository.completeFailedRunEvent({
321+
run: taskRunAttempt.taskRun,
332322
endTime: failedAt,
333-
attributes: {
334-
isError: true,
335-
},
336-
events: [
337-
{
338-
name: "exception",
339-
time: failedAt,
340-
properties: {
341-
exception: createExceptionPropertiesFromError(sanitizedError),
342-
},
343-
},
344-
],
345-
}
323+
exception: createExceptionPropertiesFromError(sanitizedError),
324+
})
346325
);
347326

327+
if (completeFailedRunEventError) {
328+
logger.error("[CompleteAttemptService] Failed to complete failed run event", {
329+
error: completeFailedRunEventError,
330+
runId: taskRunAttempt.taskRunId,
331+
});
332+
}
333+
348334
await this._prisma.taskRun.update({
349335
where: {
350336
id: taskRunAttempt.taskRunId,
@@ -385,64 +371,43 @@ export class CompleteAttemptService extends BaseService {
385371
return "COMPLETED";
386372
}
387373

388-
const inProgressEvents = await eventRepository.queryIncompleteEvents(
389-
getTaskEventStoreTableForRun(taskRunAttempt.taskRun),
390-
{
391-
runId: taskRunAttempt.taskRun.friendlyId,
392-
},
393-
taskRunAttempt.taskRun.createdAt,
394-
taskRunAttempt.taskRun.completedAt ?? undefined
395-
);
396-
397374
// Handle in-progress events
398375
switch (status) {
399376
case "CRASHED": {
400-
logger.debug("[CompleteAttemptService] Crashing in-progress events", {
401-
inProgressEvents: inProgressEvents.map((event) => event.id),
402-
});
403-
404-
await Promise.all(
405-
inProgressEvents.map((event) => {
406-
return eventRepository.crashEvent({
407-
event,
408-
crashedAt: failedAt,
409-
exception: createExceptionPropertiesFromError(sanitizedError),
410-
});
377+
const [createAttemptFailedEventError] = await tryCatch(
378+
eventRepository.createAttemptFailedRunEvent({
379+
run: taskRunAttempt.taskRun,
380+
endTime: failedAt,
381+
attemptNumber: taskRunAttempt.number,
382+
exception: createExceptionPropertiesFromError(sanitizedError),
411383
})
412384
);
413385

386+
if (createAttemptFailedEventError) {
387+
logger.error("[CompleteAttemptService] Failed to create attempt failed run event", {
388+
error: createAttemptFailedEventError,
389+
runId: taskRunAttempt.taskRunId,
390+
});
391+
}
392+
414393
break;
415394
}
416395
case "SYSTEM_FAILURE": {
417-
logger.debug("[CompleteAttemptService] Failing in-progress events", {
418-
inProgressEvents: inProgressEvents.map((event) => event.id),
419-
});
420-
421-
await Promise.all(
422-
inProgressEvents.map((event) => {
423-
return eventRepository.completeEvent(
424-
getTaskEventStoreTableForRun(taskRunAttempt.taskRun),
425-
event.spanId,
426-
taskRunAttempt.taskRun.createdAt,
427-
taskRunAttempt.taskRun.completedAt ?? undefined,
428-
{
429-
endTime: failedAt,
430-
attributes: {
431-
isError: true,
432-
},
433-
events: [
434-
{
435-
name: "exception",
436-
time: failedAt,
437-
properties: {
438-
exception: createExceptionPropertiesFromError(sanitizedError),
439-
},
440-
},
441-
],
442-
}
443-
);
396+
const [createAttemptFailedEventError] = await tryCatch(
397+
eventRepository.createAttemptFailedRunEvent({
398+
run: taskRunAttempt.taskRun,
399+
endTime: failedAt,
400+
attemptNumber: taskRunAttempt.number,
401+
exception: createExceptionPropertiesFromError(sanitizedError),
444402
})
445403
);
404+
405+
if (createAttemptFailedEventError) {
406+
logger.error("[CompleteAttemptService] Failed to create attempt failed run event", {
407+
error: createAttemptFailedEventError,
408+
runId: taskRunAttempt.taskRunId,
409+
});
410+
}
446411
}
447412
}
448413

0 commit comments

Comments
 (0)