Skip to content

Commit 9b0eb64

Browse files
authored
Fallback from ClickHouse to Postgres (#2300)
* Set the default replication concurrency to 2 for self-hosters 100 was a bit crazy * Made the run repository an interface, deferring just to CH for now * Added run repository feature flag, allowing passing a default when getting a flag * Switch run repository using a feature flag * Added spans * Pass the default repository in, so we can try Postgres in the tests * Fallback to Postgres if ClickHouse errors * Update feature flags API endpoint
1 parent c927fbc commit 9b0eb64

12 files changed

+814
-157
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -891,7 +891,7 @@ const EnvironmentSchema = z.object({
891891
RUN_REPLICATION_ENABLED: z.string().default("0"),
892892
RUN_REPLICATION_SLOT_NAME: z.string().default("task_runs_to_clickhouse_v1"),
893893
RUN_REPLICATION_PUBLICATION_NAME: z.string().default("task_runs_to_clickhouse_v1_publication"),
894-
RUN_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(100),
894+
RUN_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(2),
895895
RUN_REPLICATION_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
896896
RUN_REPLICATION_FLUSH_BATCH_SIZE: z.coerce.number().int().default(100),
897897
RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS: z.coerce.number().int().default(30_000),

apps/webapp/app/presenters/RunFilters.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import {
33
TaskRunListSearchFilters,
44
} from "~/components/runs/v3/RunFilters";
55
import { getRootOnlyFilterPreference } from "~/services/preferences/uiPreferences.server";
6-
import { type ParsedRunFilters } from "~/services/runsRepository.server";
6+
import { type ParsedRunFilters } from "~/services/runsRepository/runsRepository.server";
77

88
type FiltersFromRequest = ParsedRunFilters & Required<Pick<ParsedRunFilters, "rootOnly">>;
99

apps/webapp/app/presenters/v3/BulkActionPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { getUsername } from "~/utils/username";
22
import { BasePresenter } from "./basePresenter.server";
33
import { type BulkActionMode } from "~/components/BulkActionFilterSummary";
4-
import { parseRunListInputOptions } from "~/services/runsRepository.server";
4+
import { parseRunListInputOptions } from "~/services/runsRepository/runsRepository.server";
55
import { TaskRunListSearchFilters } from "~/components/runs/v3/RunFilters";
66

77
type BulkActionOptions = {

apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { type PrismaClient } from "@trigger.dev/database";
22
import { CreateBulkActionSearchParams } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction";
33
import { clickhouseClient } from "~/services/clickhouseInstance.server";
4-
import { RunsRepository } from "~/services/runsRepository.server";
4+
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
55
import { getRunFiltersFromRequest } from "../RunFilters.server";
66
import { BasePresenter } from "./basePresenter.server";
77

apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { type Direction } from "~/components/ListPagination";
99
import { timeFilters } from "~/components/runs/v3/SharedFilters";
1010
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
1111
import { getAllTaskIdentifiers } from "~/models/task.server";
12-
import { RunsRepository } from "~/services/runsRepository.server";
12+
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
1313
import { machinePresetFromRun } from "~/v3/machinePresets.server";
1414
import { ServiceValidationError } from "~/v3/services/baseService.server";
1515
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { prisma } from "~/db.server";
3+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
4+
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
5+
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
6+
import {
7+
makeSetFlags,
8+
setFlags,
9+
FeatureFlagCatalogSchema,
10+
validateAllFeatureFlags,
11+
validatePartialFeatureFlags,
12+
makeSetMultipleFlags,
13+
} from "~/v3/featureFlags.server";
14+
import { z } from "zod";
15+
16+
export async function action({ request }: ActionFunctionArgs) {
17+
// Next authenticate the request
18+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
19+
20+
if (!authenticationResult) {
21+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
22+
}
23+
24+
const user = await prisma.user.findUnique({
25+
where: {
26+
id: authenticationResult.userId,
27+
},
28+
});
29+
30+
if (!user) {
31+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
32+
}
33+
34+
if (!user.admin) {
35+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
36+
}
37+
38+
try {
39+
// Parse the request body
40+
const body = await request.json();
41+
42+
// Validate the input using the partial schema
43+
const validationResult = validatePartialFeatureFlags(body as Record<string, unknown>);
44+
if (!validationResult.success) {
45+
return json(
46+
{
47+
error: "Invalid feature flags data",
48+
details: validationResult.error.issues,
49+
},
50+
{ status: 400 }
51+
);
52+
}
53+
54+
const featureFlags = validationResult.data;
55+
const setMultipleFlags = makeSetMultipleFlags(prisma);
56+
const updatedFlags = await setMultipleFlags(featureFlags);
57+
58+
return json({
59+
success: true,
60+
updatedFlags,
61+
message: `Updated ${updatedFlags.length} feature flag(s)`,
62+
});
63+
} catch (error) {
64+
return json(
65+
{
66+
error: error instanceof Error ? error.message : String(error),
67+
},
68+
{ status: 400 }
69+
);
70+
}
71+
}

apps/webapp/app/services/runsRepository.server.ts renamed to apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts

Lines changed: 18 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -1,79 +1,26 @@
1-
import { type ClickHouse, type ClickhouseQueryBuilder } from "@internal/clickhouse";
2-
import { type Tracer } from "@internal/tracing";
3-
import { type Logger, type LogLevel } from "@trigger.dev/core/logger";
4-
import { MachinePresetName } from "@trigger.dev/core/v3";
5-
import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic";
6-
import { TaskRunStatus } from "@trigger.dev/database";
7-
import parseDuration from "parse-duration";
8-
import { z } from "zod";
9-
import { timeFilters } from "~/components/runs/v3/SharedFilters";
10-
import { type PrismaClient } from "~/db.server";
11-
12-
export type RunsRepositoryOptions = {
13-
clickhouse: ClickHouse;
14-
prisma: PrismaClient;
15-
logger?: Logger;
16-
logLevel?: LogLevel;
17-
tracer?: Tracer;
18-
};
19-
20-
const RunStatus = z.enum(Object.values(TaskRunStatus) as [TaskRunStatus, ...TaskRunStatus[]]);
21-
22-
const RunListInputOptionsSchema = z.object({
23-
organizationId: z.string(),
24-
projectId: z.string(),
25-
environmentId: z.string(),
26-
//filters
27-
tasks: z.array(z.string()).optional(),
28-
versions: z.array(z.string()).optional(),
29-
statuses: z.array(RunStatus).optional(),
30-
tags: z.array(z.string()).optional(),
31-
scheduleId: z.string().optional(),
32-
period: z.string().optional(),
33-
from: z.number().optional(),
34-
to: z.number().optional(),
35-
isTest: z.boolean().optional(),
36-
rootOnly: z.boolean().optional(),
37-
batchId: z.string().optional(),
38-
runId: z.array(z.string()).optional(),
39-
bulkId: z.string().optional(),
40-
queues: z.array(z.string()).optional(),
41-
machines: MachinePresetName.array().optional(),
42-
});
43-
44-
export type RunListInputOptions = z.infer<typeof RunListInputOptionsSchema>;
45-
export type RunListInputFilters = Omit<
46-
RunListInputOptions,
47-
"organizationId" | "projectId" | "environmentId"
48-
>;
49-
50-
export type ParsedRunFilters = RunListInputFilters & {
51-
cursor?: string;
52-
direction?: "forward" | "backward";
53-
};
54-
55-
type FilterRunsOptions = Omit<RunListInputOptions, "period"> & {
56-
period: number | undefined;
57-
};
58-
59-
type Pagination = {
60-
page: {
61-
size: number;
62-
cursor?: string;
63-
direction?: "forward" | "backward";
64-
};
65-
};
66-
67-
export type ListRunsOptions = RunListInputOptions & Pagination;
68-
69-
export class RunsRepository {
1+
import { type ClickhouseQueryBuilder } from "@internal/clickhouse";
2+
import { RunId } from "@trigger.dev/core/v3/isomorphic";
3+
import {
4+
type FilterRunsOptions,
5+
type IRunsRepository,
6+
type ListRunsOptions,
7+
type RunListInputOptions,
8+
type RunsRepositoryOptions,
9+
convertRunListInputOptionsToFilterRunsOptions,
10+
} from "./runsRepository.server";
11+
12+
export class ClickHouseRunsRepository implements IRunsRepository {
7013
constructor(private readonly options: RunsRepositoryOptions) {}
7114

15+
get name() {
16+
return "clickhouse";
17+
}
18+
7219
async listRunIds(options: ListRunsOptions) {
7320
const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder();
7421
applyRunFiltersToQueryBuilder(
7522
queryBuilder,
76-
await this.#convertRunListInputOptionsToFilterRunsOptions(options)
23+
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
7724
);
7825

7926
if (options.page.cursor) {
@@ -200,7 +147,7 @@ export class RunsRepository {
200147
const queryBuilder = this.options.clickhouse.taskRuns.countQueryBuilder();
201148
applyRunFiltersToQueryBuilder(
202149
queryBuilder,
203-
await this.#convertRunListInputOptionsToFilterRunsOptions(options)
150+
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
204151
);
205152

206153
const [queryError, result] = await queryBuilder.execute();
@@ -215,73 +162,6 @@ export class RunsRepository {
215162

216163
return result[0].count;
217164
}
218-
219-
async #convertRunListInputOptionsToFilterRunsOptions(
220-
options: RunListInputOptions
221-
): Promise<FilterRunsOptions> {
222-
const convertedOptions: FilterRunsOptions = {
223-
...options,
224-
period: undefined,
225-
};
226-
227-
// Convert time period to ms
228-
const time = timeFilters({
229-
period: options.period,
230-
from: options.from,
231-
to: options.to,
232-
});
233-
convertedOptions.period = time.period ? parseDuration(time.period) ?? undefined : undefined;
234-
235-
// batch friendlyId to id
236-
if (options.batchId && options.batchId.startsWith("batch_")) {
237-
const batch = await this.options.prisma.batchTaskRun.findFirst({
238-
select: {
239-
id: true,
240-
},
241-
where: {
242-
friendlyId: options.batchId,
243-
runtimeEnvironmentId: options.environmentId,
244-
},
245-
});
246-
247-
if (batch) {
248-
convertedOptions.batchId = batch.id;
249-
}
250-
}
251-
252-
// scheduleId can be a friendlyId
253-
if (options.scheduleId && options.scheduleId.startsWith("sched_")) {
254-
const schedule = await this.options.prisma.taskSchedule.findFirst({
255-
select: {
256-
id: true,
257-
},
258-
where: {
259-
friendlyId: options.scheduleId,
260-
projectId: options.projectId,
261-
},
262-
});
263-
264-
if (schedule) {
265-
convertedOptions.scheduleId = schedule?.id;
266-
}
267-
}
268-
269-
if (options.bulkId && options.bulkId.startsWith("bulk_")) {
270-
convertedOptions.bulkId = BulkActionId.toId(options.bulkId);
271-
}
272-
273-
if (options.runId) {
274-
//convert to friendlyId
275-
convertedOptions.runId = options.runId.map((r) => RunId.toFriendlyId(r));
276-
}
277-
278-
// Show all runs if we are filtering by batchId or runId
279-
if (options.batchId || options.runId?.length || options.scheduleId || options.tasks?.length) {
280-
convertedOptions.rootOnly = false;
281-
}
282-
283-
return convertedOptions;
284-
}
285165
}
286166

287167
function applyRunFiltersToQueryBuilder<T>(
@@ -373,7 +253,3 @@ function applyRunFiltersToQueryBuilder<T>(
373253
});
374254
}
375255
}
376-
377-
export function parseRunListInputOptions(data: any): RunListInputOptions {
378-
return RunListInputOptionsSchema.parse(data);
379-
}

0 commit comments

Comments
 (0)