Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/openai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ const EMBEDDINGS_TIMEOUT_MS = 20_000;
// We do not track "last read" to keep writes minimal.
const EMBEDDINGS_CACHE_EVICT_BATCH = 512;
const EMBEDDINGS_CACHE_EVICT_MAX_BATCH = 8192;
const EMBEDDINGS_CACHE_QUOTA_RETRY_ATTEMPTS = 4;
const EMBEDDINGS_CACHE_QUOTA_MAX_RETRIES = 4;
const EMBEDDINGS_JOB_TTL_MS = 24 * 60 * 60_000;
const EMBEDDINGS_JOB_LOCK_MS = 30_000;

Expand Down Expand Up @@ -435,13 +435,14 @@ const isEmbeddingsCacheQuotaError = (error: unknown): boolean => {
const message = error instanceof Error ? error.message : typeof error === "string" ? error : "";
const combined = `${name} ${message}`.toLowerCase();
if (!combined) return false;
return combined.includes("quota") ||
combined.includes("insufficient") && combined.includes("storage") ||
combined.includes("insufficient") && combined.includes("space") ||
return (
combined.includes("quota") ||
(combined.includes("insufficient") && combined.includes("storage")) ||
(combined.includes("insufficient") && combined.includes("space")) ||
combined.includes("no space") ||
combined.includes("storage limit") ||
combined.includes("storage") && combined.includes("exceeded") ||
combined.includes("limit") && combined.includes("exceeded");
(combined.includes("storage") && combined.includes("exceeded"))
);
};

const writeEmbeddingsCacheEntry = async (
Expand Down Expand Up @@ -494,7 +495,8 @@ const writeEmbeddingsCacheEntryBestEffort = async (
deadlineMs: number,
): Promise<{ isNew: boolean }> => {
let evictBatch = EMBEDDINGS_CACHE_EVICT_BATCH;
for (let attempt = 0; attempt <= EMBEDDINGS_CACHE_QUOTA_RETRY_ATTEMPTS; attempt += 1) {
// Attempts = 1 (initial write) + max retries.
for (let attempt = 0; attempt <= EMBEDDINGS_CACHE_QUOTA_MAX_RETRIES; attempt += 1) {
if (Date.now() >= deadlineMs) return { isNew: false };
try {
return await writeEmbeddingsCacheEntry(kv, cacheModelKey, hash, embedding, createdAtMs);
Expand All @@ -510,7 +512,7 @@ const writeEmbeddingsCacheEntryBestEffort = async (
console.warn(
`[ai.ubq.fi] embeddings_cache quota eviction model=${cacheModelKey} evicted=${evicted.evicted_embeddings} stale_index_deleted=${evicted.deleted_stale_index_keys} batch=${evictBatch}`,
);
if (evicted.evicted_embeddings <= 0) return { isNew: false };
if (evicted.evicted_embeddings <= 0 && evicted.deleted_stale_index_keys <= 0) return { isNew: false };
} catch (evictError) {
console.warn("[ai.ubq.fi] embeddings_cache quota eviction failed:", evictError);
return { isNew: false };
Expand Down Expand Up @@ -1604,21 +1606,19 @@ export const handleEmbeddings = async (req: Request, usageContext?: UsageContext
});
}

let wroteNew = 0;
for (let i = 0; i < chunkItems.length; i += 1) {
const item = chunkItems[i]!;
const vec = vectors[i]!;
for (const idx of item.indices) vectorsByIndex[idx] = vec;
if (shouldCache && kv) {
const result = await writeEmbeddingsCacheEntryBestEffort(
await writeEmbeddingsCacheEntryBestEffort(
kv,
cacheModelKey,
item.hash,
vec,
Date.now(),
deadlineMs,
);
if (result.isNew) wroteNew += 1;
}
}
}
Expand Down
Loading