diff --git a/apps/webapp/app/components/navigation/SideMenu.tsx b/apps/webapp/app/components/navigation/SideMenu.tsx index 76c974d596..9bbf970858 100644 --- a/apps/webapp/app/components/navigation/SideMenu.tsx +++ b/apps/webapp/app/components/navigation/SideMenu.tsx @@ -1,4 +1,5 @@ import { + AdjustmentsHorizontalIcon, ArrowPathRoundedSquareIcon, ArrowRightOnRectangleIcon, BeakerIcon, @@ -50,6 +51,7 @@ import { adminPath, branchesPath, concurrencyPath, + limitsPath, logoutPath, newOrganizationPath, newProjectPath, @@ -349,7 +351,7 @@ export function SideMenu({ @@ -357,11 +359,18 @@ export function SideMenu({ } /> + + createRedisRateLimitClient({ + port: env.RATE_LIMIT_REDIS_PORT, + host: env.RATE_LIMIT_REDIS_HOST, + username: env.RATE_LIMIT_REDIS_USERNAME, + password: env.RATE_LIMIT_REDIS_PASSWORD, + tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "true", + clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1", + }) +); + +// Types for rate limit display +export type RateLimitInfo = { + name: string; + description: string; + config: RateLimiterConfig; + currentTokens: number | null; +}; + +// Types for quota display +export type QuotaInfo = { + name: string; + description: string; + limit: number | null; + currentUsage: number; + source: "default" | "plan" | "override"; + canExceed?: boolean; + isUpgradable?: boolean; +}; + +// Types for feature flags +export type FeatureInfo = { + name: string; + description: string; + enabled: boolean; + value?: string | number; +}; + +export type LimitsResult = { + rateLimits: { + api: RateLimitInfo; + batch: RateLimitInfo; + }; + quotas: { + projects: QuotaInfo; + schedules: QuotaInfo | null; + teamMembers: QuotaInfo | null; + alerts: QuotaInfo | null; + branches: QuotaInfo | null; + logRetentionDays: QuotaInfo | null; + realtimeConnections: QuotaInfo | null; + batchProcessingConcurrency: QuotaInfo; + devQueueSize: QuotaInfo; + deployedQueueSize: QuotaInfo; + }; + features: { + hasStagingEnvironment: FeatureInfo; + support: FeatureInfo; + includedUsage: FeatureInfo; + }; + planName: string | null; + organizationId: string; + isOnTopPlan: boolean; +}; + +export class LimitsPresenter extends BasePresenter { + public async call({ + organizationId, + projectId, + environmentApiKey, + }: { + organizationId: string; + projectId: string; + environmentApiKey: string; + }): Promise { + // Get organization with all limit-related fields + const organization = await this._replica.organization.findFirstOrThrow({ + where: { id: organizationId }, + select: { + id: true, + maximumConcurrencyLimit: true, + maximumProjectCount: true, + maximumDevQueueSize: true, + maximumDeployedQueueSize: true, + apiRateLimiterConfig: true, + batchRateLimitConfig: true, + batchQueueConcurrencyConfig: true, + _count: { + select: { + projects: { + where: { deletedAt: null }, + }, + members: true, + }, + }, + }, + }); + + // Get current plan from billing service + const currentPlan = await getCurrentPlan(organizationId); + const limits = currentPlan?.v3Subscription?.plan?.limits; + const isOnTopPlan = currentPlan?.v3Subscription?.plan?.code === "v3_pro_1"; + + // Resolve rate limit configs (org override or default) + const apiRateLimitConfig = resolveApiRateLimitConfig(organization.apiRateLimiterConfig); + const batchRateLimitConfig = resolveBatchRateLimitConfig(organization.batchRateLimitConfig); + + // Resolve batch concurrency config + const batchConcurrencyConfig = resolveBatchConcurrencyConfig( + organization.batchQueueConcurrencyConfig + ); + const batchConcurrencySource = organization.batchQueueConcurrencyConfig + ? "override" + : "default"; + + // Get schedule count for this org + const scheduleCount = await CheckScheduleService.getUsedSchedulesCount({ + prisma: this._replica, + projectId, + }); + + // Get alert channel count for this org + const alertChannelCount = await this._replica.projectAlertChannel.count({ + where: { + projectId, + }, + }); + + // Get active branches count for this org (uses @@index([organizationId])) + const activeBranchCount = await this._replica.runtimeEnvironment.count({ + where: { + projectId, + branchName: { + not: null, + }, + archivedAt: null, + }, + }); + + // Get current rate limit tokens for this environment's API key + const apiRateLimitTokens = await getRateLimitRemainingTokens( + "api", + environmentApiKey, + apiRateLimitConfig + ); + const batchRateLimitTokens = await getRateLimitRemainingTokens( + "batch", + environmentApiKey, + batchRateLimitConfig + ); + + // Get plan-level limits + const schedulesLimit = limits?.schedules?.number ?? null; + const teamMembersLimit = limits?.teamMembers?.number ?? null; + const alertsLimit = limits?.alerts?.number ?? null; + const branchesLimit = limits?.branches?.number ?? null; + const logRetentionDaysLimit = limits?.logRetentionDays?.number ?? null; + const realtimeConnectionsLimit = limits?.realtimeConcurrentConnections?.number ?? null; + const includedUsage = limits?.includedUsage ?? null; + const hasStagingEnvironment = limits?.hasStagingEnvironment ?? false; + const supportLevel = limits?.support ?? "community"; + + return { + isOnTopPlan, + rateLimits: { + api: { + name: "API rate limit", + description: "Rate limit for API requests (trigger, batch, etc.)", + config: apiRateLimitConfig, + currentTokens: apiRateLimitTokens, + }, + batch: { + name: "Batch rate limit", + description: "Rate limit for batch trigger operations", + config: batchRateLimitConfig, + currentTokens: batchRateLimitTokens, + }, + }, + quotas: { + projects: { + name: "Projects", + description: "Maximum number of projects in this organization", + limit: organization.maximumProjectCount, + currentUsage: organization._count.projects, + source: "default", + isUpgradable: true, + }, + schedules: + schedulesLimit !== null + ? { + name: "Schedules", + description: "Maximum number of schedules across all projects", + limit: schedulesLimit, + currentUsage: scheduleCount, + source: "plan", + canExceed: limits?.schedules?.canExceed, + isUpgradable: true, + } + : null, + teamMembers: + teamMembersLimit !== null + ? { + name: "Team members", + description: "Maximum number of team members in this organization", + limit: teamMembersLimit, + currentUsage: organization._count.members, + source: "plan", + canExceed: limits?.teamMembers?.canExceed, + isUpgradable: true, + } + : null, + alerts: + alertsLimit !== null + ? { + name: "Alert channels", + description: "Maximum number of alert channels across all projects", + limit: alertsLimit, + currentUsage: alertChannelCount, + source: "plan", + canExceed: limits?.alerts?.canExceed, + isUpgradable: true, + } + : null, + branches: + branchesLimit !== null + ? { + name: "Preview branches", + description: "Maximum number of active preview branches", + limit: branchesLimit, + currentUsage: activeBranchCount, + source: "plan", + canExceed: limits?.branches?.canExceed, + isUpgradable: true, + } + : null, + logRetentionDays: + logRetentionDaysLimit !== null + ? { + name: "Log retention", + description: "Number of days logs are retained", + limit: logRetentionDaysLimit, + currentUsage: 0, // Not applicable - this is a duration, not a count + source: "plan", + } + : null, + realtimeConnections: + realtimeConnectionsLimit !== null + ? { + name: "Realtime connections", + description: "Maximum concurrent Realtime connections", + limit: realtimeConnectionsLimit, + currentUsage: 0, // Would need to query realtime service for this + source: "plan", + canExceed: limits?.realtimeConcurrentConnections?.canExceed, + isUpgradable: true, + } + : null, + batchProcessingConcurrency: { + name: "Batch processing concurrency", + description: "Controls how many batch items can be processed simultaneously.", + limit: batchConcurrencyConfig.processingConcurrency, + currentUsage: 0, + source: batchConcurrencySource, + canExceed: true, + isUpgradable: true, + }, + devQueueSize: { + name: "Dev queue size", + description: "Maximum pending runs in development environments", + limit: organization.maximumDevQueueSize ?? null, + currentUsage: 0, // Would need to query Redis for this + source: organization.maximumDevQueueSize ? "override" : "default", + }, + deployedQueueSize: { + name: "Deployed queue size", + description: "Maximum pending runs in deployed environments", + limit: organization.maximumDeployedQueueSize ?? null, + currentUsage: 0, // Would need to query Redis for this + source: organization.maximumDeployedQueueSize ? "override" : "default", + }, + }, + features: { + hasStagingEnvironment: { + name: "Staging environment", + description: "Access to staging environment for testing before production", + enabled: hasStagingEnvironment, + }, + support: { + name: "Support level", + description: "Type of support available for your plan", + enabled: true, + value: supportLevel === "slack" ? "Slack" : "Community", + }, + includedUsage: { + name: "Included compute", + description: "Monthly included compute credits", + enabled: includedUsage !== null && includedUsage > 0, + value: includedUsage ?? 0, + }, + }, + planName: currentPlan?.v3Subscription?.plan?.title ?? null, + organizationId, + }; + } +} + +function resolveApiRateLimitConfig(apiRateLimiterConfig?: unknown): RateLimiterConfig { + const defaultConfig: RateLimitTokenBucketConfig = { + type: "tokenBucket", + refillRate: env.API_RATE_LIMIT_REFILL_RATE, + interval: env.API_RATE_LIMIT_REFILL_INTERVAL as Duration, + maxTokens: env.API_RATE_LIMIT_MAX, + }; + + if (!apiRateLimiterConfig) { + return defaultConfig; + } + + const parsed = RateLimiterConfig.safeParse(apiRateLimiterConfig); + if (!parsed.success) { + return defaultConfig; + } + + return parsed.data; +} + +function resolveBatchRateLimitConfig(batchRateLimitConfig?: unknown): RateLimiterConfig { + const defaultConfig: RateLimitTokenBucketConfig = { + type: "tokenBucket", + refillRate: env.BATCH_RATE_LIMIT_REFILL_RATE, + interval: env.BATCH_RATE_LIMIT_REFILL_INTERVAL as Duration, + maxTokens: env.BATCH_RATE_LIMIT_MAX, + }; + + if (!batchRateLimitConfig) { + return defaultConfig; + } + + const parsed = RateLimiterConfig.safeParse(batchRateLimitConfig); + if (!parsed.success) { + return defaultConfig; + } + + return parsed.data; +} + +function resolveBatchConcurrencyConfig(batchConcurrencyConfig?: unknown): { + processingConcurrency: number; +} { + const defaultConfig = { + processingConcurrency: env.BATCH_CONCURRENCY_LIMIT_DEFAULT, + }; + + if (!batchConcurrencyConfig) { + return defaultConfig; + } + + if (typeof batchConcurrencyConfig === "object" && batchConcurrencyConfig !== null) { + const config = batchConcurrencyConfig as Record; + if (typeof config.processingConcurrency === "number") { + return { processingConcurrency: config.processingConcurrency }; + } + } + + return defaultConfig; +} + +/** + * Query the current remaining tokens for a rate limiter using the Upstash getRemaining method. + * This uses the same configuration and hashing logic as the rate limit middleware. + */ +async function getRateLimitRemainingTokens( + keyPrefix: string, + apiKey: string, + config: RateLimiterConfig +): Promise { + try { + // Hash the authorization header the same way the rate limiter does + const authorizationValue = `Bearer ${apiKey}`; + const hash = createHash("sha256"); + hash.update(authorizationValue); + const hashedKey = hash.digest("hex"); + + // Create a Ratelimit instance with the same configuration + const limiter = createLimiterFromConfig(config); + const ratelimit = new Ratelimit({ + redis: rateLimitRedisClient, + limiter, + ephemeralCache: new Map(), + analytics: false, + prefix: `ratelimit:${keyPrefix}`, + }); + + // Use the getRemaining method to get the current remaining tokens + // getRemaining returns a Promise + const remaining = await ratelimit.getRemaining(hashedKey); + return remaining; + } catch (error) { + logger.warn("Failed to get rate limit remaining tokens", { + keyPrefix, + error: error instanceof Error ? error.message : String(error), + }); + return null; + } +} diff --git a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts index 929e1e40ba..053414dcfc 100644 --- a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts @@ -96,7 +96,7 @@ export class ScheduleListPresenter extends BasePresenter { const schedulesCount = await CheckScheduleService.getUsedSchedulesCount({ prisma: this._replica, - environments: project.environments, + projectId, }); const limit = await getLimit(project.organizationId, "schedules", 100_000_000); diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.limits/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.limits/route.tsx new file mode 100644 index 0000000000..1ca8778dba --- /dev/null +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.limits/route.tsx @@ -0,0 +1,842 @@ +import { CheckIcon, BookOpenIcon } from "@heroicons/react/20/solid"; +import { type MetaFunction } from "@remix-run/react"; +import { type LoaderFunctionArgs } from "@remix-run/server-runtime"; +import { IconCardsFilled, IconDiamondFilled, IconTallymark4 } from "@tabler/icons-react"; +import { tryCatch } from "@trigger.dev/core"; +import { Gauge } from "lucide-react"; +import { typedjson, useTypedLoaderData } from "remix-typedjson"; +import { ConcurrencyIcon } from "~/assets/icons/ConcurrencyIcon"; +import { AdminDebugTooltip } from "~/components/admin/debugTooltip"; +import { Feedback } from "~/components/Feedback"; +import { PageBody, PageContainer } from "~/components/layout/AppLayout"; +import { EnvironmentSelector } from "~/components/navigation/EnvironmentSelector"; +import { AnimatedNumber } from "~/components/primitives/AnimatedNumber"; +import { Badge } from "~/components/primitives/Badge"; +import { Button, LinkButton } from "~/components/primitives/Buttons"; +import { Header2 } from "~/components/primitives/Headers"; +import { NavBar, PageAccessories, PageTitle } from "~/components/primitives/PageHeader"; +import * as Property from "~/components/primitives/PropertyTable"; +import { + Table, + TableBody, + TableCell, + TableHeader, + TableHeaderCell, + TableRow, +} from "~/components/primitives/Table"; +import { InfoIconTooltip } from "~/components/primitives/Tooltip"; +import { useAutoRevalidate } from "~/hooks/useAutoRevalidate"; +import { useEnvironment } from "~/hooks/useEnvironment"; +import { useOrganization } from "~/hooks/useOrganizations"; +import { useProject } from "~/hooks/useProject"; +import { findProjectBySlug } from "~/models/project.server"; +import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; +import { + LimitsPresenter, + type FeatureInfo, + type LimitsResult, + type QuotaInfo, + type RateLimitInfo, +} from "~/presenters/v3/LimitsPresenter.server"; +import { requireUserId } from "~/services/session.server"; +import { cn } from "~/utils/cn"; +import { formatNumber } from "~/utils/numberFormatter"; +import { + concurrencyPath, + docsPath, + EnvironmentParamSchema, + organizationBillingPath, +} from "~/utils/pathBuilder"; + +export const meta: MetaFunction = () => { + return [ + { + title: `Limits | Trigger.dev`, + }, + ]; +}; + +export const loader = async ({ request, params }: LoaderFunctionArgs) => { + const userId = await requireUserId(request); + const { organizationSlug, projectParam, envParam } = EnvironmentParamSchema.parse(params); + + const project = await findProjectBySlug(organizationSlug, projectParam, userId); + if (!project) { + throw new Response(undefined, { + status: 404, + statusText: "Project not found", + }); + } + + const environment = await findEnvironmentBySlug(project.id, envParam, userId); + if (!environment) { + throw new Response(undefined, { + status: 404, + statusText: "Environment not found", + }); + } + + const presenter = new LimitsPresenter(); + const [error, result] = await tryCatch( + presenter.call({ + organizationId: project.organizationId, + projectId: project.id, + environmentApiKey: environment.apiKey, + }) + ); + + if (error) { + throw new Response(error.message, { + status: 400, + }); + } + + // Match the queues page pattern: pass a poll interval from the loader + const autoReloadPollIntervalMs = 5000; + + return typedjson({ + ...result, + autoReloadPollIntervalMs, + }); +}; + +export default function Page() { + const data = useTypedLoaderData(); + const organization = useOrganization(); + const project = useProject(); + const environment = useEnvironment(); + + // Auto-revalidate using the loader-provided interval and refresh on focus + useAutoRevalidate({ interval: data.autoReloadPollIntervalMs, onFocus: true }); + + return ( + + + + + + + + Plan + {data.planName ?? "No plan"} + + + Organization ID + {data.organizationId} + + + + + Limits docs + + + + +
+
+ {/* Current Plan Section */} + {data.planName && ( + + )} + + {/* Concurrency Section */} + + + {/* Rate Limits Section */} + + + {/* Quotas Section */} + + + {/* Features Section */} + +
+
+
+
+ ); +} + +function CurrentPlanSection({ + planName, + isOnTopPlan, + billingPath, +}: { + planName: string; + isOnTopPlan: boolean; + billingPath: string; +}) { + return ( +
+ + + Current plan + + + + + {planName} + + {isOnTopPlan ? ( + Request Enterprise} + defaultValue="help" + /> + ) : ( + + View plans + + )} + + + +
+
+ ); +} + +function ConcurrencySection({ concurrencyPath }: { concurrencyPath: string }) { + return ( +
+ + + Concurrency limits + + + + + + Concurrency + + + Manage concurrency + + + + +
+
+ ); +} + +function RateLimitsSection({ + rateLimits, + isOnTopPlan, + billingPath, + organization, + project, + environment, +}: { + rateLimits: LimitsResult["rateLimits"]; + isOnTopPlan: boolean; + billingPath: string; + organization: ReturnType; + project: ReturnType; + environment: ReturnType; +}) { + return ( +
+
+ + + Rate limits + + + +
+ + + + Rate limit + + + Type + +
+ + + Requests consume tokens from a bucket that refills over time. When empty, + requests are rate limited. + +
+
+ + + Allows a set number of requests per time window. The window resets at + fixed intervals. + +
+
+ + + Allows a set number of requests per rolling time window. The limit is + continuously evaluated. + +
+ + } + disableHoverableContent + /> +
+
+ Configuration + + + Available + + + + Upgrade +
+
+ + + + +
+
+ ); +} + +function RateLimitRow({ + info, + isOnTopPlan, + billingPath, +}: { + info: RateLimitInfo; + isOnTopPlan: boolean; + billingPath: string; +}) { + const maxTokens = info.config.type === "tokenBucket" ? info.config.maxTokens : info.config.tokens; + const percentage = + info.currentTokens !== null && maxTokens > 0 ? info.currentTokens / maxTokens : null; + + return ( + + + + {info.name} + + + + +
+ +
+
+ + + + + {info.currentTokens !== null ? ( +
+ + + + + of {formatNumber(maxTokens)} + +
+ ) : ( + + )} +
+ +
+ {info.name === "Batch rate limit" ? ( + isOnTopPlan ? ( + Contact us} + defaultValue="help" + /> + ) : ( + + View plans + + ) + ) : ( + Contact us} + defaultValue="help" + /> + )} +
+
+
+ ); +} + +function RateLimitTypeBadge({ + config, + type, +}: { + config?: RateLimitInfo["config"]; + type?: "tokenBucket" | "fixedWindow" | "slidingWindow"; +}) { + const rateLimitType = type ?? config?.type; + switch (rateLimitType) { + case "tokenBucket": + return ( + + Token bucket + + ); + case "fixedWindow": + return ( + + Fixed window + + ); + case "slidingWindow": + return ( + + Sliding window + + ); + default: + return null; + } +} + +function RateLimitConfigDisplay({ config }: { config: RateLimitInfo["config"] }) { + if (config.type === "tokenBucket") { + return ( +
+ + Max tokens:{" "} + {formatNumber(config.maxTokens)} + + + Refill:{" "} + + {formatNumber(config.refillRate)}/{config.interval} + + +
+ ); + } + + if (config.type === "fixedWindow" || config.type === "slidingWindow") { + return ( +
+ + Tokens:{" "} + {formatNumber(config.tokens)} + + + Window:{" "} + {config.window} + +
+ ); + } + + return ; +} + +function QuotasSection({ + quotas, + isOnTopPlan, + billingPath, +}: { + quotas: LimitsResult["quotas"]; + isOnTopPlan: boolean; + billingPath: string; +}) { + // Collect all quotas that should be shown + const quotaRows: QuotaInfo[] = []; + + // Always show projects + quotaRows.push(quotas.projects); + + // Add plan-based quotas if they exist + if (quotas.teamMembers) quotaRows.push(quotas.teamMembers); + if (quotas.schedules) quotaRows.push(quotas.schedules); + if (quotas.alerts) quotaRows.push(quotas.alerts); + if (quotas.branches) quotaRows.push(quotas.branches); + if (quotas.realtimeConnections) quotaRows.push(quotas.realtimeConnections); + if (quotas.logRetentionDays) quotaRows.push(quotas.logRetentionDays); + + // Include batch processing concurrency + quotaRows.push(quotas.batchProcessingConcurrency); + + // Add queue size quotas if set + if (quotas.devQueueSize.limit !== null) quotaRows.push(quotas.devQueueSize); + if (quotas.deployedQueueSize.limit !== null) quotaRows.push(quotas.deployedQueueSize); + + return ( +
+ + + Quotas + + + + + + Quota + Limit + Current + Source + Upgrade + + + + {quotaRows.map((quota) => ( + + ))} + +
+
+ ); +} + +function QuotaRow({ + quota, + isOnTopPlan, + billingPath, +}: { + quota: QuotaInfo; + isOnTopPlan: boolean; + billingPath: string; +}) { + // For log retention, we don't show current usage as it's a duration, not a count + const isRetentionQuota = quota.name === "Log retention"; + const percentage = + !isRetentionQuota && quota.limit && quota.limit > 0 ? quota.currentUsage / quota.limit : null; + + // Special handling for Log retention + if (quota.name === "Log retention") { + const canUpgrade = !isOnTopPlan; + return ( + + + {quota.name} + + + + {quota.limit !== null ? `${formatNumber(quota.limit)} days` : "Unlimited"} + + + – + + + + + +
+ {canUpgrade ? ( + + View plans + + ) : ( + Contact us} + defaultValue="help" + /> + )} +
+
+
+ ); + } + + const renderUpgrade = () => { + // Projects always show Contact us (regardless of upgrade flags) + if (quota.name === "Projects") { + return ( +
+ Contact us} + defaultValue="help" + /> +
+ ); + } + + if (!quota.isUpgradable) { + return null; + } + + // Not on top plan - show View plans + if (!isOnTopPlan) { + return ( +
+ + View plans + +
+ ); + } + + // On top plan - show Contact us if canExceed is true + if (quota.canExceed) { + return ( +
+ Contact us} + defaultValue="help" + /> +
+ ); + } + + // On top plan but cannot exceed - no upgrade option + return null; + }; + + return ( + + + {quota.name} + + + + {quota.limit !== null + ? isRetentionQuota + ? `${formatNumber(quota.limit)} days` + : formatNumber(quota.limit) + : "Unlimited"} + + + {isRetentionQuota ? "–" : formatNumber(quota.currentUsage)} + + + + + {renderUpgrade()} + + ); +} + +function FeaturesSection({ + features, + isOnTopPlan, + billingPath, +}: { + features: LimitsResult["features"]; + isOnTopPlan: boolean; + billingPath: string; +}) { + // For staging environment: show View plans if not enabled (i.e., on Free plan) + const stagingUpgradeType = features.hasStagingEnvironment.enabled ? "none" : "view-plans"; + + return ( +
+ + + Plan features + + + + + Feature + Status + Upgrade + + + + + + + +
+
+ ); +} + +function FeatureRow({ + feature, + upgradeType, + billingPath, +}: { + feature: FeatureInfo; + upgradeType: "view-plans" | "contact-us" | "none"; + billingPath: string; +}) { + const displayValue = () => { + if (feature.name === "Included compute" && typeof feature.value === "number") { + if (!feature.enabled || feature.value === 0) { + return None; + } + return ( + ${formatNumber(feature.value / 100)} + ); + } + + if (feature.value !== undefined) { + return {feature.value}; + } + + return feature.enabled ? ( + + + Enabled + + ) : ( + Not available + ); + }; + + const renderUpgrade = () => { + switch (upgradeType) { + case "view-plans": + return ( +
+ + View plans + +
+ ); + case "contact-us": + return ( +
+ Contact us} + defaultValue="help" + /> +
+ ); + case "none": + return null; + } + }; + + return ( + + + {feature.name} + + + {displayValue()} + {renderUpgrade()} + + ); +} + +/** + * Returns the appropriate color class based on usage percentage. + * @param percentage - The usage percentage (0-1 scale) + * @param mode - "usage" means higher is worse (quotas), "remaining" means lower is worse (rate limits) + * @returns Tailwind color class + */ +function getUsageColorClass( + percentage: number | null, + mode: "usage" | "remaining" = "usage" +): string { + if (percentage === null) return "text-text-dimmed"; + + if (mode === "remaining") { + // For remaining tokens: 0 = bad (red), <=10% = warning (orange) + if (percentage <= 0) return "text-error"; + if (percentage <= 0.1) return "text-warning"; + return "text-text-bright"; + } else { + // For usage: 100% = bad (red), >=90% = warning (orange) + if (percentage >= 1) return "text-error"; + if (percentage >= 0.9) return "text-warning"; + return "text-text-bright"; + } +} + +function SourceBadge({ source }: { source: "default" | "plan" | "override" }) { + const variants: Record = { + default: { + label: "Default", + className: "bg-indigo-500/20 text-indigo-400", + }, + plan: { + label: "Plan", + className: "bg-purple-500/20 text-purple-400", + }, + override: { + label: "Override", + className: "bg-amber-500/20 text-amber-400", + }, + }; + + const variant = variants[source]; + + return ( + + {variant.label} + + ); +} diff --git a/apps/webapp/app/runEngine/concerns/batchLimits.server.ts b/apps/webapp/app/runEngine/concerns/batchLimits.server.ts index 437feadf38..f40088039e 100644 --- a/apps/webapp/app/runEngine/concerns/batchLimits.server.ts +++ b/apps/webapp/app/runEngine/concerns/batchLimits.server.ts @@ -1,8 +1,10 @@ import { Organization } from "@trigger.dev/database"; -import { Ratelimit } from "@upstash/ratelimit"; import { z } from "zod"; import { env } from "~/env.server"; -import { RateLimiterConfig } from "~/services/authorizationRateLimitMiddleware.server"; +import { + RateLimiterConfig, + createLimiterFromConfig, +} from "~/services/authorizationRateLimitMiddleware.server"; import { createRedisRateLimitClient, Duration, RateLimiter } from "~/services/rateLimiter.server"; import { singleton } from "~/utils/singleton"; @@ -33,16 +35,7 @@ function createBatchLimitsRedisClient() { function createOrganizationRateLimiter(organization: Organization): RateLimiter { const limiterConfig = resolveBatchRateLimitConfig(organization.batchRateLimitConfig); - const limiter = - limiterConfig.type === "fixedWindow" - ? Ratelimit.fixedWindow(limiterConfig.tokens, limiterConfig.window) - : limiterConfig.type === "tokenBucket" - ? Ratelimit.tokenBucket( - limiterConfig.refillRate, - limiterConfig.interval, - limiterConfig.maxTokens - ) - : Ratelimit.slidingWindow(limiterConfig.tokens, limiterConfig.window); + const limiter = createLimiterFromConfig(limiterConfig); return new RateLimiter({ redisClient: batchLimitsRedisClient, diff --git a/apps/webapp/app/services/authorizationRateLimitMiddleware.server.ts b/apps/webapp/app/services/authorizationRateLimitMiddleware.server.ts index b94a664a36..2d4cc8b1f2 100644 --- a/apps/webapp/app/services/authorizationRateLimitMiddleware.server.ts +++ b/apps/webapp/app/services/authorizationRateLimitMiddleware.server.ts @@ -7,7 +7,7 @@ import { z } from "zod"; import { env } from "~/env.server"; import { RedisWithClusterOptions } from "~/redis.server"; import { logger } from "./logger.server"; -import { createRedisRateLimitClient, Duration, RateLimiter } from "./rateLimiter.server"; +import { createRedisRateLimitClient, Duration, Limiter, RateLimiter } from "./rateLimiter.server"; import { RedisCacheStore } from "./unkey/redisCacheStore.server"; const DurationSchema = z.custom((value) => { @@ -130,6 +130,18 @@ async function resolveLimitConfig( return cacheResult.val ?? defaultLimiter; } +/** + * Creates a Ratelimit limiter from a RateLimiterConfig. + * This function is shared across the codebase to ensure consistent limiter creation. + */ +export function createLimiterFromConfig(config: RateLimiterConfig): Limiter { + return config.type === "fixedWindow" + ? Ratelimit.fixedWindow(config.tokens, config.window) + : config.type === "tokenBucket" + ? Ratelimit.tokenBucket(config.refillRate, config.interval, config.maxTokens) + : Ratelimit.slidingWindow(config.tokens, config.window); +} + //returns an Express middleware that rate limits using the Bearer token in the Authorization header export function authorizationRateLimitMiddleware({ redis, @@ -249,16 +261,7 @@ export function authorizationRateLimitMiddleware({ limiterConfigOverride ); - const limiter = - limiterConfig.type === "fixedWindow" - ? Ratelimit.fixedWindow(limiterConfig.tokens, limiterConfig.window) - : limiterConfig.type === "tokenBucket" - ? Ratelimit.tokenBucket( - limiterConfig.refillRate, - limiterConfig.interval, - limiterConfig.maxTokens - ) - : Ratelimit.slidingWindow(limiterConfig.tokens, limiterConfig.window); + const limiter = createLimiterFromConfig(limiterConfig); const rateLimiter = new RateLimiter({ redisClient, diff --git a/apps/webapp/app/utils/pathBuilder.ts b/apps/webapp/app/utils/pathBuilder.ts index a2756f7e5b..639f2f7294 100644 --- a/apps/webapp/app/utils/pathBuilder.ts +++ b/apps/webapp/app/utils/pathBuilder.ts @@ -507,6 +507,14 @@ export function concurrencyPath( return `${v3EnvironmentPath(organization, project, environment)}/concurrency`; } +export function limitsPath( + organization: OrgForPath, + project: ProjectForPath, + environment: EnvironmentForPath +) { + return `${v3EnvironmentPath(organization, project, environment)}/limits`; +} + export function regionsPath( organization: OrgForPath, project: ProjectForPath, diff --git a/apps/webapp/app/v3/services/checkSchedule.server.ts b/apps/webapp/app/v3/services/checkSchedule.server.ts index 25a0944dc6..570e6e3bf1 100644 --- a/apps/webapp/app/v3/services/checkSchedule.server.ts +++ b/apps/webapp/app/v3/services/checkSchedule.server.ts @@ -92,7 +92,7 @@ export class CheckScheduleService extends BaseService { const limit = await getLimit(project.organizationId, "schedules", 100_000_000); const schedulesCount = await CheckScheduleService.getUsedSchedulesCount({ prisma: this._prisma, - environments: project.environments, + projectId, }); if (schedulesCount >= limit) { @@ -105,26 +105,21 @@ export class CheckScheduleService extends BaseService { static async getUsedSchedulesCount({ prisma, - environments, + projectId, }: { prisma: PrismaClientOrTransaction; - environments: { id: string; type: RuntimeEnvironmentType; archivedAt: Date | null }[]; + projectId: string; }) { - const deployedEnvironments = environments.filter( - (env) => env.type !== "DEVELOPMENT" && !env.archivedAt - ); - const schedulesCount = await prisma.taskScheduleInstance.count({ + return await prisma.taskScheduleInstance.count({ where: { - environmentId: { - in: deployedEnvironments.map((env) => env.id), + projectId, + environment: { + type: { + not: "DEVELOPMENT", + }, }, active: true, - taskSchedule: { - active: true, - }, }, }); - - return schedulesCount; } } diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 175fb5b230..987e983862 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -111,7 +111,7 @@ "@sentry/remix": "9.46.0", "@slack/web-api": "7.9.1", "@socket.io/redis-adapter": "^8.3.0", - "@tabler/icons-react": "^2.39.0", + "@tabler/icons-react": "^3.36.1", "@tailwindcss/container-queries": "^0.1.1", "@tanstack/react-virtual": "^3.0.4", "@team-plain/typescript-sdk": "^3.5.0", diff --git a/apps/webapp/tailwind.config.js b/apps/webapp/tailwind.config.js index 9f4e4381b8..d7ee335694 100644 --- a/apps/webapp/tailwind.config.js +++ b/apps/webapp/tailwind.config.js @@ -160,6 +160,9 @@ const batches = colors.pink[500]; const schedules = colors.yellow[500]; const queues = colors.purple[500]; const deployments = colors.green[500]; +const concurrency = colors.amber[500]; +const limits = colors.purple[500]; +const regions = colors.green[500]; const logs = colors.blue[500]; const tests = colors.lime[500]; const apiKeys = colors.amber[500]; @@ -235,7 +238,10 @@ module.exports = { runs, batches, schedules, + concurrency, queues, + regions, + limits, deployments, logs, tests, diff --git a/internal-packages/database/prisma/migrations/20260115154554_taskscheduleinstance_required_projectid/migration.sql b/internal-packages/database/prisma/migrations/20260115154554_taskscheduleinstance_required_projectid/migration.sql new file mode 100644 index 0000000000..584b0b438a --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260115154554_taskscheduleinstance_required_projectid/migration.sql @@ -0,0 +1,21 @@ +/* +Warnings: + +- Made the column `projectId` on table `TaskScheduleInstance` required. This step will fail if there are existing NULL values in that column. + + */ +-- Backfill from TaskSchedule +UPDATE "TaskScheduleInstance" tsi +SET + "projectId" = ts."projectId" +FROM + "TaskSchedule" ts +WHERE + tsi."taskScheduleId" = ts."id" + AND tsi."projectId" IS NULL; + +-- AlterTable +ALTER TABLE "public"."TaskScheduleInstance" +ALTER COLUMN "projectId" +SET + NOT NULL; \ No newline at end of file diff --git a/internal-packages/database/prisma/migrations/20260115164415_taskscheduleinstance_required_projectid_index/migration.sql b/internal-packages/database/prisma/migrations/20260115164415_taskscheduleinstance_required_projectid_index/migration.sql new file mode 100644 index 0000000000..cb130c0400 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260115164415_taskscheduleinstance_required_projectid_index/migration.sql @@ -0,0 +1,2 @@ +-- CreateIndex +CREATE INDEX CONCURRENTLY IF NOT EXISTS "TaskScheduleInstance_projectId_active_idx" ON "public"."TaskScheduleInstance" ("projectId", "active"); \ No newline at end of file diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 44c5409970..451c9beb80 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1946,8 +1946,8 @@ model TaskScheduleInstance { environment RuntimeEnvironment @relation(fields: [environmentId], references: [id], onDelete: Cascade, onUpdate: Cascade) environmentId String - project Project? @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade) - projectId String? + project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade) + projectId String createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -1960,6 +1960,7 @@ model TaskScheduleInstance { //you can only have a schedule attached to each environment once @@unique([taskScheduleId, environmentId]) @@index([environmentId]) + @@index([projectId, active]) } model RuntimeEnvironmentSession { diff --git a/internal-packages/tsql/src/query/printer.test.ts b/internal-packages/tsql/src/query/printer.test.ts index 6b82c08b07..dcbb79b2d8 100644 --- a/internal-packages/tsql/src/query/printer.test.ts +++ b/internal-packages/tsql/src/query/printer.test.ts @@ -2612,3 +2612,150 @@ describe("Internal-only column blocking", () => { }); }); }); + +describe("Required Filters", () => { + /** + * Tests for tables with requiredFilters, which inject internal ClickHouse + * column conditions (like engine = 'V2') that aren't exposed in the schema. + */ + + const schemaWithRequiredFilters: TableSchema = { + name: "runs", + clickhouseName: "trigger_dev.task_runs_v2", + description: "Task runs table with required filters", + tenantColumns: { + organizationId: "organization_id", + projectId: "project_id", + environmentId: "environment_id", + }, + requiredFilters: [{ column: "engine", value: "V2" }], + columns: { + run_id: { + name: "run_id", + clickhouseName: "friendly_id", + ...column("String", { description: "Run ID", coreColumn: true }), + }, + status: { + name: "status", + ...column("String", { description: "Status" }), + }, + triggered_at: { + name: "triggered_at", + clickhouseName: "created_at", + ...column("DateTime64", { description: "When the run was triggered", coreColumn: true }), + }, + total_cost: { + name: "total_cost", + ...column("Float64", { description: "Total cost" }), + expression: "(cost_in_cents + base_cost_in_cents) / 100.0", + }, + }, + }; + + function createRequiredFiltersContext(): PrinterContext { + const schemaRegistry = createSchemaRegistry([schemaWithRequiredFilters]); + return createPrinterContext({ + organizationId: "org_test123", + projectId: "proj_test456", + environmentId: "env_test789", + schema: schemaRegistry, + }); + } + + function printQueryWithFilters(query: string): PrintResult { + const ctx = createRequiredFiltersContext(); + const ast = parseTSQLSelect(query); + const printer = new ClickHousePrinter(ctx); + return printer.print(ast); + } + + it("should NOT throw for internal engine column from requiredFilters", () => { + // This query should work even though 'engine' is not in the schema + // because it's automatically injected by requiredFilters + const { sql, params } = printQueryWithFilters("SELECT run_id, status FROM runs LIMIT 10"); + + // The engine filter should be in the WHERE clause + expect(sql).toContain("engine"); + // The V2 value is parameterized, so check the params + expect(Object.values(params)).toContain("V2"); + }); + + it("should allow TSQL column names that map to different ClickHouse names", () => { + // User writes 'triggered_at' but it maps to 'created_at' in ClickHouse + const { sql } = printQueryWithFilters(` + SELECT run_id, status, triggered_at + FROM runs + WHERE triggered_at > now() - INTERVAL 14 DAY + ORDER BY triggered_at DESC + LIMIT 100 + `); + + // The ClickHouse SQL should use 'created_at' instead of 'triggered_at' + expect(sql).toContain("created_at"); + // The result should still have the alias for the user-friendly name + expect(sql).toContain("AS triggered_at"); + }); + + it("should allow filtering by mapped column name", () => { + const { sql } = printQueryWithFilters(` + SELECT run_id FROM runs WHERE triggered_at > '2024-01-01' LIMIT 10 + `); + + // Should use the ClickHouse column name in the WHERE clause + expect(sql).toContain("created_at"); + }); + + it("should allow ORDER BY on mapped column name", () => { + const { sql } = printQueryWithFilters(` + SELECT run_id FROM runs ORDER BY triggered_at DESC LIMIT 10 + `); + + // ORDER BY should use the ClickHouse column name + expect(sql).toContain("ORDER BY"); + expect(sql).toContain("created_at"); + }); + + it("should handle virtual columns with expressions", () => { + const { sql } = printQueryWithFilters(` + SELECT run_id, total_cost FROM runs ORDER BY total_cost DESC LIMIT 10 + `); + + // Virtual column should be expanded to its expression with an alias + expect(sql).toContain("cost_in_cents"); + expect(sql).toContain("base_cost_in_cents"); + expect(sql).toContain("AS total_cost"); + }); + + it("should combine tenant guards with required filters", () => { + const { sql, params } = printQueryWithFilters("SELECT run_id FROM runs LIMIT 10"); + + // Should have all tenant columns AND the engine filter + expect(sql).toContain("organization_id"); + expect(sql).toContain("project_id"); + expect(sql).toContain("environment_id"); + expect(sql).toContain("engine"); + // The V2 value is parameterized, so check the params + expect(Object.values(params)).toContain("V2"); + }); + + it("should allow complex queries with mapped columns", () => { + // This query is similar to what a user might write + const { sql } = printQueryWithFilters(` + SELECT + run_id, + status, + total_cost, + triggered_at + FROM runs + WHERE triggered_at > now() - INTERVAL 14 DAY + ORDER BY total_cost DESC + LIMIT 100 + `); + + // All should work without errors + expect(sql).toContain("friendly_id"); // run_id maps to friendly_id + expect(sql).toContain("status"); + expect(sql).toContain("created_at"); // triggered_at maps to created_at + expect(sql).toContain("cost_in_cents"); // total_cost is a virtual column + }); +}); diff --git a/internal-packages/tsql/src/query/printer.ts b/internal-packages/tsql/src/query/printer.ts index dcdc82d69b..75fcd56628 100644 --- a/internal-packages/tsql/src/query/printer.ts +++ b/internal-packages/tsql/src/query/printer.ts @@ -2280,7 +2280,9 @@ export class ClickHousePrinter { if (this.inProjectionContext && this.internalOnlyColumns.has(columnName)) { const availableColumns = this.getAvailableColumnNames(); throw new QueryError( - `Column "${columnName}" is not available for querying. Available columns: ${availableColumns.join(", ")}` + `Column "${columnName}" is not available for querying. Available columns: ${availableColumns.join( + ", " + )}` ); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7b6a862999..d2361acd82 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -463,8 +463,8 @@ importers: specifier: ^8.3.0 version: 8.3.0(socket.io-adapter@2.5.4(bufferutil@4.0.9)) '@tabler/icons-react': - specifier: ^2.39.0 - version: 2.47.0(react@18.2.0) + specifier: ^3.36.1 + version: 3.36.1(react@18.2.0) '@tailwindcss/container-queries': specifier: ^0.1.1 version: 0.1.1(tailwindcss@3.4.1) @@ -10167,13 +10167,13 @@ packages: resolution: {integrity: sha512-XIB2XbzHTN6ieIjfIMV9hlVcfPU26s2vafYWQcZHWXHOxiaRZYEDKEwdl129Zyg50+foYV2jCgtrqSA6qNuNSA==} engines: {node: '>=6'} - '@tabler/icons-react@2.47.0': - resolution: {integrity: sha512-iqly2FvCF/qUbgmvS8E40rVeYY7laltc5GUjRxQj59DuX0x/6CpKHTXt86YlI2whg4czvd/c8Ce8YR08uEku0g==} + '@tabler/icons-react@3.36.1': + resolution: {integrity: sha512-/8nOXeNeMoze9xY/QyEKG65wuvRhkT3q9aytaur6Gj8bYU2A98YVJyLc9MRmc5nVvpy+bRlrrwK/Ykr8WGyUWg==} peerDependencies: - react: ^16.5.1 || ^17.0.0 || ^18.0.0 + react: '>= 16' - '@tabler/icons@2.47.0': - resolution: {integrity: sha512-4w5evLh+7FUUiA1GucvGj2ReX2TvOjEr4ejXdwL/bsjoSkof6r1gQmzqI+VHrE2CpJpB3al7bCTulOkFa/RcyA==} + '@tabler/icons@3.36.1': + resolution: {integrity: sha512-f4Jg3Fof/Vru5ioix/UO4GX+sdDsF9wQo47FbtvG+utIYYVQ/QVAC0QYgcBbAjQGfbdOh2CCf0BgiFOF9Ixtjw==} '@tailwindcss/container-queries@0.1.1': resolution: {integrity: sha512-p18dswChx6WnTSaJCSGx6lTmrGzNNvm2FtXmiO6AuA1V4U5REyoqwmT6kgAsIMdjo07QdAfYXHJ4hnMtfHzWgA==} @@ -30242,13 +30242,12 @@ snapshots: dependencies: defer-to-connect: 1.1.3 - '@tabler/icons-react@2.47.0(react@18.2.0)': + '@tabler/icons-react@3.36.1(react@18.2.0)': dependencies: - '@tabler/icons': 2.47.0 - prop-types: 15.8.1 + '@tabler/icons': 3.36.1 react: 18.2.0 - '@tabler/icons@2.47.0': {} + '@tabler/icons@3.36.1': {} '@tailwindcss/container-queries@0.1.1(tailwindcss@3.4.1)': dependencies: diff --git a/references/hello-world/src/trigger/rateLimitStress.ts b/references/hello-world/src/trigger/rateLimitStress.ts new file mode 100644 index 0000000000..cb2c20c579 --- /dev/null +++ b/references/hello-world/src/trigger/rateLimitStress.ts @@ -0,0 +1,257 @@ +import { logger, task, tasks, RateLimitError } from "@trigger.dev/sdk/v3"; +import { setTimeout } from "timers/promises"; + +/** + * A simple no-op task that does minimal work. + * Used as the target for rate limit stress testing. + */ +export const noopTask = task({ + id: "noop-task", + retry: { maxAttempts: 1 }, + run: async (payload: { index: number }) => { + return { index: payload.index, timestamp: Date.now() }; + }, +}); + +/** + * Stress test task that triggers many runs rapidly to hit the API rate limit. + * Fires triggers as fast as possible for a set duration, then stops. + * + * Note: Already-triggered runs will continue to execute after the test completes. + * + * Default rate limits (per environment API key): + * - Free: 1,200 runs bucket, refills 100 runs/10 sec + * - Hobby/Pro: 5,000 runs bucket, refills 500 runs/5 sec + * + * Run with: `npx trigger.dev@latest dev` then trigger this task from the dashboard + */ +export const rateLimitStressTest = task({ + id: "rate-limit-stress-test", + maxDuration: 120, + run: async (payload: { + /** How long to run the test in seconds (default: 5) */ + durationSeconds?: number; + /** How many triggers to fire in parallel per batch (default: 100) */ + batchSize?: number; + }) => { + const durationSeconds = payload.durationSeconds ?? 5; + const batchSize = payload.batchSize ?? 100; + const durationMs = durationSeconds * 1000; + + logger.info("Starting rate limit stress test", { + durationSeconds, + batchSize, + }); + + const start = Date.now(); + let totalAttempted = 0; + let totalSuccess = 0; + let totalRateLimited = 0; + let totalOtherErrors = 0; + let batchCount = 0; + + // Keep firing batches until time runs out + while (Date.now() - start < durationMs) { + batchCount++; + const batchStart = Date.now(); + const elapsed = batchStart - start; + const remaining = durationMs - elapsed; + + logger.info(`Batch ${batchCount} starting`, { + elapsedMs: elapsed, + remainingMs: remaining, + totalAttempted, + totalSuccess, + totalRateLimited, + }); + + // Fire a batch of triggers + const promises = Array.from({ length: batchSize }, async (_, i) => { + // Check if we've exceeded time before each trigger + if (Date.now() - start >= durationMs) { + return { skipped: true }; + } + + const index = totalAttempted + i; + try { + await tasks.trigger("noop-task", { index }); + return { success: true, rateLimited: false }; + } catch (error) { + if (error instanceof RateLimitError) { + return { success: false, rateLimited: true, resetInMs: error.millisecondsUntilReset }; + } + return { success: false, rateLimited: false }; + } + }); + + const results = await Promise.all(promises); + + const batchSuccess = results.filter((r) => "success" in r && r.success).length; + const batchRateLimited = results.filter((r) => "rateLimited" in r && r.rateLimited).length; + const batchOtherErrors = results.filter( + (r) => "success" in r && !r.success && !("rateLimited" in r && r.rateLimited) + ).length; + const batchSkipped = results.filter((r) => "skipped" in r && r.skipped).length; + + totalAttempted += batchSize - batchSkipped; + totalSuccess += batchSuccess; + totalRateLimited += batchRateLimited; + totalOtherErrors += batchOtherErrors; + + // Log rate limit hits + const rateLimitedResult = results.find((r) => "rateLimited" in r && r.rateLimited); + if (rateLimitedResult && "resetInMs" in rateLimitedResult) { + logger.warn("Rate limit hit!", { + batch: batchCount, + resetInMs: rateLimitedResult.resetInMs, + totalRateLimited, + }); + } + + // Small delay between batches to not overwhelm + await setTimeout(50); + } + + const duration = Date.now() - start; + + logger.info("Stress test completed", { + actualDurationMs: duration, + totalAttempted, + totalSuccess, + totalRateLimited, + totalOtherErrors, + batchCount, + }); + + return { + config: { + durationSeconds, + batchSize, + }, + results: { + actualDurationMs: duration, + totalAttempted, + totalSuccess, + totalRateLimited, + totalOtherErrors, + batchCount, + hitRateLimit: totalRateLimited > 0, + triggersPerSecond: Math.round((totalAttempted / duration) * 1000), + }, + }; + }, +}); + +/** + * Sustained load test - maintains a steady rate of triggers over time. + * Useful for seeing how rate limits behave under sustained load. + * + * Note: Successfully triggered runs will continue executing after this test completes. + */ +export const sustainedLoadTest = task({ + id: "sustained-load-test", + maxDuration: 300, + run: async (payload: { + /** Triggers per second to attempt (default: 100) */ + triggersPerSecond?: number; + /** Duration in seconds (default: 20) */ + durationSeconds?: number; + }) => { + const triggersPerSecond = payload.triggersPerSecond ?? 100; + const durationSeconds = payload.durationSeconds ?? 20; + + const intervalMs = 1000 / triggersPerSecond; + const totalTriggers = triggersPerSecond * durationSeconds; + + logger.info("Starting sustained load test", { + triggersPerSecond, + durationSeconds, + totalTriggers, + intervalMs, + }); + + const results: Array<{ + index: number; + success: boolean; + rateLimited: boolean; + timestamp: number; + }> = []; + + const start = Date.now(); + let index = 0; + + while (Date.now() - start < durationSeconds * 1000 && index < totalTriggers) { + const triggerStart = Date.now(); + + try { + await tasks.trigger("noop-task", { index }); + results.push({ + index, + success: true, + rateLimited: false, + timestamp: Date.now() - start, + }); + } catch (error) { + results.push({ + index, + success: false, + rateLimited: error instanceof RateLimitError, + timestamp: Date.now() - start, + }); + + if (error instanceof RateLimitError) { + logger.warn("Rate limit hit during sustained load", { + index, + timestamp: Date.now() - start, + resetInMs: error.millisecondsUntilReset, + }); + } + } + + index++; + + // Maintain the target rate + const elapsed = Date.now() - triggerStart; + const sleepTime = Math.max(0, intervalMs - elapsed); + if (sleepTime > 0) { + await setTimeout(sleepTime); + } + } + + const duration = Date.now() - start; + const successCount = results.filter((r) => r.success).length; + const rateLimitedCount = results.filter((r) => r.rateLimited).length; + + // Find when rate limiting started (if at all) + const firstRateLimited = results.find((r) => r.rateLimited); + + logger.info("Sustained load test completed", { + actualDuration: duration, + actualTriggers: results.length, + successCount, + rateLimitedCount, + actualRate: Math.round((results.length / duration) * 1000), + }); + + return { + config: { + targetTriggersPerSecond: triggersPerSecond, + targetDurationSeconds: durationSeconds, + }, + results: { + actualDuration: duration, + actualTriggers: results.length, + successCount, + rateLimitedCount, + actualRate: Math.round((results.length / duration) * 1000), + hitRateLimit: rateLimitedCount > 0, + firstRateLimitedAt: firstRateLimited + ? { + index: firstRateLimited.index, + timestampMs: firstRateLimited.timestamp, + } + : null, + }, + }; + }, +});