@@ -75,6 +75,8 @@ const kHello = Symbol('hello');
75
75
const kAutoEncrypter = Symbol ( 'autoEncrypter' ) ;
76
76
/** @internal */
77
77
const kFullResult = Symbol ( 'fullResult' ) ;
78
+ /** @internal */
79
+ const kDelayedTimeoutId = Symbol ( 'delayedTimeoutId' ) ;
78
80
79
81
/** @internal */
80
82
export interface QueryOptions extends BSONSerializeOptions {
@@ -199,6 +201,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
199
201
lastHelloMS ?: number ;
200
202
serverApi ?: ServerApi ;
201
203
helloOk ?: boolean ;
204
+
205
+ /**@internal */
206
+ [ kDelayedTimeoutId ] : NodeJS . Timeout | null ;
202
207
/** @internal */
203
208
[ kDescription ] : StreamDescription ;
204
209
/** @internal */
@@ -253,19 +258,21 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
253
258
...options ,
254
259
maxBsonMessageSize : this . hello ?. maxBsonMessageSize
255
260
} ) ;
256
- this [ kMessageStream ] . on ( 'message' , messageHandler ( this ) ) ;
257
261
this [ kStream ] = stream ;
258
- stream . on ( 'error' , ( ) => {
262
+
263
+ this [ kDelayedTimeoutId ] = null ;
264
+
265
+ this [ kMessageStream ] . on ( 'message' , message => this . onMessage ( message ) ) ;
266
+ this [ kMessageStream ] . on ( 'error' , error => this . onError ( error ) ) ;
267
+ this [ kStream ] . on ( 'close' , ( ) => this . onClose ( ) ) ;
268
+ this [ kStream ] . on ( 'timeout' , ( ) => this . onTimeout ( ) ) ;
269
+ this [ kStream ] . on ( 'error' , ( ) => {
259
270
/* ignore errors, listen to `close` instead */
260
271
} ) ;
261
272
262
- this [ kMessageStream ] . on ( 'error' , error => this . handleIssue ( { destroy : error } ) ) ;
263
- stream . on ( 'close' , ( ) => this . handleIssue ( { isClose : true } ) ) ;
264
- stream . on ( 'timeout' , ( ) => this . handleIssue ( { isTimeout : true , destroy : true } ) ) ;
265
-
266
273
// hook the message stream up to the passed in stream
267
- stream . pipe ( this [ kMessageStream ] ) ;
268
- this [ kMessageStream ] . pipe ( stream ) ;
274
+ this [ kStream ] . pipe ( this [ kMessageStream ] ) ;
275
+ this [ kMessageStream ] . pipe ( this [ kStream ] ) ;
269
276
}
270
277
271
278
get description ( ) : StreamDescription {
@@ -317,40 +324,133 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
317
324
this [ kLastUseTime ] = now ( ) ;
318
325
}
319
326
320
- handleIssue ( issue : { isTimeout ?: boolean ; isClose ?: boolean ; destroy ?: boolean | Error } ) : void {
327
+ onError ( error : Error ) {
321
328
if ( this . closed ) {
322
329
return ;
323
330
}
324
331
325
- if ( issue . destroy ) {
326
- this [ kStream ] . destroy ( typeof issue . destroy === 'boolean' ? undefined : issue . destroy ) ;
332
+ this [ kStream ] . destroy ( error ) ;
333
+
334
+ this . closed = true ;
335
+
336
+ for ( const op of this [ kQueue ] . values ( ) ) {
337
+ op . cb ( error ) ;
338
+ }
339
+
340
+ this [ kQueue ] . clear ( ) ;
341
+ this . emit ( Connection . CLOSE ) ;
342
+ }
343
+
344
+ onClose ( ) {
345
+ if ( this . closed ) {
346
+ return ;
327
347
}
328
348
329
349
this . closed = true ;
330
350
331
- for ( const [ , op ] of this [ kQueue ] ) {
332
- if ( issue . isTimeout ) {
333
- op . cb (
334
- new MongoNetworkTimeoutError ( `connection ${ this . id } to ${ this . address } timed out` , {
335
- beforeHandshake : this . hello == null
336
- } )
337
- ) ;
338
- } else if ( issue . isClose ) {
339
- op . cb ( new MongoNetworkError ( `connection ${ this . id } to ${ this . address } closed` ) ) ;
340
- } else {
341
- op . cb ( typeof issue . destroy === 'boolean' ? undefined : issue . destroy ) ;
342
- }
351
+ const message = `connection ${ this . id } to ${ this . address } closed` ;
352
+ for ( const op of this [ kQueue ] . values ( ) ) {
353
+ op . cb ( new MongoNetworkError ( message ) ) ;
343
354
}
344
355
345
356
this [ kQueue ] . clear ( ) ;
346
357
this . emit ( Connection . CLOSE ) ;
347
358
}
348
359
349
- destroy ( ) : void ;
350
- destroy ( callback : Callback ) : void ;
351
- destroy ( options : DestroyOptions ) : void ;
352
- destroy ( options : DestroyOptions , callback : Callback ) : void ;
353
- destroy ( options ?: DestroyOptions | Callback , callback ?: Callback ) : void {
360
+ onTimeout ( ) {
361
+ if ( this . closed ) {
362
+ return ;
363
+ }
364
+
365
+ this [ kDelayedTimeoutId ] = setTimeout ( ( ) => {
366
+ this [ kStream ] . destroy ( ) ;
367
+
368
+ this . closed = true ;
369
+
370
+ const message = `connection ${ this . id } to ${ this . address } timed out` ;
371
+ const beforeHandshake = this . hello == null ;
372
+ for ( const op of this [ kQueue ] . values ( ) ) {
373
+ op . cb ( new MongoNetworkTimeoutError ( message , { beforeHandshake } ) ) ;
374
+ }
375
+
376
+ this [ kQueue ] . clear ( ) ;
377
+ this . emit ( Connection . CLOSE ) ;
378
+ } , 1 ) . unref ( ) ; // No need for this timer to hold the event loop open
379
+ }
380
+
381
+ onMessage ( message : BinMsg | Response ) {
382
+ const delayedTimeoutId = this [ kDelayedTimeoutId ] ;
383
+ if ( delayedTimeoutId != null ) {
384
+ clearTimeout ( delayedTimeoutId ) ;
385
+ this [ kDelayedTimeoutId ] = null ;
386
+ }
387
+
388
+ // always emit the message, in case we are streaming
389
+ this . emit ( 'message' , message ) ;
390
+ const operationDescription = this [ kQueue ] . get ( message . responseTo ) ;
391
+ if ( ! operationDescription ) {
392
+ return ;
393
+ }
394
+
395
+ const callback = operationDescription . cb ;
396
+
397
+ // SERVER-45775: For exhaust responses we should be able to use the same requestId to
398
+ // track response, however the server currently synthetically produces remote requests
399
+ // making the `responseTo` change on each response
400
+ this [ kQueue ] . delete ( message . responseTo ) ;
401
+ if ( 'moreToCome' in message && message . moreToCome ) {
402
+ // requeue the callback for next synthetic request
403
+ this [ kQueue ] . set ( message . requestId , operationDescription ) ;
404
+ } else if ( operationDescription . socketTimeoutOverride ) {
405
+ this [ kStream ] . setTimeout ( this . socketTimeoutMS ) ;
406
+ }
407
+
408
+ try {
409
+ // Pass in the entire description because it has BSON parsing options
410
+ message . parse ( operationDescription ) ;
411
+ } catch ( err ) {
412
+ // If this error is generated by our own code, it will already have the correct class applied
413
+ // if it is not, then it is coming from a catastrophic data parse failure or the BSON library
414
+ // in either case, it should not be wrapped
415
+ callback ( err ) ;
416
+ return ;
417
+ }
418
+
419
+ if ( message . documents [ 0 ] ) {
420
+ const document : Document = message . documents [ 0 ] ;
421
+ const session = operationDescription . session ;
422
+ if ( session ) {
423
+ updateSessionFromResponse ( session , document ) ;
424
+ }
425
+
426
+ if ( document . $clusterTime ) {
427
+ this [ kClusterTime ] = document . $clusterTime ;
428
+ this . emit ( Connection . CLUSTER_TIME_RECEIVED , document . $clusterTime ) ;
429
+ }
430
+
431
+ if ( operationDescription . command ) {
432
+ if ( document . writeConcernError ) {
433
+ callback ( new MongoWriteConcernError ( document . writeConcernError , document ) ) ;
434
+ return ;
435
+ }
436
+
437
+ if ( document . ok === 0 || document . $err || document . errmsg || document . code ) {
438
+ callback ( new MongoServerError ( document ) ) ;
439
+ return ;
440
+ }
441
+ } else {
442
+ // Pre 3.2 support
443
+ if ( document . ok === 0 || document . $err || document . errmsg ) {
444
+ callback ( new MongoServerError ( document ) ) ;
445
+ return ;
446
+ }
447
+ }
448
+ }
449
+
450
+ callback ( undefined , operationDescription . fullResult ? message : message . documents [ 0 ] ) ;
451
+ }
452
+
453
+ destroy ( options ?: DestroyOptions , callback ?: Callback ) : void {
354
454
if ( typeof options === 'function' ) {
355
455
callback = options ;
356
456
options = { force : false } ;
@@ -387,7 +487,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
387
487
} ) ;
388
488
}
389
489
390
- /** @internal */
391
490
command (
392
491
ns : MongoDBNamespace ,
393
492
cmd : Document ,
@@ -464,7 +563,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
464
563
}
465
564
}
466
565
467
- /** @internal */
468
566
query ( ns : MongoDBNamespace , cmd : Document , options : QueryOptions , callback : Callback ) : void {
469
567
const isExplain = cmd . $explain != null ;
470
568
const readPreference = options . readPreference ?? ReadPreference . primary ;
@@ -537,7 +635,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
537
635
) ;
538
636
}
539
637
540
- /** @internal */
541
638
getMore (
542
639
ns : MongoDBNamespace ,
543
640
cursorId : Long ,
@@ -599,7 +696,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
599
696
this . command ( ns , getMoreCmd , commandOptions , callback ) ;
600
697
}
601
698
602
- /** @internal */
603
699
killCursors (
604
700
ns : MongoDBNamespace ,
605
701
cursorIds : Long [ ] ,
@@ -719,74 +815,6 @@ function supportsOpMsg(conn: Connection) {
719
815
return maxWireVersion ( conn ) >= 6 && ! description . __nodejs_mock_server__ ;
720
816
}
721
817
722
- function messageHandler ( conn : Connection ) {
723
- return function messageHandler ( message : BinMsg | Response ) {
724
- // always emit the message, in case we are streaming
725
- conn . emit ( 'message' , message ) ;
726
- const operationDescription = conn [ kQueue ] . get ( message . responseTo ) ;
727
- if ( ! operationDescription ) {
728
- return ;
729
- }
730
-
731
- const callback = operationDescription . cb ;
732
-
733
- // SERVER-45775: For exhaust responses we should be able to use the same requestId to
734
- // track response, however the server currently synthetically produces remote requests
735
- // making the `responseTo` change on each response
736
- conn [ kQueue ] . delete ( message . responseTo ) ;
737
- if ( 'moreToCome' in message && message . moreToCome ) {
738
- // requeue the callback for next synthetic request
739
- conn [ kQueue ] . set ( message . requestId , operationDescription ) ;
740
- } else if ( operationDescription . socketTimeoutOverride ) {
741
- conn [ kStream ] . setTimeout ( conn . socketTimeoutMS ) ;
742
- }
743
-
744
- try {
745
- // Pass in the entire description because it has BSON parsing options
746
- message . parse ( operationDescription ) ;
747
- } catch ( err ) {
748
- // If this error is generated by our own code, it will already have the correct class applied
749
- // if it is not, then it is coming from a catastrophic data parse failure or the BSON library
750
- // in either case, it should not be wrapped
751
- callback ( err ) ;
752
- return ;
753
- }
754
-
755
- if ( message . documents [ 0 ] ) {
756
- const document : Document = message . documents [ 0 ] ;
757
- const session = operationDescription . session ;
758
- if ( session ) {
759
- updateSessionFromResponse ( session , document ) ;
760
- }
761
-
762
- if ( document . $clusterTime ) {
763
- conn [ kClusterTime ] = document . $clusterTime ;
764
- conn . emit ( Connection . CLUSTER_TIME_RECEIVED , document . $clusterTime ) ;
765
- }
766
-
767
- if ( operationDescription . command ) {
768
- if ( document . writeConcernError ) {
769
- callback ( new MongoWriteConcernError ( document . writeConcernError , document ) ) ;
770
- return ;
771
- }
772
-
773
- if ( document . ok === 0 || document . $err || document . errmsg || document . code ) {
774
- callback ( new MongoServerError ( document ) ) ;
775
- return ;
776
- }
777
- } else {
778
- // Pre 3.2 support
779
- if ( document . ok === 0 || document . $err || document . errmsg ) {
780
- callback ( new MongoServerError ( document ) ) ;
781
- return ;
782
- }
783
- }
784
- }
785
-
786
- callback ( undefined , operationDescription . fullResult ? message : message . documents [ 0 ] ) ;
787
- } ;
788
- }
789
-
790
818
function streamIdentifier ( stream : Stream , options : ConnectionOptions ) : string {
791
819
if ( options . proxyHost ) {
792
820
// If proxy options are specified, the properties of `stream` itself
0 commit comments