Skip to content

Commit 62f4121

Browse files
authored
fix: 5 memory leaks + subagent blocking + AliceLJY nit fixes (#611)
* fix: 5 memory leaks + subagent skip + AliceLJY nit fixes (#598, #601) Cherry-pick of PR #603 (memory leak fixes) + Nit fixes from AliceLJY review: - store.ts: tail-reset semaphore replaces unbounded promise chain - access-tracker.ts: separate retryCount map + Nit#4 merge delta on retry race - embedder.ts: TTL eviction on set() when near capacity - retrieval-stats.ts: ring buffer + Nit#6 inline iteration (no intermediate array) - noise-prototypes.ts: DEDUP_THRESHOLD 0.95->0.90 - index.ts: subagent skip guards for before_prompt_build hooks - test/issue598_smoke.mjs: Nit#1 hardcoded path -> import.meta.url relative path * fix: logger.error optional chaining to prevent TypeError when logger only has warn * fix: route destroy() flush through flush() to avoid concurrent writes + catch unhandled rejection * fix: clear pending synchronously in destroy() before async flush + remove unused this.logger.error * fix: resolve 5 memory leak issues causing heap OOM (#598) Cherry-pick from PR #603: - store.ts: tail-reset semaphore replaces unbounded promise chain - access-tracker.ts: separate retryCount map, maxRetries=5, destroy() with 3s timeout - embedder.ts: TTL eviction on set() when near capacity - retrieval-stats.ts: ring buffer replaces Array.shift() - O(1) write - noise-prototypes.ts: DEDUP_THRESHOLD 0.95->0.90 - test/issue598_smoke.mjs: smoke test for subagent guards * test: add regression tests for Issue #598 memory leak fixes - store-serialization.test.mjs: verifies runSerializedUpdate executes sequentially - access-tracker-retry.test.mjs: verifies retry delta does not amplify, max retries drops - embedder-cache.test.mjs: verifies embedder config accepts TTL params Refs: issues #598 * test: improve regression tests per Codex review feedback - store-serialization: add in-flight concurrency check + exception release test - access-tracker-retry: add new writes during retry test + precise delta check - embedder-cache: add actual TTL config verification Refs: issues #598 * test: precise metadata count verification for access-tracker + improved embedder TTL test - access-tracker: add precise metadata count check using parseAccessMetadata (verifies final accessCount=3 matches expected 1+2 delta merge) - embedder-cache: clarify TTL test with config acceptance + OLLAMA note - pre-seed memory in retry test so getById returns data (not null) Refs: Codex Round 2 feedback * test: fix embedder-cache to not use non-existent config fields - Remove maxCacheSize and cacheTtlMinutes from test (they don't exist in EmbeddingConfig) - Document that TTL eviction uses hardcoded defaults (256, 30 min) - The fix is _evictExpired() on set(), not configurable params Ref: Codex Round 3 feedback * test: rename embedder-cache to smoke test, clarify coverage limits - Rename from 'regression test' to 'smoke test' per Codex Round 4 feedback - Document that _evictExpired() on set() requires OLLAMA server for full test - Fix passes now shows actual cache size/hits/misses verification Ref: Codex Round 4 * ci: add Issue #598 regression tests to CI manifest - test/store-serialization.test.mjs: store.ts tail-reset serialization - test/access-tracker-retry.test.mjs: access-tracker delta amplification - test/embedder-cache.test.mjs: embedder TTL eviction (smoke) Ref: issues #598 * test: remove issue598_smoke.mjs from PR #611 (belongs to PR #613 subagent skip) * fix(cli-smoke): add missing store.count() mock (fixes pre-existing bug from 0988a46) * revert: undo cli-smoke fix (belongs to dedicated issue)
1 parent 7a4e0bd commit 62f4121

10 files changed

+639
-46
lines changed

scripts/ci-test-manifest.mjs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ export const CI_TEST_MANIFEST = [
4242
{ group: "core-regression", runner: "node", file: "test/preference-slots.test.mjs", args: ["--test"] },
4343
{ group: "core-regression", runner: "node", file: "test/is-latest-auto-supersede.test.mjs" },
4444
{ group: "core-regression", runner: "node", file: "test/temporal-awareness.test.mjs", args: ["--test"] },
45+
// Issue #598 regression tests
46+
{ group: "core-regression", runner: "node", file: "test/store-serialization.test.mjs" },
47+
{ group: "core-regression", runner: "node", file: "test/access-tracker-retry.test.mjs" },
48+
{ group: "core-regression", runner: "node", file: "test/embedder-cache.test.mjs" },
4549
];
4650

4751
export function getEntriesForGroup(group) {

scripts/verify-ci-test-manifest.mjs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ const EXPECTED_BASELINE = [
4343
{ group: "core-regression", runner: "node", file: "test/preference-slots.test.mjs", args: ["--test"] },
4444
{ group: "core-regression", runner: "node", file: "test/is-latest-auto-supersede.test.mjs" },
4545
{ group: "core-regression", runner: "node", file: "test/temporal-awareness.test.mjs", args: ["--test"] },
46+
// Issue #598 regression tests
47+
{ group: "core-regression", runner: "node", file: "test/store-serialization.test.mjs" },
48+
{ group: "core-regression", runner: "node", file: "test/access-tracker-retry.test.mjs" },
49+
{ group: "core-regression", runner: "node", file: "test/embedder-cache.test.mjs" },
4650
];
4751

4852
function fail(message) {

src/access-tracker.ts

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ export function computeEffectiveHalfLife(
213213
*/
214214
export class AccessTracker {
215215
private readonly pending: Map<string, number> = new Map();
216+
// Tracks retry count per ID so that delta is never amplified across failures.
217+
private readonly _retryCount = new Map<string, number>();
218+
private readonly _maxRetries = 5;
216219
private debounceTimer: ReturnType<typeof setTimeout> | null = null;
217220
private flushPromise: Promise<void> | null = null;
218221
private readonly debounceMs: number;
@@ -291,10 +294,24 @@ export class AccessTracker {
291294
this.clearTimer();
292295
if (this.pending.size > 0) {
293296
this.logger.warn(
294-
`access-tracker: destroying with ${this.pending.size} pending writes`,
297+
`access-tracker: destroying with ${this.pending.size} pending writes — attempting final flush (3s timeout)`,
295298
);
299+
// Clear synchronously BEFORE returning — async flush is best-effort.
300+
this.pending.clear();
301+
this._retryCount.clear();
302+
// Fire-and-forget final flush with a hard 3s timeout.
303+
// Route through flush() to avoid concurrent write-backs with any in-flight flush.
304+
const flushWithTimeout = Promise.race([
305+
this.flush(),
306+
new Promise<void>((resolve) => setTimeout(resolve, 3_000)),
307+
]);
308+
void flushWithTimeout.catch(() => {
309+
// Suppress unhandled rejection during shutdown.
310+
});
311+
} else {
312+
this.pending.clear();
313+
this._retryCount.clear();
296314
}
297-
this.pending.clear();
298315
}
299316

300317
// --------------------------------------------------------------------------
@@ -308,18 +325,34 @@ export class AccessTracker {
308325
for (const [id, delta] of batch) {
309326
try {
310327
const current = await this.store.getById(id);
311-
if (!current) continue;
328+
if (!current) {
329+
// ID not found — memory was deleted or outside current scope.
330+
// Do NOT retry or warn; just drop silently and clear any retry counter.
331+
this._retryCount.delete(id);
332+
continue;
333+
}
312334

313335
const updatedMeta = buildUpdatedMetadata(current.metadata, delta);
314336
await this.store.update(id, { metadata: updatedMeta });
337+
this._retryCount.delete(id); // success — clear retry counter
315338
} catch (err) {
316-
// Requeue failed delta for retry on next flush
317-
const existing = this.pending.get(id) ?? 0;
318-
this.pending.set(id, existing + delta);
319-
this.logger.warn(
320-
`access-tracker: write-back failed for ${id.slice(0, 8)}:`,
321-
err,
322-
);
339+
const retryCount = (this._retryCount.get(id) ?? 0) + 1;
340+
if (retryCount > this._maxRetries) {
341+
// Exceeded max retries — drop and log error.
342+
this._retryCount.delete(id);
343+
this.logger.error?.(
344+
`access-tracker: dropping ${id.slice(0, 8)} after ${retryCount} failed retries`,
345+
);
346+
} else {
347+
this._retryCount.set(id, retryCount);
348+
// Requeue: merge new delta with pending (safe because _retryCount is now independent,
349+
// so delta represents "unflushed retry" only, not accumulated retry amplification).
350+
this.pending.set(id, (this.pending.get(id) ?? 0) + delta);
351+
this.logger.warn(
352+
`access-tracker: write-back failed for ${id.slice(0, 8)} (attempt ${retryCount}/${this._maxRetries}):`,
353+
err,
354+
);
355+
}
323356
}
324357
}
325358
}

src/embedder.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ class EmbeddingCache {
3333
this.ttlMs = ttlMinutes * 60_000;
3434
}
3535

36+
/** Remove all expired entries. Called on every set() when cache is near capacity. */
37+
private _evictExpired(): void {
38+
const now = Date.now();
39+
for (const [k, entry] of this.cache) {
40+
if (now - entry.createdAt > this.ttlMs) {
41+
this.cache.delete(k);
42+
}
43+
}
44+
}
45+
3646
private key(text: string, task?: string): string {
3747
const hash = createHash("sha256").update(`${task || ""}:${text}`).digest("hex").slice(0, 24);
3848
return hash;
@@ -59,10 +69,15 @@ class EmbeddingCache {
5969

6070
set(text: string, task: string | undefined, vector: number[]): void {
6171
const k = this.key(text, task);
62-
// Evict oldest if full
72+
// When cache is full, run TTL eviction first (removes expired + oldest).
73+
// This prevents unbounded growth from stale entries while keeping writes O(1).
6374
if (this.cache.size >= this.maxSize) {
64-
const firstKey = this.cache.keys().next().value;
65-
if (firstKey !== undefined) this.cache.delete(firstKey);
75+
this._evictExpired();
76+
// If eviction didn't free enough slots, evict the single oldest LRU entry.
77+
if (this.cache.size >= this.maxSize) {
78+
const firstKey = this.cache.keys().next().value;
79+
if (firstKey !== undefined) this.cache.delete(firstKey);
80+
}
6681
}
6782
this.cache.set(k, { vector, createdAt: Date.now() });
6883
}

src/noise-prototypes.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ const BUILTIN_NOISE_TEXTS: readonly string[] = [
4040

4141
const DEFAULT_THRESHOLD = 0.82;
4242
const MAX_LEARNED_PROTOTYPES = 200;
43-
const DEDUP_THRESHOLD = 0.95;
43+
const DEDUP_THRESHOLD = 0.90; // lowered from 0.95: reduces noise bank bloat (0.82-0.90 range is where near-duplicate noise accumulates)
4444

4545
// ============================================================================
4646
// NoisePrototypeBank

src/retrieval-stats.ts

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,15 @@ interface QueryRecord {
4242
}
4343

4444
export class RetrievalStatsCollector {
45-
private _records: QueryRecord[] = [];
45+
// Ring buffer: O(1) write, avoids O(n) Array.shift() GC pressure.
46+
private _records: (QueryRecord | undefined)[] = [];
47+
private _head = 0; // next write position
48+
private _count = 0; // number of valid records
4649
private readonly _maxRecords: number;
4750

4851
constructor(maxRecords = 1000) {
4952
this._maxRecords = maxRecords;
53+
this._records = new Array(maxRecords);
5054
}
5155

5256
/**
@@ -55,18 +59,31 @@ export class RetrievalStatsCollector {
5559
* @param source - Query source identifier (e.g. "manual", "auto-recall")
5660
*/
5761
recordQuery(trace: RetrievalTrace, source: string): void {
58-
this._records.push({ trace, source });
59-
// Evict oldest if over capacity
60-
if (this._records.length > this._maxRecords) {
61-
this._records.shift();
62+
this._records[this._head] = { trace, source };
63+
this._head = (this._head + 1) % this._maxRecords;
64+
if (this._count < this._maxRecords) {
65+
this._count++;
6266
}
6367
}
6468

69+
/** Return records in insertion order (oldest → newest). Used by getStats(). */
70+
private _getRecords(): QueryRecord[] {
71+
if (this._count === 0) return [];
72+
const result: QueryRecord[] = [];
73+
const start = this._count < this._maxRecords ? 0 : this._head;
74+
for (let i = 0; i < this._count; i++) {
75+
const rec = this._records[(start + i) % this._maxRecords];
76+
if (rec !== undefined) result.push(rec);
77+
}
78+
return result;
79+
}
80+
6581
/**
6682
* Compute aggregate statistics from all recorded queries.
83+
* Iterates ring buffer directly — avoids intermediate array allocation from _getRecords().
6784
*/
6885
getStats(): AggregateStats {
69-
const n = this._records.length;
86+
const n = this._count;
7087
if (n === 0) {
7188
return {
7289
totalQueries: 0,
@@ -90,28 +107,27 @@ export class RetrievalStatsCollector {
90107
const queriesBySource: Record<string, number> = {};
91108
const dropsByStage: Record<string, number> = {};
92109

93-
for (const { trace, source } of this._records) {
110+
// Iterate ring buffer directly (no intermediate array allocation).
111+
const start = n < this._maxRecords ? 0 : this._head;
112+
for (let i = 0; i < n; i++) {
113+
const rec = this._records[(start + i) % this._maxRecords];
114+
if (rec === undefined) continue;
115+
const { trace, source } = rec;
116+
94117
totalLatency += trace.totalMs;
95118
totalResults += trace.finalCount;
96119
latencies.push(trace.totalMs);
97120

98-
if (trace.finalCount === 0) {
99-
zeroResultQueries++;
100-
}
121+
if (trace.finalCount === 0) zeroResultQueries++;
101122

102123
queriesBySource[source] = (queriesBySource[source] || 0) + 1;
103-
104124
for (const stage of trace.stages) {
105125
const dropped = stage.inputCount - stage.outputCount;
106126
if (dropped > 0) {
107127
dropsByStage[stage.name] = (dropsByStage[stage.name] || 0) + dropped;
108128
}
109-
if (stage.name === "rerank") {
110-
rerankUsed++;
111-
}
112-
if (stage.name === "noise_filter" && dropped > 0) {
113-
noiseFiltered++;
114-
}
129+
if (stage.name === "rerank") rerankUsed++;
130+
if (stage.name === "noise_filter" && dropped > 0) noiseFiltered++;
115131
}
116132
}
117133

@@ -142,11 +158,13 @@ export class RetrievalStatsCollector {
142158
* Reset all collected statistics.
143159
*/
144160
reset(): void {
145-
this._records = [];
161+
this._records = new Array(this._maxRecords);
162+
this._head = 0;
163+
this._count = 0;
146164
}
147165

148166
/** Number of recorded queries. */
149167
get count(): number {
150-
return this._records.length;
168+
return this._count;
151169
}
152170
}

src/store.ts

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,9 @@ export class MemoryStore {
198198
private table: LanceDB.Table | null = null;
199199
private initPromise: Promise<void> | null = null;
200200
private ftsIndexCreated = false;
201-
private updateQueue: Promise<void> = Promise.resolve();
201+
// Tail-reset serialization: replaces unbounded promise chain with a boolean flag + FIFO queue.
202+
private _updating = false;
203+
private _waitQueue: Array<() => void> = [];
202204

203205
constructor(private readonly config: StoreConfig) { }
204206

@@ -999,18 +1001,21 @@ export class MemoryStore {
9991001
}
10001002

10011003
private async runSerializedUpdate<T>(action: () => Promise<T>): Promise<T> {
1002-
const previous = this.updateQueue;
1003-
let release: (() => void) | undefined;
1004-
const lock = new Promise<void>((resolve) => {
1005-
release = resolve;
1006-
});
1007-
this.updateQueue = previous.then(() => lock);
1008-
1009-
await previous;
1010-
try {
1011-
return await action();
1012-
} finally {
1013-
release?.();
1004+
// Tail-reset: no infinite promise chain. Uses a boolean flag + FIFO queue.
1005+
if (!this._updating) {
1006+
this._updating = true;
1007+
try {
1008+
return await action();
1009+
} finally {
1010+
this._updating = false;
1011+
const next = this._waitQueue.shift();
1012+
if (next) next();
1013+
}
1014+
} else {
1015+
// Already busy — enqueue and wait for the current owner to signal done.
1016+
return new Promise<void>((resolve) => {
1017+
this._waitQueue.push(resolve);
1018+
}).then(() => this.runSerializedUpdate(action)) as Promise<T>;
10141019
}
10151020
}
10161021

0 commit comments

Comments
 (0)