Skip to content

Commit c0031ff

Browse files
chore: implement read write lock
1 parent 85cb524 commit c0031ff

File tree

5 files changed

+179
-106
lines changed

5 files changed

+179
-106
lines changed

package-lock.json

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
"@mongodb-js/devtools-proxy-support": "^0.5.1",
9797
"@mongosh/service-provider-node-driver": "^3.10.2",
9898
"@vitest/eslint-plugin": "^1.3.4",
99+
"async-rwlock": "^1.1.1",
99100
"bson": "^6.10.4",
100101
"express": "^5.1.0",
101102
"lru-cache": "^11.1.0",

src/common/exportsManager.ts

Lines changed: 130 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ import { FindCursor } from "mongodb";
77
import { EJSON, EJSONOptions, ObjectId } from "bson";
88
import { Transform } from "stream";
99
import { pipeline } from "stream/promises";
10+
import { MongoLogId } from "mongodb-log-writer";
11+
import { RWLock } from "async-rwlock";
1012

1113
import { UserConfig } from "./config.js";
1214
import { LoggerBase, LogId } from "./logger.js";
13-
import { MongoLogId } from "mongodb-log-writer";
1415

1516
export const jsonExportFormat = z.enum(["relaxed", "canonical"]);
1617
export type JSONExportFormat = z.infer<typeof jsonExportFormat>;
@@ -60,6 +61,12 @@ export type ExportsManagerConfig = Pick<UserConfig, "exportsPath" | "exportTimeo
6061
// The maximum number of milliseconds to wait for in-flight operations to
6162
// settle before shutting down ExportsManager.
6263
activeOpsDrainTimeoutMs?: number;
64+
65+
// The maximum number of milliseconds to wait before timing out queued reads
66+
readTimeout?: number;
67+
68+
// The maximum number of milliseconds to wait before timing out queued writes
69+
writeTimeout?: number;
6370
};
6471

6572
type ExportsManagerEvents = {
@@ -74,6 +81,9 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
7481
private readonly shutdownController: AbortController = new AbortController();
7582
private readonly activeOperations: Set<Promise<unknown>> = new Set();
7683
private readonly activeOpsDrainTimeoutMs: number;
84+
private readonly readTimeoutMs: number;
85+
private readonly writeTimeoutMs: number;
86+
private readonly exportLocks: Map<string, RWLock> = new Map();
7787

7888
private constructor(
7989
private readonly exportsDirectoryPath: string,
@@ -82,6 +92,8 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
8292
) {
8393
super();
8494
this.activeOpsDrainTimeoutMs = this.config.activeOpsDrainTimeoutMs ?? 10_000;
95+
this.readTimeoutMs = this.config.readTimeout ?? 30_0000; // 30 seconds is the default timeout for an MCP request
96+
this.writeTimeoutMs = this.config.writeTimeout ?? 120_000; // considering that writes can take time
8597
}
8698

8799
public get availableExports(): AvailableExport[] {
@@ -131,24 +143,24 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
131143
try {
132144
this.assertIsNotShuttingDown();
133145
exportName = decodeURIComponent(exportName);
134-
const exportHandle = this.storedExports[exportName];
135-
if (!exportHandle) {
136-
throw new Error("Requested export has either expired or does not exist!");
137-
}
138-
139-
if (exportHandle.exportStatus === "in-progress") {
140-
throw new Error("Requested export is still being generated!");
141-
}
146+
return await this.withLock(exportName, "read", false, async (): Promise<string> => {
147+
const exportHandle = this.storedExports[exportName];
148+
if (!exportHandle) {
149+
throw new Error("Requested export has either expired or does not exist!");
150+
}
142151

143-
const { exportPath, exportCreatedAt } = exportHandle;
152+
// This won't happen anymore because of lock synchronization but
153+
// keeping it here to make TS happy.
154+
if (exportHandle.exportStatus === "in-progress") {
155+
throw new Error("Requested export is still being generated!");
156+
}
144157

145-
if (isExportExpired(exportCreatedAt, this.config.exportTimeoutMs)) {
146-
throw new Error("Requested export has expired!");
147-
}
158+
const { exportPath } = exportHandle;
148159

149-
return await this.trackOperation(
150-
fs.readFile(exportPath, { encoding: "utf8", signal: this.shutdownController.signal })
151-
);
160+
return await this.trackOperation(
161+
fs.readFile(exportPath, { encoding: "utf8", signal: this.shutdownController.signal })
162+
);
163+
});
152164
} catch (error) {
153165
this.logger.error({
154166
id: LogId.exportReadError,
@@ -162,7 +174,7 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
162174
}
163175
}
164176

165-
public createJSONExport({
177+
public async createJSONExport({
166178
input,
167179
exportName,
168180
exportTitle,
@@ -172,25 +184,27 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
172184
exportName: string;
173185
exportTitle: string;
174186
jsonExportFormat: JSONExportFormat;
175-
}): AvailableExport {
187+
}): Promise<AvailableExport> {
176188
try {
177189
this.assertIsNotShuttingDown();
178190
const exportNameWithExtension = validateExportName(ensureExtension(exportName, "json"));
179-
if (this.storedExports[exportNameWithExtension]) {
180-
throw new Error("Export with same name is either already available or being generated.");
181-
}
182-
const exportURI = `exported-data://${encodeURIComponent(exportNameWithExtension)}`;
183-
const exportFilePath = path.join(this.exportsDirectoryPath, exportNameWithExtension);
184-
const inProgressExport: InProgressExport = (this.storedExports[exportNameWithExtension] = {
185-
exportName: exportNameWithExtension,
186-
exportTitle,
187-
exportPath: exportFilePath,
188-
exportURI: exportURI,
189-
exportStatus: "in-progress",
190-
});
191+
return await this.withLock(exportNameWithExtension, "write", false, (): AvailableExport => {
192+
if (this.storedExports[exportNameWithExtension]) {
193+
throw new Error("Export with same name is either already available or being generated.");
194+
}
195+
const exportURI = `exported-data://${encodeURIComponent(exportNameWithExtension)}`;
196+
const exportFilePath = path.join(this.exportsDirectoryPath, exportNameWithExtension);
197+
const inProgressExport: InProgressExport = (this.storedExports[exportNameWithExtension] = {
198+
exportName: exportNameWithExtension,
199+
exportTitle,
200+
exportPath: exportFilePath,
201+
exportURI: exportURI,
202+
exportStatus: "in-progress",
203+
});
191204

192-
void this.trackOperation(this.startExport({ input, jsonExportFormat, inProgressExport }));
193-
return inProgressExport;
205+
void this.trackOperation(this.startExport({ input, jsonExportFormat, inProgressExport }));
206+
return inProgressExport;
207+
});
194208
} catch (error) {
195209
this.logger.error({
196210
id: LogId.exportCreationError,
@@ -211,40 +225,46 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
211225
inProgressExport: InProgressExport;
212226
}): Promise<void> {
213227
let pipeSuccessful = false;
214-
try {
215-
await fs.mkdir(this.exportsDirectoryPath, { recursive: true });
216-
const outputStream = createWriteStream(inProgressExport.exportPath);
217-
await pipeline(
218-
[input.stream(), this.docToEJSONStream(this.getEJSONOptionsForFormat(jsonExportFormat)), outputStream],
219-
{ signal: this.shutdownController.signal }
220-
);
221-
pipeSuccessful = true;
222-
} catch (error) {
223-
this.logger.error({
224-
id: LogId.exportCreationError,
225-
context: `Error when generating JSON export for ${inProgressExport.exportName}`,
226-
message: error instanceof Error ? error.message : String(error),
227-
});
228+
await this.withLock(inProgressExport.exportName, "write", false, async (): Promise<void> => {
229+
try {
230+
await fs.mkdir(this.exportsDirectoryPath, { recursive: true });
231+
const outputStream = createWriteStream(inProgressExport.exportPath);
232+
await pipeline(
233+
[
234+
input.stream(),
235+
this.docToEJSONStream(this.getEJSONOptionsForFormat(jsonExportFormat)),
236+
outputStream,
237+
],
238+
{ signal: this.shutdownController.signal }
239+
);
240+
pipeSuccessful = true;
241+
} catch (error) {
242+
this.logger.error({
243+
id: LogId.exportCreationError,
244+
context: `Error when generating JSON export for ${inProgressExport.exportName}`,
245+
message: error instanceof Error ? error.message : String(error),
246+
});
228247

229-
// If the pipeline errors out then we might end up with
230-
// partial and incorrect export so we remove it entirely.
231-
await this.silentlyRemoveExport(
232-
inProgressExport.exportPath,
233-
LogId.exportCreationCleanupError,
234-
`Error when removing incomplete export ${inProgressExport.exportName}`
235-
);
236-
delete this.storedExports[inProgressExport.exportName];
237-
} finally {
238-
if (pipeSuccessful) {
239-
this.storedExports[inProgressExport.exportName] = {
240-
...inProgressExport,
241-
exportCreatedAt: Date.now(),
242-
exportStatus: "ready",
243-
};
244-
this.emit("export-available", inProgressExport.exportURI);
248+
// If the pipeline errors out then we might end up with
249+
// partial and incorrect export so we remove it entirely.
250+
await this.silentlyRemoveExport(
251+
inProgressExport.exportPath,
252+
LogId.exportCreationCleanupError,
253+
`Error when removing incomplete export ${inProgressExport.exportName}`
254+
);
255+
delete this.storedExports[inProgressExport.exportName];
256+
} finally {
257+
if (pipeSuccessful) {
258+
this.storedExports[inProgressExport.exportName] = {
259+
...inProgressExport,
260+
exportCreatedAt: Date.now(),
261+
exportStatus: "ready",
262+
};
263+
this.emit("export-available", inProgressExport.exportURI);
264+
}
265+
void input.close();
245266
}
246-
void input.close();
247-
}
267+
});
248268
}
249269

250270
private getEJSONOptionsForFormat(format: JSONExportFormat): EJSONOptions | undefined {
@@ -293,24 +313,26 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
293313
}
294314

295315
this.exportsCleanupInProgress = true;
296-
const exportsForCleanup = Object.values({ ...this.storedExports }).filter(
297-
(storedExport): storedExport is ReadyExport => storedExport.exportStatus === "ready"
298-
);
299316
try {
300-
for (const { exportPath, exportCreatedAt, exportURI, exportName } of exportsForCleanup) {
301-
if (this.shutdownController.signal.aborted) {
302-
break;
303-
}
304-
if (isExportExpired(exportCreatedAt, this.config.exportTimeoutMs)) {
305-
delete this.storedExports[exportName];
306-
await this.silentlyRemoveExport(
307-
exportPath,
308-
LogId.exportCleanupError,
309-
`Considerable error when removing export ${exportName}`
310-
);
311-
this.emit("export-expired", exportURI);
312-
}
313-
}
317+
const exportsForCleanup = Object.values({ ...this.storedExports }).filter(
318+
(storedExport): storedExport is ReadyExport => storedExport.exportStatus === "ready"
319+
);
320+
321+
await Promise.allSettled(
322+
exportsForCleanup.map(async ({ exportPath, exportCreatedAt, exportURI, exportName }) => {
323+
if (isExportExpired(exportCreatedAt, this.config.exportTimeoutMs)) {
324+
await this.withLock(exportName, "write", true, async (): Promise<void> => {
325+
delete this.storedExports[exportName];
326+
await this.silentlyRemoveExport(
327+
exportPath,
328+
LogId.exportCleanupError,
329+
`Considerable error when removing export ${exportName}`
330+
);
331+
this.emit("export-expired", exportURI);
332+
});
333+
}
334+
})
335+
);
314336
} catch (error) {
315337
this.logger.error({
316338
id: LogId.exportCleanupError,
@@ -345,6 +367,33 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
345367
}
346368
}
347369

370+
private async withLock<T>(
371+
exportName: string,
372+
mode: "read" | "write",
373+
finalize: boolean,
374+
fn: () => T | Promise<T>
375+
): Promise<T> {
376+
let lock = this.exportLocks.get(exportName);
377+
if (!lock) {
378+
lock = new RWLock();
379+
this.exportLocks.set(exportName, lock);
380+
}
381+
382+
try {
383+
if (mode === "read") {
384+
await lock.readLock(this.readTimeoutMs);
385+
} else {
386+
await lock.writeLock(this.writeTimeoutMs);
387+
}
388+
return await fn();
389+
} finally {
390+
lock.unlock();
391+
if (finalize) {
392+
this.exportLocks.delete(exportName);
393+
}
394+
}
395+
}
396+
348397
private async trackOperation<T>(promise: Promise<T>): Promise<T> {
349398
this.activeOperations.add(promise);
350399
try {

src/tools/mongodb/read/export.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ export class ExportTool extends MongoDBToolBase {
4646
});
4747
const exportName = `${database}.${collection}.${new ObjectId().toString()}.json`;
4848

49-
const { exportURI, exportPath } = this.session.exportsManager.createJSONExport({
49+
const { exportURI, exportPath } = await this.session.exportsManager.createJSONExport({
5050
input: findCursor,
5151
exportName,
5252
exportTitle:

0 commit comments

Comments
 (0)