|
1 | | -import type { CheckpointRestoreEvent, CheckpointRestoreEventType } from "@trigger.dev/database"; |
| 1 | +import type { |
| 2 | + Checkpoint, |
| 3 | + CheckpointRestoreEvent, |
| 4 | + CheckpointRestoreEventType, |
| 5 | +} from "@trigger.dev/database"; |
2 | 6 | import { logger } from "~/services/logger.server"; |
3 | 7 | import { BaseService } from "./baseService.server"; |
| 8 | +import { ManualCheckpointMetadata } from "@trigger.dev/core/v3"; |
| 9 | +import { isTaskRunAttemptStatus, isTaskRunStatus, TaskRunAttemptStatus } from "~/database-types"; |
| 10 | +import { safeJsonParse } from "~/utils/json"; |
4 | 11 |
|
5 | 12 | interface CheckpointRestoreEventCallParams { |
6 | 13 | checkpointId: string; |
@@ -39,6 +46,13 @@ export class CreateCheckpointRestoreEventService extends BaseService { |
39 | 46 | return; |
40 | 47 | } |
41 | 48 |
|
| 49 | + if (params.type === "RESTORE" && checkpoint.reason === "MANUAL") { |
| 50 | + const manualRestoreSuccess = await this.#handleManualCheckpointRestore(checkpoint); |
| 51 | + if (!manualRestoreSuccess) { |
| 52 | + return; |
| 53 | + } |
| 54 | + } |
| 55 | + |
42 | 56 | logger.debug(`Creating checkpoint/restore event`, { params }); |
43 | 57 |
|
44 | 58 | let taskRunDependencyId: string | undefined; |
@@ -99,4 +113,81 @@ export class CreateCheckpointRestoreEventService extends BaseService { |
99 | 113 |
|
100 | 114 | return checkpointEvent; |
101 | 115 | } |
| 116 | + |
| 117 | + async #handleManualCheckpointRestore(checkpoint: Checkpoint): Promise<boolean> { |
| 118 | + const json = checkpoint.metadata ? safeJsonParse(checkpoint.metadata) : undefined; |
| 119 | + |
| 120 | + // We need to restore the previous run and attempt status as saved in the metadata |
| 121 | + const metadata = ManualCheckpointMetadata.safeParse(json); |
| 122 | + |
| 123 | + if (!metadata.success) { |
| 124 | + logger.error("Invalid metadata", { metadata }); |
| 125 | + return false; |
| 126 | + } |
| 127 | + |
| 128 | + const { attemptId, previousAttemptStatus, previousRunStatus } = metadata.data; |
| 129 | + |
| 130 | + if (!isTaskRunAttemptStatus(previousAttemptStatus)) { |
| 131 | + logger.error("Invalid previous attempt status", { previousAttemptStatus }); |
| 132 | + return false; |
| 133 | + } |
| 134 | + |
| 135 | + if (!isTaskRunStatus(previousRunStatus)) { |
| 136 | + logger.error("Invalid previous run status", { previousRunStatus }); |
| 137 | + return false; |
| 138 | + } |
| 139 | + |
| 140 | + try { |
| 141 | + const updatedAttempt = await this._prisma.taskRunAttempt.update({ |
| 142 | + where: { |
| 143 | + id: attemptId, |
| 144 | + }, |
| 145 | + data: { |
| 146 | + status: previousAttemptStatus, |
| 147 | + taskRun: { |
| 148 | + update: { |
| 149 | + data: { |
| 150 | + status: previousRunStatus, |
| 151 | + }, |
| 152 | + }, |
| 153 | + }, |
| 154 | + }, |
| 155 | + select: { |
| 156 | + id: true, |
| 157 | + status: true, |
| 158 | + taskRun: { |
| 159 | + select: { |
| 160 | + id: true, |
| 161 | + status: true, |
| 162 | + }, |
| 163 | + }, |
| 164 | + }, |
| 165 | + }); |
| 166 | + |
| 167 | + logger.debug("Set post resume statuses after manual checkpoint", { |
| 168 | + run: { |
| 169 | + id: updatedAttempt.taskRun.id, |
| 170 | + status: updatedAttempt.taskRun.status, |
| 171 | + }, |
| 172 | + attempt: { |
| 173 | + id: updatedAttempt.id, |
| 174 | + status: updatedAttempt.status, |
| 175 | + }, |
| 176 | + }); |
| 177 | + |
| 178 | + return true; |
| 179 | + } catch (error) { |
| 180 | + logger.error("Failed to set post resume statuses", { |
| 181 | + error: |
| 182 | + error instanceof Error |
| 183 | + ? { |
| 184 | + name: error.name, |
| 185 | + message: error.message, |
| 186 | + stack: error.stack, |
| 187 | + } |
| 188 | + : error, |
| 189 | + }); |
| 190 | + return false; |
| 191 | + } |
| 192 | + } |
102 | 193 | } |
0 commit comments