Skip to content

Commit f21bcab

Browse files
Hi-JiajunHi-Jiajun
authored andcommitted
fix: address reviewer concerns about timeout and recursion convergence (PR CortexReach#215 follow-up)
This commit addresses the two blocking issues raised in PR CortexReach#215: 1. Timeout now uses AbortController for TRUE request cancellation - Timer is properly cleaned up in .finally() - AbortSignal is passed through to embedWithRetry 2. Recursion now guarantees monotonic convergence - Introduced STRICT_REDUCTION_FACTOR = 0.5 - Each recursion level must reduce input by 50% - Works regardless of model context size Modified by AI assistant (not human code) based on PR CortexReach#215. Thanks to the original author and maintainers. Co-authored-by: Hi-Jiajun <Hi-Jiajun@users.noreply.github.com>
1 parent 9966a3f commit f21bcab

File tree

1 file changed

+65
-27
lines changed

1 file changed

+65
-27
lines changed

src/embedder.ts

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,12 @@ const MAX_EMBED_DEPTH = 3;
258258
/** Global timeout for a single embedding operation (ms). */
259259
const EMBED_TIMEOUT_MS = 10_000;
260260

261+
/**
262+
* Strictly decreasing character limit for forced truncation.
263+
* Each recursion level MUST reduce input by this factor to guarantee progress.
264+
*/
265+
const STRICT_REDUCTION_FACTOR = 0.5; // Each retry must be at most 50% of previous
266+
261267
/**
262268
* Safe character limits per model for forced truncation.
263269
* CJK characters typically consume ~3 tokens each, so the char limit is
@@ -382,16 +388,23 @@ export class Embedder {
382388
/**
383389
* Call embeddings.create with automatic key rotation on rate-limit errors.
384390
* Tries each key in the pool at most once before giving up.
391+
* Accepts an optional AbortSignal to support true request cancellation.
385392
*/
386-
private async embedWithRetry(payload: any): Promise<any> {
393+
private async embedWithRetry(payload: any, signal?: AbortSignal): Promise<any> {
387394
const maxAttempts = this.clients.length;
388395
let lastError: Error | undefined;
389396

390397
for (let attempt = 0; attempt < maxAttempts; attempt++) {
391398
const client = this.nextClient();
392399
try {
393-
return await client.embeddings.create(payload);
400+
// Pass signal to OpenAI SDK if provided (SDK v6+ supports this)
401+
return await client.embeddings.create(payload, signal ? { signal } : undefined);
394402
} catch (error) {
403+
// If aborted, re-throw immediately
404+
if (error instanceof Error && error.name === 'AbortError') {
405+
throw error;
406+
}
407+
395408
lastError = error instanceof Error ? error : new Error(String(error));
396409

397410
if (this.isRateLimitError(error) && attempt < maxAttempts - 1) {
@@ -420,19 +433,33 @@ export class Embedder {
420433
return this.clients.length;
421434
}
422435

423-
/** FR-05: Wrap a promise with a global timeout to prevent indefinite hangs. */
424-
private withTimeout<T>(promise: Promise<T>, label: string): Promise<T> {
425-
return Promise.race([
426-
promise,
427-
new Promise<never>((_, reject) => {
428-
setTimeout(
429-
() => reject(new Error(
430-
`[memory-lancedb-pro] ${label} timed out after ${EMBED_TIMEOUT_MS}ms`
431-
)),
432-
EMBED_TIMEOUT_MS,
433-
);
434-
}),
435-
]);
436+
/** FR-05: Wrap a promise with a global timeout using AbortSignal for TRUE cancellation.
437+
* @param promiseFactory - A function that receives an AbortSignal and returns a promise
438+
*/
439+
private withTimeout<T>(promiseFactory: (signal: AbortSignal) => Promise<T>, label: string): Promise<T> {
440+
const controller = new AbortController();
441+
const timeoutId = setTimeout(() => controller.abort(), EMBED_TIMEOUT_MS);
442+
443+
// Create the promise with the signal
444+
const promise = promiseFactory(controller.signal);
445+
446+
// Race between the original promise and timeout
447+
// When timeout fires, controller.abort() will:
448+
// 1. Trigger the abort event below to reject
449+
// 2. If embedWithRetry received the signal, it will cancel the underlying HTTP request
450+
const timeoutPromise = new Promise<never>((_, reject) => {
451+
controller.signal.addEventListener('abort', () => {
452+
clearTimeout(timeoutId);
453+
reject(new Error(
454+
`[memory-lancedb-pro] ${label} timed out after ${EMBED_TIMEOUT_MS}ms`
455+
));
456+
});
457+
});
458+
459+
return Promise.race([promise, timeoutPromise])
460+
.finally(() => {
461+
clearTimeout(timeoutId);
462+
}) as Promise<T>;
436463
}
437464

438465
// --------------------------------------------------------------------------
@@ -459,11 +486,11 @@ export class Embedder {
459486
// --------------------------------------------------------------------------
460487

461488
async embedQuery(text: string): Promise<number[]> {
462-
return this.withTimeout(this.embedSingle(text, this._taskQuery), "embedQuery");
489+
return this.withTimeout((signal) => this.embedSingle(text, this._taskQuery, signal), "embedQuery");
463490
}
464491

465492
async embedPassage(text: string): Promise<number[]> {
466-
return this.withTimeout(this.embedSingle(text, this._taskPassage), "embedPassage");
493+
return this.withTimeout((signal) => this.embedSingle(text, this._taskPassage, signal), "embedPassage");
467494
}
468495

469496
async embedBatchQuery(texts: string[]): Promise<number[][]> {
@@ -510,18 +537,23 @@ export class Embedder {
510537
return payload;
511538
}
512539

513-
private async embedSingle(text: string, task?: string, depth: number = 0): Promise<number[]> {
540+
private async embedSingle(text: string, task?: string, depth: number = 0, signal?: AbortSignal): Promise<number[]> {
514541
if (!text || text.trim().length === 0) {
515542
throw new Error("Cannot embed empty text");
516543
}
517544

518545
// FR-01: Recursion depth limit — force truncate when too deep
519546
if (depth >= MAX_EMBED_DEPTH) {
520-
const safeLimit = getSafeCharLimit(this._model);
547+
const safeLimit = Math.floor(text.length * STRICT_REDUCTION_FACTOR);
521548
console.warn(
522549
`[memory-lancedb-pro] Recursion depth ${depth} reached MAX_EMBED_DEPTH (${MAX_EMBED_DEPTH}), ` +
523-
`force-truncating ${text.length} chars → ${safeLimit} chars`
550+
`force-truncating ${text.length} chars → ${safeLimit} chars (strict ${STRICT_REDUCTION_FACTOR * 100}% reduction)`
524551
);
552+
if (safeLimit < 100) {
553+
throw new Error(
554+
`[memory-lancedb-pro] Failed to embed: input too large for model context after ${MAX_EMBED_DEPTH} retries`
555+
);
556+
}
525557
text = text.slice(0, safeLimit);
526558
}
527559

@@ -530,7 +562,7 @@ export class Embedder {
530562
if (cached) return cached;
531563

532564
try {
533-
const response = await this.embedWithRetry(this.buildPayload(text, task));
565+
const response = await this.embedWithRetry(this.buildPayload(text, task), signal);
534566
const embedding = response.data[0]?.embedding as number[] | undefined;
535567
if (!embedding) {
536568
throw new Error("No embedding returned from provider");
@@ -555,27 +587,33 @@ export class Embedder {
555587

556588
// FR-03: Single chunk output detection — if smartChunk produced only
557589
// one chunk that is nearly the same size as the original text, chunking
558-
// did not actually reduce the problem. Force-truncate instead of
559-
// recursing (which would loop forever).
590+
// did not actually reduce the problem. Force-truncate with STRICT
591+
// reduction to guarantee progress.
560592
if (
561593
chunkResult.chunks.length === 1 &&
562594
chunkResult.chunks[0].length > text.length * 0.9
563595
) {
564-
const safeLimit = getSafeCharLimit(this._model);
596+
// Use strict reduction factor to guarantee each retry makes progress
597+
const safeLimit = Math.floor(text.length * STRICT_REDUCTION_FACTOR);
565598
console.warn(
566599
`[memory-lancedb-pro] smartChunk produced 1 chunk (${chunkResult.chunks[0].length} chars) ≈ original (${text.length} chars). ` +
567-
`Force-truncating to ${safeLimit} chars to avoid infinite recursion.`
600+
`Force-truncating to ${safeLimit} chars (strict ${STRICT_REDUCTION_FACTOR * 100}% reduction) to avoid infinite recursion.`
568601
);
602+
if (safeLimit < 100) {
603+
throw new Error(
604+
`[memory-lancedb-pro] Failed to embed: chunking couldn't reduce input size enough for model context`
605+
);
606+
}
569607
const truncated = text.slice(0, safeLimit);
570-
return this.embedSingle(truncated, task, depth + 1);
608+
return this.embedSingle(truncated, task, depth + 1, signal);
571609
}
572610

573611
// Embed all chunks in parallel
574612
console.log(`Split document into ${chunkResult.chunkCount} chunks for embedding`);
575613
const chunkEmbeddings = await Promise.all(
576614
chunkResult.chunks.map(async (chunk, idx) => {
577615
try {
578-
const embedding = await this.embedSingle(chunk, task, depth + 1);
616+
const embedding = await this.embedSingle(chunk, task, depth + 1, signal);
579617
return { embedding };
580618
} catch (chunkError) {
581619
console.warn(`Failed to embed chunk ${idx}:`, chunkError);

0 commit comments

Comments
 (0)