Skip to content

Commit 9883993

Browse files
refactor(NODE-4637): clean up async interval (mongodb#3411)
1 parent 64b3ee9 commit 9883993

File tree

7 files changed

+704
-689
lines changed

7 files changed

+704
-689
lines changed

src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,8 @@ export type { ClusterTime, TimerQueue } from './sdam/common';
431431
export type {
432432
Monitor,
433433
MonitorEvents,
434+
MonitorInterval,
435+
MonitorIntervalOptions,
434436
MonitorOptions,
435437
MonitorPrivate,
436438
RTTPinger,
@@ -475,7 +477,6 @@ export type {
475477
ClientMetadataOptions,
476478
EventEmitterWithState,
477479
HostAddress,
478-
InterruptibleAsyncInterval,
479480
MongoDBNamespace
480481
} from './utils';
481482
export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern';

src/sdam/monitor.ts

Lines changed: 136 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,8 @@ import { Connection, ConnectionOptions } from '../cmap/connection';
66
import { LEGACY_HELLO_COMMAND } from '../constants';
77
import { MongoError, MongoErrorLabel } from '../error';
88
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
9-
import type { Callback, InterruptibleAsyncInterval } from '../utils';
10-
import {
11-
calculateDurationInMs,
12-
EventEmitterWithState,
13-
makeInterruptibleAsyncInterval,
14-
makeStateMachine,
15-
now,
16-
ns
17-
} from '../utils';
9+
import type { Callback } from '../utils';
10+
import { calculateDurationInMs, EventEmitterWithState, makeStateMachine, now, ns } from '../utils';
1811
import { ServerType, STATE_CLOSED, STATE_CLOSING } from './common';
1912
import {
2013
ServerHeartbeatFailedEvent,
@@ -87,7 +80,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
8780
[kConnection]?: Connection;
8881
[kCancellationToken]: CancellationToken;
8982
/** @internal */
90-
[kMonitorId]?: InterruptibleAsyncInterval;
83+
[kMonitorId]?: MonitorInterval;
9184
[kRTTPinger]?: RTTPinger;
9285

9386
get connection(): Connection | undefined {
@@ -150,9 +143,9 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
150143
// start
151144
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
152145
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
153-
this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), {
154-
interval: heartbeatFrequencyMS,
155-
minInterval: minHeartbeatFrequencyMS,
146+
this[kMonitorId] = new MonitorInterval(monitorServer(this), {
147+
heartbeatFrequencyMS: heartbeatFrequencyMS,
148+
minHeartbeatFrequencyMS: minHeartbeatFrequencyMS,
156149
immediate: true
157150
});
158151
}
@@ -180,9 +173,9 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
180173
// restart monitoring
181174
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
182175
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
183-
this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), {
184-
interval: heartbeatFrequencyMS,
185-
minInterval: minHeartbeatFrequencyMS
176+
this[kMonitorId] = new MonitorInterval(monitorServer(this), {
177+
heartbeatFrequencyMS: heartbeatFrequencyMS,
178+
minHeartbeatFrequencyMS: minHeartbeatFrequencyMS
186179
});
187180
}
188181

@@ -466,3 +459,130 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
466459
measureAndReschedule();
467460
});
468461
}
462+
463+
/**
464+
* @internal
465+
*/
466+
export interface MonitorIntervalOptions {
467+
/** The interval to execute a method on */
468+
heartbeatFrequencyMS: number;
469+
/** A minimum interval that must elapse before the method is called */
470+
minHeartbeatFrequencyMS: number;
471+
/** Whether the method should be called immediately when the interval is started */
472+
immediate: boolean;
473+
474+
/**
475+
* Only used for testing unreliable timer environments
476+
* @internal
477+
*/
478+
clock: () => number;
479+
}
480+
481+
/**
482+
* @internal
483+
*/
484+
export class MonitorInterval {
485+
fn: (callback: Callback) => void;
486+
timerId: NodeJS.Timeout | undefined;
487+
lastCallTime: number;
488+
isExpeditedCheckScheduled = false;
489+
stopped = false;
490+
491+
heartbeatFrequencyMS: number;
492+
minHeartbeatFrequencyMS: number;
493+
clock: () => number;
494+
495+
constructor(fn: (callback: Callback) => void, options: Partial<MonitorIntervalOptions> = {}) {
496+
this.fn = fn;
497+
this.lastCallTime = 0;
498+
499+
this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 1000;
500+
this.minHeartbeatFrequencyMS = options.minHeartbeatFrequencyMS ?? 500;
501+
this.clock = typeof options.clock === 'function' ? options.clock : now;
502+
503+
if (options.immediate) {
504+
this._executeAndReschedule();
505+
} else {
506+
this.lastCallTime = this.clock();
507+
this._reschedule(undefined);
508+
}
509+
}
510+
511+
wake() {
512+
const currentTime = this.clock();
513+
const nextScheduledCallTime = this.lastCallTime + this.heartbeatFrequencyMS;
514+
const timeUntilNextCall = nextScheduledCallTime - currentTime;
515+
516+
// For the streaming protocol: there is nothing obviously stopping this
517+
// interval from being woken up again while we are waiting "infinitely"
518+
// for `fn` to be called again`. Since the function effectively
519+
// never completes, the `timeUntilNextCall` will continue to grow
520+
// negatively unbounded, so it will never trigger a reschedule here.
521+
522+
// This is possible in virtualized environments like AWS Lambda where our
523+
// clock is unreliable. In these cases the timer is "running" but never
524+
// actually completes, so we want to execute immediately and then attempt
525+
// to reschedule.
526+
if (timeUntilNextCall < 0) {
527+
this._executeAndReschedule();
528+
return;
529+
}
530+
531+
// debounce multiple calls to wake within the `minInterval`
532+
if (this.isExpeditedCheckScheduled) {
533+
return;
534+
}
535+
536+
// reschedule a call as soon as possible, ensuring the call never happens
537+
// faster than the `minInterval`
538+
if (timeUntilNextCall > this.minHeartbeatFrequencyMS) {
539+
this._reschedule(this.minHeartbeatFrequencyMS);
540+
this.isExpeditedCheckScheduled = true;
541+
}
542+
}
543+
544+
stop() {
545+
this.stopped = true;
546+
if (this.timerId) {
547+
clearTimeout(this.timerId);
548+
this.timerId = undefined;
549+
}
550+
551+
this.lastCallTime = 0;
552+
this.isExpeditedCheckScheduled = false;
553+
}
554+
555+
toString() {
556+
return JSON.stringify(this);
557+
}
558+
559+
toJSON() {
560+
return {
561+
timerId: this.timerId != null ? 'set' : 'cleared',
562+
lastCallTime: this.lastCallTime,
563+
isExpeditedCheckScheduled: this.isExpeditedCheckScheduled,
564+
stopped: this.stopped,
565+
heartbeatFrequencyMS: this.heartbeatFrequencyMS,
566+
minHeartbeatFrequencyMS: this.minHeartbeatFrequencyMS
567+
};
568+
}
569+
570+
private _reschedule(ms?: number) {
571+
if (this.stopped) return;
572+
if (this.timerId) {
573+
clearTimeout(this.timerId);
574+
}
575+
576+
this.timerId = setTimeout(this._executeAndReschedule, ms || this.heartbeatFrequencyMS);
577+
}
578+
579+
private _executeAndReschedule = () => {
580+
this.isExpeditedCheckScheduled = false;
581+
this.lastCallTime = this.clock();
582+
583+
this.fn(err => {
584+
if (err) throw err;
585+
this._reschedule(this.heartbeatFrequencyMS);
586+
});
587+
};
588+
}

src/sdam/topology.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,9 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
386386
}
387387

388388
/** Initiate server connect */
389-
connect(options?: ConnectOptions, callback?: Callback): void {
389+
connect(callback: Callback): void;
390+
connect(options: ConnectOptions, callback: Callback): void;
391+
connect(options?: ConnectOptions | Callback, callback?: Callback): void {
390392
if (typeof options === 'function') (callback = options), (options = {});
391393
options = options ?? {};
392394
if (this.s.state === STATE_CONNECTED) {
@@ -468,7 +470,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
468470
}
469471

470472
/** Close this topology */
471-
close(options?: CloseOptions, callback?: Callback): void {
473+
close(callback: Callback): void;
474+
close(options: CloseOptions): void;
475+
close(options: CloseOptions, callback: Callback): void;
476+
close(options?: CloseOptions | Callback, callback?: Callback): void {
472477
if (typeof options === 'function') {
473478
callback = options;
474479
options = {};
@@ -484,7 +489,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
484489
}
485490

486491
const destroyedServers = Array.from(this.s.servers.values(), server => {
487-
return promisify(destroyServer)(server, this, options);
492+
return promisify(destroyServer)(server, this, options as CloseOptions);
488493
});
489494

490495
Promise.all(destroyedServers)

src/utils.ts

Lines changed: 0 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import * as crypto from 'crypto';
22
import type { SrvRecord } from 'dns';
33
import * as os from 'os';
4-
import { clearTimeout, setTimeout } from 'timers';
54
import { URL } from 'url';
65

76
import { Document, ObjectId, resolveBSONOptions } from './bson';
@@ -774,124 +773,6 @@ export function calculateDurationInMs(started: number): number {
774773
return elapsed < 0 ? 0 : elapsed;
775774
}
776775

777-
export interface InterruptibleAsyncIntervalOptions {
778-
/** The interval to execute a method on */
779-
interval: number;
780-
/** A minimum interval that must elapse before the method is called */
781-
minInterval: number;
782-
/** Whether the method should be called immediately when the interval is started */
783-
immediate: boolean;
784-
785-
/**
786-
* Only used for testing unreliable timer environments
787-
* @internal
788-
*/
789-
clock: () => number;
790-
}
791-
792-
/** @internal */
793-
export interface InterruptibleAsyncInterval {
794-
wake(): void;
795-
stop(): void;
796-
}
797-
798-
/**
799-
* Creates an interval timer which is able to be woken up sooner than
800-
* the interval. The timer will also debounce multiple calls to wake
801-
* ensuring that the function is only ever called once within a minimum
802-
* interval window.
803-
* @internal
804-
*
805-
* @param fn - An async function to run on an interval, must accept a `callback` as its only parameter
806-
*/
807-
export function makeInterruptibleAsyncInterval(
808-
fn: (callback: Callback) => void,
809-
options?: Partial<InterruptibleAsyncIntervalOptions>
810-
): InterruptibleAsyncInterval {
811-
let timerId: NodeJS.Timeout | undefined;
812-
let lastCallTime: number;
813-
let cannotBeExpedited = false;
814-
let stopped = false;
815-
816-
options = options ?? {};
817-
const interval = options.interval || 1000;
818-
const minInterval = options.minInterval || 500;
819-
const immediate = typeof options.immediate === 'boolean' ? options.immediate : false;
820-
const clock = typeof options.clock === 'function' ? options.clock : now;
821-
822-
function wake() {
823-
const currentTime = clock();
824-
const nextScheduledCallTime = lastCallTime + interval;
825-
const timeUntilNextCall = nextScheduledCallTime - currentTime;
826-
827-
// For the streaming protocol: there is nothing obviously stopping this
828-
// interval from being woken up again while we are waiting "infinitely"
829-
// for `fn` to be called again`. Since the function effectively
830-
// never completes, the `timeUntilNextCall` will continue to grow
831-
// negatively unbounded, so it will never trigger a reschedule here.
832-
833-
// This is possible in virtualized environments like AWS Lambda where our
834-
// clock is unreliable. In these cases the timer is "running" but never
835-
// actually completes, so we want to execute immediately and then attempt
836-
// to reschedule.
837-
if (timeUntilNextCall < 0) {
838-
executeAndReschedule();
839-
return;
840-
}
841-
842-
// debounce multiple calls to wake within the `minInterval`
843-
if (cannotBeExpedited) {
844-
return;
845-
}
846-
847-
// reschedule a call as soon as possible, ensuring the call never happens
848-
// faster than the `minInterval`
849-
if (timeUntilNextCall > minInterval) {
850-
reschedule(minInterval);
851-
cannotBeExpedited = true;
852-
}
853-
}
854-
855-
function stop() {
856-
stopped = true;
857-
if (timerId) {
858-
clearTimeout(timerId);
859-
timerId = undefined;
860-
}
861-
862-
lastCallTime = 0;
863-
cannotBeExpedited = false;
864-
}
865-
866-
function reschedule(ms?: number) {
867-
if (stopped) return;
868-
if (timerId) {
869-
clearTimeout(timerId);
870-
}
871-
872-
timerId = setTimeout(executeAndReschedule, ms || interval);
873-
}
874-
875-
function executeAndReschedule() {
876-
cannotBeExpedited = false;
877-
lastCallTime = clock();
878-
879-
fn(err => {
880-
if (err) throw err;
881-
reschedule(interval);
882-
});
883-
}
884-
885-
if (immediate) {
886-
executeAndReschedule();
887-
} else {
888-
lastCallTime = clock();
889-
reschedule(undefined);
890-
}
891-
892-
return { wake, stop };
893-
}
894-
895776
/** @internal */
896777
export function hasAtomicOperators(doc: Document | Document[]): boolean {
897778
if (Array.isArray(doc)) {

0 commit comments

Comments
 (0)