Skip to content

Commit 07967bd

Browse files
committed
Implement next runs list powered by clickhouse
1 parent f3e2643 commit 07967bd

File tree

5 files changed

+363
-34
lines changed

5 files changed

+363
-34
lines changed
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
import { ClickHouse } from "@internal/clickhouse";
2+
import { PrismaClient, PrismaClientOrTransaction, type TaskRunStatus } from "@trigger.dev/database";
3+
import { type Direction } from "~/components/ListPagination";
4+
import { timeFilters } from "~/components/runs/v3/SharedFilters";
5+
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
6+
import { getAllTaskIdentifiers } from "~/models/task.server";
7+
import { RunsRepository } from "~/services/runsRepository.server";
8+
import { ServiceValidationError } from "~/v3/services/baseService.server";
9+
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";
10+
import parseDuration from "parse-duration";
11+
12+
export type RunListOptions = {
13+
userId?: string;
14+
projectId: string;
15+
//filters
16+
tasks?: string[];
17+
versions?: string[];
18+
statuses?: TaskRunStatus[];
19+
tags?: string[];
20+
scheduleId?: string;
21+
period?: string;
22+
bulkId?: string;
23+
from?: number;
24+
to?: number;
25+
isTest?: boolean;
26+
rootOnly?: boolean;
27+
batchId?: string;
28+
runIds?: string[];
29+
//pagination
30+
direction?: Direction;
31+
cursor?: string;
32+
pageSize?: number;
33+
};
34+
35+
const DEFAULT_PAGE_SIZE = 25;
36+
37+
export type NextRunList = Awaited<ReturnType<NextRunListPresenter["call"]>>;
38+
export type NextRunListItem = NextRunList["runs"][0];
39+
export type NextRunListAppliedFilters = NextRunList["filters"];
40+
41+
export class NextRunListPresenter {
42+
constructor(
43+
private readonly replica: PrismaClientOrTransaction,
44+
private readonly clickhouse: ClickHouse
45+
) {}
46+
47+
public async call(
48+
environmentId: string,
49+
{
50+
userId,
51+
projectId,
52+
tasks,
53+
versions,
54+
statuses,
55+
tags,
56+
scheduleId,
57+
period,
58+
bulkId,
59+
isTest,
60+
rootOnly,
61+
batchId,
62+
runIds,
63+
from,
64+
to,
65+
direction = "forward",
66+
cursor,
67+
pageSize = DEFAULT_PAGE_SIZE,
68+
}: RunListOptions
69+
) {
70+
//get the time values from the raw values (including a default period)
71+
const time = timeFilters({
72+
period,
73+
from,
74+
to,
75+
});
76+
77+
const periodMs = time.period ? parseDuration(time.period) : undefined;
78+
79+
const hasStatusFilters = statuses && statuses.length > 0;
80+
81+
const hasFilters =
82+
(tasks !== undefined && tasks.length > 0) ||
83+
(versions !== undefined && versions.length > 0) ||
84+
hasStatusFilters ||
85+
(bulkId !== undefined && bulkId !== "") ||
86+
(scheduleId !== undefined && scheduleId !== "") ||
87+
(tags !== undefined && tags.length > 0) ||
88+
batchId !== undefined ||
89+
(runIds !== undefined && runIds.length > 0) ||
90+
typeof isTest === "boolean" ||
91+
rootOnly === true ||
92+
!time.isDefault;
93+
94+
//get all possible tasks
95+
const possibleTasksAsync = getAllTaskIdentifiers(this.replica, environmentId);
96+
97+
//get possible bulk actions
98+
// TODO: we should replace this with the new bulk stuff and make it environment scoped
99+
const bulkActionsAsync = this.replica.bulkActionGroup.findMany({
100+
select: {
101+
friendlyId: true,
102+
type: true,
103+
createdAt: true,
104+
},
105+
where: {
106+
projectId: projectId,
107+
},
108+
orderBy: {
109+
createdAt: "desc",
110+
},
111+
take: 20,
112+
});
113+
114+
const [possibleTasks, bulkActions, displayableEnvironment] = await Promise.all([
115+
possibleTasksAsync,
116+
bulkActionsAsync,
117+
findDisplayableEnvironment(environmentId, userId),
118+
]);
119+
120+
if (!displayableEnvironment) {
121+
throw new ServiceValidationError("No environment found");
122+
}
123+
124+
//we can restrict to specific runs using bulkId, or batchId
125+
let restrictToRunIds: undefined | string[] = undefined;
126+
127+
//bulk id
128+
if (bulkId) {
129+
const bulkAction = await this.replica.bulkActionGroup.findFirst({
130+
select: {
131+
items: {
132+
select: {
133+
destinationRunId: true,
134+
},
135+
},
136+
},
137+
where: {
138+
friendlyId: bulkId,
139+
},
140+
});
141+
142+
if (bulkAction) {
143+
const runIds = bulkAction.items.map((item) => item.destinationRunId).filter(Boolean);
144+
restrictToRunIds = runIds;
145+
}
146+
}
147+
148+
//batch id is a friendly id
149+
if (batchId) {
150+
const batch = await this.replica.batchTaskRun.findFirst({
151+
select: {
152+
id: true,
153+
},
154+
where: {
155+
friendlyId: batchId,
156+
runtimeEnvironmentId: environmentId,
157+
},
158+
});
159+
160+
if (batch) {
161+
batchId = batch.id;
162+
}
163+
}
164+
165+
//scheduleId can be a friendlyId
166+
if (scheduleId && scheduleId.startsWith("sched_")) {
167+
const schedule = await this.replica.taskSchedule.findFirst({
168+
select: {
169+
id: true,
170+
},
171+
where: {
172+
friendlyId: scheduleId,
173+
projectId: projectId,
174+
},
175+
});
176+
177+
if (schedule) {
178+
scheduleId = schedule?.id;
179+
}
180+
}
181+
182+
//show all runs if we are filtering by batchId or runId
183+
if (batchId || runIds?.length || scheduleId || tasks?.length) {
184+
rootOnly = false;
185+
}
186+
187+
const runsRepository = new RunsRepository({
188+
clickhouse: this.clickhouse,
189+
prisma: this.replica as PrismaClient,
190+
});
191+
192+
const { runs, pagination } = await runsRepository.listRuns({
193+
environmentId,
194+
projectId,
195+
tasks,
196+
versions,
197+
statuses,
198+
tags,
199+
scheduleId,
200+
period: periodMs ?? undefined,
201+
from,
202+
to,
203+
isTest,
204+
rootOnly,
205+
batchId,
206+
runFriendlyIds: runIds,
207+
runIds: restrictToRunIds,
208+
page: {
209+
size: pageSize,
210+
cursor,
211+
direction,
212+
},
213+
});
214+
215+
let hasAnyRuns = runs.length > 0;
216+
217+
if (!hasAnyRuns) {
218+
const firstRun = await this.replica.taskRun.findFirst({
219+
where: {
220+
runtimeEnvironmentId: environmentId,
221+
},
222+
});
223+
224+
if (firstRun) {
225+
hasAnyRuns = true;
226+
}
227+
}
228+
229+
return {
230+
runs: runs.map((run) => {
231+
const hasFinished = isFinalRunStatus(run.status);
232+
233+
const startedAt = run.startedAt ?? run.lockedAt;
234+
235+
return {
236+
id: run.id,
237+
number: 1,
238+
friendlyId: run.friendlyId,
239+
createdAt: run.createdAt.toISOString(),
240+
updatedAt: run.updatedAt.toISOString(),
241+
startedAt: startedAt ? startedAt.toISOString() : undefined,
242+
delayUntil: run.delayUntil ? run.delayUntil.toISOString() : undefined,
243+
hasFinished,
244+
finishedAt: hasFinished
245+
? run.completedAt?.toISOString() ?? run.updatedAt.toISOString()
246+
: undefined,
247+
isTest: run.isTest,
248+
status: run.status,
249+
version: run.taskVersion,
250+
taskIdentifier: run.taskIdentifier,
251+
spanId: run.spanId,
252+
isReplayable: true,
253+
isCancellable: isCancellableRunStatus(run.status),
254+
isPending: isPendingRunStatus(run.status),
255+
environment: displayableEnvironment,
256+
idempotencyKey: run.idempotencyKey ? run.idempotencyKey : undefined,
257+
ttl: run.ttl ? run.ttl : undefined,
258+
expiredAt: run.expiredAt ? run.expiredAt.toISOString() : undefined,
259+
costInCents: run.costInCents,
260+
baseCostInCents: run.baseCostInCents,
261+
usageDurationMs: Number(run.usageDurationMs),
262+
tags: run.runTags ? run.runTags.sort((a, b) => a.localeCompare(b)) : [],
263+
depth: run.depth,
264+
rootTaskRunId: run.rootTaskRunId,
265+
metadata: run.metadata,
266+
metadataType: run.metadataType,
267+
};
268+
}),
269+
pagination: {
270+
next: pagination.nextCursor ?? undefined,
271+
previous: pagination.previousCursor ?? undefined,
272+
},
273+
possibleTasks: possibleTasks
274+
.map((task) => ({ slug: task.slug, triggerSource: task.triggerSource }))
275+
.sort((a, b) => {
276+
return a.slug.localeCompare(b.slug);
277+
}),
278+
bulkActions: bulkActions.map((bulkAction) => ({
279+
id: bulkAction.friendlyId,
280+
type: bulkAction.type,
281+
createdAt: bulkAction.createdAt,
282+
})),
283+
filters: {
284+
tasks: tasks || [],
285+
versions: versions || [],
286+
statuses: statuses || [],
287+
from: time.from,
288+
to: time.to,
289+
},
290+
hasFilters,
291+
hasAnyRuns,
292+
};
293+
}
294+
}

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.next.runs._index/route.tsx

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@ import { TextLink } from "~/components/primitives/TextLink";
3434
import { RunsFilters, TaskRunListSearchFilters } from "~/components/runs/v3/RunFilters";
3535
import { TaskRunsTable } from "~/components/runs/v3/TaskRunsTable";
3636
import { BULK_ACTION_RUN_LIMIT } from "~/consts";
37+
import { $replica } from "~/db.server";
3738
import { useEnvironment } from "~/hooks/useEnvironment";
3839
import { useOrganization } from "~/hooks/useOrganizations";
3940
import { useProject } from "~/hooks/useProject";
4041
import { findProjectBySlug } from "~/models/project.server";
4142
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
42-
import { RunListPresenter } from "~/presenters/v3/RunListPresenter.server";
43+
import { NextRunListPresenter } from "~/presenters/v3/NextRunListPresenter.server";
44+
import { clickhouseClient } from "~/services/clickhouseInstance.server";
4345
import {
4446
getRootOnlyFilterPreference,
4547
setRootOnlyFilterPreference,
@@ -121,7 +123,11 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
121123
scheduleId,
122124
} = TaskRunListSearchFilters.parse(s);
123125

124-
const presenter = new RunListPresenter();
126+
if (!clickhouseClient) {
127+
throw new Error("Clickhouse is not supported yet");
128+
}
129+
130+
const presenter = new NextRunListPresenter($replica, clickhouseClient);
125131
const list = presenter.call(environment.id, {
126132
userId,
127133
projectId: project.id,

0 commit comments

Comments
 (0)