Skip to content

Commit 61d9343

Browse files
committed
Fix LMDB errors and add recovery mechanisms
1 parent fef1515 commit 61d9343

File tree

8 files changed

+379
-129
lines changed

8 files changed

+379
-129
lines changed

src/datastore/DataStore.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ export enum StoreName {
1616
private_schemas = 'private_schemas',
1717
}
1818

19-
export const PersistedStores = [StoreName.public_schemas, StoreName.sam_schemas];
19+
export const PersistedStores: ReadonlyArray<StoreName> = [StoreName.public_schemas, StoreName.sam_schemas];
2020

2121
export interface DataStore {
2222
get<T>(key: string): T | undefined;
@@ -33,7 +33,7 @@ export interface DataStore {
3333
export interface DataStoreFactory extends Closeable {
3434
get(store: StoreName): DataStore;
3535

36-
storeNames(): ReadonlyArray<string>;
36+
storeNames: ReadonlyArray<string>;
3737

3838
close(): Promise<void>;
3939
}

src/datastore/FileStore.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ export class FileStoreFactory implements DataStoreFactory {
2020
private readonly metricsInterval: NodeJS.Timeout;
2121
private readonly timeout: NodeJS.Timeout;
2222

23-
constructor(rootDir: string, storeNames: StoreName[] = PersistedStores) {
23+
constructor(
24+
rootDir: string,
25+
public readonly storeNames = PersistedStores,
26+
) {
2427
this.log = LoggerFactory.getLogger('FileStore.Global');
2528

2629
this.fileDbRoot = join(rootDir, 'filedb');
@@ -56,10 +59,6 @@ export class FileStoreFactory implements DataStoreFactory {
5659
return val;
5760
}
5861

59-
storeNames(): ReadonlyArray<string> {
60-
return [...this.stores.keys()];
61-
}
62-
6362
close(): Promise<void> {
6463
clearTimeout(this.timeout);
6564
clearInterval(this.metricsInterval);

src/datastore/LMDB.ts

Lines changed: 137 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
import { readdirSync, rmSync } from 'fs';
1+
import { existsSync, readdirSync, rmSync } from 'fs';
22
import { join } from 'path';
33
import { open, RootDatabase, RootDatabaseOptionsWithPath } from 'lmdb';
44
import { LoggerFactory } from '../telemetry/LoggerFactory';
55
import { ScopedTelemetry } from '../telemetry/ScopedTelemetry';
66
import { Telemetry } from '../telemetry/TelemetryDecorator';
77
import { isWindows } from '../utils/Environment';
8+
import { extractErrorMessage } from '../utils/Errors';
89
import { formatNumber, toString } from '../utils/String';
910
import { DataStore, DataStoreFactory, PersistedStores, StoreName } from './DataStore';
1011
import { LMDBStore } from './lmdb/LMDBStore';
@@ -18,35 +19,34 @@ export class LMDBStoreFactory implements DataStoreFactory {
1819
private readonly lmdbDir: string;
1920
private readonly timeout: NodeJS.Timeout;
2021
private readonly metricsInterval: NodeJS.Timeout;
21-
private readonly env: RootDatabase;
22+
23+
private env: RootDatabase;
24+
private openPid = process.pid;
25+
private closed = false;
2226

2327
private readonly stores = new Map<StoreName, LMDBStore>();
2428

25-
constructor(rootDir: string, storeNames: StoreName[] = PersistedStores) {
29+
constructor(
30+
rootDir: string,
31+
public readonly storeNames = PersistedStores,
32+
) {
2633
this.lmdbDir = join(rootDir, 'lmdb');
2734

28-
const config: RootDatabaseOptionsWithPath = {
29-
path: join(this.lmdbDir, Version),
30-
maxDbs: 10,
31-
mapSize: TotalMaxDbSize,
32-
encoding: Encoding,
33-
encryptionKey: encryptionStrategy(VersionNumber),
34-
};
35-
36-
if (isWindows) {
37-
config.noSubdir = false;
38-
config.overlappingSync = false;
39-
}
40-
41-
this.env = open(config);
35+
const { env, config } = createEnv(this.lmdbDir);
36+
this.env = env;
4237

4338
for (const store of storeNames) {
44-
const database = this.env.openDB<unknown, string>({
45-
name: store,
46-
encoding: Encoding,
47-
});
48-
49-
this.stores.set(store, new LMDBStore(store, database));
39+
const database = createDB(this.env, store);
40+
41+
this.stores.set(
42+
store,
43+
new LMDBStore(
44+
store,
45+
database,
46+
(e) => this.handleError(e),
47+
() => this.ensureValidEnv(),
48+
),
49+
);
5050
}
5151

5252
this.metricsInterval = setInterval(() => {
@@ -81,20 +81,85 @@ export class LMDBStoreFactory implements DataStoreFactory {
8181
return val;
8282
}
8383

84-
storeNames(): ReadonlyArray<string> {
85-
return [...this.stores.keys()];
86-
}
87-
8884
async close(): Promise<void> {
89-
// Clear the stores map but don't close individual stores
90-
// LMDB will close them when we close the environment
85+
if (this.closed) return;
86+
this.closed = true;
87+
9188
clearInterval(this.metricsInterval);
9289
clearTimeout(this.timeout);
9390
this.stores.clear();
9491
await this.env.close();
9592
}
9693

94+
private handleError(error: unknown): void {
95+
if (this.closed) return;
96+
const msg = extractErrorMessage(error);
97+
98+
if (msg.includes('MDB_BAD_RSLOT') || msg.includes("doesn't match env pid")) {
99+
this.recoverFromFork();
100+
} else if (
101+
msg.includes('MDB_CURSOR_FULL') ||
102+
msg.includes('MDB_CORRUPTED') ||
103+
msg.includes('MDB_PAGE_NOTFOUND') ||
104+
msg.includes('MDB_BAD_TXN') ||
105+
msg.includes('Commit failed') ||
106+
msg.includes('closed database')
107+
) {
108+
this.recoverFromCorruption();
109+
}
110+
}
111+
112+
private ensureValidEnv(): void {
113+
if (process.pid !== this.openPid) {
114+
this.telemetry.count('process.fork', 1);
115+
this.log.warn({ oldPid: this.openPid, newPid: process.pid }, 'Process fork detected');
116+
this.reopenEnv();
117+
118+
// Update all stores with new handles
119+
for (const store of this.storeNames) {
120+
this.stores.get(store)?.updateStore(createDB(this.env, store));
121+
}
122+
}
123+
}
124+
125+
private recoverFromFork(): void {
126+
this.telemetry.count('forked', 1);
127+
this.log.warn({ oldPid: this.openPid, newPid: process.pid }, 'Fork detected, reopening LMDB');
128+
this.reopenEnv();
129+
this.recreateStores();
130+
}
131+
132+
private recoverFromCorruption(): void {
133+
this.telemetry.count('corrupted', 1);
134+
this.log.warn('Corruption detected, reopening LMDB');
135+
this.reopenEnv();
136+
this.recreateStores();
137+
}
138+
139+
private reopenEnv(): void {
140+
this.env = createEnv(this.lmdbDir).env;
141+
this.openPid = process.pid;
142+
this.log.warn('Recreated LMDB environment');
143+
}
144+
145+
private recreateStores(): void {
146+
for (const name of this.storeNames) {
147+
const database = this.env.openDB<unknown, string>({ name, encoding: Encoding });
148+
this.stores.set(
149+
name,
150+
new LMDBStore(
151+
name,
152+
database,
153+
(e) => this.handleError(e),
154+
() => this.ensureValidEnv(),
155+
),
156+
);
157+
}
158+
}
159+
97160
private cleanupOldVersions(): void {
161+
if (this.closed || !existsSync(this.lmdbDir)) return;
162+
98163
const entries = readdirSync(this.lmdbDir, { withFileTypes: true });
99164
for (const entry of entries) {
100165
try {
@@ -110,31 +175,30 @@ export class LMDBStoreFactory implements DataStoreFactory {
110175
}
111176

112177
private emitMetrics(): void {
113-
const totalBytes = this.totalBytes();
178+
if (this.closed) return;
114179

115-
const envStat = stats(this.env);
116-
this.telemetry.histogram('version', VersionNumber);
117-
this.telemetry.histogram('env.size.bytes', envStat.totalSize, { unit: 'By' });
118-
this.telemetry.histogram('env.max.size.bytes', envStat.maxSize, {
119-
unit: 'By',
120-
});
121-
this.telemetry.histogram('env.entries', envStat.entries);
180+
try {
181+
const totalBytes = this.totalBytes();
122182

123-
for (const [name, store] of this.stores.entries()) {
124-
const stat = store.stats();
125-
126-
this.telemetry.histogram(`store.${name}.size.bytes`, stat.totalSize, {
183+
const envStat = stats(this.env);
184+
this.telemetry.histogram('version', VersionNumber);
185+
this.telemetry.histogram('env.size.bytes', envStat.totalSize, { unit: 'By' });
186+
this.telemetry.histogram('env.max.size.bytes', envStat.maxSize, {
127187
unit: 'By',
128188
});
129-
this.telemetry.histogram(`store.${name}.entries`, stat.entries);
130-
}
189+
this.telemetry.histogram('env.entries', envStat.entries);
131190

132-
this.telemetry.histogram('total.usage', 100 * (totalBytes / TotalMaxDbSize), {
133-
unit: '%',
134-
});
135-
this.telemetry.histogram('total.size.bytes', totalBytes, {
136-
unit: 'By',
137-
});
191+
for (const [name, store] of this.stores.entries()) {
192+
const stat = store.stats();
193+
this.telemetry.histogram(`store.${name}.size.bytes`, stat.totalSize, { unit: 'By' });
194+
this.telemetry.histogram(`store.${name}.entries`, stat.entries);
195+
}
196+
197+
this.telemetry.histogram('total.usage', 100 * (totalBytes / TotalMaxDbSize), { unit: '%' });
198+
this.telemetry.histogram('total.size.bytes', totalBytes, { unit: 'By' });
199+
} catch (e) {
200+
this.handleError(e);
201+
}
138202
}
139203

140204
private totalBytes() {
@@ -153,3 +217,27 @@ const VersionNumber = 5;
153217
const Version = `v${VersionNumber}`;
154218
const Encoding: 'msgpack' | 'json' | 'string' | 'binary' | 'ordered-binary' = 'msgpack';
155219
const TotalMaxDbSize = 250 * 1024 * 1024; // 250MB max size
220+
221+
function createEnv(lmdbDir: string) {
222+
const config: RootDatabaseOptionsWithPath = {
223+
path: join(lmdbDir, Version),
224+
maxDbs: 10,
225+
mapSize: TotalMaxDbSize,
226+
encoding: Encoding,
227+
encryptionKey: encryptionStrategy(VersionNumber),
228+
};
229+
230+
if (isWindows) {
231+
config.noSubdir = false;
232+
config.overlappingSync = false;
233+
}
234+
235+
return {
236+
config,
237+
env: open(config),
238+
};
239+
}
240+
241+
function createDB(env: RootDatabase, name: string) {
242+
return env.openDB<unknown, string>({ name, encoding: Encoding });
243+
}

src/datastore/MemoryStore.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ export class MemoryStoreFactory implements DataStoreFactory {
7070
return val;
7171
}
7272

73-
storeNames(): ReadonlyArray<string> {
73+
get storeNames(): ReadonlyArray<string> {
7474
return [...this.stores.keys()];
7575
}
7676

src/datastore/lmdb/LMDBStore.ts

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,72 @@
11
import { Database } from 'lmdb';
2-
import { Logger } from 'pino';
3-
import { LoggerFactory } from '../../telemetry/LoggerFactory';
42
import { ScopedTelemetry } from '../../telemetry/ScopedTelemetry';
53
import { TelemetryService } from '../../telemetry/TelemetryService';
64
import { DataStore, StoreName } from '../DataStore';
75
import { stats, StoreStatsType } from './Stats';
86

7+
type ErrorHandler = (error: unknown) => void;
8+
99
export class LMDBStore implements DataStore {
1010
private readonly telemetry: ScopedTelemetry;
1111

1212
constructor(
1313
public readonly name: StoreName,
14-
protected readonly store: Database<unknown, string>,
15-
private readonly log: Logger = LoggerFactory.getLogger(`LMDB.${name}`),
14+
private store: Database<unknown, string>,
15+
private readonly onError?: ErrorHandler,
16+
private readonly validateDatabase?: () => void,
1617
) {
1718
this.telemetry = TelemetryService.instance.get(`LMDB.${name}`);
1819
}
1920

20-
get<T>(key: string): T | undefined {
21-
return this.telemetry.countExecution('get', () => {
22-
return this.store.get(key) as T | undefined;
21+
updateStore(store: Database<unknown, string>) {
22+
this.store = store;
23+
}
24+
25+
private exec<T>(op: string, fn: () => T): T {
26+
return this.telemetry.measure(op, () => {
27+
try {
28+
this.validateDatabase?.();
29+
return fn();
30+
} catch (e) {
31+
this.onError?.(e);
32+
throw e;
33+
}
2334
});
2435
}
2536

26-
put<T>(key: string, value: T): Promise<boolean> {
27-
return this.telemetry.measureAsync('put', () => {
28-
return this.store.put(key, value);
37+
private async execAsync<T>(op: string, fn: () => Promise<T>): Promise<T> {
38+
return await this.telemetry.measureAsync(op, async () => {
39+
try {
40+
this.validateDatabase?.();
41+
return await fn();
42+
} catch (e) {
43+
this.onError?.(e);
44+
throw e;
45+
}
2946
});
3047
}
3148

49+
get<T>(key: string): T | undefined {
50+
return this.exec('get', () => this.store.get(key) as T | undefined);
51+
}
52+
53+
put<T>(key: string, value: T): Promise<boolean> {
54+
return this.execAsync('put', () => this.store.put(key, value));
55+
}
56+
3257
remove(key: string): Promise<boolean> {
33-
return this.telemetry.countExecutionAsync('remove', () => {
34-
return this.store.remove(key);
35-
});
58+
return this.execAsync('remove', () => this.store.remove(key));
3659
}
3760

3861
clear(): Promise<void> {
39-
return this.telemetry.countExecutionAsync('clear', () => {
40-
return this.store.clearAsync();
41-
});
62+
return this.execAsync('clear', () => this.store.clearAsync());
4263
}
4364

4465
keys(limit: number): ReadonlyArray<string> {
45-
return this.telemetry.countExecution('keys', () => {
46-
return this.store.getKeys({ limit }).asArray;
47-
});
66+
return this.exec('keys', () => this.store.getKeys({ limit }).asArray);
4867
}
4968

5069
stats(): StoreStatsType {
51-
return stats(this.store);
70+
return this.exec('stats', () => stats(this.store));
5271
}
5372
}

0 commit comments

Comments
 (0)