@@ -6,15 +6,8 @@ import { Connection, ConnectionOptions } from '../cmap/connection';
6
6
import { LEGACY_HELLO_COMMAND } from '../constants' ;
7
7
import { MongoError , MongoErrorLabel } from '../error' ;
8
8
import { CancellationToken , TypedEventEmitter } from '../mongo_types' ;
9
- import type { Callback , InterruptibleAsyncInterval } from '../utils' ;
10
- import {
11
- calculateDurationInMs ,
12
- EventEmitterWithState ,
13
- makeInterruptibleAsyncInterval ,
14
- makeStateMachine ,
15
- now ,
16
- ns
17
- } from '../utils' ;
9
+ import type { Callback } from '../utils' ;
10
+ import { calculateDurationInMs , EventEmitterWithState , makeStateMachine , now , ns } from '../utils' ;
18
11
import { ServerType , STATE_CLOSED , STATE_CLOSING } from './common' ;
19
12
import {
20
13
ServerHeartbeatFailedEvent ,
@@ -87,7 +80,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
87
80
[ kConnection ] ?: Connection ;
88
81
[ kCancellationToken ] : CancellationToken ;
89
82
/** @internal */
90
- [ kMonitorId ] ?: InterruptibleAsyncInterval ;
83
+ [ kMonitorId ] ?: MonitorInterval ;
91
84
[ kRTTPinger ] ?: RTTPinger ;
92
85
93
86
get connection ( ) : Connection | undefined {
@@ -150,9 +143,9 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
150
143
// start
151
144
const heartbeatFrequencyMS = this . options . heartbeatFrequencyMS ;
152
145
const minHeartbeatFrequencyMS = this . options . minHeartbeatFrequencyMS ;
153
- this [ kMonitorId ] = makeInterruptibleAsyncInterval ( monitorServer ( this ) , {
154
- interval : heartbeatFrequencyMS ,
155
- minInterval : minHeartbeatFrequencyMS ,
146
+ this [ kMonitorId ] = new MonitorInterval ( monitorServer ( this ) , {
147
+ heartbeatFrequencyMS : heartbeatFrequencyMS ,
148
+ minHeartbeatFrequencyMS : minHeartbeatFrequencyMS ,
156
149
immediate : true
157
150
} ) ;
158
151
}
@@ -180,9 +173,9 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
180
173
// restart monitoring
181
174
const heartbeatFrequencyMS = this . options . heartbeatFrequencyMS ;
182
175
const minHeartbeatFrequencyMS = this . options . minHeartbeatFrequencyMS ;
183
- this [ kMonitorId ] = makeInterruptibleAsyncInterval ( monitorServer ( this ) , {
184
- interval : heartbeatFrequencyMS ,
185
- minInterval : minHeartbeatFrequencyMS
176
+ this [ kMonitorId ] = new MonitorInterval ( monitorServer ( this ) , {
177
+ heartbeatFrequencyMS : heartbeatFrequencyMS ,
178
+ minHeartbeatFrequencyMS : minHeartbeatFrequencyMS
186
179
} ) ;
187
180
}
188
181
@@ -466,3 +459,130 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
466
459
measureAndReschedule ( ) ;
467
460
} ) ;
468
461
}
462
+
463
+ /**
464
+ * @internal
465
+ */
466
+ export interface MonitorIntervalOptions {
467
+ /** The interval to execute a method on */
468
+ heartbeatFrequencyMS : number ;
469
+ /** A minimum interval that must elapse before the method is called */
470
+ minHeartbeatFrequencyMS : number ;
471
+ /** Whether the method should be called immediately when the interval is started */
472
+ immediate : boolean ;
473
+
474
+ /**
475
+ * Only used for testing unreliable timer environments
476
+ * @internal
477
+ */
478
+ clock : ( ) => number ;
479
+ }
480
+
481
+ /**
482
+ * @internal
483
+ */
484
+ export class MonitorInterval {
485
+ fn : ( callback : Callback ) => void ;
486
+ timerId : NodeJS . Timeout | undefined ;
487
+ lastCallTime : number ;
488
+ isExpeditedCheckScheduled = false ;
489
+ stopped = false ;
490
+
491
+ heartbeatFrequencyMS : number ;
492
+ minHeartbeatFrequencyMS : number ;
493
+ clock : ( ) => number ;
494
+
495
+ constructor ( fn : ( callback : Callback ) => void , options : Partial < MonitorIntervalOptions > = { } ) {
496
+ this . fn = fn ;
497
+ this . lastCallTime = 0 ;
498
+
499
+ this . heartbeatFrequencyMS = options . heartbeatFrequencyMS ?? 1000 ;
500
+ this . minHeartbeatFrequencyMS = options . minHeartbeatFrequencyMS ?? 500 ;
501
+ this . clock = typeof options . clock === 'function' ? options . clock : now ;
502
+
503
+ if ( options . immediate ) {
504
+ this . _executeAndReschedule ( ) ;
505
+ } else {
506
+ this . lastCallTime = this . clock ( ) ;
507
+ this . _reschedule ( undefined ) ;
508
+ }
509
+ }
510
+
511
+ wake ( ) {
512
+ const currentTime = this . clock ( ) ;
513
+ const nextScheduledCallTime = this . lastCallTime + this . heartbeatFrequencyMS ;
514
+ const timeUntilNextCall = nextScheduledCallTime - currentTime ;
515
+
516
+ // For the streaming protocol: there is nothing obviously stopping this
517
+ // interval from being woken up again while we are waiting "infinitely"
518
+ // for `fn` to be called again`. Since the function effectively
519
+ // never completes, the `timeUntilNextCall` will continue to grow
520
+ // negatively unbounded, so it will never trigger a reschedule here.
521
+
522
+ // This is possible in virtualized environments like AWS Lambda where our
523
+ // clock is unreliable. In these cases the timer is "running" but never
524
+ // actually completes, so we want to execute immediately and then attempt
525
+ // to reschedule.
526
+ if ( timeUntilNextCall < 0 ) {
527
+ this . _executeAndReschedule ( ) ;
528
+ return ;
529
+ }
530
+
531
+ // debounce multiple calls to wake within the `minInterval`
532
+ if ( this . isExpeditedCheckScheduled ) {
533
+ return ;
534
+ }
535
+
536
+ // reschedule a call as soon as possible, ensuring the call never happens
537
+ // faster than the `minInterval`
538
+ if ( timeUntilNextCall > this . minHeartbeatFrequencyMS ) {
539
+ this . _reschedule ( this . minHeartbeatFrequencyMS ) ;
540
+ this . isExpeditedCheckScheduled = true ;
541
+ }
542
+ }
543
+
544
+ stop ( ) {
545
+ this . stopped = true ;
546
+ if ( this . timerId ) {
547
+ clearTimeout ( this . timerId ) ;
548
+ this . timerId = undefined ;
549
+ }
550
+
551
+ this . lastCallTime = 0 ;
552
+ this . isExpeditedCheckScheduled = false ;
553
+ }
554
+
555
+ toString ( ) {
556
+ return JSON . stringify ( this ) ;
557
+ }
558
+
559
+ toJSON ( ) {
560
+ return {
561
+ timerId : this . timerId != null ? 'set' : 'cleared' ,
562
+ lastCallTime : this . lastCallTime ,
563
+ isExpeditedCheckScheduled : this . isExpeditedCheckScheduled ,
564
+ stopped : this . stopped ,
565
+ heartbeatFrequencyMS : this . heartbeatFrequencyMS ,
566
+ minHeartbeatFrequencyMS : this . minHeartbeatFrequencyMS
567
+ } ;
568
+ }
569
+
570
+ private _reschedule ( ms ?: number ) {
571
+ if ( this . stopped ) return ;
572
+ if ( this . timerId ) {
573
+ clearTimeout ( this . timerId ) ;
574
+ }
575
+
576
+ this . timerId = setTimeout ( this . _executeAndReschedule , ms || this . heartbeatFrequencyMS ) ;
577
+ }
578
+
579
+ private _executeAndReschedule = ( ) => {
580
+ this . isExpeditedCheckScheduled = false ;
581
+ this . lastCallTime = this . clock ( ) ;
582
+
583
+ this . fn ( err => {
584
+ if ( err ) throw err ;
585
+ this . _reschedule ( this . heartbeatFrequencyMS ) ;
586
+ } ) ;
587
+ } ;
588
+ }
0 commit comments