Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,45 @@ const EnvironmentSchema = z.object({
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
BATCH_TRIGGER_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
ADMIN_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
ADMIN_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
ADMIN_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(20),
ADMIN_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
ADMIN_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),

ADMIN_WORKER_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
ADMIN_WORKER_REDIS_READER_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_READER_HOST),
ADMIN_WORKER_REDIS_READER_PORT: z.coerce
.number()
.optional()
.transform(
(v) =>
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
),
ADMIN_WORKER_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
ADMIN_WORKER_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
ADMIN_WORKER_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
ADMIN_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
ADMIN_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

ALERTS_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
ALERTS_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
Expand Down
12 changes: 1 addition & 11 deletions apps/webapp/app/routes/admin.api.v1.feature-flags.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
import {
makeSetFlags,
setFlags,
FeatureFlagCatalogSchema,
validateAllFeatureFlags,
validatePartialFeatureFlags,
makeSetMultipleFlags,
} from "~/v3/featureFlags.server";
import { z } from "zod";
import { makeSetMultipleFlags, validatePartialFeatureFlags } from "~/v3/featureFlags.server";

export async function action({ request }: ActionFunctionArgs) {
// Next authenticate the request
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { z } from "zod";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { adminWorker } from "~/v3/services/adminWorker.server";

const Body = z.object({
from: z.coerce.date(),
to: z.coerce.date(),
batchSize: z.number().optional(),
});

const Params = z.object({
batchId: z.string(),
});

const DEFAULT_BATCH_SIZE = 500;

export async function action({ request, params }: ActionFunctionArgs) {
// Next authenticate the request
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const user = await prisma.user.findUnique({
where: {
id: authenticationResult.userId,
},
});

if (!user) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

if (!user.admin) {
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
}

const { batchId } = Params.parse(params);

try {
const body = await request.json();

const { from, to, batchSize } = Body.parse(body);

await adminWorker.enqueue({
job: "admin.backfillRunsToReplication",
payload: {
from,
to,
batchSize: batchSize ?? DEFAULT_BATCH_SIZE,
},
id: batchId,
});

return json({
success: true,
id: batchId,
});
} catch (error) {
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { z } from "zod";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { adminWorker } from "~/v3/services/adminWorker.server";

const Params = z.object({
batchId: z.string(),
});

export async function action({ request, params }: ActionFunctionArgs) {
// Next authenticate the request
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const user = await prisma.user.findUnique({
where: {
id: authenticationResult.userId,
},
});

if (!user) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

if (!user.admin) {
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
}

const { batchId } = Params.parse(params);

try {
await adminWorker.cancel(batchId);

return json({
success: true,
id: batchId,
});
} catch (error) {
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
}
}
92 changes: 92 additions & 0 deletions apps/webapp/app/services/runsBackfiller.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { Tracer } from "@opentelemetry/api";
import type { PrismaClientOrTransaction } from "@trigger.dev/database";
import { RunsReplicationService } from "~/services/runsReplicationService.server";
import { startSpan } from "~/v3/tracing.server";
import { FINAL_RUN_STATUSES } from "../v3/taskStatus";
import { Logger } from "@trigger.dev/core/logger";

export class RunsBackfillerService {
private readonly prisma: PrismaClientOrTransaction;
private readonly runsReplicationInstance: RunsReplicationService;
private readonly tracer: Tracer;
private readonly logger: Logger;

constructor(opts: {
prisma: PrismaClientOrTransaction;
runsReplicationInstance: RunsReplicationService;
tracer: Tracer;
logLevel?: "log" | "error" | "warn" | "info" | "debug";
}) {
this.prisma = opts.prisma;
this.runsReplicationInstance = opts.runsReplicationInstance;
this.tracer = opts.tracer;
this.logger = new Logger("RunsBackfillerService", opts.logLevel ?? "debug");
}

public async call({
from,
to,
cursor,
batchSize,
}: {
from: Date;
to: Date;
cursor?: string;
batchSize?: number;
}): Promise<string | undefined> {
return await startSpan(this.tracer, "RunsBackfillerService.call()", async (span) => {
span.setAttribute("from", from.toISOString());
span.setAttribute("to", to.toISOString());
span.setAttribute("cursor", cursor ?? "");
span.setAttribute("batchSize", batchSize ?? 0);

const runs = await this.prisma.taskRun.findMany({
where: {
createdAt: {
gte: from,
lte: to,
},
status: {
in: FINAL_RUN_STATUSES,
},
...(cursor ? { id: { gt: cursor } } : {}),
},
orderBy: {
id: "asc",
},
take: batchSize,
});

if (runs.length === 0) {
this.logger.info("No runs to backfill", { from, to, cursor });

return;
}

this.logger.info("Backfilling runs", {
from,
to,
cursor,
batchSize,
runCount: runs.length,
firstCreatedAt: runs[0].createdAt,
lastCreatedAt: runs[runs.length - 1].createdAt,
});

await this.runsReplicationInstance.backfill(runs);

const lastRun = runs[runs.length - 1];

this.logger.info("Backfilled runs", {
from,
to,
cursor,
batchSize,
lastRunId: lastRun.id,
});

// Return the last run ID to continue from
return lastRun.id;
});
}
}
100 changes: 100 additions & 0 deletions apps/webapp/app/v3/services/adminWorker.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { Logger } from "@trigger.dev/core/logger";
import { Worker as RedisWorker } from "@trigger.dev/redis-worker";
import { z } from "zod";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
import { singleton } from "~/utils/singleton";
import { tracer } from "../tracer.server";
import { $replica } from "~/db.server";
import { RunsBackfillerService } from "../../services/runsBackfiller.server";

function initializeWorker() {
const redisOptions = {
keyPrefix: "admin:worker:",
host: env.ADMIN_WORKER_REDIS_HOST,
port: env.ADMIN_WORKER_REDIS_PORT,
username: env.ADMIN_WORKER_REDIS_USERNAME,
password: env.ADMIN_WORKER_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.ADMIN_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
};

logger.debug(`👨‍🏭 Initializing admin worker at host ${env.ADMIN_WORKER_REDIS_HOST}`);

const worker = new RedisWorker({
name: "admin-worker",
redisOptions,
catalog: {
"admin.backfillRunsToReplication": {
schema: z.object({
from: z.coerce.date(),
to: z.coerce.date(),
cursor: z.string().optional(),
batchSize: z.coerce.number().int().default(500),
}),
visibilityTimeoutMs: 60_000 * 15, // 15 minutes
retry: {
maxAttempts: 5,
},
},
},
concurrency: {
workers: env.ADMIN_WORKER_CONCURRENCY_WORKERS,
tasksPerWorker: env.ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER,
limit: env.ADMIN_WORKER_CONCURRENCY_LIMIT,
},
pollIntervalMs: env.ADMIN_WORKER_POLL_INTERVAL,
immediatePollIntervalMs: env.ADMIN_WORKER_IMMEDIATE_POLL_INTERVAL,
shutdownTimeoutMs: env.ADMIN_WORKER_SHUTDOWN_TIMEOUT_MS,
logger: new Logger("AdminWorker", env.ADMIN_WORKER_LOG_LEVEL),
jobs: {
"admin.backfillRunsToReplication": async ({ payload, id }) => {
if (!runsReplicationInstance) {
logger.error("Runs replication instance not found");
return;
}

const service = new RunsBackfillerService({
prisma: $replica,
runsReplicationInstance: runsReplicationInstance,
tracer: tracer,
});

const cursor = await service.call({
from: payload.from,
to: payload.to,
cursor: payload.cursor,
batchSize: payload.batchSize,
});

if (cursor) {
await worker.enqueue({
job: "admin.backfillRunsToReplication",
payload: {
from: payload.from,
to: payload.to,
cursor,
batchSize: payload.batchSize,
},
id,
availableAt: new Date(Date.now() + 1000),
cancellationKey: id,
});
}
},
},
});

if (env.ADMIN_WORKER_ENABLED === "true") {
logger.debug(
`👨‍🏭 Starting admin worker at host ${env.ADMIN_WORKER_REDIS_HOST}, pollInterval = ${env.ADMIN_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.ADMIN_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.ADMIN_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.ADMIN_WORKER_CONCURRENCY_LIMIT}`
);

worker.start();
}

return worker;
}

export const adminWorker = singleton("adminWorker", initializeWorker);
Loading
Loading