@@ -28,14 +28,15 @@ const exportsManagerConfig: ExportsManagerConfig = {
28
28
29
29
function getExportNameAndPath (
30
30
sessionId : string ,
31
- timestamp : number
31
+ timestamp : number = Date . now ( ) ,
32
+ objectId : string = new ObjectId ( ) . toString ( )
32
33
) : {
33
34
sessionExportsPath : string ;
34
35
exportName : string ;
35
36
exportPath : string ;
36
37
exportURI : string ;
37
38
} {
38
- const exportName = `foo.bar.${ timestamp } .json` ;
39
+ const exportName = `foo.bar.${ timestamp } .${ objectId } . json` ;
39
40
const sessionExportsPath = path . join ( exportsPath , sessionId ) ;
40
41
const exportPath = path . join ( sessionExportsPath , exportName ) ;
41
42
return {
@@ -48,22 +49,21 @@ function getExportNameAndPath(
48
49
49
50
function createDummyFindCursor (
50
51
dataArray : unknown [ ] ,
51
- chunkPushTimeoutMs ?: number
52
+ beforeEachChunk ?: ( chunkIndex : number ) => void | Promise < void >
52
53
) : { cursor : FindCursor ; cursorCloseNotification : Promise < void > } {
53
54
let index = 0 ;
54
55
const readable = new Readable ( {
55
56
objectMode : true ,
56
57
async read ( ) : Promise < void > {
57
- if ( index < dataArray . length ) {
58
- if ( chunkPushTimeoutMs ) {
59
- await timeout ( chunkPushTimeoutMs ) ;
58
+ try {
59
+ await beforeEachChunk ?.( index ) ;
60
+ if ( index < dataArray . length ) {
61
+ this . push ( dataArray [ index ++ ] ) ;
62
+ } else {
63
+ this . push ( null ) ;
60
64
}
61
- this . push ( dataArray [ index ++ ] ) ;
62
- } else {
63
- if ( chunkPushTimeoutMs ) {
64
- await timeout ( chunkPushTimeoutMs ) ;
65
- }
66
- this . push ( null ) ;
65
+ } catch ( error ) {
66
+ this . destroy ( error as Error ) ;
67
67
}
68
68
} ,
69
69
} ) ;
@@ -90,6 +90,13 @@ function createDummyFindCursor(
90
90
} ;
91
91
}
92
92
93
+ function createDummyFindCursorWithDelay (
94
+ dataArray : unknown [ ] ,
95
+ delayMs : number
96
+ ) : { cursor : FindCursor ; cursorCloseNotification : Promise < void > } {
97
+ return createDummyFindCursor ( dataArray , ( ) => timeout ( delayMs ) ) ;
98
+ }
99
+
93
100
async function fileExists ( filePath : string ) : Promise < boolean > {
94
101
try {
95
102
await fs . access ( filePath ) ;
@@ -125,15 +132,15 @@ describe("ExportsManager unit test", () => {
125
132
describe ( "#availableExport" , ( ) => {
126
133
it ( "should list only the exports that are in ready state" , async ( ) => {
127
134
// This export will finish in at-least 1 second
128
- const { exportName : exportName1 } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ;
135
+ const { exportName : exportName1 } = getExportNameAndPath ( session . sessionId ) ;
129
136
manager . createJSONExport ( {
130
- input : createDummyFindCursor ( [ { name : "Test1" } ] , 1000 ) . cursor ,
137
+ input : createDummyFindCursorWithDelay ( [ { name : "Test1" } ] , 1000 ) . cursor ,
131
138
exportName : exportName1 ,
132
139
jsonExportFormat : "relaxed" ,
133
140
} ) ;
134
141
135
142
// This export will finish way sooner than the first one
136
- const { exportName : exportName2 } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ;
143
+ const { exportName : exportName2 } = getExportNameAndPath ( session . sessionId ) ;
137
144
const { cursor, cursorCloseNotification } = createDummyFindCursor ( [ { name : "Test1" } ] ) ;
138
145
manager . createJSONExport ( {
139
146
input : cursor ,
@@ -154,8 +161,8 @@ describe("ExportsManager unit test", () => {
154
161
} ) ;
155
162
156
163
it ( "should throw if the resource is still being generated" , async ( ) => {
157
- const { exportName } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ;
158
- const { cursor } = createDummyFindCursor ( [ { name : "Test1" } ] , 100 ) ;
164
+ const { exportName } = getExportNameAndPath ( session . sessionId ) ;
165
+ const { cursor } = createDummyFindCursorWithDelay ( [ { name : "Test1" } ] , 100 ) ;
159
166
manager . createJSONExport ( {
160
167
input : cursor ,
161
168
exportName,
@@ -168,7 +175,7 @@ describe("ExportsManager unit test", () => {
168
175
} ) ;
169
176
170
177
it ( "should return the resource content if the resource is ready to be consumed" , async ( ) => {
171
- const { exportName } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ;
178
+ const { exportName } = getExportNameAndPath ( session . sessionId ) ;
172
179
const { cursor, cursorCloseNotification } = createDummyFindCursor ( [ ] ) ;
173
180
manager . createJSONExport ( {
174
181
input : cursor ,
@@ -198,7 +205,7 @@ describe("ExportsManager unit test", () => {
198
205
longNumber : Long . fromNumber ( 123456 ) ,
199
206
} ,
200
207
] ) ) ;
201
- ( { exportName, exportPath, exportURI } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ) ;
208
+ ( { exportName, exportPath, exportURI } = getExportNameAndPath ( session . sessionId ) ) ;
202
209
} ) ;
203
210
204
211
describe ( "when cursor is empty" , ( ) => {
@@ -304,31 +311,37 @@ describe("ExportsManager unit test", () => {
304
311
} ) ;
305
312
} ) ;
306
313
307
- describe ( "when there is an error in export generation " , ( ) => {
314
+ describe ( "when there is an error during stream transform " , ( ) => {
308
315
it ( "should remove the partial export and never make it available" , async ( ) => {
309
316
const emitSpy = vi . spyOn ( manager , "emit" ) ;
310
317
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any
311
318
( manager as any ) . docToEJSONStream = function ( ejsonOptions : EJSONOptions | undefined ) : Transform {
312
319
let docsTransformed = 0 ;
313
320
return new Transform ( {
314
321
objectMode : true ,
315
- transform : function ( chunk : unknown , encoding , callback ) : void {
316
- ++ docsTransformed ;
322
+ transform ( chunk : unknown , encoding , callback ) : void {
317
323
try {
318
- if ( docsTransformed === 1 ) {
324
+ const doc = EJSON . stringify ( chunk , undefined , undefined , ejsonOptions ) ;
325
+ if ( docsTransformed === 0 ) {
326
+ this . push ( "[" + doc ) ;
327
+ } else if ( docsTransformed === 1 ) {
319
328
throw new Error ( "Could not transform the chunk!" ) ;
329
+ } else {
330
+ this . push ( ",\n" + doc ) ;
320
331
}
321
- const doc : string = EJSON . stringify ( chunk , undefined , 2 , ejsonOptions ) ;
322
- const line = `${ docsTransformed > 1 ? ",\n" : "" } ${ doc } ` ;
323
-
324
- callback ( null , line ) ;
325
- } catch ( err : unknown ) {
332
+ docsTransformed ++ ;
333
+ callback ( ) ;
334
+ } catch ( err ) {
326
335
callback ( err as Error ) ;
327
336
}
328
337
} ,
329
- final : function ( callback ) : void {
330
- this . push ( "]" ) ;
331
- callback ( null ) ;
338
+ flush ( this : Transform , cb ) : void {
339
+ if ( docsTransformed === 0 ) {
340
+ this . push ( "[]" ) ;
341
+ } else {
342
+ this . push ( "]" ) ;
343
+ }
344
+ cb ( ) ;
332
345
} ,
333
346
} ) ;
334
347
} ;
@@ -348,6 +361,33 @@ describe("ExportsManager unit test", () => {
348
361
expect ( await fileExists ( exportPath ) ) . toEqual ( false ) ;
349
362
} ) ;
350
363
} ) ;
364
+
365
+ describe ( "when there is an error on read stream" , ( ) => {
366
+ it ( "should remove the partial export and never make it available" , async ( ) => {
367
+ const emitSpy = vi . spyOn ( manager , "emit" ) ;
368
+ // A cursor that will make the read stream fail after the first chunk
369
+ const { cursor, cursorCloseNotification } = createDummyFindCursor ( [ { name : "Test1" } ] , ( chunkIndex ) => {
370
+ if ( chunkIndex > 0 ) {
371
+ return Promise . reject ( new Error ( "Connection timedout!" ) ) ;
372
+ }
373
+ return Promise . resolve ( ) ;
374
+ } ) ;
375
+ manager . createJSONExport ( {
376
+ input : cursor ,
377
+ exportName,
378
+ jsonExportFormat : "relaxed" ,
379
+ } ) ;
380
+ await cursorCloseNotification ;
381
+
382
+ // Because the export was never populated in the available exports.
383
+ await expect ( ( ) => manager . readExport ( exportName ) ) . rejects . toThrow (
384
+ "Requested export has either expired or does not exist!"
385
+ ) ;
386
+ expect ( emitSpy ) . not . toHaveBeenCalled ( ) ;
387
+ expect ( manager . availableExports ) . toEqual ( [ ] ) ;
388
+ expect ( await fileExists ( exportPath ) ) . toEqual ( false ) ;
389
+ } ) ;
390
+ } ) ;
351
391
} ) ;
352
392
353
393
describe ( "#cleanupExpiredExports" , ( ) => {
@@ -368,7 +408,7 @@ describe("ExportsManager unit test", () => {
368
408
} ) ;
369
409
370
410
it ( "should not clean up in-progress exports" , async ( ) => {
371
- const { exportName } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ;
411
+ const { exportName } = getExportNameAndPath ( session . sessionId ) ;
372
412
const manager = ExportsManager . init (
373
413
session . sessionId ,
374
414
{
@@ -378,7 +418,7 @@ describe("ExportsManager unit test", () => {
378
418
} ,
379
419
new CompositeLogger ( )
380
420
) ;
381
- const { cursor } = createDummyFindCursor ( [ { name : "Test" } ] , 2000 ) ;
421
+ const { cursor } = createDummyFindCursorWithDelay ( [ { name : "Test" } ] , 2000 ) ;
382
422
manager . createJSONExport ( {
383
423
input : cursor ,
384
424
exportName,
@@ -395,7 +435,7 @@ describe("ExportsManager unit test", () => {
395
435
} ) ;
396
436
397
437
it ( "should cleanup expired exports" , async ( ) => {
398
- const { exportName, exportPath, exportURI } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ;
438
+ const { exportName, exportPath, exportURI } = getExportNameAndPath ( session . sessionId ) ;
399
439
const manager = ExportsManager . init (
400
440
session . sessionId ,
401
441
{
0 commit comments