-
Notifications
You must be signed in to change notification settings - Fork 2.1k
fix: properly handle exponential backoff for rate limiting in embedders #7030
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -294,6 +294,9 @@ export class OpenAICompatibleEmbedder implements IEmbedder { | |
|
||
const embeddings = response.data.map((item) => item.embedding as number[]) | ||
|
||
// Reset consecutive errors on success | ||
await this.resetGlobalRateLimitOnSuccess() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good addition to reset the error count on success! However, I notice there's no explicit test case verifying this behavior. Could we add a test that confirms consecutive errors are properly reset after a successful request? |
||
|
||
return { | ||
embeddings: embeddings, | ||
usage: { | ||
|
@@ -315,14 +318,9 @@ export class OpenAICompatibleEmbedder implements IEmbedder { | |
// Check if it's a rate limit error | ||
const httpError = error as HttpError | ||
if (httpError?.status === 429) { | ||
// Update global rate limit state | ||
await this.updateGlobalRateLimitState(httpError) | ||
|
||
if (hasMoreAttempts) { | ||
// Calculate delay based on global rate limit state | ||
const baseDelay = INITIAL_DELAY_MS * Math.pow(2, attempts) | ||
const globalDelay = await this.getGlobalRateLimitDelay() | ||
const delayMs = Math.max(baseDelay, globalDelay) | ||
// Update global rate limit state and get the delay | ||
const delayMs = await this.updateGlobalRateLimitState(httpError, attempts) | ||
|
||
console.warn( | ||
t("embeddings:rateLimitRetry", { | ||
|
@@ -434,14 +432,20 @@ export class OpenAICompatibleEmbedder implements IEmbedder { | |
} | ||
|
||
/** | ||
* Updates global rate limit state when a 429 error occurs | ||
* Updates global rate limit state when a 429 error occurs and returns the delay to use | ||
*/ | ||
private async updateGlobalRateLimitState(error: HttpError): Promise<void> { | ||
private async updateGlobalRateLimitState(error: HttpError, attemptNumber: number): Promise<number> { | ||
const release = await OpenAICompatibleEmbedder.globalRateLimitState.mutex.acquire() | ||
try { | ||
const state = OpenAICompatibleEmbedder.globalRateLimitState | ||
const now = Date.now() | ||
|
||
// Check if we're already in a rate limit period | ||
if (state.isRateLimited && state.rateLimitResetTime > now) { | ||
// Return the remaining wait time | ||
return state.rateLimitResetTime - now | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding a debug log here to help track when requests are reusing existing rate limit periods. This would be helpful for debugging rate limit issues in production. |
||
} | ||
|
||
// Increment consecutive rate limit errors | ||
if (now - state.lastRateLimitError < 60000) { | ||
// Within 1 minute | ||
|
@@ -452,16 +456,47 @@ export class OpenAICompatibleEmbedder implements IEmbedder { | |
|
||
state.lastRateLimitError = now | ||
|
||
// Calculate exponential backoff based on consecutive errors | ||
// Calculate exponential backoff based on consecutive errors AND attempt number | ||
// Use the maximum of the two to ensure proper backoff | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we improve this comment to explain why we take the maximum? For example: 'Use the maximum of the two to ensure proper backoff: Global delay ensures all requests respect the rate limit across the system, while attempt delay ensures individual requests back off appropriately. Taking the maximum prevents requests from bypassing the global rate limit.' |
||
const baseDelay = 5000 // 5 seconds base | ||
const maxDelay = 300000 // 5 minutes max | ||
const exponentialDelay = Math.min(baseDelay * Math.pow(2, state.consecutiveRateLimitErrors - 1), maxDelay) | ||
|
||
// Calculate delay based on consecutive errors across all requests | ||
const globalExponentialDelay = Math.min( | ||
baseDelay * Math.pow(2, state.consecutiveRateLimitErrors - 1), | ||
maxDelay, | ||
) | ||
|
||
// Calculate delay based on this specific request's attempt number | ||
const attemptExponentialDelay = Math.min(INITIAL_DELAY_MS * Math.pow(2, attemptNumber), maxDelay) | ||
|
||
// Use the larger of the two delays | ||
const exponentialDelay = Math.max(globalExponentialDelay, attemptExponentialDelay) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good implementation of the delay calculation! Taking the maximum of global and attempt-based delays ensures proper coordination across concurrent requests. |
||
|
||
// Set global rate limit | ||
state.isRateLimited = true | ||
state.rateLimitResetTime = now + exponentialDelay | ||
|
||
// Silent rate limit activation - no logging to prevent flooding | ||
return exponentialDelay | ||
} finally { | ||
release() | ||
} | ||
} | ||
|
||
/** | ||
* Resets the consecutive error count on successful request | ||
*/ | ||
private async resetGlobalRateLimitOnSuccess(): Promise<void> { | ||
const release = await OpenAICompatibleEmbedder.globalRateLimitState.mutex.acquire() | ||
try { | ||
const state = OpenAICompatibleEmbedder.globalRateLimitState | ||
|
||
// Reset rate limit state on success | ||
if (state.consecutiveRateLimitErrors > 0) { | ||
state.consecutiveRateLimitErrors = 0 | ||
state.isRateLimited = false | ||
state.rateLimitResetTime = 0 | ||
} | ||
} finally { | ||
release() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good update to pass the attempt number. Could we add another test case that explicitly verifies the resetGlobalRateLimitOnSuccess behavior to ensure consecutive errors are properly reset after successful requests?