Skip to content

Commit 0f5503b

Browse files
committed
move over the remaining v3 places that used the postgresql event repository
1 parent f197c58 commit 0f5503b

File tree

6 files changed

+21
-16
lines changed

6 files changed

+21
-16
lines changed

apps/webapp/app/v3/services/cancelAttempt.server.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
import { $transaction, type PrismaClientOrTransaction, prisma } from "~/db.server";
22
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
33
import { logger } from "~/services/logger.server";
4-
import { eventRepository } from "../eventRepository/eventRepository.server";
54
import { isCancellableRunStatus } from "../taskStatus";
65
import { BaseService } from "./baseService.server";
76
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
8-
import { getTaskEventStoreTableForRun } from "../taskEventStore.server";
97

108
export class CancelAttemptService extends BaseService {
119
public async call(

apps/webapp/app/v3/services/cancelTaskRun.server.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
import { RunEngineVersion, type TaskRun } from "@trigger.dev/database";
2-
import { logger } from "~/services/logger.server";
3-
import { eventRepository } from "../eventRepository/eventRepository.server";
42
import { engine } from "../runEngine.server";
5-
import { getTaskEventStoreTableForRun } from "../taskEventStore.server";
63
import { BaseService } from "./baseService.server";
74
import { CancelTaskRunServiceV1 } from "./cancelTaskRunV1.server";
85

apps/webapp/app/v3/services/cancelTaskRunV1.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { type Prisma } from "@trigger.dev/database";
22
import assertNever from "assert-never";
33
import { logger } from "~/services/logger.server";
4-
import { eventRepository } from "../eventRepository/eventRepository.server";
54
import { socketIo } from "../handleSocketIo.server";
65
import { devPubSub } from "../marqs/devPubSub.server";
76
import { CANCELLABLE_ATTEMPT_STATUSES, isCancellableRunStatus } from "../taskStatus";
@@ -11,6 +10,7 @@ import { CancelTaskAttemptDependenciesService } from "./cancelTaskAttemptDepende
1110
import { CancelableTaskRun } from "./cancelTaskRun.server";
1211
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
1312
import { tryCatch } from "@trigger.dev/core/utils";
13+
import { resolveEventRepositoryForStore } from "../eventRepository/index.server";
1414

1515
type ExtendedTaskRun = Prisma.TaskRunGetPayload<{
1616
include: {
@@ -101,6 +101,8 @@ export class CancelTaskRunServiceV1 extends BaseService {
101101
},
102102
});
103103

104+
const eventRepository = resolveEventRepositoryForStore(cancelledTaskRun.taskEventStore);
105+
104106
const [cancelRunEventError] = await tryCatch(
105107
eventRepository.cancelRunEvent({
106108
reason: opts.reason,

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import { env } from "~/env.server";
2222
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
2323
import { logger } from "~/services/logger.server";
2424
import { marqs } from "~/v3/marqs/index.server";
25-
import { eventRepository } from "../eventRepository/eventRepository.server";
2625
import { FailedTaskRunRetryHelper } from "../failedTaskRun.server";
2726
import { socketIo } from "../handleSocketIo.server";
2827
import { createExceptionPropertiesFromError } from "../eventRepository/common.server";
@@ -32,6 +31,7 @@ import { CancelAttemptService } from "./cancelAttempt.server";
3231
import { CreateCheckpointService } from "./createCheckpoint.server";
3332
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
3433
import { RetryAttemptService } from "./retryAttempt.server";
34+
import { resolveEventRepositoryForStore } from "../eventRepository/index.server";
3535

3636
type FoundAttempt = Awaited<ReturnType<typeof findAttempt>>;
3737

@@ -163,6 +163,8 @@ export class CompleteAttemptService extends BaseService {
163163
env,
164164
});
165165

166+
const eventRepository = resolveEventRepositoryForStore(taskRunAttempt.taskRun.taskEventStore);
167+
166168
const [completeSuccessfulRunEventError] = await tryCatch(
167169
eventRepository.completeSuccessfulRunEvent({
168170
run: taskRunAttempt.taskRun,
@@ -314,6 +316,8 @@ export class CompleteAttemptService extends BaseService {
314316
exitRun(taskRunAttempt.taskRunId);
315317
}
316318

319+
const eventRepository = resolveEventRepositoryForStore(taskRunAttempt.taskRun.taskEventStore);
320+
317321
const [completeFailedRunEventError] = await tryCatch(
318322
eventRepository.completeFailedRunEvent({
319323
run: taskRunAttempt.taskRun,
@@ -534,6 +538,8 @@ export class CompleteAttemptService extends BaseService {
534538
}) {
535539
const retryAt = new Date(executionRetry.timestamp);
536540

541+
const eventRepository = resolveEventRepositoryForStore(taskRunAttempt.taskRun.taskEventStore);
542+
537543
// Retry the task run
538544
await eventRepository.recordEvent(
539545
`Retry #${execution.attempt.number} delay${oomMachine ? " after OOM" : ""}`,

apps/webapp/app/v3/services/crashTaskRun.server.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
1+
import { tryCatch } from "@trigger.dev/core/utils";
2+
import { sanitizeError, TaskRunErrorCodes, TaskRunInternalError } from "@trigger.dev/core/v3";
13
import { TaskRun, TaskRunAttempt } from "@trigger.dev/database";
2-
import { eventRepository } from "../eventRepository/eventRepository.server";
3-
import { BaseService } from "./baseService.server";
4-
import { logger } from "~/services/logger.server";
54
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
5+
import { logger } from "~/services/logger.server";
6+
import { FailedTaskRunRetryHelper } from "../failedTaskRun.server";
67
import { CRASHABLE_ATTEMPT_STATUSES, isCrashableRunStatus } from "../taskStatus";
7-
import { sanitizeError, TaskRunErrorCodes, TaskRunInternalError } from "@trigger.dev/core/v3";
8+
import { BaseService } from "./baseService.server";
89
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
9-
import { FailedTaskRunRetryHelper } from "../failedTaskRun.server";
10-
import { getTaskEventStoreTableForRun } from "../taskEventStore.server";
11-
import { tryCatch } from "@trigger.dev/core/utils";
10+
import { resolveEventRepositoryForStore } from "../eventRepository/index.server";
1211

1312
export type CrashTaskRunServiceOptions = {
1413
reason?: string;
@@ -121,6 +120,8 @@ export class CrashTaskRunService extends BaseService {
121120
},
122121
});
123122

123+
const eventRepository = resolveEventRepositoryForStore(crashedTaskRun.taskEventStore);
124+
124125
const [createAttemptFailedEventError] = await tryCatch(
125126
eventRepository.completeFailedRunEvent({
126127
run: crashedTaskRun,

apps/webapp/app/v3/services/expireEnqueuedRun.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
import { PrismaClientOrTransaction } from "~/db.server";
22
import { logger } from "~/services/logger.server";
33
import { commonWorker } from "../commonWorker.server";
4-
import { eventRepository } from "../eventRepository/eventRepository.server";
54
import { BaseService } from "./baseService.server";
65
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
7-
import { getTaskEventStoreTableForRun } from "../taskEventStore.server";
86
import { tryCatch } from "@trigger.dev/core/utils";
7+
import { resolveEventRepositoryForStore } from "../eventRepository/index.server";
98

109
export class ExpireEnqueuedRunService extends BaseService {
1110
public static async ack(runId: string, tx?: PrismaClientOrTransaction) {
@@ -79,6 +78,8 @@ export class ExpireEnqueuedRunService extends BaseService {
7978
},
8079
});
8180

81+
const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
82+
8283
if (run.ttl) {
8384
const [completeExpiredRunEventError] = await tryCatch(
8485
eventRepository.completeExpiredRunEvent({

0 commit comments

Comments
 (0)