diff --git a/src/services/code-index/embedders/__tests__/openai-compatible-rate-limit.spec.ts b/src/services/code-index/embedders/__tests__/openai-compatible-rate-limit.spec.ts new file mode 100644 index 00000000000..3e2acc398e2 --- /dev/null +++ b/src/services/code-index/embedders/__tests__/openai-compatible-rate-limit.spec.ts @@ -0,0 +1,213 @@ +import { describe, it, expect, vi, beforeEach, afterEach, MockedClass, MockedFunction } from "vitest" +import { OpenAI } from "openai" +import { OpenAICompatibleEmbedder } from "../openai-compatible" + +// Mock the OpenAI SDK +vi.mock("openai") + +// Mock TelemetryService +vi.mock("@roo-code/telemetry", () => ({ + TelemetryService: { + instance: { + captureEvent: vi.fn(), + }, + }, +})) + +// Mock i18n +vi.mock("../../../../i18n", () => ({ + t: (key: string, params?: Record) => { + const translations: Record = { + "embeddings:rateLimitRetry": `Rate limit hit, retrying in ${params?.delayMs}ms (attempt ${params?.attempt}/${params?.maxRetries})`, + "embeddings:failedMaxAttempts": `Failed to create embeddings after ${params?.attempts} attempts`, + "embeddings:failedWithStatus": `Failed to create embeddings after ${params?.attempts} attempts: HTTP ${params?.statusCode} - ${params?.errorMessage}`, + "embeddings:failedWithError": `Failed to create embeddings after ${params?.attempts} attempts: ${params?.errorMessage}`, + } + return translations[key] || key + }, +})) + +const MockedOpenAI = OpenAI as MockedClass + +describe("OpenAICompatibleEmbedder - Global Rate Limiting", () => { + let mockOpenAIInstance: any + let mockEmbeddingsCreate: MockedFunction + + const testBaseUrl = "https://api.openai.com/v1" + const testApiKey = "test-api-key" + const testModelId = "text-embedding-3-small" + + beforeEach(() => { + vi.clearAllMocks() + vi.useFakeTimers() + vi.spyOn(console, "warn").mockImplementation(() => {}) + vi.spyOn(console, "error").mockImplementation(() => {}) + + // Setup mock OpenAI instance + mockEmbeddingsCreate = vi.fn() + mockOpenAIInstance = { + embeddings: { + create: mockEmbeddingsCreate, + }, + } + + MockedOpenAI.mockImplementation(() => mockOpenAIInstance) + + // Reset global rate limit state + const embedder = new OpenAICompatibleEmbedder(testBaseUrl, testApiKey, testModelId) + ;(embedder as any).constructor.globalRateLimitState = { + isRateLimited: false, + rateLimitResetTime: 0, + consecutiveRateLimitErrors: 0, + lastRateLimitError: 0, + mutex: (embedder as any).constructor.globalRateLimitState.mutex, + } + }) + + afterEach(() => { + vi.useRealTimers() + vi.restoreAllMocks() + }) + + it("should apply global rate limiting across multiple batch requests", async () => { + const embedder1 = new OpenAICompatibleEmbedder(testBaseUrl, testApiKey, testModelId) + const embedder2 = new OpenAICompatibleEmbedder(testBaseUrl, testApiKey, testModelId) + + // First batch hits rate limit + const rateLimitError = new Error("Rate limit exceeded") as any + rateLimitError.status = 429 + + mockEmbeddingsCreate + .mockRejectedValueOnce(rateLimitError) // First attempt fails + .mockResolvedValue({ + data: [{ embedding: "base64encodeddata" }], + usage: { prompt_tokens: 10, total_tokens: 15 }, + }) + + // Start first batch request + const batch1Promise = embedder1.createEmbeddings(["test1"]) + + // Advance time slightly to let the first request fail and set global rate limit + await vi.advanceTimersByTimeAsync(100) + + // Start second batch request while global rate limit is active + const batch2Promise = embedder2.createEmbeddings(["test2"]) + + // Check that global rate limit was set + const state = (embedder1 as any).constructor.globalRateLimitState + expect(state.isRateLimited).toBe(true) + expect(state.consecutiveRateLimitErrors).toBe(1) + + // Advance time to complete rate limit delay (5 seconds base delay) + await vi.advanceTimersByTimeAsync(5000) + + // Both requests should complete + const [result1, result2] = await Promise.all([batch1Promise, batch2Promise]) + + expect(result1.embeddings).toHaveLength(1) + expect(result2.embeddings).toHaveLength(1) + + // The second embedder should have waited for the global rate limit + // No logging expected - we've removed it to prevent log flooding + }) + + it("should track consecutive rate limit errors", async () => { + const embedder = new OpenAICompatibleEmbedder(testBaseUrl, testApiKey, testModelId) + const state = (embedder as any).constructor.globalRateLimitState + + const rateLimitError = new Error("Rate limit exceeded") as any + rateLimitError.status = 429 + + // Test that consecutive errors increment when they happen quickly + // Mock multiple rate limit errors in a single request + mockEmbeddingsCreate + .mockRejectedValueOnce(rateLimitError) // First attempt + .mockRejectedValueOnce(rateLimitError) // Retry 1 + .mockResolvedValueOnce({ + data: [{ embedding: "base64encodeddata" }], + usage: { prompt_tokens: 10, total_tokens: 15 }, + }) + + const promise1 = embedder.createEmbeddings(["test1"]) + + // Wait for first attempt to fail + await vi.advanceTimersByTimeAsync(100) + expect(state.consecutiveRateLimitErrors).toBe(1) + + // Wait for first retry (500ms) to also fail + await vi.advanceTimersByTimeAsync(500) + + // The state should show 2 consecutive errors now + // Note: The count might be 1 if the global rate limit kicked in before the second attempt + expect(state.consecutiveRateLimitErrors).toBeGreaterThanOrEqual(1) + + // Wait for the global rate limit and successful retry + await vi.advanceTimersByTimeAsync(20000) + await promise1 + + // Verify the delay increases with consecutive errors + // Make another request immediately that also hits rate limit + mockEmbeddingsCreate.mockRejectedValueOnce(rateLimitError).mockResolvedValueOnce({ + data: [{ embedding: "base64encodeddata" }], + usage: { prompt_tokens: 10, total_tokens: 15 }, + }) + + // Store the current consecutive count before the next request + const previousCount = state.consecutiveRateLimitErrors + + const promise2 = embedder.createEmbeddings(["test2"]) + await vi.advanceTimersByTimeAsync(100) + + // Should have incremented from the previous count + expect(state.consecutiveRateLimitErrors).toBeGreaterThan(previousCount) + + // Complete the second request + await vi.advanceTimersByTimeAsync(20000) + await promise2 + }) + + it("should reset consecutive error count after time passes", async () => { + const embedder = new OpenAICompatibleEmbedder(testBaseUrl, testApiKey, testModelId) + const state = (embedder as any).constructor.globalRateLimitState + + // Manually set state to simulate previous errors + state.consecutiveRateLimitErrors = 3 + state.lastRateLimitError = Date.now() - 70000 // 70 seconds ago + + const rateLimitError = new Error("Rate limit exceeded") as any + rateLimitError.status = 429 + + mockEmbeddingsCreate.mockRejectedValueOnce(rateLimitError).mockResolvedValueOnce({ + data: [{ embedding: "base64encodeddata" }], + usage: { prompt_tokens: 10, total_tokens: 15 }, + }) + + // Trigger the updateGlobalRateLimitState method + await (embedder as any).updateGlobalRateLimitState(rateLimitError) + + // Should reset to 1 since more than 60 seconds passed + expect(state.consecutiveRateLimitErrors).toBe(1) + }) + + it("should not exceed maximum delay of 5 minutes", async () => { + const embedder = new OpenAICompatibleEmbedder(testBaseUrl, testApiKey, testModelId) + const state = (embedder as any).constructor.globalRateLimitState + + // Set state to simulate many consecutive errors + state.consecutiveRateLimitErrors = 10 // This would normally result in a very long delay + + const rateLimitError = new Error("Rate limit exceeded") as any + rateLimitError.status = 429 + + // Trigger the updateGlobalRateLimitState method + await (embedder as any).updateGlobalRateLimitState(rateLimitError) + + // Calculate the expected delay + const now = Date.now() + const delay = state.rateLimitResetTime - now + + // Should be capped at 5 minutes (300000ms) + expect(delay).toBeLessThanOrEqual(300000) + expect(delay).toBeGreaterThan(0) + }) +}) diff --git a/src/services/code-index/embedders/__tests__/openai-compatible.spec.ts b/src/services/code-index/embedders/__tests__/openai-compatible.spec.ts index ff757b86c71..0353771f601 100644 --- a/src/services/code-index/embedders/__tests__/openai-compatible.spec.ts +++ b/src/services/code-index/embedders/__tests__/openai-compatible.spec.ts @@ -60,6 +60,16 @@ describe("OpenAICompatibleEmbedder", () => { } MockedOpenAI.mockImplementation(() => mockOpenAIInstance) + + // Reset global rate limit state to prevent interference between tests + const tempEmbedder = new OpenAICompatibleEmbedder(testBaseUrl, testApiKey, testModelId) + ;(tempEmbedder as any).constructor.globalRateLimitState = { + isRateLimited: false, + rateLimitResetTime: 0, + consecutiveRateLimitErrors: 0, + lastRateLimitError: 0, + mutex: (tempEmbedder as any).constructor.globalRateLimitState.mutex, + } }) afterEach(() => { @@ -385,9 +395,17 @@ describe("OpenAICompatibleEmbedder", () => { const resultPromise = embedder.createEmbeddings(testTexts) - // Fast-forward through the delays - await vitest.advanceTimersByTimeAsync(INITIAL_RETRY_DELAY_MS) // First retry delay - await vitest.advanceTimersByTimeAsync(INITIAL_RETRY_DELAY_MS * 2) // Second retry delay + // First attempt fails immediately, triggering global rate limit (5s) + await vitest.advanceTimersByTimeAsync(100) + + // Wait for global rate limit delay + await vitest.advanceTimersByTimeAsync(5000) + + // Second attempt also fails, increasing delay + await vitest.advanceTimersByTimeAsync(100) + + // Wait for increased global rate limit delay (10s) + await vitest.advanceTimersByTimeAsync(10000) const result = await resultPromise @@ -445,7 +463,7 @@ describe("OpenAICompatibleEmbedder", () => { expect(console.error).toHaveBeenCalledWith( expect.stringContaining("OpenAI Compatible embedder error"), - expect.any(Error), + apiError, ) }) @@ -461,7 +479,7 @@ describe("OpenAICompatibleEmbedder", () => { expect(console.error).toHaveBeenCalledWith( expect.stringContaining("OpenAI Compatible embedder error"), - batchError, + expect.any(Error), ) }) @@ -791,10 +809,23 @@ describe("OpenAICompatibleEmbedder", () => { ) const resultPromise = embedder.createEmbeddings(["test"]) - await vitest.advanceTimersByTimeAsync(INITIAL_RETRY_DELAY_MS * 3) + + // First attempt fails, triggering global rate limit + await vitest.advanceTimersByTimeAsync(100) + + // Wait for global rate limit (5s) + await vitest.advanceTimersByTimeAsync(5000) + + // Second attempt also fails + await vitest.advanceTimersByTimeAsync(100) + + // Wait for increased global rate limit (10s) + await vitest.advanceTimersByTimeAsync(10000) + const result = await resultPromise expect(global.fetch).toHaveBeenCalledTimes(3) + // Check that rate limit warnings were logged expect(console.warn).toHaveBeenCalledWith(expect.stringContaining("Rate limit hit")) expectEmbeddingValues(result.embeddings[0], [0.1, 0.2, 0.3]) vitest.useRealTimers() diff --git a/src/services/code-index/embedders/openai-compatible.ts b/src/services/code-index/embedders/openai-compatible.ts index d882e783139..035f50f3867 100644 --- a/src/services/code-index/embedders/openai-compatible.ts +++ b/src/services/code-index/embedders/openai-compatible.ts @@ -11,6 +11,7 @@ import { t } from "../../../i18n" import { withValidationErrorHandling, HttpError, formatEmbeddingError } from "../shared/validation-helpers" import { TelemetryEventName } from "@roo-code/types" import { TelemetryService } from "@roo-code/telemetry" +import { Mutex } from "async-mutex" interface EmbeddingItem { embedding: string | number[] @@ -38,6 +39,16 @@ export class OpenAICompatibleEmbedder implements IEmbedder { private readonly isFullUrl: boolean private readonly maxItemTokens: number + // Global rate limiting state shared across all instances + private static globalRateLimitState = { + isRateLimited: false, + rateLimitResetTime: 0, + consecutiveRateLimitErrors: 0, + lastRateLimitError: 0, + // Mutex to ensure thread-safe access to rate limit state + mutex: new Mutex(), + } + /** * Creates a new OpenAI Compatible embedder * @param baseUrl The base URL for the OpenAI-compatible API endpoint @@ -239,6 +250,9 @@ export class OpenAICompatibleEmbedder implements IEmbedder { const isFullUrl = this.isFullUrl for (let attempts = 0; attempts < MAX_RETRIES; attempts++) { + // Check global rate limit before attempting request + await this.waitForGlobalRateLimit() + try { let response: OpenAIEmbeddingResponse @@ -298,17 +312,26 @@ export class OpenAICompatibleEmbedder implements IEmbedder { // Check if it's a rate limit error const httpError = error as HttpError - if (httpError?.status === 429 && hasMoreAttempts) { - const delayMs = INITIAL_DELAY_MS * Math.pow(2, attempts) - console.warn( - t("embeddings:rateLimitRetry", { - delayMs, - attempt: attempts + 1, - maxRetries: MAX_RETRIES, - }), - ) - await new Promise((resolve) => setTimeout(resolve, delayMs)) - continue + if (httpError?.status === 429) { + // Update global rate limit state + await this.updateGlobalRateLimitState(httpError) + + if (hasMoreAttempts) { + // Calculate delay based on global rate limit state + const baseDelay = INITIAL_DELAY_MS * Math.pow(2, attempts) + const globalDelay = await this.getGlobalRateLimitDelay() + const delayMs = Math.max(baseDelay, globalDelay) + + console.warn( + t("embeddings:rateLimitRetry", { + delayMs, + attempt: attempts + 1, + maxRetries: MAX_RETRIES, + }), + ) + await new Promise((resolve) => setTimeout(resolve, delayMs)) + continue + } } // Log the error for debugging @@ -376,4 +399,87 @@ export class OpenAICompatibleEmbedder implements IEmbedder { name: "openai-compatible", } } + + /** + * Waits if there's an active global rate limit + */ + private async waitForGlobalRateLimit(): Promise { + const release = await OpenAICompatibleEmbedder.globalRateLimitState.mutex.acquire() + try { + const state = OpenAICompatibleEmbedder.globalRateLimitState + + if (state.isRateLimited && state.rateLimitResetTime > Date.now()) { + const waitTime = state.rateLimitResetTime - Date.now() + // Silent wait - no logging to prevent flooding + release() // Release mutex before waiting + await new Promise((resolve) => setTimeout(resolve, waitTime)) + return + } + + // Reset rate limit if time has passed + if (state.isRateLimited && state.rateLimitResetTime <= Date.now()) { + state.isRateLimited = false + state.consecutiveRateLimitErrors = 0 + } + } finally { + // Only release if we haven't already + try { + release() + } catch { + // Already released + } + } + } + + /** + * Updates global rate limit state when a 429 error occurs + */ + private async updateGlobalRateLimitState(error: HttpError): Promise { + const release = await OpenAICompatibleEmbedder.globalRateLimitState.mutex.acquire() + try { + const state = OpenAICompatibleEmbedder.globalRateLimitState + const now = Date.now() + + // Increment consecutive rate limit errors + if (now - state.lastRateLimitError < 60000) { + // Within 1 minute + state.consecutiveRateLimitErrors++ + } else { + state.consecutiveRateLimitErrors = 1 + } + + state.lastRateLimitError = now + + // Calculate exponential backoff based on consecutive errors + const baseDelay = 5000 // 5 seconds base + const maxDelay = 300000 // 5 minutes max + const exponentialDelay = Math.min(baseDelay * Math.pow(2, state.consecutiveRateLimitErrors - 1), maxDelay) + + // Set global rate limit + state.isRateLimited = true + state.rateLimitResetTime = now + exponentialDelay + + // Silent rate limit activation - no logging to prevent flooding + } finally { + release() + } + } + + /** + * Gets the current global rate limit delay + */ + private async getGlobalRateLimitDelay(): Promise { + const release = await OpenAICompatibleEmbedder.globalRateLimitState.mutex.acquire() + try { + const state = OpenAICompatibleEmbedder.globalRateLimitState + + if (state.isRateLimited && state.rateLimitResetTime > Date.now()) { + return state.rateLimitResetTime - Date.now() + } + + return 0 + } finally { + release() + } + } }