Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,46 @@ const EnvironmentSchema = z.object({
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

SCHEDULE_ENGINE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
SCHEDULE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(1),
SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),
SCHEDULE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
SCHEDULE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
SCHEDULE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),
SCHEDULE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(30_000),
SCHEDULE_WORKER_DISTRIBUTION_WINDOW_SECONDS: z.coerce.number().int().default(30),

SCHEDULE_WORKER_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
SCHEDULE_WORKER_REDIS_READER_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_READER_HOST),
SCHEDULE_WORKER_REDIS_READER_PORT: z.coerce
.number()
.optional()
.transform(
(v) =>
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
),
SCHEDULE_WORKER_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
SCHEDULE_WORKER_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
SCHEDULE_WORKER_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
SCHEDULE_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
SCHEDULE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

TASK_EVENT_PARTITIONING_ENABLED: z.string().default("0"),
TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS: z.coerce.number().int().default(60), // 1 minute

Expand Down
9 changes: 7 additions & 2 deletions apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ export class NextRunListPresenter {
prisma: this.replica as PrismaClient,
});

function clampToNow(date: Date): Date {
const now = new Date();
return date > now ? now : date;
}

const { runs, pagination } = await runsRepository.listRuns({
organizationId,
environmentId,
Expand All @@ -200,8 +205,8 @@ export class NextRunListPresenter {
tags,
scheduleId,
period: periodMs ?? undefined,
from,
to,
from: time.from ? time.from.getTime() : undefined,
to: time.to ? clampToNow(time.to).getTime() : undefined,
isTest,
rootOnly,
batchId,
Expand Down
10 changes: 9 additions & 1 deletion apps/webapp/app/presenters/v3/RunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ export class RunListPresenter extends BasePresenter {

const periodMs = time.period ? parse(time.period) : undefined;

function clampToNow(date: Date): Date {
const now = new Date();

return date > now ? now : date;
}

//get the runs
const runs = await this._replica.$queryRaw<
{
Expand Down Expand Up @@ -282,7 +288,9 @@ WHERE
: Prisma.empty
}
${
time.to ? Prisma.sql`AND tr."createdAt" <= ${time.to.toISOString()}::timestamp` : Prisma.empty
time.to
? Prisma.sql`AND tr."createdAt" <= ${clampToNow(time.to).toISOString()}::timestamp`
: Prisma.sql`AND tr."createdAt" <= CURRENT_TIMESTAMP`
}
${
tags && tags.length > 0
Expand Down
124 changes: 58 additions & 66 deletions apps/webapp/app/runEngine/services/batchTrigger.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { env } from "~/env.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { getEntitlement } from "~/services/platform.v3.server";
import { workerQueue } from "~/services/worker.server";
import { commonWorker } from "~/v3/commonWorker.server";
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server";
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
import { OutOfEntitlementError, TriggerTaskService } from "../../v3/services/triggerTask.server";
Expand Down Expand Up @@ -244,88 +244,80 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
}
}
} else {
return await $transaction(this._prisma, async (tx) => {
const batch = await tx.batchTaskRun.create({
data: {
id: BatchId.fromFriendlyId(batchId),
friendlyId: batchId,
runtimeEnvironmentId: environment.id,
runCount: body.items.length,
runIds: [],
payload: payloadPacket.data,
payloadType: payloadPacket.dataType,
options,
batchVersion: "runengine:v1",
oneTimeUseToken: options.oneTimeUseToken,
},
const batch = await this._prisma.batchTaskRun.create({
data: {
id: BatchId.fromFriendlyId(batchId),
friendlyId: batchId,
runtimeEnvironmentId: environment.id,
runCount: body.items.length,
runIds: [],
payload: payloadPacket.data,
payloadType: payloadPacket.dataType,
options,
batchVersion: "runengine:v1",
oneTimeUseToken: options.oneTimeUseToken,
},
});

if (body.parentRunId && body.resumeParentOnCompletion) {
await this._engine.blockRunWithCreatedBatch({
runId: RunId.fromFriendlyId(body.parentRunId),
batchId: batch.id,
environmentId: environment.id,
projectId: environment.projectId,
organizationId: environment.organizationId,
});
}

if (body.parentRunId && body.resumeParentOnCompletion) {
await this._engine.blockRunWithCreatedBatch({
runId: RunId.fromFriendlyId(body.parentRunId),
switch (this._batchProcessingStrategy) {
case "sequential": {
await this.#enqueueBatchTaskRun({
batchId: batch.id,
environmentId: environment.id,
projectId: environment.projectId,
organizationId: environment.organizationId,
tx,
processingId: batchId,
range: { start: 0, count: PROCESSING_BATCH_SIZE },
attemptCount: 0,
strategy: this._batchProcessingStrategy,
parentRunId: body.parentRunId,
resumeParentOnCompletion: body.resumeParentOnCompletion,
});
}

switch (this._batchProcessingStrategy) {
case "sequential": {
await this.#enqueueBatchTaskRun(
{
break;
}
case "parallel": {
const ranges = Array.from({
length: Math.ceil(body.items.length / PROCESSING_BATCH_SIZE),
}).map((_, index) => ({
start: index * PROCESSING_BATCH_SIZE,
count: PROCESSING_BATCH_SIZE,
}));

await Promise.all(
ranges.map((range, index) =>
this.#enqueueBatchTaskRun({
batchId: batch.id,
processingId: batchId,
range: { start: 0, count: PROCESSING_BATCH_SIZE },
processingId: `${index}`,
range,
attemptCount: 0,
strategy: this._batchProcessingStrategy,
parentRunId: body.parentRunId,
resumeParentOnCompletion: body.resumeParentOnCompletion,
},
tx
);

break;
}
case "parallel": {
const ranges = Array.from({
length: Math.ceil(body.items.length / PROCESSING_BATCH_SIZE),
}).map((_, index) => ({
start: index * PROCESSING_BATCH_SIZE,
count: PROCESSING_BATCH_SIZE,
}));

await Promise.all(
ranges.map((range, index) =>
this.#enqueueBatchTaskRun(
{
batchId: batch.id,
processingId: `${index}`,
range,
attemptCount: 0,
strategy: this._batchProcessingStrategy,
parentRunId: body.parentRunId,
resumeParentOnCompletion: body.resumeParentOnCompletion,
},
tx
)
)
);
})
)
);

break;
}
break;
}
}

return batch;
});
return batch;
}
}

async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) {
await workerQueue.enqueue("runengine.processBatchTaskRun", options, {
tx,
jobKey: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`,
await commonWorker.enqueue({
id: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`,
job: "runengine.processBatchTaskRun",
payload: options,
});
}

Expand Down
6 changes: 4 additions & 2 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,14 @@ export class RunEngineTriggerTaskService {
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
releaseConcurrency: body.options?.releaseConcurrency,
queueTimestamp:
parentRun && body.options?.resumeParentOnCompletion
options.queueTimestamp ??
(parentRun && body.options?.resumeParentOnCompletion
? parentRun.queueTimestamp ?? undefined
: undefined,
: undefined),
runChainState,
scheduleId: options.scheduleId,
scheduleInstanceId: options.scheduleInstanceId,
createdAt: options.overrideCreatedAt,
},
this.prisma
);
Expand Down
10 changes: 7 additions & 3 deletions apps/webapp/app/services/email.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { SendEmailOptions } from "remix-auth-email-link";
import { redirect } from "remix-typedjson";
import { env } from "~/env.server";
import type { AuthUser } from "./authUser";
import { workerQueue } from "./worker.server";
import { commonWorker } from "~/v3/commonWorker.server";
import { logger } from "./logger.server";
import { singleton } from "~/utils/singleton";
import { assertEmailAllowed } from "~/utils/email";
Expand Down Expand Up @@ -93,8 +93,12 @@ export async function sendPlainTextEmail(options: SendPlainTextOptions) {
}

export async function scheduleEmail(data: DeliverEmail, delay?: { seconds: number }) {
const runAt = delay ? new Date(Date.now() + delay.seconds * 1000) : undefined;
await workerQueue.enqueue("scheduleEmail", data, { runAt });
const availableAt = delay ? new Date(Date.now() + delay.seconds * 1000) : undefined;
await commonWorker.enqueue({
job: "scheduleEmail",
payload: data,
availableAt,
});
}

export async function sendEmail(data: DeliverEmail) {
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/services/runsRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ export class RunsRepository {

if (options.to) {
queryBuilder.where("created_at <= fromUnixTimestamp64Milli({to: Int64})", { to: options.to });
} else {
queryBuilder.where("created_at <= fromUnixTimestamp64Milli({to: Int64})", {
to: Date.now(),
});
}

if (typeof options.isTest === "boolean") {
Expand Down
Loading