1
1
const { randomUUID} = require ( 'crypto' ) ;
2
2
const { EventEmitter, once } = require ( 'events' ) ;
3
3
const { setTimeout } = require ( 'timers/promises' ) ;
4
+ const { setTimeout : setTimeoutCb } = require ( 'timers' ) ;
4
5
const WebSocket = require ( 'ws' ) ;
5
6
const { ExponentialStrategy } = require ( 'backoff' ) ;
6
7
const { CONNECTING , OPEN , CLOSING , CLOSED } = WebSocket ;
@@ -30,7 +31,6 @@ class RPCClient extends EventEmitter {
30
31
this . _keepAliveAbortController = undefined ;
31
32
this . _pendingPingResponse = false ;
32
33
this . _lastPingTime = 0 ;
33
- this . _skipNextPing = false ;
34
34
this . _closePromise = undefined ;
35
35
this . _protocolOptions = [ ] ;
36
36
this . _protocol = undefined ;
@@ -398,12 +398,12 @@ class RPCClient extends EventEmitter {
398
398
ws . on ( 'error' , err => this . emit ( 'socketError' , err ) ) ;
399
399
ws . on ( 'ping' , ( ) => {
400
400
if ( this . _options . deferPingsOnActivity ) {
401
- this . _skipNextPing = true ;
401
+ this . _deferNextPing ( ) ;
402
402
}
403
403
} ) ;
404
404
ws . on ( 'pong' , ( ) => {
405
405
if ( this . _options . deferPingsOnActivity ) {
406
- this . _skipNextPing = true ;
406
+ this . _deferNextPing ( ) ;
407
407
}
408
408
this . _pendingPingResponse = false ;
409
409
const rtt = Date . now ( ) - this . _lastPingTime ;
@@ -557,10 +557,24 @@ class RPCClient extends EventEmitter {
557
557
return this . _connectPromise ;
558
558
}
559
559
560
+ _deferNextPing ( ) {
561
+ if ( ! this . _nextPingTimeout ) {
562
+ return ;
563
+ }
564
+
565
+ this . _nextPingTimeout . refresh ( ) ;
566
+ }
567
+
560
568
async _keepAlive ( ) {
561
569
// abort any previously running keepAlive
562
570
this . _keepAliveAbortController ?. abort ( ) ;
563
571
572
+ const timerEmitter = new EventEmitter ( ) ;
573
+ const nextPingTimeout = setTimeoutCb ( ( ) => {
574
+ timerEmitter . emit ( 'next' )
575
+ } , this . _options . pingIntervalMs ) ;
576
+ this . _nextPingTimeout = nextPingTimeout ;
577
+
564
578
try {
565
579
if ( this . _state !== OPEN ) {
566
580
// don't start pinging if connection not open
@@ -574,20 +588,16 @@ class RPCClient extends EventEmitter {
574
588
575
589
// setup new abort controller
576
590
this . _keepAliveAbortController = new AbortController ( ) ;
577
-
591
+
578
592
while ( true ) {
579
- await setTimeout ( this . _options . pingIntervalMs , undefined , { signal : this . _keepAliveAbortController . signal } ) ;
580
-
593
+ await once ( timerEmitter , 'next' , { signal : this . _keepAliveAbortController . signal } ) ,
594
+ this . _keepAliveAbortController . signal . throwIfAborted ( ) ;
595
+
581
596
if ( this . _state !== OPEN ) {
582
597
// keepalive no longer required
583
598
break ;
584
599
}
585
600
586
- if ( this . _skipNextPing ) {
587
- this . _skipNextPing = false ;
588
- continue ;
589
- }
590
-
591
601
if ( this . _pendingPingResponse ) {
592
602
// we didn't get a response to our last ping
593
603
throw Error ( "Ping timeout" ) ;
@@ -596,13 +606,17 @@ class RPCClient extends EventEmitter {
596
606
this . _lastPingTime = Date . now ( ) ;
597
607
this . _pendingPingResponse = true ;
598
608
this . _ws . ping ( ) ;
609
+ nextPingTimeout . refresh ( ) ;
599
610
}
600
611
601
612
} catch ( err ) {
613
+ // console.log('keepalive failed', err);
602
614
if ( err . name !== 'AbortError' ) {
603
615
// throws on ws.ping() error
604
616
this . _ws . terminate ( ) ;
605
617
}
618
+ } finally {
619
+ clearTimeout ( nextPingTimeout ) ;
606
620
}
607
621
}
608
622
@@ -648,7 +662,7 @@ class RPCClient extends EventEmitter {
648
662
649
663
_onMessage ( buffer ) {
650
664
if ( this . _options . deferPingsOnActivity ) {
651
- this . _skipNextPing = true ;
665
+ this . _deferNextPing ( ) ;
652
666
}
653
667
654
668
const message = buffer . toString ( 'utf8' ) ;
0 commit comments