@@ -56,25 +56,32 @@ type StoredExport = ReadyExport | InProgressExport;
56
56
* JIRA: https://jira.mongodb.org/browse/MCP-104 */
57
57
type AvailableExport = Pick < StoredExport , "exportName" | "exportTitle" | "exportURI" | "exportPath" > ;
58
58
59
- export type ExportsManagerConfig = Pick < UserConfig , "exportsPath" | "exportTimeoutMs" | "exportCleanupIntervalMs" > ;
59
+ export type ExportsManagerConfig = Pick < UserConfig , "exportsPath" | "exportTimeoutMs" | "exportCleanupIntervalMs" > & {
60
+ // The maximum number of milliseconds to wait for in-flight operations to
61
+ // settle before shutting down ExportsManager.
62
+ activeOpsDrainTimeoutMs ?: number ;
63
+ } ;
60
64
61
65
type ExportsManagerEvents = {
62
66
"export-expired" : [ string ] ;
63
67
"export-available" : [ string ] ;
64
68
} ;
65
69
66
70
export class ExportsManager extends EventEmitter < ExportsManagerEvents > {
67
- private isShuttingDown : boolean = false ;
68
71
private storedExports : Record < StoredExport [ "exportName" ] , StoredExport > = { } ;
69
72
private exportsCleanupInProgress : boolean = false ;
70
73
private exportsCleanupInterval ?: NodeJS . Timeout ;
74
+ private readonly shutdownController : AbortController = new AbortController ( ) ;
75
+ private readonly activeOperations : Set < Promise < unknown > > = new Set ( ) ;
76
+ private readonly activeOpsDrainTimeoutMs : number ;
71
77
72
78
private constructor (
73
79
private readonly exportsDirectoryPath : string ,
74
80
private readonly config : ExportsManagerConfig ,
75
81
private readonly logger : LoggerBase
76
82
) {
77
83
super ( ) ;
84
+ this . activeOpsDrainTimeoutMs = this . config . activeOpsDrainTimeoutMs ?? 10_000 ;
78
85
}
79
86
80
87
public get availableExports ( ) : AvailableExport [ ] {
@@ -97,20 +104,19 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
97
104
protected init ( ) : void {
98
105
if ( ! this . exportsCleanupInterval ) {
99
106
this . exportsCleanupInterval = setInterval (
100
- ( ) => void this . cleanupExpiredExports ( ) ,
107
+ ( ) => void this . trackOperation ( this . cleanupExpiredExports ( ) ) ,
101
108
this . config . exportCleanupIntervalMs
102
109
) ;
103
110
}
104
111
}
105
-
106
112
public async close ( ) : Promise < void > {
107
- if ( this . isShuttingDown ) {
113
+ if ( this . shutdownController . signal . aborted ) {
108
114
return ;
109
115
}
110
-
111
- this . isShuttingDown = true ;
112
116
try {
113
117
clearInterval ( this . exportsCleanupInterval ) ;
118
+ this . shutdownController . abort ( ) ;
119
+ await this . waitForActiveOperationsToSettle ( this . activeOpsDrainTimeoutMs ) ;
114
120
await fs . rm ( this . exportsDirectoryPath , { force : true , recursive : true } ) ;
115
121
} catch ( error ) {
116
122
this . logger . error ( {
@@ -140,7 +146,9 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
140
146
throw new Error ( "Requested export has expired!" ) ;
141
147
}
142
148
143
- return await fs . readFile ( exportPath , "utf8" ) ;
149
+ return await this . trackOperation (
150
+ fs . readFile ( exportPath , { encoding : "utf8" , signal : this . shutdownController . signal } )
151
+ ) ;
144
152
} catch ( error ) {
145
153
this . logger . error ( {
146
154
id : LogId . exportReadError ,
@@ -181,7 +189,7 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
181
189
exportStatus : "in-progress" ,
182
190
} ) ;
183
191
184
- void this . startExport ( { input, jsonExportFormat, inProgressExport } ) ;
192
+ void this . trackOperation ( this . startExport ( { input, jsonExportFormat, inProgressExport } ) ) ;
185
193
return inProgressExport ;
186
194
} catch ( error ) {
187
195
this . logger . error ( {
@@ -206,11 +214,10 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
206
214
try {
207
215
await fs . mkdir ( this . exportsDirectoryPath , { recursive : true } ) ;
208
216
const outputStream = createWriteStream ( inProgressExport . exportPath ) ;
209
- await pipeline ( [
210
- input . stream ( ) ,
211
- this . docToEJSONStream ( this . getEJSONOptionsForFormat ( jsonExportFormat ) ) ,
212
- outputStream ,
213
- ] ) ;
217
+ await pipeline (
218
+ [ input . stream ( ) , this . docToEJSONStream ( this . getEJSONOptionsForFormat ( jsonExportFormat ) ) , outputStream ] ,
219
+ { signal : this . shutdownController . signal }
220
+ ) ;
214
221
pipeSuccessful = true ;
215
222
} catch ( error ) {
216
223
this . logger . error ( {
@@ -281,7 +288,7 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
281
288
}
282
289
283
290
private async cleanupExpiredExports ( ) : Promise < void > {
284
- if ( this . exportsCleanupInProgress || this . isShuttingDown ) {
291
+ if ( this . exportsCleanupInProgress ) {
285
292
return ;
286
293
}
287
294
@@ -291,6 +298,9 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
291
298
) ;
292
299
try {
293
300
for ( const { exportPath, exportCreatedAt, exportURI, exportName } of exportsForCleanup ) {
301
+ if ( this . shutdownController . signal . aborted ) {
302
+ break ;
303
+ }
294
304
if ( isExportExpired ( exportCreatedAt , this . config . exportTimeoutMs ) ) {
295
305
delete this . storedExports [ exportName ] ;
296
306
await this . silentlyRemoveExport (
@@ -330,11 +340,42 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
330
340
}
331
341
332
342
private assertIsNotShuttingDown ( ) : void {
333
- if ( this . isShuttingDown ) {
343
+ if ( this . shutdownController . signal . aborted ) {
334
344
throw new Error ( "ExportsManager is shutting down." ) ;
335
345
}
336
346
}
337
347
348
+ private async trackOperation < T > ( promise : Promise < T > ) : Promise < T > {
349
+ this . activeOperations . add ( promise ) ;
350
+ try {
351
+ return await promise ;
352
+ } finally {
353
+ this . activeOperations . delete ( promise ) ;
354
+ }
355
+ }
356
+
357
+ private async waitForActiveOperationsToSettle ( timeoutMs : number ) : Promise < void > {
358
+ const pendingPromises = Array . from ( this . activeOperations ) ;
359
+ if ( pendingPromises . length === 0 ) {
360
+ return ;
361
+ }
362
+ let timedOut = false ;
363
+ const timeoutPromise = new Promise < void > ( ( resolve ) =>
364
+ setTimeout ( ( ) => {
365
+ timedOut = true ;
366
+ resolve ( ) ;
367
+ } , timeoutMs )
368
+ ) ;
369
+ await Promise . race ( [ Promise . allSettled ( pendingPromises ) , timeoutPromise ] ) ;
370
+ if ( timedOut && this . activeOperations . size > 0 ) {
371
+ this . logger . error ( {
372
+ id : LogId . exportCloseError ,
373
+ context : `Close timed out waiting for ${ this . activeOperations . size } operation(s) to settle` ,
374
+ message : "Proceeding to force cleanup after timeout" ,
375
+ } ) ;
376
+ }
377
+ }
378
+
338
379
static init (
339
380
config : ExportsManagerConfig ,
340
381
logger : LoggerBase ,
0 commit comments