Skip to content

Commit 8ba7526

Browse files
authored
fix(batch): rate limiting by token bucket no longer incorrectly goes negative (#2837)
Also improves the BatchTriggerError when a result of getting rate limited.
1 parent 47bed15 commit 8ba7526

File tree

9 files changed

+320
-20
lines changed

9 files changed

+320
-20
lines changed

.changeset/sharp-ravens-doubt.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Improve batch trigger error messages, especially when rate limited

apps/webapp/app/routes/api.v3.batches.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ const { action, loader } = createActionApiRoute(
158158
status: 429,
159159
headers: {
160160
"X-RateLimit-Limit": error.limit.toString(),
161-
"X-RateLimit-Remaining": error.remaining.toString(),
161+
"X-RateLimit-Remaining": Math.max(0, error.remaining).toString(),
162162
"X-RateLimit-Reset": Math.floor(error.resetAt.getTime() / 1000).toString(),
163163
"Retry-After": Math.max(
164164
1,

apps/webapp/app/runEngine/concerns/batchLimits.server.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,7 @@ export class BatchRateLimitExceededError extends Error {
115115
public readonly resetAt: Date,
116116
public readonly itemCount: number
117117
) {
118-
super(
119-
`Batch rate limit exceeded. Attempted to submit ${itemCount} items but only ${remaining} remaining. Limit resets at ${resetAt.toISOString()}`
120-
);
118+
super(`Batch rate limit exceeded. Limit resets at ${resetAt.toISOString()}`);
121119
this.name = "BatchRateLimitExceededError";
122120
}
123121
}

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@
7979
8080
8181
"@kubernetes/[email protected]": "patches/@[email protected]",
82-
"@sentry/[email protected]": "patches/@[email protected]"
82+
"@sentry/[email protected]": "patches/@[email protected]",
83+
"@upstash/[email protected]": "patches/@upstash__ratelimit.patch"
8384
},
8485
"overrides": {
8586
"express@^4>body-parser": "1.20.3",

packages/trigger-sdk/src/v3/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
export * from "./cache.js";
22
export * from "./config.js";
33
export { retry, type RetryOptions } from "./retry.js";
4-
export { queue } from "./shared.js";
4+
export { queue, BatchTriggerError } from "./shared.js";
55
export * from "./tasks.js";
66
export * from "./batch.js";
77
export * from "./wait.js";

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { SpanKind } from "@opentelemetry/api";
22
import { SerializableJson } from "@trigger.dev/core";
33
import {
44
accessoryAttributes,
5+
ApiError,
56
apiClientManager,
67
ApiRequestOptions,
78
conditionallyImportPacket,
@@ -17,6 +18,7 @@ import {
1718
parsePacket,
1819
Queue,
1920
QueueOptions,
21+
RateLimitError,
2022
resourceCatalog,
2123
runtime,
2224
SemanticInternalAttributes,
@@ -125,9 +127,6 @@ export { SubtaskUnwrapError, TaskRunPromise };
125127

126128
export type Context = TaskRunContext;
127129

128-
// Re-export for external use (defined later in file)
129-
export { BatchTriggerError };
130-
131130
export function queue(options: QueueOptions): Queue {
132131
resourceCatalog.registerQueueMetadata(options);
133132

@@ -1592,12 +1591,28 @@ async function executeBatchTwoPhase(
15921591
/**
15931592
* Error thrown when batch trigger operations fail.
15941593
* Includes context about which phase failed and the batch details.
1594+
*
1595+
* When the underlying error is a rate limit (429), additional properties are exposed:
1596+
* - `isRateLimited`: true
1597+
* - `retryAfterMs`: milliseconds until the rate limit resets
15951598
*/
1596-
class BatchTriggerError extends Error {
1599+
export class BatchTriggerError extends Error {
15971600
readonly phase: "create" | "stream";
15981601
readonly batchId?: string;
15991602
readonly itemCount: number;
16001603

1604+
/** True if the error was caused by rate limiting (HTTP 429) */
1605+
readonly isRateLimited: boolean;
1606+
1607+
/** Milliseconds until the rate limit resets. Only set when `isRateLimited` is true. */
1608+
readonly retryAfterMs?: number;
1609+
1610+
/** The underlying API error, if the cause was an ApiError */
1611+
readonly apiError?: ApiError;
1612+
1613+
/** The underlying cause of the error */
1614+
override readonly cause?: unknown;
1615+
16011616
constructor(
16021617
message: string,
16031618
options: {
@@ -1607,12 +1622,59 @@ class BatchTriggerError extends Error {
16071622
itemCount: number;
16081623
}
16091624
) {
1610-
super(message, { cause: options.cause });
1625+
// Build enhanced message that includes the cause's message
1626+
const fullMessage = buildBatchErrorMessage(message, options.cause);
1627+
super(fullMessage, { cause: options.cause });
1628+
16111629
this.name = "BatchTriggerError";
1630+
this.cause = options.cause;
16121631
this.phase = options.phase;
16131632
this.batchId = options.batchId;
16141633
this.itemCount = options.itemCount;
1634+
1635+
// Extract rate limit info from cause
1636+
if (options.cause instanceof RateLimitError) {
1637+
this.isRateLimited = true;
1638+
this.retryAfterMs = options.cause.millisecondsUntilReset;
1639+
this.apiError = options.cause;
1640+
} else if (options.cause instanceof ApiError) {
1641+
this.isRateLimited = options.cause.status === 429;
1642+
this.apiError = options.cause;
1643+
} else {
1644+
this.isRateLimited = false;
1645+
}
1646+
}
1647+
}
1648+
1649+
/**
1650+
* Build an enhanced error message that includes context from the cause.
1651+
*/
1652+
function buildBatchErrorMessage(baseMessage: string, cause: unknown): string {
1653+
if (!cause) {
1654+
return baseMessage;
1655+
}
1656+
1657+
// Handle RateLimitError specifically for better messaging
1658+
if (cause instanceof RateLimitError) {
1659+
const retryMs = cause.millisecondsUntilReset;
1660+
if (retryMs !== undefined) {
1661+
const retrySeconds = Math.ceil(retryMs / 1000);
1662+
return `${baseMessage}: Rate limit exceeded - retry after ${retrySeconds}s`;
1663+
}
1664+
return `${baseMessage}: Rate limit exceeded`;
16151665
}
1666+
1667+
// Handle other ApiErrors
1668+
if (cause instanceof ApiError) {
1669+
return `${baseMessage}: ${cause.message}`;
1670+
}
1671+
1672+
// Handle generic errors
1673+
if (cause instanceof Error) {
1674+
return `${baseMessage}: ${cause.message}`;
1675+
}
1676+
1677+
return baseMessage;
16161678
}
16171679

16181680
/**

patches/@upstash__ratelimit.patch

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
diff --git a/dist/index.js b/dist/index.js
2+
index 7d1502426320957017988aed0c29974acd70e8da..062769cda055302d737503e5d1ba5e62609c934f 100644
3+
--- a/dist/index.js
4+
+++ b/dist/index.js
5+
@@ -841,7 +841,7 @@ var tokenBucketLimitScript = `
6+
refilledAt = refilledAt + numRefills * interval
7+
end
8+
9+
- if tokens == 0 then
10+
+ if tokens < incrementBy then
11+
return {-1, refilledAt + interval}
12+
end
13+
14+
diff --git a/dist/index.mjs b/dist/index.mjs
15+
index 25a2c888be27b7c5aff41de63d5df189e0031145..53b4a4b2d2ef55f709f7404cc6a66058b7f3191a 100644
16+
--- a/dist/index.mjs
17+
+++ b/dist/index.mjs
18+
@@ -813,7 +813,7 @@ var tokenBucketLimitScript = `
19+
refilledAt = refilledAt + numRefills * interval
20+
end
21+
22+
- if tokens == 0 then
23+
+ if tokens < incrementBy then
24+
return {-1, refilledAt + interval}
25+
end
26+

pnpm-lock.yaml

Lines changed: 11 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)