Skip to content

Commit 867fa6e

Browse files
committed
Implement Sliding Window Rate Limiter and Retry Handler for Gemini Embedder
- Introduced SlidingWindowRateLimiter to manage request rates effectively, ensuring consistent request handling and memory efficiency. - Added RetryHandler with exponential backoff and jitter to handle transient failures during API calls. - Refactored CodeIndexGeminiEmbedder to utilize the new rate limiter and retry handler, improving error handling and request management. - Enhanced batch processing logic for embedding texts, ensuring better performance and reliability.
1 parent 92ab05a commit 867fa6e

File tree

3 files changed

+524
-115
lines changed

3 files changed

+524
-115
lines changed

src/services/code-index/embedders/gemini.ts

Lines changed: 162 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@ import { ApiHandlerOptions } from "../../../shared/api"
22
import { EmbedderInfo, EmbeddingResponse, IEmbedder } from "../interfaces"
33
import { GeminiHandler } from "../../../api/providers/gemini"
44
import { EMBEDDING_MODEL_PROFILES } from "../../../shared/embeddingModels"
5-
import { GEMINI_RATE_LIMIT_DELAY_MS, MAX_BATCH_RETRIES, INITIAL_RETRY_DELAY_MS } from "../constants"
5+
import { GEMINI_RATE_LIMIT_DELAY_MS } from "../constants"
6+
import { SlidingWindowRateLimiter, SlidingWindowRateLimiterOptions } from "../../../utils/rate-limiter"
7+
import { RetryHandler } from "../../../utils/retry-handler"
8+
69
/**
710
* Implements the IEmbedder interface using Google Gemini's embedding API.
811
*/
912
export class CodeIndexGeminiEmbedder extends GeminiHandler implements IEmbedder {
1013
private readonly defaultModelId: string
1114
private readonly defaultTaskType: string
15+
private readonly rateLimiter: SlidingWindowRateLimiter
16+
private readonly retryHandler: RetryHandler
17+
private readonly id: string
1218

1319
/**
1420
* Creates a new Gemini embedder instance.
@@ -18,6 +24,26 @@ export class CodeIndexGeminiEmbedder extends GeminiHandler implements IEmbedder
1824
super(options)
1925
this.defaultModelId = options.apiModelId || "gemini-embedding-exp-03-07"
2026
this.defaultTaskType = options.geminiEmbeddingTaskType || "CODE_RETRIEVAL_QUERY"
27+
28+
// Calculate rate limit parameters based on rateLimitSeconds or default
29+
const rateLimitSeconds = options.rateLimitSeconds || GEMINI_RATE_LIMIT_DELAY_MS / 1000
30+
31+
// Configure the rate limiter to use rateLimitSeconds for rate calculations
32+
const limiterOptions: SlidingWindowRateLimiterOptions = {
33+
rateLimitSeconds: rateLimitSeconds,
34+
}
35+
36+
// Get the singleton rate limiter instance
37+
this.rateLimiter = new SlidingWindowRateLimiter(limiterOptions)
38+
// Initialize retry handler with default options
39+
this.retryHandler = new RetryHandler({
40+
initialDelay: rateLimitSeconds,
41+
})
42+
this.id = Math.random().toString()
43+
44+
console.log(
45+
`Initialized Gemini rate limiter with id ${this.id} and ${rateLimitSeconds}s minimum delay between requests`,
46+
)
2147
}
2248

2349
/**
@@ -39,6 +65,38 @@ export class CodeIndexGeminiEmbedder extends GeminiHandler implements IEmbedder
3965
}
4066
}
4167

68+
/**
69+
* Processes a batch of texts and aggregates the embeddings and usage statistics.
70+
*
71+
* @param batch Array of texts to process
72+
* @param model Model identifier to use
73+
* @param taskType The task type for the embedding
74+
* @param allEmbeddings Array to store all embeddings
75+
* @param aggregatedUsage Object to track token usage
76+
* @param isFinalBatch Whether this is the final batch (affects error messages)
77+
*/
78+
private async _processAndAggregateBatch(
79+
batch: string[],
80+
model: string,
81+
taskType: string,
82+
allEmbeddings: number[][],
83+
aggregatedUsage: { promptTokens: number; totalTokens: number },
84+
isFinalBatch: boolean = false,
85+
): Promise<void> {
86+
if (batch.length === 0) return
87+
88+
try {
89+
const batchResult = await this._embedBatch(batch, model, taskType)
90+
allEmbeddings.push(...batchResult.embeddings)
91+
aggregatedUsage.promptTokens += batchResult.usage.promptTokens
92+
aggregatedUsage.totalTokens += batchResult.usage.totalTokens
93+
} catch (error) {
94+
const batchType = isFinalBatch ? "final batch" : "batch"
95+
console.error(`Failed to process ${batchType} with retries:`, error)
96+
throw new Error(`Failed to create embeddings for ${batchType}: ${(error as Error).message}`)
97+
}
98+
}
99+
42100
/**
43101
* Embeds texts while respecting the token limit of the model.
44102
* Splits the input texts into batches that don't exceed the model's token limit.
@@ -68,142 +126,131 @@ export class CodeIndexGeminiEmbedder extends GeminiHandler implements IEmbedder
68126
const allEmbeddings: number[][] = []
69127
const aggregatedUsage = { promptTokens: 0, totalTokens: 0 }
70128

71-
// Process texts in batches
72-
const remainingTexts = [...texts]
73-
let isFirstBatch = true // Initialize isFirstBatch
74-
75-
while (remainingTexts.length > 0) {
76-
const currentBatch: string[] = []
77-
let currentBatchTokens = 0
78-
const processedIndices: number[] = []
79-
80-
// Simple token estimation (4 chars ≈ 1 token)
81-
for (let i = 0; i < remainingTexts.length; i++) {
82-
const text = remainingTexts[i]
83-
// Estimate tokens (similar to OpenAI's implementation)
84-
const estimatedTokens = Math.ceil(text.length / 4)
85-
86-
// Skip texts that exceed the max token limit for a single item
87-
if (estimatedTokens > maxInputTokens) {
88-
console.warn(
89-
`Text at index ${i} exceeds maximum token limit (${estimatedTokens} > ${maxInputTokens}). Skipping.`,
90-
)
91-
processedIndices.push(i)
92-
continue
93-
}
94-
95-
// Add text to batch if it fits within the token limit
96-
if (currentBatchTokens + estimatedTokens <= maxInputTokens) {
97-
currentBatch.push(text)
98-
currentBatchTokens += estimatedTokens
99-
processedIndices.push(i)
100-
} else {
101-
// This text would exceed the limit, so process the current batch first
102-
break
103-
}
104-
}
129+
// Initialize the current batch
130+
let currentBatch: string[] = []
131+
let currentBatchTokens = 0
105132

106-
// Remove processed texts from the remaining texts
107-
for (let i = processedIndices.length - 1; i >= 0; i--) {
108-
remainingTexts.splice(processedIndices[i], 1)
133+
// Process each text sequentially with for...of loop
134+
for (const text of texts) {
135+
// Estimate tokens (similar to OpenAI's implementation)
136+
const estimatedTokens = Math.ceil(text.length / 4)
137+
138+
// Skip texts that exceed the max token limit for a single item
139+
if (estimatedTokens > maxInputTokens) {
140+
console.warn(`Text exceeds maximum token limit (${estimatedTokens} > ${maxInputTokens}). Skipping.`)
141+
continue
109142
}
110143

111-
// Process the current batch if not empty
112-
if (currentBatch.length > 0) {
113-
if (!isFirstBatch) {
114-
const delayMs =
115-
this.options.rateLimitSeconds !== undefined
116-
? this.options.rateLimitSeconds * 1000
117-
: GEMINI_RATE_LIMIT_DELAY_MS
118-
console.log(`Adding proactive delay of ${delayMs}ms before Gemini batch`)
119-
await new Promise((resolve) => setTimeout(resolve, delayMs))
120-
isFirstBatch = false
121-
}
122-
123-
try {
124-
const batchResult = await this._embedBatchWithRetries(currentBatch, model, taskType)
125-
allEmbeddings.push(...batchResult.embeddings)
126-
aggregatedUsage.promptTokens += batchResult.usage.promptTokens
127-
aggregatedUsage.totalTokens += batchResult.usage.totalTokens
128-
} catch (error) {
129-
console.error("Failed to process batch with retries:", error)
130-
throw new Error(`Failed to create embeddings for batch: ${(error as Error).message}`)
131-
}
144+
// If adding this text would exceed the token limit, process the current batch first
145+
if (currentBatchTokens + estimatedTokens > maxInputTokens) {
146+
// Process the current batch
147+
await this._processAndAggregateBatch(currentBatch, model, taskType, allEmbeddings, aggregatedUsage)
148+
149+
// Reset the batch
150+
currentBatch = []
151+
currentBatchTokens = 0
132152
}
153+
154+
// Add the current text to the batch
155+
currentBatch.push(text)
156+
currentBatchTokens += estimatedTokens
133157
}
134158

159+
// Process any remaining texts in the final batch
160+
await this._processAndAggregateBatch(currentBatch, model, taskType, allEmbeddings, aggregatedUsage, true)
161+
135162
return { embeddings: allEmbeddings, usage: aggregatedUsage }
136163
}
137164

138165
/**
139-
* Helper method to handle batch embedding with retries and exponential backoff for Gemini.
166+
* Makes the actual API call to Gemini's embedding service and processes the response.
167+
*
168+
* @param batchTexts Array of texts to embed
169+
* @param modelId Model identifier to use for the API call
170+
* @param taskType The task type for the embedding
171+
* @returns Promise resolving to embeddings and usage statistics
172+
*/
173+
private async _callGeminiEmbeddingApi(
174+
batchTexts: string[],
175+
modelId: string,
176+
taskType: string,
177+
): Promise<{ embeddings: number[][]; usage: { promptTokens: number; totalTokens: number } }> {
178+
const now = new Date()
179+
console.log(`_callGeminiEmbeddingApi ${now.toISOString()}`)
180+
const response = await this.client.models.embedContent({
181+
model: modelId,
182+
contents: batchTexts,
183+
config: {
184+
taskType,
185+
},
186+
})
187+
188+
if (!response.embeddings) {
189+
throw new Error("No embeddings returned from Gemini API")
190+
}
191+
192+
const embeddings = response.embeddings
193+
.map((embedding) => embedding?.values)
194+
.filter((values) => values !== undefined && values.length > 0) as number[][]
195+
196+
// Gemini API for embeddings doesn't directly return token usage per call
197+
return {
198+
embeddings,
199+
usage: { promptTokens: 0, totalTokens: 0 }, // Placeholder usage
200+
}
201+
}
202+
203+
/**
204+
* Creates embeddings for a batch of texts using the Gemini API.
205+
* Rate limiting is handled by the SlidingWindowRateLimiter.
206+
*
140207
* @param batchTexts Array of texts to embed in this batch
141208
* @param model Model identifier to use
142209
* @param taskType The task type for the embedding
143210
* @returns Promise resolving to embeddings and usage statistics
144211
*/
145-
private async _embedBatchWithRetries(
212+
private async _embedBatch(
146213
batchTexts: string[],
147214
model: string,
148215
taskType: string,
149216
): Promise<{ embeddings: number[][]; usage: { promptTokens: number; totalTokens: number } }> {
150217
const modelId = model || this.defaultModelId
151-
let lastError: any = null
152-
153-
for (let attempts = 0; attempts < MAX_BATCH_RETRIES; attempts++) {
154-
try {
155-
const response = await this.client.models.embedContent({
156-
model: modelId,
157-
contents: batchTexts,
158-
config: {
159-
taskType,
160-
},
161-
})
162-
163-
if (!response.embeddings) {
164-
throw new Error("No embeddings returned from Gemini API")
165-
}
166-
167-
const embeddings = response.embeddings
168-
.map((embedding) => embedding?.values)
169-
.filter((values) => values !== undefined && values.length > 0) as number[][]
170-
171-
// Gemini API for embeddings doesn't directly return token usage per call in the same way some others do.
172-
// The `generateEmbeddings` in the original file didn't populate usage.
173-
// If usage needs to be calculated, it would require a separate token counting call.
174-
// For now, returning empty usage, consistent with the original generateEmbeddings.
175-
return {
176-
embeddings,
177-
usage: { promptTokens: 0, totalTokens: 0 }, // Placeholder usage
178-
}
179-
} catch (error: any) {
180-
lastError = error
181-
// Basic check for retryable errors (e.g., rate limits)
182-
// Gemini might use 429 or specific error messages like "RESOURCE_EXHAUSTED" or "rate limit exceeded"
183-
const isRateLimitError =
184-
error?.status === 429 ||
185-
(error?.message &&
186-
(error.message.includes("rate limit") || error.message.includes("RESOURCE_EXHAUSTED")))
187-
188-
const hasMoreAttempts = attempts < MAX_BATCH_RETRIES - 1
189-
190-
if (isRateLimitError && hasMoreAttempts) {
191-
const delayMs = INITIAL_RETRY_DELAY_MS * Math.pow(2, attempts)
192-
console.warn(
193-
`Gemini embedding attempt ${attempts + 1} failed due to rate limit. Retrying in ${delayMs}ms...`,
194-
)
195-
await new Promise((resolve) => setTimeout(resolve, delayMs))
196-
continue
197-
}
198-
// Non-retryable error or last attempt failed
199-
console.error(`Gemini embedding failed on attempt ${attempts + 1}:`, error)
200-
throw error // Re-throw the last error if not retryable or out of attempts
218+
219+
// Determine if an error is retryable (429 Too Many Requests or specific API errors)
220+
const shouldRetry = (error: any): boolean => {
221+
const retryable =
222+
error.status === 429 ||
223+
error.message?.includes("RESOURCE_EXHAUSTED") ||
224+
error.message?.includes("rate limit") ||
225+
error.message?.includes("quota exceeded")
226+
227+
if (retryable) {
228+
console.log(`Retryable error detected: ${error.message}`)
201229
}
230+
231+
return retryable
232+
}
233+
234+
try {
235+
// Execute the API call with retry logic
236+
return await this.retryHandler.execute(async () => {
237+
// Acquire a slot from the rate limiter before making the API call
238+
// This ensures each retry attempt also respects rate limits
239+
await this.rateLimiter.acquire()
240+
return await this._callGeminiEmbeddingApi(batchTexts, modelId, taskType)
241+
}, shouldRetry)
242+
} catch (error: any) {
243+
// Log the error with context
244+
console.error(`Gemini embedding request failed after all retry attempts:`, {
245+
error: error.message,
246+
status: error.status,
247+
modelId,
248+
batchSize: batchTexts.length,
249+
})
250+
251+
// Rethrow the error
252+
throw error
202253
}
203-
// Should not be reached if throw error in loop works correctly, but as a fallback:
204-
throw new Error(
205-
`Failed to create embeddings for batch after ${MAX_BATCH_RETRIES} attempts. Last error: ${lastError?.message}`,
206-
)
207254
}
208255

209256
get embedderInfo(): EmbedderInfo {

0 commit comments

Comments
 (0)