Skip to content

Commit e333494

Browse files
committed
fix(store): proper-lockfile retries + ECOMPROMISED graceful handling (CortexReach#415)
對齊 upstream/master(包含 PR CortexReach#626 proactive cleanup),並保留 James 針對 Issue CortexReach#415 的修復: - 從 PR CortexReach#626 引入 proactive cleanup(age > 5 分鐘的 stale lock 自動清除) - 【修復 CortexReach#415】保守 retries 設定: - minTimeout: 1000ms(避免高負載下過度密集重試) - maxTimeout: 30000ms(支撐更久的 event loop 阻塞) - stale: 10000ms - 【修復 CortexReach#415】onCompromised flag:lock compromised 時不立即崩潰, 由 finally block 統一處理 fn() 錯誤 vs compromisedErr 的抛出邏輯 - 新增 lock-stress-test.mjs:驗證並發寫入、重試行為、stress test PR CortexReach#517: CortexReach/memory-lancedb-pro Issue CortexReach#415: ECOMPROMISED crash under event-loop pressure
1 parent 16ee3e4 commit e333494

File tree

2 files changed

+242
-43
lines changed

2 files changed

+242
-43
lines changed

src/store.ts

Lines changed: 94 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import {
1111
mkdirSync,
1212
realpathSync,
1313
lstatSync,
14-
rmSync,
1514
statSync,
1615
unlinkSync,
1716
} from "node:fs";
@@ -67,6 +66,11 @@ async function loadLockfile(): Promise<any> {
6766
return lockfileModule;
6867
}
6968

69+
/** For unit testing: override the lockfile module with a mock. */
70+
export function __setLockfileModuleForTests(module: any): void {
71+
lockfileModule = module;
72+
}
73+
7074
export const loadLanceDB = async (): Promise<
7175
typeof import("@lancedb/lancedb")
7276
> => {
@@ -157,7 +161,7 @@ export function validateStoragePath(dbPath: string): string {
157161
) {
158162
throw err;
159163
} else {
160-
// Other lstat failures ??continue with original path
164+
// Other lstat failures continue with original path
161165
}
162166
}
163167

@@ -201,23 +205,27 @@ export class MemoryStore {
201205
private table: LanceDB.Table | null = null;
202206
private initPromise: Promise<void> | null = null;
203207
private ftsIndexCreated = false;
204-
// Tail-reset serialization: replaces unbounded promise chain with a boolean flag + FIFO queue.
205-
private _updating = false;
206-
private _waitQueue: Array<() => void> = [];
208+
private updateQueue: Promise<void> = Promise.resolve();
207209

208210
constructor(private readonly config: StoreConfig) { }
209211

210212
private async runWithFileLock<T>(fn: () => Promise<T>): Promise<T> {
211213
const lockfile = await loadLockfile();
212214
const lockPath = join(this.config.dbPath, ".memory-write.lock");
213-
214-
// Ensure lock file exists before locking (proper-lockfile requires it)
215215
if (!existsSync(lockPath)) {
216216
try { mkdirSync(dirname(lockPath), { recursive: true }); } catch {}
217217
try { const { writeFileSync } = await import("node:fs"); writeFileSync(lockPath, "", { flag: "wx" }); } catch {}
218218
}
219-
220-
// Proactive cleanup of stale lock artifacts (fixes stale-lock ECOMPROMISED)
219+
// 【修復 #415】調整 retries:max wait 從 ~3100ms → ~151秒
220+
// 指數退避:1s, 2s, 4s, 8s, 16s, 30s×5,總計約 151 秒
221+
// ECOMPROMISED 透過 onCompromised callback 觸發(非 throw),使用 flag 機制正確處理
222+
let isCompromised = false;
223+
let compromisedErr: unknown = null;
224+
let fnSucceeded = false;
225+
let fnError: unknown = null;
226+
227+
// Proactive cleanup of stale lock artifacts(from PR #626)
228+
// 根本避免 >5 分鐘的 lock artifact 導致 ECOMPROMISED
221229
if (existsSync(lockPath)) {
222230
try {
223231
const stat = statSync(lockPath);
@@ -231,10 +239,61 @@ export class MemoryStore {
231239
}
232240

233241
const release = await lockfile.lock(lockPath, {
234-
retries: { retries: 10, factor: 2, minTimeout: 200, maxTimeout: 5000 },
235-
stale: 10000,
242+
retries: {
243+
retries: 10,
244+
factor: 2,
245+
minTimeout: 1000, // James 保守設定:避免高負載下過度密集重試
246+
maxTimeout: 30000, // James 保守設定:支撐更久的 event loop 阻塞
247+
},
248+
stale: 10000, // 10 秒後視為 stale,觸發 ECOMPROMISED callback
249+
// 注意:ECOMPROMISED 是 ambiguous degradation 訊號,mtime 無法區分
250+
// "holder 崩潰" vs "holder event loop 阻塞",所以不嘗試區分
251+
onCompromised: (err: unknown) => {
252+
// 【修復 #415 關鍵】必須是同步 callback
253+
// setLockAsCompromised() 不等待 Promise,async throw 無法傳回 caller
254+
isCompromised = true;
255+
compromisedErr = err;
256+
},
236257
});
237-
try { return await fn(); } finally { await release(); }
258+
259+
try {
260+
const result = await fn();
261+
fnSucceeded = true;
262+
return result;
263+
} catch (e: unknown) {
264+
fnError = e;
265+
throw e;
266+
} finally {
267+
if (isCompromised) {
268+
// fnError 優先:fn() 失敗時,fn 的錯誤比 compromised 重要
269+
if (fnError !== null) {
270+
throw fnError;
271+
}
272+
// fn() 尚未完成就 compromised → throw,讓 caller 知道要重試
273+
if (!fnSucceeded) {
274+
throw compromisedErr as Error;
275+
}
276+
// fn() 成功執行,但 lock 在執行期間被標記 compromised
277+
// 正確行為:回傳成功結果(資料已寫入),明確告知 caller 不要重試
278+
console.warn(
279+
`[memory-lancedb-pro] Returning successful result despite compromised lock at "${lockPath}". ` +
280+
`Callers must not retry this operation automatically.`,
281+
);
282+
// 【修復 #415】compromised 後 release() 會回 ERELEASED,忽略即可
283+
// 重要:不要在這裡 return!否則 finally 的 return 會覆蓋 try 的 return 值
284+
try {
285+
await release();
286+
} catch (e: unknown) {
287+
if ((e as NodeJS.ErrnoException).code === 'ERELEASED') {
288+
// ERELEASED 是預期行為,不做任何事,讓 try 的 return 值通過
289+
} else {
290+
throw e; // 其他錯誤照拋
291+
}
292+
}
293+
} else {
294+
await release();
295+
}
296+
}
238297
}
239298

240299
get dbPath(): string {
@@ -297,24 +356,24 @@ export class MemoryStore {
297356

298357
if (missingColumns.length > 0) {
299358
console.warn(
300-
`memory-lancedb-pro: migrating legacy table ??adding columns: ${missingColumns.map((c) => c.name).join(", ")}`,
359+
`memory-lancedb-pro: migrating legacy table adding columns: ${missingColumns.map((c) => c.name).join(", ")}`,
301360
);
302361
await table.addColumns(missingColumns);
303362
console.log(
304-
`memory-lancedb-pro: migration complete ??${missingColumns.length} column(s) added`,
363+
`memory-lancedb-pro: migration complete ${missingColumns.length} column(s) added`,
305364
);
306365
}
307366
} catch (err) {
308367
const msg = String(err);
309368
if (msg.includes("already exists")) {
310-
// Concurrent initialization race ??another process already added the columns
369+
// Concurrent initialization race another process already added the columns
311370
console.log("memory-lancedb-pro: migration columns already exist (concurrent init)");
312371
} else {
313372
console.warn("memory-lancedb-pro: could not check/migrate table schema:", err);
314373
}
315374
}
316375
} catch (_openErr) {
317-
// Table doesn't exist yet ??create it
376+
// Table doesn't exist yet create it
318377
const schemaEntry: MemoryEntry = {
319378
id: "__schema__",
320379
text: "",
@@ -333,7 +392,7 @@ export class MemoryStore {
333392
await table.delete('id = "__schema__"');
334393
} catch (createErr) {
335394
// Race: another caller (or eventual consistency) created the table
336-
// between our failed openTable and this createTable ??just open it.
395+
// between our failed openTable and this createTable just open it.
337396
if (String(createErr).includes("already exists")) {
338397
table = await db.openTable(TABLE_NAME);
339398
} else {
@@ -408,9 +467,10 @@ export class MemoryStore {
408467
return this.runWithFileLock(async () => {
409468
try {
410469
await this.table!.add([fullEntry]);
411-
} catch (err: any) {
412-
const code = err.code || "";
413-
const message = err.message || String(err);
470+
} catch (err: unknown) {
471+
const e = err as { code?: string; message?: string };
472+
const code = e.code || "";
473+
const message = e.message || String(err);
414474
throw new Error(
415475
`Failed to store memory in "${this.config.dbPath}": ${code} ${message}`,
416476
);
@@ -465,12 +525,6 @@ export class MemoryStore {
465525
return res.length > 0;
466526
}
467527

468-
/** Lightweight total row count via LanceDB countRows(). */
469-
async count(): Promise<number> {
470-
await this.ensureInitialized();
471-
return await this.table!.countRows();
472-
}
473-
474528
async getById(id: string, scopeFilter?: string[]): Promise<MemoryEntry | null> {
475529
await this.ensureInitialized();
476530

@@ -901,7 +955,7 @@ export class MemoryStore {
901955
throw new Error(`Memory ${id} is outside accessible scopes`);
902956
}
903957

904-
return this.runWithFileLock(async () => {
958+
return this.runWithFileLock(() => this.runSerializedUpdate(async () => {
905959
// Support both full UUID and short prefix (8+ hex chars), same as delete()
906960
const uuidRegex =
907961
/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
@@ -1016,25 +1070,22 @@ export class MemoryStore {
10161070
}
10171071

10181072
return updated;
1019-
});
1073+
}));
10201074
}
10211075

10221076
private async runSerializedUpdate<T>(action: () => Promise<T>): Promise<T> {
1023-
// Tail-reset: no infinite promise chain. Uses a boolean flag + FIFO queue.
1024-
if (!this._updating) {
1025-
this._updating = true;
1026-
try {
1027-
return await action();
1028-
} finally {
1029-
this._updating = false;
1030-
const next = this._waitQueue.shift();
1031-
if (next) next();
1032-
}
1033-
} else {
1034-
// Already busy — enqueue and wait for the current owner to signal done.
1035-
return new Promise<void>((resolve) => {
1036-
this._waitQueue.push(resolve);
1037-
}).then(() => this.runSerializedUpdate(action)) as Promise<T>;
1077+
const previous = this.updateQueue;
1078+
let release: (() => void) | undefined;
1079+
const lock = new Promise<void>((resolve) => {
1080+
release = resolve;
1081+
});
1082+
this.updateQueue = previous.then(() => lock);
1083+
1084+
await previous;
1085+
try {
1086+
return await action();
1087+
} finally {
1088+
release?.();
10381089
}
10391090
}
10401091

test/lock-stress-test.mjs

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/**
2+
* 高並發鎖壓力測試 v2(改良版)
3+
* 測試重點:
4+
* 1. 高並發寫入不會 crash(無 ECOMPROMISED)
5+
* 2. 重負載下長等待不會導致 Gateway 崩潰
6+
* 3. 資料完整性
7+
*/
8+
import { describe, it, before, after } from "node:test";
9+
import assert from "node:assert/strict";
10+
import { mkdtempSync, rmSync } from "node:fs";
11+
import { tmpdir } from "node:os";
12+
import { join } from "node:path";
13+
import jitiFactory from "jiti";
14+
15+
const jiti = jitiFactory(import.meta.url, { interopDefault: true });
16+
const { MemoryStore } = jiti("../src/store.ts");
17+
18+
let workDir;
19+
20+
before(() => {
21+
workDir = mkdtempSync(join(tmpdir(), "memory-lancedb-pro-stress-v2-"));
22+
});
23+
24+
after(() => {
25+
if (workDir) {
26+
rmSync(workDir, { recursive: true, force: true });
27+
}
28+
});
29+
30+
describe("高並發鎖壓力測試 v2", { concurrency: 1 }, () => {
31+
// 測試 1:中等並發(3個同時寫)不 crash
32+
it("中等並發寫入(3行程×5次)無 ECOMPROMISED crash", async () => {
33+
const store = new MemoryStore({ dbPath: join(workDir, "medium-concurrent"), vectorDim: 3 });
34+
const errors = [];
35+
36+
const worker = async (workerId) => {
37+
const results = [];
38+
for (let i = 0; i < 5; i++) {
39+
try {
40+
const r = await store.store({
41+
text: `w${workerId}-${i}`,
42+
vector: [workerId * 10 + i, 0, 0],
43+
category: "stress",
44+
scope: "global",
45+
importance: 0.5,
46+
metadata: "{}",
47+
});
48+
results.push({ ok: true, id: r.id });
49+
} catch (err) {
50+
const isEcomp = err.code === "ECOMPROMISED" || (err.message && err.message.includes("ECOMPROMISED"));
51+
errors.push({ workerId, i, code: err.code, msg: err.message, isEcomp });
52+
results.push({ ok: false, error: err.message, isEcomp });
53+
}
54+
}
55+
return results;
56+
};
57+
58+
// 3 個 worker 同時啟動
59+
const allResults = await Promise.all([worker(0), worker(1), worker(2)]);
60+
const flat = allResults.flat();
61+
const ecompCount = flat.filter(r => r.isEcomp).length;
62+
63+
console.log(`\n [中等並發] 總操作: ${flat.length}, 成功: ${flat.filter(r => r.ok).length}, ECOMPROMISED: ${ecompCount}`);
64+
if (errors.length > 0) {
65+
errors.forEach(e => console.log(` Worker${e.workerId} op${e.i}: ${e.code || "?"} - ${e.msg}`));
66+
}
67+
68+
// 核心驗證:0 個 ECOMPROMISED crash
69+
assert.strictEqual(ecompCount, 0, `不應有 ECOMPROMISED crash,但發生了 ${ecompCount} 次`);
70+
// 至少有半數成功
71+
assert.ok(flat.filter(r => r.ok).length >= 7, "起碼要有 7/15 成功");
72+
});
73+
74+
// 測試 2:真正並發請求 — 用 Promise.all 同時搶 lock
75+
// 模擬 holder 持有 lock 時,competitor 嘗試取得 lock
76+
// 結果應該是:兩個都成功(一個立即,一個等到 lock 釋放後)
77+
it("並發寫入時兩個都成功(retry 機制正常運作)", async () => {
78+
const store = new MemoryStore({ dbPath: join(workDir, "concurrent-retry"), vectorDim: 3 });
79+
80+
// 用 Promise.all 同時發起兩個 store 請求,真正測試並發競爭下的 retry 行為
81+
const start = Date.now();
82+
const [r1, r2] = await Promise.all([
83+
store.store({
84+
text: "concurrent-1",
85+
vector: [1, 0, 0],
86+
category: "fact",
87+
scope: "global",
88+
importance: 0.8,
89+
metadata: "{}",
90+
}),
91+
store.store({
92+
text: "concurrent-2",
93+
vector: [0, 1, 0],
94+
category: "fact",
95+
scope: "global",
96+
importance: 0.8,
97+
metadata: "{}",
98+
}),
99+
]);
100+
const elapsed = Date.now() - start;
101+
102+
console.log(`\n [並發競爭] 耗時: ${elapsed}ms, id1=${r1.id.slice(0,8)}, id2=${r2.id.slice(0,8)}`);
103+
// F3 修復:明確斷言兩個請求都成功(不死、不 ECOMPROMISED)
104+
assert.ok(r1.id, "第一個請求應該成功(不死、不拋 ECOMPROMISED)");
105+
assert.ok(r2.id, "第二個請求應該成功(retry 後成功,不拋 ECOMPROMISED)");
106+
assert.ok(r1.id !== r2.id, "兩個請求應該產生不同 ID");
107+
// EF4 修復:明確斷言耗時在合理範圍內,防止 CI hang
108+
assert.ok(elapsed < 30000, `並發鎖競爭應在合理時間內完成(< 30s),實際 ${elapsed}ms`);
109+
});
110+
111+
// 測試 3:批量順序寫入後資料完整性(stress test 不該用 30 個並發,那會 ELOCKED)
112+
it("批量寫入後所有資料都能正確讀回", async () => {
113+
const store = new MemoryStore({ dbPath: join(workDir, "bulk-integrity"), vectorDim: 3 });
114+
const COUNT = 20;
115+
const TIMEOUT_MS = 60_000; // EF4 修復:60 秒安全上限,防止 CI hang
116+
117+
// 順序寫入(不是並發),驗證大量寫入的資料完整性
118+
const entries = [];
119+
for (let i = 0; i < COUNT; i++) {
120+
// EF4 修復:單次操作加安全上限
121+
const opStart = Date.now();
122+
const r = await Promise.race([
123+
store.store({
124+
text: `bulk-${i}`,
125+
vector: [i * 0.1, i * 0.2, i * 0.3],
126+
category: "fact",
127+
scope: "global",
128+
importance: 0.6,
129+
metadata: "{}",
130+
}),
131+
new Promise((_, reject) =>
132+
setTimeout(() => reject(new Error(`bulk[${i}] timeout after ${TIMEOUT_MS}ms`)), TIMEOUT_MS)
133+
),
134+
]);
135+
const opElapsed = Date.now() - opStart;
136+
assert.ok(opElapsed < TIMEOUT_MS, `bulk[${i}] 單次寫入應在 ${TIMEOUT_MS}ms 內,實際 ${opElapsed}ms`);
137+
entries.push(r);
138+
}
139+
140+
const ids = entries.map(e => e.id);
141+
const uniqueIds = new Set(ids);
142+
assert.strictEqual(uniqueIds.size, COUNT, `應該有 ${COUNT} 個唯一 ID`);
143+
144+
// 全部能讀回
145+
const all = await store.list(undefined, undefined, 100, 0);
146+
assert.strictEqual(all.length, COUNT, `list 應該返回 ${COUNT} 筆記錄`);
147+
});
148+
});

0 commit comments

Comments
 (0)