Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ const EnvironmentSchema = z.object({
RUN_REPLICATION_ENABLED: z.string().default("0"),
RUN_REPLICATION_SLOT_NAME: z.string().default("task_runs_to_clickhouse_v1"),
RUN_REPLICATION_PUBLICATION_NAME: z.string().default("task_runs_to_clickhouse_v1_publication"),
RUN_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(100),
RUN_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(2),
RUN_REPLICATION_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
RUN_REPLICATION_FLUSH_BATCH_SIZE: z.coerce.number().int().default(100),
RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS: z.coerce.number().int().default(30_000),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/RunFilters.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
TaskRunListSearchFilters,
} from "~/components/runs/v3/RunFilters";
import { getRootOnlyFilterPreference } from "~/services/preferences/uiPreferences.server";
import { type ParsedRunFilters } from "~/services/runsRepository.server";
import { type ParsedRunFilters } from "~/services/runsRepository/runsRepository.server";

type FiltersFromRequest = ParsedRunFilters & Required<Pick<ParsedRunFilters, "rootOnly">>;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getUsername } from "~/utils/username";
import { BasePresenter } from "./basePresenter.server";
import { type BulkActionMode } from "~/components/BulkActionFilterSummary";
import { parseRunListInputOptions } from "~/services/runsRepository.server";
import { parseRunListInputOptions } from "~/services/runsRepository/runsRepository.server";
import { TaskRunListSearchFilters } from "~/components/runs/v3/RunFilters";

type BulkActionOptions = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type PrismaClient } from "@trigger.dev/database";
import { CreateBulkActionSearchParams } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { RunsRepository } from "~/services/runsRepository.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { getRunFiltersFromRequest } from "../RunFilters.server";
import { BasePresenter } from "./basePresenter.server";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { type Direction } from "~/components/ListPagination";
import { timeFilters } from "~/components/runs/v3/SharedFilters";
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
import { getAllTaskIdentifiers } from "~/models/task.server";
import { RunsRepository } from "~/services/runsRepository.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { machinePresetFromRun } from "~/v3/machinePresets.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";
Expand Down
71 changes: 71 additions & 0 deletions apps/webapp/app/routes/admin.api.v1.feature-flags.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
import {
makeSetFlags,
setFlags,
FeatureFlagCatalogSchema,
validateAllFeatureFlags,
validatePartialFeatureFlags,
makeSetMultipleFlags,
} from "~/v3/featureFlags.server";
import { z } from "zod";

export async function action({ request }: ActionFunctionArgs) {
// Next authenticate the request
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const user = await prisma.user.findUnique({
where: {
id: authenticationResult.userId,
},
});

if (!user) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

if (!user.admin) {
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
}

try {
// Parse the request body
const body = await request.json();

// Validate the input using the partial schema
const validationResult = validatePartialFeatureFlags(body as Record<string, unknown>);
if (!validationResult.success) {
return json(
{
error: "Invalid feature flags data",
details: validationResult.error.issues,
},
{ status: 400 }
);
}

const featureFlags = validationResult.data;
const setMultipleFlags = makeSetMultipleFlags(prisma);
const updatedFlags = await setMultipleFlags(featureFlags);

return json({
success: true,
updatedFlags,
message: `Updated ${updatedFlags.length} feature flag(s)`,
});
} catch (error) {
return json(
{
error: error instanceof Error ? error.message : String(error),
},
{ status: 400 }
);
}
}
Original file line number Diff line number Diff line change
@@ -1,79 +1,26 @@
import { type ClickHouse, type ClickhouseQueryBuilder } from "@internal/clickhouse";
import { type Tracer } from "@internal/tracing";
import { type Logger, type LogLevel } from "@trigger.dev/core/logger";
import { MachinePresetName } from "@trigger.dev/core/v3";
import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic";
import { TaskRunStatus } from "@trigger.dev/database";
import parseDuration from "parse-duration";
import { z } from "zod";
import { timeFilters } from "~/components/runs/v3/SharedFilters";
import { type PrismaClient } from "~/db.server";

export type RunsRepositoryOptions = {
clickhouse: ClickHouse;
prisma: PrismaClient;
logger?: Logger;
logLevel?: LogLevel;
tracer?: Tracer;
};

const RunStatus = z.enum(Object.values(TaskRunStatus) as [TaskRunStatus, ...TaskRunStatus[]]);

const RunListInputOptionsSchema = z.object({
organizationId: z.string(),
projectId: z.string(),
environmentId: z.string(),
//filters
tasks: z.array(z.string()).optional(),
versions: z.array(z.string()).optional(),
statuses: z.array(RunStatus).optional(),
tags: z.array(z.string()).optional(),
scheduleId: z.string().optional(),
period: z.string().optional(),
from: z.number().optional(),
to: z.number().optional(),
isTest: z.boolean().optional(),
rootOnly: z.boolean().optional(),
batchId: z.string().optional(),
runId: z.array(z.string()).optional(),
bulkId: z.string().optional(),
queues: z.array(z.string()).optional(),
machines: MachinePresetName.array().optional(),
});

export type RunListInputOptions = z.infer<typeof RunListInputOptionsSchema>;
export type RunListInputFilters = Omit<
RunListInputOptions,
"organizationId" | "projectId" | "environmentId"
>;

export type ParsedRunFilters = RunListInputFilters & {
cursor?: string;
direction?: "forward" | "backward";
};

type FilterRunsOptions = Omit<RunListInputOptions, "period"> & {
period: number | undefined;
};

type Pagination = {
page: {
size: number;
cursor?: string;
direction?: "forward" | "backward";
};
};

export type ListRunsOptions = RunListInputOptions & Pagination;

export class RunsRepository {
import { type ClickhouseQueryBuilder } from "@internal/clickhouse";
import { RunId } from "@trigger.dev/core/v3/isomorphic";
import {
type FilterRunsOptions,
type IRunsRepository,
type ListRunsOptions,
type RunListInputOptions,
type RunsRepositoryOptions,
convertRunListInputOptionsToFilterRunsOptions,
} from "./runsRepository.server";

export class ClickHouseRunsRepository implements IRunsRepository {
constructor(private readonly options: RunsRepositoryOptions) {}

get name() {
return "clickhouse";
}

async listRunIds(options: ListRunsOptions) {
const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder();
applyRunFiltersToQueryBuilder(
queryBuilder,
await this.#convertRunListInputOptionsToFilterRunsOptions(options)
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
);

if (options.page.cursor) {
Expand Down Expand Up @@ -200,7 +147,7 @@ export class RunsRepository {
const queryBuilder = this.options.clickhouse.taskRuns.countQueryBuilder();
applyRunFiltersToQueryBuilder(
queryBuilder,
await this.#convertRunListInputOptionsToFilterRunsOptions(options)
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
);

const [queryError, result] = await queryBuilder.execute();
Expand All @@ -215,73 +162,6 @@ export class RunsRepository {

return result[0].count;
}

async #convertRunListInputOptionsToFilterRunsOptions(
options: RunListInputOptions
): Promise<FilterRunsOptions> {
const convertedOptions: FilterRunsOptions = {
...options,
period: undefined,
};

// Convert time period to ms
const time = timeFilters({
period: options.period,
from: options.from,
to: options.to,
});
convertedOptions.period = time.period ? parseDuration(time.period) ?? undefined : undefined;

// batch friendlyId to id
if (options.batchId && options.batchId.startsWith("batch_")) {
const batch = await this.options.prisma.batchTaskRun.findFirst({
select: {
id: true,
},
where: {
friendlyId: options.batchId,
runtimeEnvironmentId: options.environmentId,
},
});

if (batch) {
convertedOptions.batchId = batch.id;
}
}

// scheduleId can be a friendlyId
if (options.scheduleId && options.scheduleId.startsWith("sched_")) {
const schedule = await this.options.prisma.taskSchedule.findFirst({
select: {
id: true,
},
where: {
friendlyId: options.scheduleId,
projectId: options.projectId,
},
});

if (schedule) {
convertedOptions.scheduleId = schedule?.id;
}
}

if (options.bulkId && options.bulkId.startsWith("bulk_")) {
convertedOptions.bulkId = BulkActionId.toId(options.bulkId);
}

if (options.runId) {
//convert to friendlyId
convertedOptions.runId = options.runId.map((r) => RunId.toFriendlyId(r));
}

// Show all runs if we are filtering by batchId or runId
if (options.batchId || options.runId?.length || options.scheduleId || options.tasks?.length) {
convertedOptions.rootOnly = false;
}

return convertedOptions;
}
}

function applyRunFiltersToQueryBuilder<T>(
Expand Down Expand Up @@ -373,7 +253,3 @@ function applyRunFiltersToQueryBuilder<T>(
});
}
}

export function parseRunListInputOptions(data: any): RunListInputOptions {
return RunListInputOptionsSchema.parse(data);
}
Loading
Loading