-
+
diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug/route.tsx
index 7700b7e20d..5a32667a26 100644
--- a/apps/webapp/app/routes/_app.orgs.$organizationSlug/route.tsx
+++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug/route.tsx
@@ -11,6 +11,8 @@ import { getCachedUsage, getCurrentPlan } from "~/services/platform.v3.server";
import { requireUser } from "~/services/session.server";
import { telemetry } from "~/services/telemetry.server";
import { organizationPath } from "~/utils/pathBuilder";
+import { isEnvironmentPauseResumeFormSubmission } from "../_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route";
+import { logger } from "~/services/logger.server";
const ParamsSchema = z.object({
organizationSlug: z.string(),
@@ -45,6 +47,11 @@ export const shouldRevalidate: ShouldRevalidateFunction = (params) => {
}
}
+ // Invalidate if the environment has been paused or resumed
+ if (isEnvironmentPauseResumeFormSubmission(params.formMethod, params.formData)) {
+ return true;
+ }
+
// This prevents revalidation when there are search params changes
// IMPORTANT: If the loader function depends on search params, this should be updated
return params.currentUrl.pathname !== params.nextUrl.pathname;
diff --git a/apps/webapp/app/routes/api.v1.queues.$queueParam.pause.ts b/apps/webapp/app/routes/api.v1.queues.$queueParam.pause.ts
new file mode 100644
index 0000000000..452bd81746
--- /dev/null
+++ b/apps/webapp/app/routes/api.v1.queues.$queueParam.pause.ts
@@ -0,0 +1,46 @@
+import { json } from "@remix-run/server-runtime";
+import { type QueueItem, type RetrieveQueueParam, RetrieveQueueType } from "@trigger.dev/core/v3";
+import { z } from "zod";
+import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
+import { PauseQueueService } from "~/v3/services/pauseQueue.server";
+
+const BodySchema = z.object({
+ type: RetrieveQueueType.default("id"),
+ action: z.enum(["pause", "resume"]),
+});
+
+export const { action } = createActionApiRoute(
+ {
+ body: BodySchema,
+ params: z.object({
+ queueParam: z.string().transform((val) => val.replace(/%2F/g, "/")),
+ }),
+ },
+ async ({ params, body, authentication }) => {
+ const input: RetrieveQueueParam =
+ body.type === "id"
+ ? params.queueParam
+ : {
+ type: body.type,
+ name: decodeURIComponent(params.queueParam).replace(/%2F/g, "/"),
+ };
+
+ const service = new PauseQueueService();
+ const result = await service.call(
+ authentication.environment,
+ input,
+ body.action === "pause" ? "paused" : "resumed"
+ );
+
+ if (!result.success) {
+ if (result.code === "queue-not-found") {
+ return json({ error: result.code }, { status: 404 });
+ }
+
+ return json({ error: result.code }, { status: 400 });
+ }
+
+ const q: QueueItem = result.queue;
+ return json(q);
+ }
+);
diff --git a/apps/webapp/app/routes/api.v1.queues.$queueParam.ts b/apps/webapp/app/routes/api.v1.queues.$queueParam.ts
new file mode 100644
index 0000000000..a9bcd2342e
--- /dev/null
+++ b/apps/webapp/app/routes/api.v1.queues.$queueParam.ts
@@ -0,0 +1,45 @@
+import { json } from "@remix-run/server-runtime";
+import { type QueueItem, type RetrieveQueueParam, RetrieveQueueType } from "@trigger.dev/core/v3";
+import { z } from "zod";
+import { QueueRetrievePresenter } from "~/presenters/v3/QueueRetrievePresenter.server";
+import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
+
+const SearchParamsSchema = z.object({
+ type: RetrieveQueueType.default("id"),
+});
+
+export const loader = createLoaderApiRoute(
+ {
+ params: z.object({
+ queueParam: z.string().transform((val) => val.replace(/%2F/g, "/")),
+ }),
+ searchParams: SearchParamsSchema,
+ findResource: async () => 1, // This is a dummy function, we don't need to find a resource
+ },
+ async ({ params, searchParams, authentication }) => {
+ const input: RetrieveQueueParam =
+ searchParams.type === "id"
+ ? params.queueParam
+ : {
+ type: searchParams.type,
+ name: decodeURIComponent(params.queueParam).replace(/%2F/g, "/"),
+ };
+
+ const presenter = new QueueRetrievePresenter();
+ const result = await presenter.call({
+ environment: authentication.environment,
+ queueInput: input,
+ });
+
+ if (!result.success) {
+ if (result.code === "queue-not-found") {
+ return json({ error: result.code }, { status: 404 });
+ }
+
+ return json({ error: result.code }, { status: 400 });
+ }
+
+ const q: QueueItem = result.queue;
+ return json(q);
+ }
+);
diff --git a/apps/webapp/app/routes/api.v1.queues.ts b/apps/webapp/app/routes/api.v1.queues.ts
new file mode 100644
index 0000000000..551b3c2f34
--- /dev/null
+++ b/apps/webapp/app/routes/api.v1.queues.ts
@@ -0,0 +1,44 @@
+import { json } from "@remix-run/server-runtime";
+import { type QueueItem } from "@trigger.dev/core/v3";
+import { z } from "zod";
+import { QueueListPresenter } from "~/presenters/v3/QueueListPresenter.server";
+import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
+import { ServiceValidationError } from "~/v3/services/baseService.server";
+
+const SearchParamsSchema = z.object({
+ page: z.coerce.number().int().positive().optional(),
+ perPage: z.coerce.number().int().positive().optional(),
+});
+
+export const loader = createLoaderApiRoute(
+ {
+ searchParams: SearchParamsSchema,
+ findResource: async () => 1, // This is a dummy function, we don't need to find a resource
+ },
+ async ({ searchParams, authentication }) => {
+ const service = new QueueListPresenter(searchParams.perPage);
+
+ try {
+ const result = await service.call({
+ environment: authentication.environment,
+ page: searchParams.page ?? 1,
+ });
+
+ if (!result.success) {
+ return json({ error: result.code }, { status: 400 });
+ }
+
+ const queues: QueueItem[] = result.queues;
+ return json({ data: queues, pagination: result.pagination }, { status: 200 });
+ } catch (error) {
+ if (error instanceof ServiceValidationError) {
+ return json({ error: error.message }, { status: 422 });
+ }
+
+ return json(
+ { error: error instanceof Error ? error.message : "Internal Server Error" },
+ { status: 500 }
+ );
+ }
+ }
+);
diff --git a/apps/webapp/app/routes/orgs.$organizationSlug.projects.$projectParam.concurrency.ts b/apps/webapp/app/routes/orgs.$organizationSlug.projects.$projectParam.concurrency.ts
index 5a977cd5cc..caf714fd30 100644
--- a/apps/webapp/app/routes/orgs.$organizationSlug.projects.$projectParam.concurrency.ts
+++ b/apps/webapp/app/routes/orgs.$organizationSlug.projects.$projectParam.concurrency.ts
@@ -2,7 +2,7 @@ import { redirect, type LoaderFunctionArgs } from "@remix-run/server-runtime";
import { prisma } from "~/db.server";
import { SelectBestEnvironmentPresenter } from "~/presenters/SelectBestEnvironmentPresenter.server";
import { requireUser } from "~/services/session.server";
-import { ProjectParamSchema, v3ApiKeysPath, v3ConcurrencyPath } from "~/utils/pathBuilder";
+import { ProjectParamSchema, v3QueuesPath } from "~/utils/pathBuilder";
export const loader = async ({ request, params }: LoaderFunctionArgs) => {
const user = await requireUser(request);
@@ -39,5 +39,5 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
const selector = new SelectBestEnvironmentPresenter();
const environment = await selector.selectBestEnvironment(project.id, user, project.environments);
- return redirect(v3ConcurrencyPath({ slug: organizationSlug }, project, environment));
+ return redirect(v3QueuesPath({ slug: organizationSlug }, project, environment));
};
diff --git a/apps/webapp/app/routes/orgs.$organizationSlug.projects.$projectParam.env.$envParam.concurrency.ts b/apps/webapp/app/routes/orgs.$organizationSlug.projects.$projectParam.env.$envParam.concurrency.ts
new file mode 100644
index 0000000000..e5af3479c6
--- /dev/null
+++ b/apps/webapp/app/routes/orgs.$organizationSlug.projects.$projectParam.env.$envParam.concurrency.ts
@@ -0,0 +1,9 @@
+import { redirect, type LoaderFunctionArgs } from "@remix-run/server-runtime";
+import { EnvironmentParamSchema, v3QueuesPath } from "~/utils/pathBuilder";
+
+export const loader = async ({ request, params }: LoaderFunctionArgs) => {
+ const { organizationSlug, projectParam, envParam } = EnvironmentParamSchema.parse(params);
+ return redirect(
+ v3QueuesPath({ slug: organizationSlug }, { slug: projectParam }, { slug: envParam })
+ );
+};
diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues.stream.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues.stream.tsx
new file mode 100644
index 0000000000..007e2c4f7e
--- /dev/null
+++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues.stream.tsx
@@ -0,0 +1,55 @@
+import { $replica } from "~/db.server";
+import { env } from "~/env.server";
+import { logger } from "~/services/logger.server";
+import { requireUserId } from "~/services/session.server";
+import { EnvironmentParamSchema } from "~/utils/pathBuilder";
+import { createSSELoader } from "~/utils/sse";
+
+export const loader = createSSELoader({
+ timeout: env.QUEUE_SSE_AUTORELOAD_TIMEOUT_MS,
+ interval: env.QUEUE_SSE_AUTORELOAD_INTERVAL_MS,
+ debug: true,
+ handler: async ({ request, params }) => {
+ const userId = await requireUserId(request);
+ const { projectParam, envParam } = EnvironmentParamSchema.parse(params);
+
+ const environment = await $replica.runtimeEnvironment.findFirst({
+ where: {
+ slug: envParam,
+ type: "DEVELOPMENT",
+ orgMember: {
+ userId,
+ },
+ project: {
+ slug: projectParam,
+ },
+ },
+ });
+
+ if (!environment) {
+ throw new Response("Not Found", { status: 404 });
+ }
+
+ return {
+ beforeStream: async () => {
+ logger.debug("Start queue page SSE session", {
+ environmentId: environment.id,
+ });
+ },
+ initStream: async ({ send }) => {
+ send({ event: "time", data: new Date().toISOString() });
+ },
+ iterator: async ({ send }) => {
+ send({
+ event: "update",
+ data: new Date().toISOString(),
+ });
+ },
+ cleanup: async () => {
+ logger.debug("End queue page SSE session", {
+ environmentId: environment.id,
+ });
+ },
+ };
+ },
+});
diff --git a/apps/webapp/app/routes/storybook.info-panel/route.tsx b/apps/webapp/app/routes/storybook.info-panel/route.tsx
new file mode 100644
index 0000000000..0a5b4e3e77
--- /dev/null
+++ b/apps/webapp/app/routes/storybook.info-panel/route.tsx
@@ -0,0 +1,118 @@
+import {
+ BeakerIcon,
+ BellAlertIcon,
+ BookOpenIcon,
+ ClockIcon,
+ InformationCircleIcon,
+ PlusIcon,
+ RocketLaunchIcon,
+ ServerStackIcon,
+ Squares2X2Icon,
+} from "@heroicons/react/20/solid";
+import { InfoPanel } from "~/components/primitives/InfoPanel";
+import { TaskIcon } from "~/assets/icons/TaskIcon";
+
+export default function Story() {
+ return (
+
+
+ {/* Basic Info Panel */}
+
+ This is a basic info panel with title and default variant
+
+
+ {/* Info Panel with Button */}
+
+ This panel includes a button in the top-right corner
+
+
+ {/* Upgrade Variant with Button */}
+
+ This panel uses the upgrade variant with a call-to-action button
+
+
+ {/* Minimal Variant */}
+
+ A minimal variant without a title
+
+
+ {/* Task Panel with Action */}
+
+ A panel showing task information with a view action
+
+
+ {/* Getting Started Panel */}
+
+ Begin your journey with our quick start guide
+
+
+ {/* Deployment Panel with Button */}
+
+ Ready to deploy your changes to production
+
+
+ {/* Create New Panel */}
+
+ Start a new project with our guided setup
+
+
+ {/* Batches Panel */}
+
+ Information about batch processing
+
+
+ {/* Documentation Panel with Link */}
+
+ Access our comprehensive documentation
+
+
+
+ );
+}
diff --git a/apps/webapp/app/routes/storybook/route.tsx b/apps/webapp/app/routes/storybook/route.tsx
index bd451f6147..995bfdf50e 100644
--- a/apps/webapp/app/routes/storybook/route.tsx
+++ b/apps/webapp/app/routes/storybook/route.tsx
@@ -52,6 +52,10 @@ const stories: Story[] = [
name: "Free plan usage",
slug: "free-plan-usage",
},
+ {
+ name: "Info panel",
+ slug: "info-panel",
+ },
{
name: "Inline code",
slug: "inline-code",
diff --git a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
index d28945218f..fae78713db 100644
--- a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
+++ b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
@@ -39,7 +39,12 @@ type ApiKeyRouteBuilderOptions<
params: TParamsSchema extends z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion
? z.infer
: undefined,
- authentication: ApiAuthenticationResultSuccess
+ authentication: ApiAuthenticationResultSuccess,
+ searchParams: TSearchParamsSchema extends
+ | z.ZodFirstPartySchemaTypes
+ | z.ZodDiscriminatedUnion
+ ? z.infer
+ : undefined
) => Promise;
shouldRetryNotFound?: boolean;
authorization?: {
@@ -179,7 +184,7 @@ export function createLoaderApiRoute<
}
// Find the resource
- const resource = await findResource(parsedParams, authenticationResult);
+ const resource = await findResource(parsedParams, authenticationResult, parsedSearchParams);
if (!resource) {
return await wrapResponse(
diff --git a/apps/webapp/app/utils/pathBuilder.ts b/apps/webapp/app/utils/pathBuilder.ts
index 332ed0aa0f..b46c3cac13 100644
--- a/apps/webapp/app/utils/pathBuilder.ts
+++ b/apps/webapp/app/utils/pathBuilder.ts
@@ -171,14 +171,6 @@ export function v3EnvironmentVariablesPath(
return `${v3EnvironmentPath(organization, project, environment)}/environment-variables`;
}
-export function v3ConcurrencyPath(
- organization: OrgForPath,
- project: ProjectForPath,
- environment: EnvironmentForPath
-) {
- return `${v3EnvironmentPath(organization, project, environment)}/concurrency`;
-}
-
export function v3NewEnvironmentVariablesPath(
organization: OrgForPath,
project: ProjectForPath,
@@ -311,6 +303,14 @@ export function v3NewSchedulePath(
return `${v3EnvironmentPath(organization, project, environment)}/schedules/new`;
}
+export function v3QueuesPath(
+ organization: OrgForPath,
+ project: ProjectForPath,
+ environment: EnvironmentForPath
+) {
+ return `${v3EnvironmentPath(organization, project, environment)}/queues`;
+}
+
export function v3BatchesPath(
organization: OrgForPath,
project: ProjectForPath,
diff --git a/apps/webapp/app/utils/sse.ts b/apps/webapp/app/utils/sse.ts
index 9f3452cf93..8f396c092e 100644
--- a/apps/webapp/app/utils/sse.ts
+++ b/apps/webapp/app/utils/sse.ts
@@ -53,6 +53,26 @@ export function createSSELoader(options: SSEOptions) {
);
};
+ const createSafeSend = (originalSend: SendFunction): SendFunction => {
+ return (event) => {
+ try {
+ if (!internalController.signal.aborted) {
+ originalSend(event);
+ }
+ // If controller is aborted, silently ignore the send attempt
+ } catch (error) {
+ if (error instanceof Error) {
+ if (error.message?.includes("Controller is already closed")) {
+ // Silently handle controller closed errors
+ return;
+ }
+ log(`Error sending event: ${error.message}`);
+ }
+ throw error; // Re-throw other errors
+ }
+ };
+ };
+
const context: SSEContext = {
id,
request,
@@ -115,12 +135,13 @@ export function createSSELoader(options: SSEOptions) {
return eventStream(combinedSignal, function setup(send) {
connections.add(id);
+ const safeSend = createSafeSend(send);
async function run() {
try {
log("Initializing");
if (handlers.initStream) {
- const shouldContinue = await handlers.initStream({ send });
+ const shouldContinue = await handlers.initStream({ send: safeSend });
if (shouldContinue === false) {
log("initStream returned false, so we'll stop the stream");
internalController.abort("Init requested stop");
@@ -138,7 +159,7 @@ export function createSSELoader(options: SSEOptions) {
if (handlers.iterator) {
try {
- const shouldContinue = await handlers.iterator({ date, send });
+ const shouldContinue = await handlers.iterator({ date, send: safeSend });
if (shouldContinue === false) {
log("iterator return false, so we'll stop the stream");
internalController.abort("Iterator requested stop");
@@ -173,7 +194,7 @@ export function createSSELoader(options: SSEOptions) {
log("Cleanup called");
if (handlers.cleanup) {
try {
- handlers.cleanup({ send });
+ handlers.cleanup({ send: safeSend });
} catch (error) {
log(
`Error in cleanup handler: ${
diff --git a/apps/webapp/app/v3/engineVersion.server.ts b/apps/webapp/app/v3/engineVersion.server.ts
index 1b514fc398..c1a32052ff 100644
--- a/apps/webapp/app/v3/engineVersion.server.ts
+++ b/apps/webapp/app/v3/engineVersion.server.ts
@@ -1,17 +1,25 @@
-import { RunEngineVersion, RuntimeEnvironmentType } from "@trigger.dev/database";
-import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
+import { RunEngineVersion, type RuntimeEnvironmentType } from "@trigger.dev/database";
import {
findCurrentWorkerDeploymentWithoutTasks,
findCurrentWorkerFromEnvironment,
} from "./models/workerDeployment.server";
import { $replica } from "~/db.server";
+type Environment = {
+ id: string;
+ type: RuntimeEnvironmentType;
+ project: {
+ id: string;
+ engine: RunEngineVersion;
+ };
+};
+
export async function determineEngineVersion({
environment,
workerVersion,
engineVersion: version,
}: {
- environment: AuthenticatedEnvironment;
+ environment: Environment;
workerVersion?: string;
engineVersion?: RunEngineVersion;
}): Promise {
@@ -36,7 +44,7 @@ export async function determineEngineVersion({
},
where: {
projectId_runtimeEnvironmentId_version: {
- projectId: environment.projectId,
+ projectId: environment.project.id,
runtimeEnvironmentId: environment.id,
version: workerVersion,
},
diff --git a/apps/webapp/app/v3/runQueue.server.ts b/apps/webapp/app/v3/runQueue.server.ts
index 7198456d39..e7aa13c5c5 100644
--- a/apps/webapp/app/v3/runQueue.server.ts
+++ b/apps/webapp/app/v3/runQueue.server.ts
@@ -1,14 +1,22 @@
-import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
+import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { marqs } from "./marqs/index.server";
import { engine } from "./runEngine.server";
//This allows us to update MARQS and the RunQueue
/** Updates MARQS and the RunQueue limits */
-export async function updateEnvConcurrencyLimits(environment: AuthenticatedEnvironment) {
+export async function updateEnvConcurrencyLimits(
+ environment: AuthenticatedEnvironment,
+ maximumConcurrencyLimit?: number
+) {
+ let updatedEnvironment = environment;
+ if (maximumConcurrencyLimit !== undefined) {
+ updatedEnvironment.maximumConcurrencyLimit = maximumConcurrencyLimit;
+ }
+
await Promise.allSettled([
- marqs?.updateEnvConcurrencyLimits(environment),
- engine.runQueue.updateEnvConcurrencyLimits(environment),
+ marqs?.updateEnvConcurrencyLimits(updatedEnvironment),
+ engine.runQueue.updateEnvConcurrencyLimits(updatedEnvironment),
]);
}
diff --git a/apps/webapp/app/v3/services/pauseEnvironment.server.ts b/apps/webapp/app/v3/services/pauseEnvironment.server.ts
new file mode 100644
index 0000000000..a3e029e565
--- /dev/null
+++ b/apps/webapp/app/v3/services/pauseEnvironment.server.ts
@@ -0,0 +1,68 @@
+import { type AuthenticatedEnvironment } from "@internal/testcontainers";
+import { type PrismaClientOrTransaction } from "@trigger.dev/database";
+import { prisma } from "~/db.server";
+import { logger } from "~/services/logger.server";
+import { updateEnvConcurrencyLimits } from "../runQueue.server";
+import { WithRunEngine } from "./baseService.server";
+
+export type PauseStatus = "paused" | "resumed";
+
+export type PauseEnvironmentResult =
+ | {
+ success: true;
+ state: PauseStatus;
+ }
+ | {
+ success: false;
+ error: string;
+ };
+
+export class PauseEnvironmentService extends WithRunEngine {
+ constructor(protected readonly _prisma: PrismaClientOrTransaction = prisma) {
+ super({ prisma });
+ }
+
+ public async call(
+ environment: AuthenticatedEnvironment,
+ action: PauseStatus
+ ): Promise {
+ try {
+ await this._prisma.runtimeEnvironment.update({
+ where: {
+ id: environment.id,
+ },
+ data: {
+ paused: action === "paused",
+ },
+ });
+
+ if (action === "paused") {
+ logger.debug("PauseEnvironmentService: pausing environment", {
+ environmentId: environment.id,
+ });
+ await updateEnvConcurrencyLimits(environment, 0);
+ } else {
+ logger.debug("PauseEnvironmentService: resuming environment", {
+ environmentId: environment.id,
+ });
+ await updateEnvConcurrencyLimits(environment);
+ }
+
+ return {
+ success: true,
+ state: action,
+ };
+ } catch (error) {
+ logger.error("PauseEnvironmentService: error pausing environment", {
+ action,
+ environmentId: environment.id,
+ error,
+ });
+
+ return {
+ success: false,
+ error: error instanceof Error ? error.message : "Unknown error",
+ };
+ }
+ }
+}
diff --git a/apps/webapp/app/v3/services/pauseQueue.server.ts b/apps/webapp/app/v3/services/pauseQueue.server.ts
new file mode 100644
index 0000000000..f4e18eab4b
--- /dev/null
+++ b/apps/webapp/app/v3/services/pauseQueue.server.ts
@@ -0,0 +1,107 @@
+import { QueueItem, type RetrieveQueueParam } from "@trigger.dev/core/v3";
+import { getQueue, toQueueItem } from "~/presenters/v3/QueueRetrievePresenter.server";
+import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
+import { logger } from "~/services/logger.server";
+import { BaseService } from "./baseService.server";
+import { determineEngineVersion } from "../engineVersion.server";
+import { removeQueueConcurrencyLimits, updateQueueConcurrencyLimits } from "../runQueue.server";
+import { engine } from "../runEngine.server";
+
+export type PauseStatus = "paused" | "resumed";
+
+export type PauseQueueResult =
+ | {
+ success: true;
+ state: PauseStatus;
+ queue: QueueItem;
+ }
+ | {
+ success: false;
+ code: "queue-not-found" | "unknown-error" | "engine-version";
+ error?: string;
+ };
+
+export class PauseQueueService extends BaseService {
+ public async call(
+ environment: AuthenticatedEnvironment,
+ queueInput: RetrieveQueueParam,
+ action: PauseStatus
+ ): Promise {
+ try {
+ //check the engine is the correct version
+ const engineVersion = await determineEngineVersion({ environment });
+
+ if (engineVersion === "V1") {
+ return {
+ success: false as const,
+ code: "engine-version",
+ error: "Upgrade to v4+ to pause/resume queues",
+ };
+ }
+
+ const queue = await getQueue(this._prisma, environment, queueInput);
+
+ if (!queue) {
+ return {
+ success: false,
+ code: "queue-not-found",
+ };
+ }
+
+ const updatedQueue = await this._prisma.taskQueue.update({
+ where: {
+ id: queue.id,
+ },
+ data: {
+ paused: action === "paused",
+ },
+ });
+
+ if (action === "paused") {
+ await updateQueueConcurrencyLimits(environment, queue.name, 0);
+ } else {
+ if (queue.concurrencyLimit) {
+ await updateQueueConcurrencyLimits(environment, queue.name, queue.concurrencyLimit);
+ } else {
+ await removeQueueConcurrencyLimits(environment, queue.name);
+ }
+ }
+
+ logger.debug("PauseQueueService: queue state updated", {
+ queueId: queue.id,
+ action,
+ environmentId: environment.id,
+ });
+
+ const results = await Promise.all([
+ engine.lengthOfQueues(environment, [queue.name]),
+ engine.currentConcurrencyOfQueues(environment, [queue.name]),
+ ]);
+
+ return {
+ success: true,
+ state: action,
+ queue: toQueueItem({
+ friendlyId: updatedQueue.friendlyId,
+ name: updatedQueue.name,
+ type: updatedQueue.type,
+ running: results[1]?.[updatedQueue.name] ?? 0,
+ queued: results[0]?.[updatedQueue.name] ?? 0,
+ concurrencyLimit: updatedQueue.concurrencyLimit ?? null,
+ paused: updatedQueue.paused,
+ }),
+ };
+ } catch (error) {
+ logger.error("PauseQueueService: error updating queue state", {
+ error,
+ environmentId: environment.id,
+ });
+
+ return {
+ success: false,
+ code: "unknown-error",
+ error: error instanceof Error ? error.message : "Unknown error",
+ };
+ }
+ }
+}
diff --git a/internal-packages/database/prisma/migrations/20250318090847_pause_queues_and_environments/migration.sql b/internal-packages/database/prisma/migrations/20250318090847_pause_queues_and_environments/migration.sql
new file mode 100644
index 0000000000..4daf97b608
--- /dev/null
+++ b/internal-packages/database/prisma/migrations/20250318090847_pause_queues_and_environments/migration.sql
@@ -0,0 +1,7 @@
+-- AlterTable
+ALTER TABLE "RuntimeEnvironment"
+ADD COLUMN "paused" BOOLEAN NOT NULL DEFAULT false;
+
+-- AlterTable
+ALTER TABLE "TaskQueue"
+ADD COLUMN "paused" BOOLEAN NOT NULL DEFAULT false;
\ No newline at end of file
diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma
index 0398103546..84d8c55702 100644
--- a/internal-packages/database/prisma/schema.prisma
+++ b/internal-packages/database/prisma/schema.prisma
@@ -382,7 +382,8 @@ model RuntimeEnvironment {
///A memorable code for the environment
shortcode String
- maximumConcurrencyLimit Int @default(5)
+ maximumConcurrencyLimit Int @default(5)
+ paused Boolean @default(false)
autoEnableInternalSources Boolean @default(true)
@@ -2523,6 +2524,8 @@ model TaskQueue {
concurrencyLimit Int?
rateLimit Json?
+ paused Boolean @default(false)
+
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts
index 834652fad6..46556b9588 100644
--- a/internal-packages/run-engine/src/engine/index.ts
+++ b/internal-packages/run-engine/src/engine/index.ts
@@ -1652,12 +1652,24 @@ export class RunEngine {
return this.runQueue.lengthOfEnvQueue(environment);
}
- async currentConcurrencyOfEnvQueue(
- environment: MinimalAuthenticatedEnvironment
- ): Promise {
+ async concurrencyOfEnvQueue(environment: MinimalAuthenticatedEnvironment): Promise {
return this.runQueue.currentConcurrencyOfEnvironment(environment);
}
+ async lengthOfQueues(
+ environment: MinimalAuthenticatedEnvironment,
+ queues: string[]
+ ): Promise> {
+ return this.runQueue.lengthOfQueues(environment, queues);
+ }
+
+ async currentConcurrencyOfQueues(
+ environment: MinimalAuthenticatedEnvironment,
+ queues: string[]
+ ): Promise> {
+ return this.runQueue.currentConcurrencyOfQueues(environment, queues);
+ }
+
/**
* This creates a DATETIME waitpoint, that will be completed automatically when the specified date is reached.
* If you pass an `idempotencyKey`, the waitpoint will be created only if it doesn't already exist.
diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts
index 374d224aff..dcedf121ec 100644
--- a/internal-packages/run-engine/src/run-queue/index.ts
+++ b/internal-packages/run-engine/src/run-queue/index.ts
@@ -184,6 +184,76 @@ export class RunQueue {
return this.redis.scard(this.keys.currentConcurrencyKey(env, queue, concurrencyKey));
}
+ public async currentConcurrencyOfQueues(
+ env: MinimalAuthenticatedEnvironment,
+ queues: string[]
+ ): Promise> {
+ const pipeline = this.redis.pipeline();
+
+ // Queue up all SCARD commands in the pipeline
+ queues.forEach((queue) => {
+ pipeline.scard(this.keys.currentConcurrencyKey(env, queue));
+ });
+
+ // Execute pipeline and get results
+ const results = await pipeline.exec();
+
+ // If results is null, return all queues with 0 concurrency
+ if (!results) {
+ return queues.reduce(
+ (acc, queue) => {
+ acc[queue] = 0;
+ return acc;
+ },
+ {} as Record
+ );
+ }
+
+ // Map results back to queue names, handling potential errors
+ return queues.reduce(
+ (acc, queue, index) => {
+ const [err, value] = results[index];
+ // If there was an error or value is null/undefined, use 0
+ acc[queue] = err || value == null ? 0 : (value as number);
+ return acc;
+ },
+ {} as Record
+ );
+ }
+
+ public async lengthOfQueues(
+ env: MinimalAuthenticatedEnvironment,
+ queues: string[]
+ ): Promise> {
+ const pipeline = this.redis.pipeline();
+
+ // Queue up all ZCARD commands in the pipeline
+ queues.forEach((queue) => {
+ pipeline.zcard(this.keys.queueKey(env, queue));
+ });
+
+ const results = await pipeline.exec();
+
+ if (!results) {
+ return queues.reduce(
+ (acc, queue) => {
+ acc[queue] = 0;
+ return acc;
+ },
+ {} as Record
+ );
+ }
+
+ return queues.reduce(
+ (acc, queue, index) => {
+ const [err, value] = results![index];
+ acc[queue] = err || value == null ? 0 : (value as number);
+ return acc;
+ },
+ {} as Record
+ );
+ }
+
public async currentConcurrencyOfEnvironment(env: MinimalAuthenticatedEnvironment) {
return this.redis.scard(this.keys.envCurrentConcurrencyKey(env));
}
diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts
index e92a92047b..845b6b4310 100644
--- a/packages/core/src/index.ts
+++ b/packages/core/src/index.ts
@@ -1,3 +1,4 @@
export * from "./types.js";
export * from "./utils.js";
export * from "./schemas/json.js";
+export * from "./version.js";
diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts
index a10171d0ed..668a5c34a6 100644
--- a/packages/core/src/v3/apiClient/index.ts
+++ b/packages/core/src/v3/apiClient/index.ts
@@ -18,11 +18,14 @@ import {
EnvironmentVariableResponseBody,
EnvironmentVariableValue,
EnvironmentVariables,
+ ListQueueOptions,
ListRunResponseItem,
ListScheduleOptions,
+ QueueItem,
ReplayRunResponse,
RescheduleRunRequestBody,
RetrieveBatchV2Response,
+ RetrieveQueueParam,
RetrieveRunResponse,
ScheduleObject,
TaskRunExecutionResult,
@@ -716,6 +719,76 @@ export class ApiClient {
);
}
+ listQueues(options?: ListQueueOptions, requestOptions?: ZodFetchOptions) {
+ const searchParams = new URLSearchParams();
+
+ if (options?.page) {
+ searchParams.append("page", options.page.toString());
+ }
+
+ if (options?.perPage) {
+ searchParams.append("perPage", options.perPage.toString());
+ }
+
+ return zodfetchOffsetLimitPage(
+ QueueItem,
+ `${this.baseUrl}/api/v1/queues`,
+ {
+ page: options?.page,
+ limit: options?.perPage,
+ },
+ {
+ method: "GET",
+ headers: this.#getHeaders(false),
+ },
+ mergeRequestOptions(this.defaultRequestOptions, requestOptions)
+ );
+ }
+
+ retrieveQueue(queue: RetrieveQueueParam, requestOptions?: ZodFetchOptions) {
+ const type = typeof queue === "string" ? "id" : queue.type;
+ const value = typeof queue === "string" ? queue : queue.name;
+
+ // Explicitly encode slashes before encoding the rest of the string
+ const encodedValue = encodeURIComponent(value.replace(/\//g, "%2F"));
+
+ return zodfetch(
+ QueueItem,
+ `${this.baseUrl}/api/v1/queues/${encodedValue}?type=${type}`,
+ {
+ method: "GET",
+ headers: this.#getHeaders(false),
+ },
+ mergeRequestOptions(this.defaultRequestOptions, requestOptions)
+ );
+ }
+
+ pauseQueue(
+ queue: RetrieveQueueParam,
+ action: "pause" | "resume",
+ requestOptions?: ZodFetchOptions
+ ) {
+ const type = typeof queue === "string" ? "id" : queue.type;
+ const value = typeof queue === "string" ? queue : queue.name;
+
+ // Explicitly encode slashes before encoding the rest of the string
+ const encodedValue = encodeURIComponent(value.replace(/\//g, "%2F"));
+
+ return zodfetch(
+ QueueItem,
+ `${this.baseUrl}/api/v1/queues/${encodedValue}/pause`,
+ {
+ method: "POST",
+ headers: this.#getHeaders(false),
+ body: JSON.stringify({
+ type,
+ action,
+ }),
+ },
+ mergeRequestOptions(this.defaultRequestOptions, requestOptions)
+ );
+ }
+
subscribeToRun(
runId: string,
options?: {
diff --git a/packages/core/src/v3/schemas/index.ts b/packages/core/src/v3/schemas/index.ts
index 0f0c753122..c2b17a72b6 100644
--- a/packages/core/src/v3/schemas/index.ts
+++ b/packages/core/src/v3/schemas/index.ts
@@ -14,3 +14,4 @@ export * from "./runEngine.js";
export * from "./webhooks.js";
export * from "./checkpoints.js";
export * from "./warmStart.js";
+export * from "./queues.js";
diff --git a/packages/core/src/v3/schemas/queues.ts b/packages/core/src/v3/schemas/queues.ts
new file mode 100644
index 0000000000..2b511eb44c
--- /dev/null
+++ b/packages/core/src/v3/schemas/queues.ts
@@ -0,0 +1,78 @@
+import { z } from "zod";
+
+const queueTypes = ["task", "custom"] as const;
+
+/**
+ * The type of queue, either "task" or "custom"
+ * "task" are created automatically for each task.
+ * "custom" are created by you explicitly in your code.
+ * */
+export const QueueType = z.enum(queueTypes);
+export type QueueType = z.infer;
+
+export const RetrieveQueueType = z.enum([...queueTypes, "id"]);
+export type RetrieveQueueType = z.infer;
+
+export const QueueItem = z.object({
+ /** The queue id, e.g. queue_12345 */
+ id: z.string(),
+ /** The queue name */
+ name: z.string(),
+ /**
+ * The queue type, either "task" or "custom"
+ * "task" are created automatically for each task.
+ * "custom" are created by you explicitly in your code.
+ * */
+ type: QueueType,
+ /** The number of runs currently running */
+ running: z.number(),
+ /** The number of runs currently queued */
+ queued: z.number(),
+ /** The concurrency limit of the queue */
+ concurrencyLimit: z.number().nullable(),
+ /** Whether the queue is paused. If it's paused, no new runs will be started. */
+ paused: z.boolean(),
+});
+
+export type QueueItem = z.infer;
+
+export const ListQueueOptions = z.object({
+ /** The page number */
+ page: z.number().optional(),
+ /** The number of queues per page */
+ perPage: z.number().optional(),
+});
+
+export type ListQueueOptions = z.infer;
+
+/**
+ * When retrieving a queue you can either use the queue id,
+ * or the type and name.
+ *
+ * @example
+ *
+ * ```ts
+ * // Use a queue id (they start with queue_
+ * const q1 = await queues.retrieve("queue_12345");
+ *
+ * // Or use the type and name
+ * // The default queue for your "my-task-id"
+ * const q2 = await queues.retrieve({ type: "task", name: "my-task-id"});
+ *
+ * // The custom queue you defined in your code
+ * const q3 = await queues.retrieve({ type: "custom", name: "my-custom-queue" });
+ * ```
+ */
+export const RetrieveQueueParam = z.union([
+ z.string(),
+ z.object({
+ /** "task" or "custom" */
+ type: QueueType,
+ /** The name of your queue.
+ * For "task" type it will be the task id, for "custom" it will be the name you specified.
+ * */
+ name: z.string(),
+ }),
+]);
+
+export type RetrieveQueueParam = z.infer;
diff --git a/packages/react-hooks/src/package.json b/packages/react-hooks/src/package.json
deleted file mode 100644
index 5bbefffbab..0000000000
--- a/packages/react-hooks/src/package.json
+++ /dev/null
@@ -1,3 +0,0 @@
-{
- "type": "commonjs"
-}
diff --git a/packages/trigger-sdk/src/v3/index.ts b/packages/trigger-sdk/src/v3/index.ts
index f83254b8c9..5f00a4a3e1 100644
--- a/packages/trigger-sdk/src/v3/index.ts
+++ b/packages/trigger-sdk/src/v3/index.ts
@@ -48,6 +48,7 @@ export {
} from "./runs.js";
export * as schedules from "./schedules/index.js";
export * as envvars from "./envvars.js";
+export * as queues from "./queues.js";
export type { ImportEnvironmentVariablesParams } from "./envvars.js";
export { configure, auth } from "./auth.js";
diff --git a/packages/trigger-sdk/src/v3/queues.ts b/packages/trigger-sdk/src/v3/queues.ts
new file mode 100644
index 0000000000..6788186f9d
--- /dev/null
+++ b/packages/trigger-sdk/src/v3/queues.ts
@@ -0,0 +1,177 @@
+import {
+ accessoryAttributes,
+ apiClientManager,
+ ApiPromise,
+ ApiRequestOptions,
+ flattenAttributes,
+ ListQueueOptions,
+ mergeRequestOptions,
+ OffsetLimitPagePromise,
+ QueueItem,
+ RetrieveQueueParam,
+} from "@trigger.dev/core/v3";
+import { tracer } from "./tracer.js";
+
+/**
+ * Lists queues
+ * @param options - The list options
+ * @param options.page - The page number
+ * @param options.perPage - The number of queues per page
+ * @returns The list of queues
+ */
+export function list(
+ options?: ListQueueOptions,
+ requestOptions?: ApiRequestOptions
+): OffsetLimitPagePromise {
+ const apiClient = apiClientManager.clientOrThrow();
+
+ const $requestOptions = mergeRequestOptions(
+ {
+ tracer,
+ name: "queues.list()",
+ icon: "queue",
+ },
+ requestOptions
+ );
+
+ return apiClient.listQueues(options, $requestOptions);
+}
+
+/**
+ * When retrieving a queue you can either use the queue id,
+ * or the type and name.
+ *
+ * @example
+ *
+ * ```ts
+ * // Use a queue id (they start with queue_
+ * const q1 = await queues.retrieve("queue_12345");
+ *
+ * // Or use the type and name
+ * // The default queue for your "my-task-id"
+ * const q2 = await queues.retrieve({ type: "task", name: "my-task-id"});
+ *
+ * // The custom queue you defined in your code
+ * const q3 = await queues.retrieve({ type: "custom", name: "my-custom-queue" });
+ * ```
+ * @param queue - The ID of the queue to retrieve, or the type and name
+ * @returns The retrieved queue
+ */
+export function retrieve(
+ queue: RetrieveQueueParam,
+ requestOptions?: ApiRequestOptions
+): ApiPromise {
+ const apiClient = apiClientManager.clientOrThrow();
+
+ const $requestOptions = mergeRequestOptions(
+ {
+ tracer,
+ name: "queues.retrieve()",
+ icon: "queue",
+ attributes: {
+ ...flattenAttributes({ queue }),
+ ...accessoryAttributes({
+ items: [
+ {
+ text: typeof queue === "string" ? queue : queue.name,
+ variant: "normal",
+ },
+ ],
+ style: "codepath",
+ }),
+ },
+ },
+ requestOptions
+ );
+
+ return apiClient.retrieveQueue(queue, $requestOptions);
+}
+
+/**
+ * Pauses a queue, preventing any new runs from being started.
+ * Runs that are currently running will continue to completion.
+ *
+ * @example
+ * ```ts
+ * // Pause using a queue id
+ * await queues.pause("queue_12345");
+ *
+ * // Or pause using type and name
+ * await queues.pause({ type: "task", name: "my-task-id"});
+ * ```
+ * @param queue - The ID of the queue to pause, or the type and name
+ * @returns The updated queue state
+ */
+export function pause(
+ queue: RetrieveQueueParam,
+ requestOptions?: ApiRequestOptions
+): ApiPromise {
+ const apiClient = apiClientManager.clientOrThrow();
+
+ const $requestOptions = mergeRequestOptions(
+ {
+ tracer,
+ name: "queues.pause()",
+ icon: "queue",
+ attributes: {
+ ...flattenAttributes({ queue }),
+ ...accessoryAttributes({
+ items: [
+ {
+ text: typeof queue === "string" ? queue : queue.name,
+ variant: "normal",
+ },
+ ],
+ style: "codepath",
+ }),
+ },
+ },
+ requestOptions
+ );
+
+ return apiClient.pauseQueue(queue, "pause", $requestOptions);
+}
+
+/**
+ * Resumes a paused queue, allowing new runs to be started.
+ *
+ * @example
+ * ```ts
+ * // Resume using a queue id
+ * await queues.resume("queue_12345");
+ *
+ * // Or resume using type and name
+ * await queues.resume({ type: "task", name: "my-task-id"});
+ * ```
+ * @param queue - The ID of the queue to resume, or the type and name
+ * @returns The updated queue state
+ */
+export function resume(
+ queue: RetrieveQueueParam,
+ requestOptions?: ApiRequestOptions
+): ApiPromise {
+ const apiClient = apiClientManager.clientOrThrow();
+
+ const $requestOptions = mergeRequestOptions(
+ {
+ tracer,
+ name: "queues.resume()",
+ icon: "queue",
+ attributes: {
+ ...flattenAttributes({ queue }),
+ ...accessoryAttributes({
+ items: [
+ {
+ text: typeof queue === "string" ? queue : queue.name,
+ variant: "normal",
+ },
+ ],
+ style: "codepath",
+ }),
+ },
+ },
+ requestOptions
+ );
+
+ return apiClient.pauseQueue(queue, "resume", $requestOptions);
+}
diff --git a/references/hello-world/src/trigger/queues.ts b/references/hello-world/src/trigger/queues.ts
new file mode 100644
index 0000000000..efca8fe0e5
--- /dev/null
+++ b/references/hello-world/src/trigger/queues.ts
@@ -0,0 +1,52 @@
+import { logger, queues, task } from "@trigger.dev/sdk/v3";
+
+export const queuesTester = task({
+ id: "queues-tester",
+ run: async (payload: any, { ctx }) => {
+ const q = await queues.list();
+
+ for await (const queue of q) {
+ logger.log("Queue", { queue });
+ }
+
+ const retrievedFromId = await queues.retrieve(ctx.queue.id);
+ logger.log("Retrieved from ID", { retrievedFromId });
+
+ const retrievedFromCtxName = await queues.retrieve({
+ type: "task",
+ name: ctx.queue.name,
+ });
+ logger.log("Retrieved from name", { retrievedFromCtxName });
+
+ //pause the queue
+ const pausedQueue = await queues.pause({
+ type: "task",
+ name: "queues-tester",
+ });
+ logger.log("Paused queue", { pausedQueue });
+
+ const retrievedFromName = await queues.retrieve({
+ type: "task",
+ name: "queues-tester",
+ });
+ logger.log("Retrieved from name", { retrievedFromName });
+
+ //resume the queue
+ const resumedQueue = await queues.resume({
+ type: "task",
+ name: "queues-tester",
+ });
+ logger.log("Resumed queue", { resumedQueue });
+ },
+});
+
+export const otherQueueTask = task({
+ id: "other-queue-task",
+ queue: {
+ name: "my-custom-queue",
+ concurrencyLimit: 1,
+ },
+ run: async (payload: any, { ctx }) => {
+ logger.log("Other queue task", { payload });
+ },
+});