Skip to content

Commit 78a9044

Browse files
committed
chore: remove usage of rwlocks as sessions are single user
RWLocks are not necessary here because sessions are single user and we don't want the agent to wait until a resource is available, as it can take forever depending on the data set.
1 parent 602f1c1 commit 78a9044

File tree

3 files changed

+96
-217
lines changed

3 files changed

+96
-217
lines changed

src/common/exportsManager.ts

Lines changed: 86 additions & 196 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import { EJSON, EJSONOptions, ObjectId } from "bson";
88
import { Transform } from "stream";
99
import { pipeline } from "stream/promises";
1010
import { MongoLogId } from "mongodb-log-writer";
11-
import { RWLock } from "async-rwlock";
1211

1312
import { UserConfig } from "./config.js";
1413
import { LoggerBase, LogId } from "./logger.js";
@@ -57,43 +56,26 @@ type StoredExport = ReadyExport | InProgressExport;
5756
* JIRA: https://jira.mongodb.org/browse/MCP-104 */
5857
type AvailableExport = Pick<StoredExport, "exportName" | "exportTitle" | "exportURI" | "exportPath">;
5958

60-
export type ExportsManagerConfig = Pick<UserConfig, "exportsPath" | "exportTimeoutMs" | "exportCleanupIntervalMs"> & {
61-
// The maximum number of milliseconds to wait for in-flight operations to
62-
// settle before shutting down ExportsManager.
63-
activeOpsDrainTimeoutMs?: number;
64-
65-
// The maximum number of milliseconds to wait before timing out queued reads
66-
readTimeout?: number;
67-
68-
// The maximum number of milliseconds to wait before timing out queued writes
69-
writeTimeout?: number;
70-
};
59+
export type ExportsManagerConfig = Pick<UserConfig, "exportsPath" | "exportTimeoutMs" | "exportCleanupIntervalMs">;
7160

7261
type ExportsManagerEvents = {
7362
closed: [];
7463
"export-expired": [string];
7564
"export-available": [string];
7665
};
7766

78-
class OperationAbortedError extends Error {}
79-
8067
export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
8168
private storedExports: Record<StoredExport["exportName"], StoredExport> = {};
8269
private exportsCleanupInProgress: boolean = false;
8370
private exportsCleanupInterval?: NodeJS.Timeout;
8471
private readonly shutdownController: AbortController = new AbortController();
85-
private readonly readTimeoutMs: number;
86-
private readonly writeTimeoutMs: number;
87-
private readonly exportLocks: Map<string, RWLock> = new Map();
8872

8973
private constructor(
9074
private readonly exportsDirectoryPath: string,
9175
private readonly config: ExportsManagerConfig,
9276
private readonly logger: LoggerBase
9377
) {
9478
super();
95-
this.readTimeoutMs = this.config.readTimeout ?? 30_0000; // 30 seconds is the default timeout for an MCP request
96-
this.writeTimeoutMs = this.config.writeTimeout ?? 120_000; // considering that writes can take time
9779
}
9880

9981
public get availableExports(): AvailableExport[] {
@@ -121,6 +103,7 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
121103
);
122104
}
123105
}
106+
124107
public async close(): Promise<void> {
125108
if (this.shutdownController.signal.aborted) {
126109
return;
@@ -143,29 +126,18 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
143126
try {
144127
this.assertIsNotShuttingDown();
145128
exportName = decodeURIComponent(exportName);
146-
return await this.withLock(
147-
{
148-
exportName,
149-
mode: "read",
150-
callbackName: "readExport",
151-
},
152-
async (): Promise<string> => {
153-
const exportHandle = this.storedExports[exportName];
154-
if (!exportHandle) {
155-
throw new Error("Requested export has either expired or does not exist!");
156-
}
129+
const exportHandle = this.storedExports[exportName];
130+
if (!exportHandle) {
131+
throw new Error("Requested export has either expired or does not exist.");
132+
}
157133

158-
// This won't happen because of lock synchronization but
159-
// keeping it here to make TS happy.
160-
if (exportHandle.exportStatus === "in-progress") {
161-
throw new Error("Requested export is still being generated!");
162-
}
134+
if (exportHandle.exportStatus === "in-progress") {
135+
throw new Error("Requested export is still being generated. Try again later.");
136+
}
163137

164-
const { exportPath } = exportHandle;
138+
const { exportPath } = exportHandle;
165139

166-
return fs.readFile(exportPath, { encoding: "utf8", signal: this.shutdownController.signal });
167-
}
168-
);
140+
return fs.readFile(exportPath, { encoding: "utf8", signal: this.shutdownController.signal });
169141
} catch (error) {
170142
this.logger.error({
171143
id: LogId.exportReadError,
@@ -190,32 +162,23 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
190162
try {
191163
this.assertIsNotShuttingDown();
192164
const exportNameWithExtension = validateExportName(ensureExtension(exportName, "json"));
193-
return await this.withLock(
194-
{
195-
exportName: exportNameWithExtension,
196-
mode: "write",
197-
callbackName: "createJSONExport",
198-
},
199-
(): Promise<AvailableExport> => {
200-
if (this.storedExports[exportNameWithExtension]) {
201-
return Promise.reject(
202-
new Error("Export with same name is either already available or being generated.")
203-
);
204-
}
205-
const exportURI = `exported-data://${encodeURIComponent(exportNameWithExtension)}`;
206-
const exportFilePath = path.join(this.exportsDirectoryPath, exportNameWithExtension);
207-
const inProgressExport: InProgressExport = (this.storedExports[exportNameWithExtension] = {
208-
exportName: exportNameWithExtension,
209-
exportTitle,
210-
exportPath: exportFilePath,
211-
exportURI: exportURI,
212-
exportStatus: "in-progress",
213-
});
214-
215-
void this.startExport({ input, jsonExportFormat, inProgressExport });
216-
return Promise.resolve(inProgressExport);
217-
}
218-
);
165+
if (this.storedExports[exportNameWithExtension]) {
166+
return Promise.reject(
167+
new Error("Export with same name is either already available or being generated.")
168+
);
169+
}
170+
const exportURI = `exported-data://${encodeURIComponent(exportNameWithExtension)}`;
171+
const exportFilePath = path.join(this.exportsDirectoryPath, exportNameWithExtension);
172+
const inProgressExport: InProgressExport = (this.storedExports[exportNameWithExtension] = {
173+
exportName: exportNameWithExtension,
174+
exportTitle,
175+
exportPath: exportFilePath,
176+
exportURI: exportURI,
177+
exportStatus: "in-progress",
178+
});
179+
180+
void this.startExport({ input, jsonExportFormat, inProgressExport });
181+
return Promise.resolve(inProgressExport);
219182
} catch (error) {
220183
this.logger.error({
221184
id: LogId.exportCreationError,
@@ -236,49 +199,41 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
236199
inProgressExport: InProgressExport;
237200
}): Promise<void> {
238201
try {
239-
await this.withLock(
240-
{
241-
exportName: inProgressExport.exportName,
242-
mode: "write",
243-
callbackName: "startExport",
244-
},
245-
async (): Promise<void> => {
246-
let pipeSuccessful = false;
247-
try {
248-
await fs.mkdir(this.exportsDirectoryPath, { recursive: true });
249-
const outputStream = createWriteStream(inProgressExport.exportPath);
250-
await pipeline(
251-
[
252-
input.stream(),
253-
this.docToEJSONStream(this.getEJSONOptionsForFormat(jsonExportFormat)),
254-
outputStream,
255-
],
256-
{ signal: this.shutdownController.signal }
257-
);
258-
pipeSuccessful = true;
259-
} catch (error) {
260-
// If the pipeline errors out then we might end up with
261-
// partial and incorrect export so we remove it entirely.
262-
await this.silentlyRemoveExport(
263-
inProgressExport.exportPath,
264-
LogId.exportCreationCleanupError,
265-
`Error when removing incomplete export ${inProgressExport.exportName}`
266-
);
267-
delete this.storedExports[inProgressExport.exportName];
268-
throw error;
269-
} finally {
270-
if (pipeSuccessful) {
271-
this.storedExports[inProgressExport.exportName] = {
272-
...inProgressExport,
273-
exportCreatedAt: Date.now(),
274-
exportStatus: "ready",
275-
};
276-
this.emit("export-available", inProgressExport.exportURI);
277-
}
278-
void input.close();
279-
}
202+
let pipeSuccessful = false;
203+
try {
204+
await fs.mkdir(this.exportsDirectoryPath, { recursive: true });
205+
const outputStream = createWriteStream(inProgressExport.exportPath);
206+
await pipeline(
207+
[
208+
input.stream(),
209+
this.docToEJSONStream(this.getEJSONOptionsForFormat(jsonExportFormat)),
210+
outputStream,
211+
],
212+
{ signal: this.shutdownController.signal }
213+
);
214+
pipeSuccessful = true;
215+
} catch (error) {
216+
// If the pipeline errors out then we might end up with
217+
// partial and incorrect export so we remove it entirely.
218+
delete this.storedExports[inProgressExport.exportName];
219+
// do not block the user, just delete the file in the background
220+
void this.silentlyRemoveExport(
221+
inProgressExport.exportPath,
222+
LogId.exportCreationCleanupError,
223+
`Error when removing incomplete export ${inProgressExport.exportName}`
224+
);
225+
throw error;
226+
} finally {
227+
if (pipeSuccessful) {
228+
this.storedExports[inProgressExport.exportName] = {
229+
...inProgressExport,
230+
exportCreatedAt: Date.now(),
231+
exportStatus: "ready",
232+
};
233+
this.emit("export-available", inProgressExport.exportURI);
280234
}
281-
);
235+
void input.close();
236+
}
282237
} catch (error) {
283238
this.logger.error({
284239
id: LogId.exportCreationError,
@@ -335,33 +290,31 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
335290

336291
this.exportsCleanupInProgress = true;
337292
try {
338-
const exportsForCleanup = Object.values({ ...this.storedExports }).filter(
339-
(storedExport): storedExport is ReadyExport => storedExport.exportStatus === "ready"
340-
);
293+
// first, unregister all exports that are expired, so they are not considered anymore for reading
294+
const exportsForCleanup: ReadyExport[] = [];
295+
for (const expiredExport of Object.values(this.storedExports)) {
296+
if (
297+
expiredExport.exportStatus === "ready" &&
298+
isExportExpired(expiredExport.exportCreatedAt, this.config.exportTimeoutMs)
299+
) {
300+
exportsForCleanup.push(expiredExport);
301+
delete this.storedExports[expiredExport.exportName];
302+
}
303+
}
341304

342-
await Promise.allSettled(
343-
exportsForCleanup.map(async ({ exportPath, exportCreatedAt, exportURI, exportName }) => {
344-
if (isExportExpired(exportCreatedAt, this.config.exportTimeoutMs)) {
345-
await this.withLock(
346-
{
347-
exportName,
348-
mode: "write",
349-
finalize: true,
350-
callbackName: "cleanupExpiredExport",
351-
},
352-
async (): Promise<void> => {
353-
delete this.storedExports[exportName];
354-
await this.silentlyRemoveExport(
355-
exportPath,
356-
LogId.exportCleanupError,
357-
`Considerable error when removing export ${exportName}`
358-
);
359-
this.emit("export-expired", exportURI);
360-
}
361-
);
362-
}
363-
})
364-
);
305+
// and then remove them (slow operation potentially) from disk.
306+
const allDeletionPromises: Promise<void>[] = [];
307+
for (const { exportPath, exportName } of exportsForCleanup) {
308+
allDeletionPromises.push(
309+
this.silentlyRemoveExport(
310+
exportPath,
311+
LogId.exportCleanupError,
312+
`Considerable error when removing export ${exportName}`
313+
)
314+
);
315+
}
316+
317+
await Promise.allSettled(allDeletionPromises);
365318
} catch (error) {
366319
this.logger.error({
367320
id: LogId.exportCleanupError,
@@ -396,69 +349,6 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
396349
}
397350
}
398351

399-
private async withLock<CallbackResult extends Promise<unknown>>(
400-
lockConfig: {
401-
exportName: string;
402-
mode: "read" | "write";
403-
finalize?: boolean;
404-
callbackName?: string;
405-
},
406-
callback: () => CallbackResult
407-
): Promise<Awaited<CallbackResult>> {
408-
const { exportName, mode, finalize = false, callbackName } = lockConfig;
409-
const operationName = callbackName ? `${callbackName} - ${exportName}` : exportName;
410-
let lock = this.exportLocks.get(exportName);
411-
if (!lock) {
412-
lock = new RWLock();
413-
this.exportLocks.set(exportName, lock);
414-
}
415-
416-
let lockAcquired: boolean = false;
417-
const acquireLock = async (): Promise<void> => {
418-
if (mode === "read") {
419-
await lock.readLock(this.readTimeoutMs);
420-
} else {
421-
await lock.writeLock(this.writeTimeoutMs);
422-
}
423-
lockAcquired = true;
424-
};
425-
426-
try {
427-
await Promise.race([
428-
this.operationAbortedPromise(`Acquire ${mode} lock for ${operationName}`),
429-
acquireLock(),
430-
]);
431-
return await Promise.race([this.operationAbortedPromise(operationName), callback()]);
432-
} finally {
433-
if (lockAcquired) {
434-
lock.unlock();
435-
}
436-
if (finalize) {
437-
this.exportLocks.delete(exportName);
438-
}
439-
}
440-
}
441-
442-
private operationAbortedPromise(operationName?: string): Promise<never> {
443-
return new Promise((_, reject) => {
444-
const rejectIfAborted = (): void => {
445-
if (this.shutdownController.signal.aborted) {
446-
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
447-
const abortReason = this.shutdownController.signal.reason;
448-
const abortMessage =
449-
typeof abortReason === "string"
450-
? abortReason
451-
: `${operationName ?? "Operation"} aborted - ExportsManager shutting down!`;
452-
reject(new OperationAbortedError(abortMessage));
453-
this.shutdownController.signal.removeEventListener("abort", rejectIfAborted);
454-
}
455-
};
456-
457-
rejectIfAborted();
458-
this.shutdownController.signal.addEventListener("abort", rejectIfAborted);
459-
});
460-
}
461-
462352
static init(
463353
config: ExportsManagerConfig,
464354
logger: LoggerBase,

tests/integration/resources/exportedData.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ describeWithMongoDB(
5555
expect(response.isError).toEqual(true);
5656
expect(response.contents[0]?.uri).toEqual(exportURI);
5757
expect(response.contents[0]?.text).toEqual(
58-
`Error reading ${exportURI}: Requested export has either expired or does not exist!`
58+
`Error reading ${exportURI}: Requested export has either expired or does not exist.`
5959
);
6060
});
6161
});

0 commit comments

Comments
 (0)