Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ const EnvironmentSchema = z.object({
RUN_REPLICATION_ENABLED: z.string().default("0"),
RUN_REPLICATION_SLOT_NAME: z.string().default("task_runs_to_clickhouse_v1"),
RUN_REPLICATION_PUBLICATION_NAME: z.string().default("task_runs_to_clickhouse_v1_publication"),
RUN_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(100),
RUN_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(2),
RUN_REPLICATION_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
RUN_REPLICATION_FLUSH_BATCH_SIZE: z.coerce.number().int().default(100),
RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS: z.coerce.number().int().default(30_000),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/RunFilters.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
TaskRunListSearchFilters,
} from "~/components/runs/v3/RunFilters";
import { getRootOnlyFilterPreference } from "~/services/preferences/uiPreferences.server";
import { type ParsedRunFilters } from "~/services/runsRepository.server";
import { type ParsedRunFilters } from "~/services/runsRepository/runsRepository.server";

type FiltersFromRequest = ParsedRunFilters & Required<Pick<ParsedRunFilters, "rootOnly">>;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getUsername } from "~/utils/username";
import { BasePresenter } from "./basePresenter.server";
import { type BulkActionMode } from "~/components/BulkActionFilterSummary";
import { parseRunListInputOptions } from "~/services/runsRepository.server";
import { parseRunListInputOptions } from "~/services/runsRepository/runsRepository.server";
import { TaskRunListSearchFilters } from "~/components/runs/v3/RunFilters";

type BulkActionOptions = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type PrismaClient } from "@trigger.dev/database";
import { CreateBulkActionSearchParams } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { RunsRepository } from "~/services/runsRepository.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { getRunFiltersFromRequest } from "../RunFilters.server";
import { BasePresenter } from "./basePresenter.server";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { type Direction } from "~/components/ListPagination";
import { timeFilters } from "~/components/runs/v3/SharedFilters";
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
import { getAllTaskIdentifiers } from "~/models/task.server";
import { RunsRepository } from "~/services/runsRepository.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { machinePresetFromRun } from "~/v3/machinePresets.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,79 +1,26 @@
import { type ClickHouse, type ClickhouseQueryBuilder } from "@internal/clickhouse";
import { type Tracer } from "@internal/tracing";
import { type Logger, type LogLevel } from "@trigger.dev/core/logger";
import { MachinePresetName } from "@trigger.dev/core/v3";
import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic";
import { TaskRunStatus } from "@trigger.dev/database";
import parseDuration from "parse-duration";
import { z } from "zod";
import { timeFilters } from "~/components/runs/v3/SharedFilters";
import { type PrismaClient } from "~/db.server";

export type RunsRepositoryOptions = {
clickhouse: ClickHouse;
prisma: PrismaClient;
logger?: Logger;
logLevel?: LogLevel;
tracer?: Tracer;
};

const RunStatus = z.enum(Object.values(TaskRunStatus) as [TaskRunStatus, ...TaskRunStatus[]]);

const RunListInputOptionsSchema = z.object({
organizationId: z.string(),
projectId: z.string(),
environmentId: z.string(),
//filters
tasks: z.array(z.string()).optional(),
versions: z.array(z.string()).optional(),
statuses: z.array(RunStatus).optional(),
tags: z.array(z.string()).optional(),
scheduleId: z.string().optional(),
period: z.string().optional(),
from: z.number().optional(),
to: z.number().optional(),
isTest: z.boolean().optional(),
rootOnly: z.boolean().optional(),
batchId: z.string().optional(),
runId: z.array(z.string()).optional(),
bulkId: z.string().optional(),
queues: z.array(z.string()).optional(),
machines: MachinePresetName.array().optional(),
});

export type RunListInputOptions = z.infer<typeof RunListInputOptionsSchema>;
export type RunListInputFilters = Omit<
RunListInputOptions,
"organizationId" | "projectId" | "environmentId"
>;

export type ParsedRunFilters = RunListInputFilters & {
cursor?: string;
direction?: "forward" | "backward";
};

type FilterRunsOptions = Omit<RunListInputOptions, "period"> & {
period: number | undefined;
};

type Pagination = {
page: {
size: number;
cursor?: string;
direction?: "forward" | "backward";
};
};

export type ListRunsOptions = RunListInputOptions & Pagination;

export class RunsRepository {
import { type ClickhouseQueryBuilder } from "@internal/clickhouse";
import { RunId } from "@trigger.dev/core/v3/isomorphic";
import {
type FilterRunsOptions,
type IRunsRepository,
type ListRunsOptions,
type RunListInputOptions,
type RunsRepositoryOptions,
convertRunListInputOptionsToFilterRunsOptions,
} from "./runsRepository.server";

export class ClickHouseRunsRepository implements IRunsRepository {
constructor(private readonly options: RunsRepositoryOptions) {}

get name() {
return "clickhouse";
}

async listRunIds(options: ListRunsOptions) {
const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder();
applyRunFiltersToQueryBuilder(
queryBuilder,
await this.#convertRunListInputOptionsToFilterRunsOptions(options)
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
);

if (options.page.cursor) {
Expand Down Expand Up @@ -200,7 +147,7 @@ export class RunsRepository {
const queryBuilder = this.options.clickhouse.taskRuns.countQueryBuilder();
applyRunFiltersToQueryBuilder(
queryBuilder,
await this.#convertRunListInputOptionsToFilterRunsOptions(options)
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
);

const [queryError, result] = await queryBuilder.execute();
Expand All @@ -215,73 +162,6 @@ export class RunsRepository {

return result[0].count;
}

async #convertRunListInputOptionsToFilterRunsOptions(
options: RunListInputOptions
): Promise<FilterRunsOptions> {
const convertedOptions: FilterRunsOptions = {
...options,
period: undefined,
};

// Convert time period to ms
const time = timeFilters({
period: options.period,
from: options.from,
to: options.to,
});
convertedOptions.period = time.period ? parseDuration(time.period) ?? undefined : undefined;

// batch friendlyId to id
if (options.batchId && options.batchId.startsWith("batch_")) {
const batch = await this.options.prisma.batchTaskRun.findFirst({
select: {
id: true,
},
where: {
friendlyId: options.batchId,
runtimeEnvironmentId: options.environmentId,
},
});

if (batch) {
convertedOptions.batchId = batch.id;
}
}

// scheduleId can be a friendlyId
if (options.scheduleId && options.scheduleId.startsWith("sched_")) {
const schedule = await this.options.prisma.taskSchedule.findFirst({
select: {
id: true,
},
where: {
friendlyId: options.scheduleId,
projectId: options.projectId,
},
});

if (schedule) {
convertedOptions.scheduleId = schedule?.id;
}
}

if (options.bulkId && options.bulkId.startsWith("bulk_")) {
convertedOptions.bulkId = BulkActionId.toId(options.bulkId);
}

if (options.runId) {
//convert to friendlyId
convertedOptions.runId = options.runId.map((r) => RunId.toFriendlyId(r));
}

// Show all runs if we are filtering by batchId or runId
if (options.batchId || options.runId?.length || options.scheduleId || options.tasks?.length) {
convertedOptions.rootOnly = false;
}

return convertedOptions;
}
}

function applyRunFiltersToQueryBuilder<T>(
Expand Down Expand Up @@ -373,7 +253,3 @@ function applyRunFiltersToQueryBuilder<T>(
});
}
}

export function parseRunListInputOptions(data: any): RunListInputOptions {
return RunListInputOptionsSchema.parse(data);
}
Loading
Loading