Skip to content

Commit 34aff9a

Browse files
chore: improve readability for locks and in-progress operations
1 parent c0031ff commit 34aff9a

File tree

3 files changed

+255
-191
lines changed

3 files changed

+255
-191
lines changed

src/common/exportsManager.ts

Lines changed: 159 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -70,17 +70,18 @@ export type ExportsManagerConfig = Pick<UserConfig, "exportsPath" | "exportTimeo
7070
};
7171

7272
type ExportsManagerEvents = {
73+
closed: [];
7374
"export-expired": [string];
7475
"export-available": [string];
7576
};
7677

78+
class OperationAbortedError extends Error {}
79+
7780
export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
7881
private storedExports: Record<StoredExport["exportName"], StoredExport> = {};
7982
private exportsCleanupInProgress: boolean = false;
8083
private exportsCleanupInterval?: NodeJS.Timeout;
8184
private readonly shutdownController: AbortController = new AbortController();
82-
private readonly activeOperations: Set<Promise<unknown>> = new Set();
83-
private readonly activeOpsDrainTimeoutMs: number;
8485
private readonly readTimeoutMs: number;
8586
private readonly writeTimeoutMs: number;
8687
private readonly exportLocks: Map<string, RWLock> = new Map();
@@ -91,7 +92,6 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
9192
private readonly logger: LoggerBase
9293
) {
9394
super();
94-
this.activeOpsDrainTimeoutMs = this.config.activeOpsDrainTimeoutMs ?? 10_000;
9595
this.readTimeoutMs = this.config.readTimeout ?? 30_0000; // 30 seconds is the default timeout for an MCP request
9696
this.writeTimeoutMs = this.config.writeTimeout ?? 120_000; // considering that writes can take time
9797
}
@@ -116,7 +116,7 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
116116
protected init(): void {
117117
if (!this.exportsCleanupInterval) {
118118
this.exportsCleanupInterval = setInterval(
119-
() => void this.trackOperation(this.cleanupExpiredExports()),
119+
() => void this.cleanupExpiredExports(),
120120
this.config.exportCleanupIntervalMs
121121
);
122122
}
@@ -128,8 +128,8 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
128128
try {
129129
clearInterval(this.exportsCleanupInterval);
130130
this.shutdownController.abort();
131-
await this.waitForActiveOperationsToSettle(this.activeOpsDrainTimeoutMs);
132131
await fs.rm(this.exportsDirectoryPath, { force: true, recursive: true });
132+
this.emit("closed");
133133
} catch (error) {
134134
this.logger.error({
135135
id: LogId.exportCloseError,
@@ -143,33 +143,35 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
143143
try {
144144
this.assertIsNotShuttingDown();
145145
exportName = decodeURIComponent(exportName);
146-
return await this.withLock(exportName, "read", false, async (): Promise<string> => {
147-
const exportHandle = this.storedExports[exportName];
148-
if (!exportHandle) {
149-
throw new Error("Requested export has either expired or does not exist!");
150-
}
146+
return await this.withLock(
147+
{
148+
exportName,
149+
mode: "read",
150+
callbackName: "readExport",
151+
},
152+
async (): Promise<string> => {
153+
const exportHandle = this.storedExports[exportName];
154+
if (!exportHandle) {
155+
throw new Error("Requested export has either expired or does not exist!");
156+
}
151157

152-
// This won't happen anymore because of lock synchronization but
153-
// keeping it here to make TS happy.
154-
if (exportHandle.exportStatus === "in-progress") {
155-
throw new Error("Requested export is still being generated!");
156-
}
158+
// This won't happen because of lock synchronization but
159+
// keeping it here to make TS happy.
160+
if (exportHandle.exportStatus === "in-progress") {
161+
throw new Error("Requested export is still being generated!");
162+
}
157163

158-
const { exportPath } = exportHandle;
164+
const { exportPath } = exportHandle;
159165

160-
return await this.trackOperation(
161-
fs.readFile(exportPath, { encoding: "utf8", signal: this.shutdownController.signal })
162-
);
163-
});
166+
return fs.readFile(exportPath, { encoding: "utf8", signal: this.shutdownController.signal });
167+
}
168+
);
164169
} catch (error) {
165170
this.logger.error({
166171
id: LogId.exportReadError,
167172
context: `Error when reading export - ${exportName}`,
168173
message: error instanceof Error ? error.message : String(error),
169174
});
170-
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
171-
throw new Error("Requested export does not exist!");
172-
}
173175
throw error;
174176
}
175177
}
@@ -188,23 +190,32 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
188190
try {
189191
this.assertIsNotShuttingDown();
190192
const exportNameWithExtension = validateExportName(ensureExtension(exportName, "json"));
191-
return await this.withLock(exportNameWithExtension, "write", false, (): AvailableExport => {
192-
if (this.storedExports[exportNameWithExtension]) {
193-
throw new Error("Export with same name is either already available or being generated.");
194-
}
195-
const exportURI = `exported-data://${encodeURIComponent(exportNameWithExtension)}`;
196-
const exportFilePath = path.join(this.exportsDirectoryPath, exportNameWithExtension);
197-
const inProgressExport: InProgressExport = (this.storedExports[exportNameWithExtension] = {
193+
return await this.withLock(
194+
{
198195
exportName: exportNameWithExtension,
199-
exportTitle,
200-
exportPath: exportFilePath,
201-
exportURI: exportURI,
202-
exportStatus: "in-progress",
203-
});
204-
205-
void this.trackOperation(this.startExport({ input, jsonExportFormat, inProgressExport }));
206-
return inProgressExport;
207-
});
196+
mode: "write",
197+
callbackName: "createJSONExport",
198+
},
199+
(): Promise<AvailableExport> => {
200+
if (this.storedExports[exportNameWithExtension]) {
201+
return Promise.reject(
202+
new Error("Export with same name is either already available or being generated.")
203+
);
204+
}
205+
const exportURI = `exported-data://${encodeURIComponent(exportNameWithExtension)}`;
206+
const exportFilePath = path.join(this.exportsDirectoryPath, exportNameWithExtension);
207+
const inProgressExport: InProgressExport = (this.storedExports[exportNameWithExtension] = {
208+
exportName: exportNameWithExtension,
209+
exportTitle,
210+
exportPath: exportFilePath,
211+
exportURI: exportURI,
212+
exportStatus: "in-progress",
213+
});
214+
215+
void this.startExport({ input, jsonExportFormat, inProgressExport });
216+
return Promise.resolve(inProgressExport);
217+
}
218+
);
208219
} catch (error) {
209220
this.logger.error({
210221
id: LogId.exportCreationError,
@@ -224,47 +235,57 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
224235
jsonExportFormat: JSONExportFormat;
225236
inProgressExport: InProgressExport;
226237
}): Promise<void> {
227-
let pipeSuccessful = false;
228-
await this.withLock(inProgressExport.exportName, "write", false, async (): Promise<void> => {
229-
try {
230-
await fs.mkdir(this.exportsDirectoryPath, { recursive: true });
231-
const outputStream = createWriteStream(inProgressExport.exportPath);
232-
await pipeline(
233-
[
234-
input.stream(),
235-
this.docToEJSONStream(this.getEJSONOptionsForFormat(jsonExportFormat)),
236-
outputStream,
237-
],
238-
{ signal: this.shutdownController.signal }
239-
);
240-
pipeSuccessful = true;
241-
} catch (error) {
242-
this.logger.error({
243-
id: LogId.exportCreationError,
244-
context: `Error when generating JSON export for ${inProgressExport.exportName}`,
245-
message: error instanceof Error ? error.message : String(error),
246-
});
247-
248-
// If the pipeline errors out then we might end up with
249-
// partial and incorrect export so we remove it entirely.
250-
await this.silentlyRemoveExport(
251-
inProgressExport.exportPath,
252-
LogId.exportCreationCleanupError,
253-
`Error when removing incomplete export ${inProgressExport.exportName}`
254-
);
255-
delete this.storedExports[inProgressExport.exportName];
256-
} finally {
257-
if (pipeSuccessful) {
258-
this.storedExports[inProgressExport.exportName] = {
259-
...inProgressExport,
260-
exportCreatedAt: Date.now(),
261-
exportStatus: "ready",
262-
};
263-
this.emit("export-available", inProgressExport.exportURI);
238+
try {
239+
await this.withLock(
240+
{
241+
exportName: inProgressExport.exportName,
242+
mode: "write",
243+
callbackName: "startExport",
244+
},
245+
async (): Promise<void> => {
246+
let pipeSuccessful = false;
247+
try {
248+
await fs.mkdir(this.exportsDirectoryPath, { recursive: true });
249+
const outputStream = createWriteStream(inProgressExport.exportPath);
250+
await pipeline(
251+
[
252+
input.stream(),
253+
this.docToEJSONStream(this.getEJSONOptionsForFormat(jsonExportFormat)),
254+
outputStream,
255+
],
256+
{ signal: this.shutdownController.signal }
257+
);
258+
pipeSuccessful = true;
259+
} catch (error) {
260+
// If the pipeline errors out then we might end up with
261+
// partial and incorrect export so we remove it entirely.
262+
await this.silentlyRemoveExport(
263+
inProgressExport.exportPath,
264+
LogId.exportCreationCleanupError,
265+
`Error when removing incomplete export ${inProgressExport.exportName}`
266+
);
267+
delete this.storedExports[inProgressExport.exportName];
268+
throw error;
269+
} finally {
270+
if (pipeSuccessful) {
271+
this.storedExports[inProgressExport.exportName] = {
272+
...inProgressExport,
273+
exportCreatedAt: Date.now(),
274+
exportStatus: "ready",
275+
};
276+
this.emit("export-available", inProgressExport.exportURI);
277+
}
278+
void input.close();
279+
}
264280
}
265-
void input.close();
266-
}
267-
});
281+
);
282+
} catch (error) {
283+
this.logger.error({
284+
id: LogId.exportCreationError,
285+
context: `Error when generating JSON export for ${inProgressExport.exportName}`,
286+
message: error instanceof Error ? error.message : String(error),
287+
});
288+
}
268289
}
269290

270291
private getEJSONOptionsForFormat(format: JSONExportFormat): EJSONOptions | undefined {
@@ -321,15 +342,23 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
321342
await Promise.allSettled(
322343
exportsForCleanup.map(async ({ exportPath, exportCreatedAt, exportURI, exportName }) => {
323344
if (isExportExpired(exportCreatedAt, this.config.exportTimeoutMs)) {
324-
await this.withLock(exportName, "write", true, async (): Promise<void> => {
325-
delete this.storedExports[exportName];
326-
await this.silentlyRemoveExport(
327-
exportPath,
328-
LogId.exportCleanupError,
329-
`Considerable error when removing export ${exportName}`
330-
);
331-
this.emit("export-expired", exportURI);
332-
});
345+
await this.withLock(
346+
{
347+
exportName,
348+
mode: "write",
349+
finalize: true,
350+
callbackName: "cleanupExpiredExport",
351+
},
352+
async (): Promise<void> => {
353+
delete this.storedExports[exportName];
354+
await this.silentlyRemoveExport(
355+
exportPath,
356+
LogId.exportCleanupError,
357+
`Considerable error when removing export ${exportName}`
358+
);
359+
this.emit("export-expired", exportURI);
360+
}
361+
);
333362
}
334363
})
335364
);
@@ -367,62 +396,67 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
367396
}
368397
}
369398

370-
private async withLock<T>(
371-
exportName: string,
372-
mode: "read" | "write",
373-
finalize: boolean,
374-
fn: () => T | Promise<T>
375-
): Promise<T> {
399+
private async withLock<CallbackResult extends Promise<unknown>>(
400+
lockConfig: {
401+
exportName: string;
402+
mode: "read" | "write";
403+
finalize?: boolean;
404+
callbackName?: string;
405+
},
406+
callback: () => CallbackResult
407+
): Promise<Awaited<CallbackResult>> {
408+
const { exportName, mode, finalize = false, callbackName } = lockConfig;
409+
const operationName = callbackName ? `${callbackName} - ${exportName}` : exportName;
376410
let lock = this.exportLocks.get(exportName);
377411
if (!lock) {
378412
lock = new RWLock();
379413
this.exportLocks.set(exportName, lock);
380414
}
381415

382-
try {
416+
let lockAcquired: boolean = false;
417+
const acquireLock = async (): Promise<void> => {
383418
if (mode === "read") {
384419
await lock.readLock(this.readTimeoutMs);
385420
} else {
386421
await lock.writeLock(this.writeTimeoutMs);
387422
}
388-
return await fn();
423+
lockAcquired = true;
424+
};
425+
426+
try {
427+
await Promise.race([
428+
this.operationAbortedPromise(`Acquire ${mode} lock for ${operationName}`),
429+
acquireLock(),
430+
]);
431+
return await Promise.race([this.operationAbortedPromise(operationName), callback()]);
389432
} finally {
390-
lock.unlock();
433+
if (lockAcquired) {
434+
lock.unlock();
435+
}
391436
if (finalize) {
392437
this.exportLocks.delete(exportName);
393438
}
394439
}
395440
}
396441

397-
private async trackOperation<T>(promise: Promise<T>): Promise<T> {
398-
this.activeOperations.add(promise);
399-
try {
400-
return await promise;
401-
} finally {
402-
this.activeOperations.delete(promise);
403-
}
404-
}
442+
private operationAbortedPromise(operationName?: string): Promise<never> {
443+
return new Promise((_, reject) => {
444+
const rejectIfAborted = (): void => {
445+
if (this.shutdownController.signal.aborted) {
446+
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
447+
const abortReason = this.shutdownController.signal.reason;
448+
const abortMessage =
449+
typeof abortReason === "string"
450+
? abortReason
451+
: `${operationName ?? "Operation"} aborted - ExportsManager shutting down!`;
452+
reject(new OperationAbortedError(abortMessage));
453+
this.shutdownController.signal.removeEventListener("abort", rejectIfAborted);
454+
}
455+
};
405456

406-
private async waitForActiveOperationsToSettle(timeoutMs: number): Promise<void> {
407-
const pendingPromises = Array.from(this.activeOperations);
408-
if (pendingPromises.length === 0) {
409-
return;
410-
}
411-
let timedOut = false;
412-
const timeoutPromise = new Promise<void>((resolve) =>
413-
setTimeout(() => {
414-
timedOut = true;
415-
resolve();
416-
}, timeoutMs)
417-
);
418-
await Promise.race([Promise.allSettled(pendingPromises), timeoutPromise]);
419-
if (timedOut && this.activeOperations.size > 0) {
420-
this.logger.error({
421-
id: LogId.exportCloseError,
422-
context: `Close timed out waiting for ${this.activeOperations.size} operation(s) to settle`,
423-
message: "Proceeding to force cleanup after timeout",
424-
});
425-
}
457+
rejectIfAborted();
458+
this.shutdownController.signal.addEventListener("abort", rejectIfAborted);
459+
});
426460
}
427461

428462
static init(

src/common/logger.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ export const LogId = {
5757
exportCloseError: mongoLogId(1_007_005),
5858
exportedDataListError: mongoLogId(1_007_006),
5959
exportedDataAutoCompleteError: mongoLogId(1_007_007),
60+
exportLockError: mongoLogId(1_007_008),
6061
} as const;
6162

6263
interface LogPayload {

0 commit comments

Comments
 (0)