Skip to content

Commit e22cd64

Browse files
committed
fix: add graceful fallback when Redis unavailable
- Return no-op lock instead of throwing Error when Redis is down - Log Redis errors instead of silent swallow - Fix duplicate ioredis entry in package.json
1 parent 2f7032f commit e22cd64

10 files changed

+1901
-3
lines changed

package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@
4141
"@lancedb/lancedb": "^0.26.2",
4242
"@sinclair/typebox": "0.34.48",
4343
"apache-arrow": "18.1.0",
44+
"ioredis": "^5.10.1",
4445
"json5": "^2.2.3",
4546
"openai": "^6.21.0",
46-
"ioredis": "^5.4.1",
4747
"proper-lockfile": "^4.1.2"
4848
},
4949
"openclaw": {
@@ -52,10 +52,10 @@
5252
]
5353
},
5454
"optionalDependencies": {
55-
"@lancedb/lancedb-darwin-x64": "^0.26.2",
5655
"@lancedb/lancedb-darwin-arm64": "^0.26.2",
57-
"@lancedb/lancedb-linux-x64-gnu": "^0.26.2",
56+
"@lancedb/lancedb-darwin-x64": "^0.26.2",
5857
"@lancedb/lancedb-linux-arm64-gnu": "^0.26.2",
58+
"@lancedb/lancedb-linux-x64-gnu": "^0.26.2",
5959
"@lancedb/lancedb-win32-x64-msvc": "^0.26.2"
6060
},
6161
"devDependencies": {

src/redis-lock.ts

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
// src/redis-lock.ts
2+
/**
3+
* Redis Lock Manager
4+
*
5+
* 實現分散式 lock,用於解決高並發寫入時的 lock contention 問題
6+
*/
7+
8+
import Redis from 'ioredis';
9+
10+
// 生成唯一 token
11+
function generateToken(): string {
12+
return `${Date.now()}-${Math.random().toString(36).substring(2, 10)}`;
13+
}
14+
15+
export interface LockConfig {
16+
redisUrl?: string;
17+
ttl?: number; // lock 過期時間(毫秒)
18+
maxWait?: number; // 最大等待時間(毫秒)
19+
retryDelay?: number; // 重試延遲(毫秒)
20+
}
21+
22+
export class RedisLockManager {
23+
private redis: Redis;
24+
private defaultTTL = 60000; // 60 秒
25+
private maxWait = 60000; // 最多等 60 秒
26+
private retryDelay = 100; // 初始重試延遲
27+
28+
constructor(config?: LockConfig) {
29+
const redisUrl = config?.redisUrl || process.env.REDIS_URL || 'redis://localhost:6379';
30+
this.redis = new Redis(redisUrl.replace('redis://', ''), {
31+
lazyConnect: true,
32+
retryStrategy: (times) => {
33+
if (times > 3) return null; // 放棄重連
34+
return Math.min(times * 200, 2000);
35+
},
36+
});
37+
38+
if (config?.ttl) this.defaultTTL = config.ttl;
39+
if (config?.maxWait) this.maxWait = config.maxWait;
40+
}
41+
42+
async connect(): Promise<void> {
43+
try {
44+
await this.redis.connect();
45+
} catch (err) {
46+
// 如果連不上,尝试不連接(lazy connect)
47+
console.warn(`[RedisLock] Could not connect to Redis: ${err}`);
48+
}
49+
}
50+
51+
async acquire(key: string, ttl?: number): Promise<() => Promise<void>> {
52+
const lockKey = `memory-lock:${key}`;
53+
const token = generateToken();
54+
const startTime = Date.now();
55+
const lockTTL = ttl || this.defaultTTL;
56+
57+
// 嘗試連接
58+
let redisAvailable = false;
59+
try {
60+
await this.redis.ping();
61+
redisAvailable = true;
62+
} catch {
63+
// Redis 不可用,優雅降級 - 回傳 no-op lock
64+
console.warn('[RedisLock] Redis unavailable, using no-op lock (allow concurrent)');
65+
return async () => {}; // No-op release
66+
}
67+
68+
if (!redisAvailable) {
69+
return async () => {};
70+
}
71+
72+
let attempts = 0;
73+
while (true) {
74+
attempts++;
75+
76+
try {
77+
// 使用 SET NX + token (原子操作)
78+
const result = await this.redis.set(lockKey, token, 'PX', lockTTL, 'NX');
79+
80+
if (result === 'OK') {
81+
// 成功取得 lock
82+
console.log(`[RedisLock] Acquired lock ${key} after ${attempts} attempts`);
83+
84+
// 回傳帶 token 的 release function
85+
return async () => {
86+
// 用 Lua script 確保只刪除自己的 lock
87+
const script = `
88+
if redis.call("get", KEYS[1]) == ARGV[1] then
89+
return redis.call("del", KEYS[1])
90+
else
91+
return 0
92+
end
93+
`;
94+
try {
95+
await this.redis.eval(script, 1, lockKey, token);
96+
console.log(`[RedisLock] Released lock ${key}`);
97+
} catch (err) {
98+
console.warn(`[RedisLock] Failed to release lock: ${err}`);
99+
}
100+
};
101+
}
102+
} catch (err) {
103+
// 記錄 Redis 錯誤,避免 silent swallow
104+
console.warn(`[RedisLock] Redis error during acquire (attempt ${attempts}): ${err}`);
105+
}
106+
107+
// 檢查是否超時
108+
if (Date.now() - startTime > this.maxWait) {
109+
throw new Error(`Lock acquisition timeout: ${key} after ${attempts} attempts`);
110+
}
111+
112+
// 指數退避等待
113+
const delay = Math.min(this.retryDelay * Math.pow(1.5, Math.min(attempts, 10)), 2000);
114+
await this.sleep(delay + Math.random() * 100);
115+
}
116+
}
117+
118+
async isHealthy(): Promise<boolean> {
119+
try {
120+
await this.redis.ping();
121+
return true;
122+
} catch {
123+
return false;
124+
}
125+
}
126+
127+
async disconnect(): Promise<void> {
128+
await this.redis.quit();
129+
}
130+
131+
private sleep(ms: number): Promise<void> {
132+
return new Promise(resolve => setTimeout(resolve, ms));
133+
}
134+
}
135+
136+
/**
137+
* 建立 RedisLockManager 工廠
138+
*/
139+
export async function createRedisLockManager(config?: LockConfig): Promise<RedisLockManager | null> {
140+
const manager = new RedisLockManager(config);
141+
142+
try {
143+
await manager.connect();
144+
const isHealthy = await manager.isHealthy();
145+
if (isHealthy) {
146+
console.log('[RedisLock] Redis lock manager initialized');
147+
return manager;
148+
} else {
149+
console.warn('[RedisLock] Redis not healthy, will use file lock fallback');
150+
await manager.disconnect();
151+
return null;
152+
}
153+
} catch (err) {
154+
console.warn(`[RedisLock] Failed to initialize: ${err}`);
155+
return null;
156+
}
157+
}

test/lock-200-concurrent.test.mjs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// test/lock-200-concurrent.test.mjs
2+
/**
3+
* 200 並發測試
4+
*/
5+
import { describe, it } from "node:test";
6+
import { mkdtempSync, rmSync } from "node:fs";
7+
import { tmpdir } from "node:os";
8+
import { join } from "node:path";
9+
import jitiFactory from "jiti";
10+
11+
const jiti = jitiFactory(import.meta.url, { interopDefault: true });
12+
const { MemoryStore } = jiti("../src/store.ts");
13+
14+
function makeStore() {
15+
const dir = mkdtempSync(join(tmpdir(), "memory-lancedb-pro-200-"));
16+
const store = new MemoryStore({ dbPath: dir, vectorDim: 3 });
17+
return { store, dir };
18+
}
19+
20+
function makeEntry(i) {
21+
return {
22+
text: `memory-${i}`,
23+
vector: [0.1 * i, 0.2 * i, 0.3 * i],
24+
category: "fact",
25+
scope: "global",
26+
importance: 0.5,
27+
metadata: "{}",
28+
};
29+
}
30+
31+
describe("200 concurrent operations", () => {
32+
it("should test 200 concurrent writes", async () => {
33+
const { store, dir } = makeStore();
34+
try {
35+
const count = 200;
36+
console.log(`[Starting ${count} concurrent writes...]`);
37+
38+
const start = Date.now();
39+
const ops = Array.from({ length: count }, (_, i) => store.store(makeEntry(i)));
40+
const settled = await Promise.allSettled(ops);
41+
const elapsed = Date.now() - start;
42+
43+
const successes = settled.filter(r => r.status === 'fulfilled').length;
44+
const failures = settled.filter(r => r.status === 'rejected').length;
45+
46+
console.log(`[Result] ${count} concurrent writes:`);
47+
console.log(` Success: ${successes} (${(successes/count*100).toFixed(1)}%)`);
48+
console.log(` Failed: ${failures} (${(failures/count*100).toFixed(1)}%)`);
49+
console.log(` Time: ${elapsed}ms (${(elapsed/1000).toFixed(1)}s)`);
50+
51+
} finally {
52+
rmSync(dir, { recursive: true, force: true });
53+
}
54+
});
55+
});

0 commit comments

Comments
 (0)