Skip to content

Commit e74b750

Browse files
committed
Backfill run replication admin API endpoint
1 parent 3a4a622 commit e74b750

File tree

2 files changed

+91
-0
lines changed

2 files changed

+91
-0
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { type TaskRun } from "@trigger.dev/database";
3+
import { z } from "zod";
4+
import { prisma } from "~/db.server";
5+
import { logger } from "~/services/logger.server";
6+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
7+
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
8+
import { FINAL_RUN_STATUSES } from "~/v3/taskStatus";
9+
10+
const Body = z.object({
11+
runIds: z.array(z.string()),
12+
});
13+
14+
const MAX_BATCH_SIZE = 50;
15+
16+
export async function action({ request }: ActionFunctionArgs) {
17+
// Next authenticate the request
18+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
19+
20+
if (!authenticationResult) {
21+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
22+
}
23+
24+
const user = await prisma.user.findUnique({
25+
where: {
26+
id: authenticationResult.userId,
27+
},
28+
});
29+
30+
if (!user) {
31+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
32+
}
33+
34+
if (!user.admin) {
35+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
36+
}
37+
38+
try {
39+
const body = await request.json();
40+
const { runIds } = Body.parse(body);
41+
42+
logger.info("Backfilling runs", { runIds });
43+
44+
const runs: TaskRun[] = [];
45+
for (let i = 0; i < runIds.length; i += MAX_BATCH_SIZE) {
46+
const batch = runIds.slice(i, i + MAX_BATCH_SIZE);
47+
const batchRuns = await prisma.taskRun.findMany({
48+
where: {
49+
id: { in: batch },
50+
status: {
51+
in: FINAL_RUN_STATUSES,
52+
},
53+
},
54+
});
55+
runs.push(...batchRuns);
56+
}
57+
58+
if (!runsReplicationInstance) {
59+
throw new Error("Runs replication instance not found");
60+
}
61+
62+
await runsReplicationInstance.backfill(runs);
63+
64+
logger.info("Backfilled runs", { runs });
65+
66+
return json({
67+
success: true,
68+
runCount: runs.length,
69+
});
70+
} catch (error) {
71+
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
72+
}
73+
}

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,24 @@ export class RunsReplicationService {
214214
}
215215
}
216216

217+
async backfill(runs: TaskRun[]) {
218+
// divide into batches of 50 to get data from Postgres
219+
const flushId = nanoid();
220+
// Use current timestamp as LSN (high enough to be above existing data)
221+
const now = Date.now();
222+
const syntheticLsn = `${now.toString(16).padStart(8, "0").toUpperCase()}/00000000`;
223+
const baseVersion = lsnToUInt64(syntheticLsn);
224+
225+
await this.#flushBatch(
226+
flushId,
227+
runs.map((run, index) => ({
228+
_version: baseVersion + BigInt(index),
229+
run,
230+
event: "insert",
231+
}))
232+
);
233+
}
234+
217235
#handleData(lsn: string, message: PgoutputMessage, parseDuration: bigint) {
218236
this.logger.debug("Handling data", {
219237
lsn,

0 commit comments

Comments
 (0)