1
1
const { randomUUID} = require ( 'crypto' ) ;
2
2
const { EventEmitter, once } = require ( 'events' ) ;
3
3
const { setTimeout } = require ( 'timers/promises' ) ;
4
+ const { setTimeout : setTimeoutCb } = require ( 'timers' ) ;
4
5
const WebSocket = require ( 'ws' ) ;
5
6
const { ExponentialStrategy } = require ( 'backoff' ) ;
6
7
const { CONNECTING , OPEN , CLOSING , CLOSED } = WebSocket ;
@@ -30,7 +31,6 @@ class RPCClient extends EventEmitter {
30
31
this . _keepAliveAbortController = undefined ;
31
32
this . _pendingPingResponse = false ;
32
33
this . _lastPingTime = 0 ;
33
- this . _skipNextPing = false ;
34
34
this . _closePromise = undefined ;
35
35
this . _protocolOptions = [ ] ;
36
36
this . _protocol = undefined ;
@@ -407,12 +407,12 @@ class RPCClient extends EventEmitter {
407
407
ws . on ( 'error' , err => this . emit ( 'socketError' , err ) ) ;
408
408
ws . on ( 'ping' , ( ) => {
409
409
if ( this . _options . deferPingsOnActivity ) {
410
- this . _skipNextPing = true ;
410
+ this . _deferNextPing ( ) ;
411
411
}
412
412
} ) ;
413
413
ws . on ( 'pong' , ( ) => {
414
414
if ( this . _options . deferPingsOnActivity ) {
415
- this . _skipNextPing = true ;
415
+ this . _deferNextPing ( ) ;
416
416
}
417
417
this . _pendingPingResponse = false ;
418
418
const rtt = Date . now ( ) - this . _lastPingTime ;
@@ -566,10 +566,24 @@ class RPCClient extends EventEmitter {
566
566
return this . _connectPromise ;
567
567
}
568
568
569
+ _deferNextPing ( ) {
570
+ if ( ! this . _nextPingTimeout ) {
571
+ return ;
572
+ }
573
+
574
+ this . _nextPingTimeout . refresh ( ) ;
575
+ }
576
+
569
577
async _keepAlive ( ) {
570
578
// abort any previously running keepAlive
571
579
this . _keepAliveAbortController ?. abort ( ) ;
572
580
581
+ const timerEmitter = new EventEmitter ( ) ;
582
+ const nextPingTimeout = setTimeoutCb ( ( ) => {
583
+ timerEmitter . emit ( 'next' )
584
+ } , this . _options . pingIntervalMs ) ;
585
+ this . _nextPingTimeout = nextPingTimeout ;
586
+
573
587
try {
574
588
if ( this . _state !== OPEN ) {
575
589
// don't start pinging if connection not open
@@ -583,20 +597,16 @@ class RPCClient extends EventEmitter {
583
597
584
598
// setup new abort controller
585
599
this . _keepAliveAbortController = new AbortController ( ) ;
586
-
600
+
587
601
while ( true ) {
588
- await setTimeout ( this . _options . pingIntervalMs , undefined , { signal : this . _keepAliveAbortController . signal } ) ;
589
-
602
+ await once ( timerEmitter , 'next' , { signal : this . _keepAliveAbortController . signal } ) ,
603
+ this . _keepAliveAbortController . signal . throwIfAborted ( ) ;
604
+
590
605
if ( this . _state !== OPEN ) {
591
606
// keepalive no longer required
592
607
break ;
593
608
}
594
609
595
- if ( this . _skipNextPing ) {
596
- this . _skipNextPing = false ;
597
- continue ;
598
- }
599
-
600
610
if ( this . _pendingPingResponse ) {
601
611
// we didn't get a response to our last ping
602
612
throw Error ( "Ping timeout" ) ;
@@ -605,13 +615,17 @@ class RPCClient extends EventEmitter {
605
615
this . _lastPingTime = Date . now ( ) ;
606
616
this . _pendingPingResponse = true ;
607
617
this . _ws . ping ( ) ;
618
+ nextPingTimeout . refresh ( ) ;
608
619
}
609
620
610
621
} catch ( err ) {
622
+ // console.log('keepalive failed', err);
611
623
if ( err . name !== 'AbortError' ) {
612
624
// throws on ws.ping() error
613
625
this . _ws . terminate ( ) ;
614
626
}
627
+ } finally {
628
+ clearTimeout ( nextPingTimeout ) ;
615
629
}
616
630
}
617
631
@@ -657,7 +671,7 @@ class RPCClient extends EventEmitter {
657
671
658
672
_onMessage ( buffer ) {
659
673
if ( this . _options . deferPingsOnActivity ) {
660
- this . _skipNextPing = true ;
674
+ this . _deferNextPing ( ) ;
661
675
}
662
676
663
677
const message = buffer . toString ( 'utf8' ) ;
0 commit comments