@@ -23,7 +23,7 @@ import {
23
23
CONNECTION_INIT_TIMEOUT ,
24
24
CONNECTION_STATE_CHANGE ,
25
25
DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT ,
26
- DEFAULT_KEEP_ALIVE_TIMEOUT ,
26
+ DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT ,
27
27
MAX_DELAY_MS ,
28
28
MESSAGE_TYPES ,
29
29
NON_RETRYABLE_CODES ,
@@ -83,9 +83,8 @@ export abstract class AWSWebSocketProvider {
83
83
84
84
protected awsRealTimeSocket ?: WebSocket ;
85
85
private socketStatus : SOCKET_STATUS = SOCKET_STATUS . CLOSED ;
86
- private keepAliveTimeoutId ?: ReturnType < typeof setTimeout > ;
87
- private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT ;
88
- private keepAliveAlertTimeoutId ?: ReturnType < typeof setTimeout > ;
86
+ private keepAliveTimestamp : number = Date . now ( ) ;
87
+ private keepAliveHeartbeatIntervalId ?: ReturnType < typeof setInterval > ;
89
88
private promiseArray : { res ( ) : void ; rej ( reason ?: any ) : void } [ ] = [ ] ;
90
89
private connectionState : ConnectionState | undefined ;
91
90
private readonly connectionStateMonitor = new ConnectionStateMonitor ( ) ;
@@ -119,6 +118,7 @@ export abstract class AWSWebSocketProvider {
119
118
return new Promise < void > ( ( resolve , reject ) => {
120
119
if ( this . awsRealTimeSocket ) {
121
120
this . awsRealTimeSocket . onclose = ( _ : CloseEvent ) => {
121
+ this . _closeSocket ( ) ;
122
122
this . subscriptionObserverMap = new Map ( ) ;
123
123
this . awsRealTimeSocket = undefined ;
124
124
resolve ( ) ;
@@ -171,7 +171,7 @@ export abstract class AWSWebSocketProvider {
171
171
this . logger . debug (
172
172
`${ CONTROL_MSG . REALTIME_SUBSCRIPTION_INIT_ERROR } : ${ err } ` ,
173
173
) ;
174
- this . connectionStateMonitor . record ( CONNECTION_CHANGE . CLOSED ) ;
174
+ this . _closeSocket ( ) ;
175
175
} )
176
176
. finally ( ( ) => {
177
177
subscriptionStartInProgress = false ;
@@ -435,7 +435,7 @@ export abstract class AWSWebSocketProvider {
435
435
this . logger . debug ( { err } ) ;
436
436
const message = String ( err . message ?? '' ) ;
437
437
// Resolving to give the state observer time to propogate the update
438
- this . connectionStateMonitor . record ( CONNECTION_CHANGE . CLOSED ) ;
438
+ this . _closeSocket ( ) ;
439
439
440
440
// Capture the error only when the network didn't cause disruption
441
441
if (
@@ -544,20 +544,15 @@ export abstract class AWSWebSocketProvider {
544
544
setTimeout ( this . _closeSocketIfRequired . bind ( this ) , 1000 ) ;
545
545
} else {
546
546
this . logger . debug ( 'closing WebSocket...' ) ;
547
- if ( this . keepAliveTimeoutId ) {
548
- clearTimeout ( this . keepAliveTimeoutId ) ;
549
- }
550
- if ( this . keepAliveAlertTimeoutId ) {
551
- clearTimeout ( this . keepAliveAlertTimeoutId ) ;
552
- }
547
+
553
548
const tempSocket = this . awsRealTimeSocket ;
554
549
// Cleaning callbacks to avoid race condition, socket still exists
555
550
tempSocket . onclose = null ;
556
551
tempSocket . onerror = null ;
557
552
tempSocket . close ( 1000 ) ;
558
553
this . awsRealTimeSocket = undefined ;
559
554
this . socketStatus = SOCKET_STATUS . CLOSED ;
560
- this . connectionStateMonitor . record ( CONNECTION_CHANGE . CLOSED ) ;
555
+ this . _closeSocket ( ) ;
561
556
}
562
557
}
563
558
@@ -577,13 +572,40 @@ export abstract class AWSWebSocketProvider {
577
572
errorType : string ;
578
573
} ;
579
574
575
+ private maintainKeepAlive ( ) {
576
+ this . keepAliveTimestamp = Date . now ( ) ;
577
+ }
578
+
579
+ private keepAliveHeartbeat ( connectionTimeoutMs : number ) {
580
+ const currentTime = Date . now ( ) ;
581
+
582
+ // Check for missed KA message
583
+ if (
584
+ currentTime - this . keepAliveTimestamp >
585
+ DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT
586
+ ) {
587
+ this . connectionStateMonitor . record ( CONNECTION_CHANGE . KEEP_ALIVE_MISSED ) ;
588
+ } else {
589
+ this . connectionStateMonitor . record ( CONNECTION_CHANGE . KEEP_ALIVE ) ;
590
+ }
591
+
592
+ // Recognize we are disconnected if we haven't seen messages in the keep alive timeout period
593
+ if ( currentTime - this . keepAliveTimestamp > connectionTimeoutMs ) {
594
+ this . _errorDisconnect ( CONTROL_MSG . TIMEOUT_DISCONNECT ) ;
595
+ }
596
+ }
597
+
580
598
private _handleIncomingSubscriptionMessage ( message : MessageEvent ) {
581
599
if ( typeof message . data !== 'string' ) {
582
600
return ;
583
601
}
584
602
585
603
const [ isData , data ] = this . _handleSubscriptionData ( message ) ;
586
- if ( isData ) return ;
604
+ if ( isData ) {
605
+ this . maintainKeepAlive ( ) ;
606
+
607
+ return ;
608
+ }
587
609
588
610
const { type, id, payload } = data ;
589
611
@@ -632,16 +654,7 @@ export abstract class AWSWebSocketProvider {
632
654
}
633
655
634
656
if ( type === MESSAGE_TYPES . GQL_CONNECTION_KEEP_ALIVE ) {
635
- if ( this . keepAliveTimeoutId ) clearTimeout ( this . keepAliveTimeoutId ) ;
636
- if ( this . keepAliveAlertTimeoutId )
637
- clearTimeout ( this . keepAliveAlertTimeoutId ) ;
638
- this . keepAliveTimeoutId = setTimeout ( ( ) => {
639
- this . _errorDisconnect ( CONTROL_MSG . TIMEOUT_DISCONNECT ) ;
640
- } , this . keepAliveTimeout ) ;
641
- this . keepAliveAlertTimeoutId = setTimeout ( ( ) => {
642
- this . connectionStateMonitor . record ( CONNECTION_CHANGE . KEEP_ALIVE_MISSED ) ;
643
- } , DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT ) ;
644
- this . connectionStateMonitor . record ( CONNECTION_CHANGE . KEEP_ALIVE ) ;
657
+ this . maintainKeepAlive ( ) ;
645
658
646
659
return ;
647
660
}
@@ -686,13 +699,21 @@ export abstract class AWSWebSocketProvider {
686
699
this . logger . debug ( `Disconnect error: ${ msg } ` ) ;
687
700
688
701
if ( this . awsRealTimeSocket ) {
689
- this . connectionStateMonitor . record ( CONNECTION_CHANGE . CLOSED ) ;
702
+ this . _closeSocket ( ) ;
690
703
this . awsRealTimeSocket . close ( ) ;
691
704
}
692
705
693
706
this . socketStatus = SOCKET_STATUS . CLOSED ;
694
707
}
695
708
709
+ private _closeSocket ( ) {
710
+ if ( this . keepAliveHeartbeatIntervalId ) {
711
+ clearInterval ( this . keepAliveHeartbeatIntervalId ) ;
712
+ this . keepAliveHeartbeatIntervalId = undefined ;
713
+ }
714
+ this . connectionStateMonitor . record ( CONNECTION_CHANGE . CLOSED ) ;
715
+ }
716
+
696
717
private _timeoutStartSubscriptionAck ( subscriptionId : string ) {
697
718
const subscriptionObserver =
698
719
this . subscriptionObserverMap . get ( subscriptionId ) ;
@@ -708,7 +729,7 @@ export abstract class AWSWebSocketProvider {
708
729
subscriptionState : SUBSCRIPTION_STATUS . FAILED ,
709
730
} ) ;
710
731
711
- this . connectionStateMonitor . record ( CONNECTION_CHANGE . CLOSED ) ;
732
+ this . _closeSocket ( ) ;
712
733
this . logger . debug (
713
734
'timeoutStartSubscription' ,
714
735
JSON . stringify ( { query, variables } ) ,
@@ -820,6 +841,7 @@ export abstract class AWSWebSocketProvider {
820
841
this . logger . debug ( `WebSocket connection error` ) ;
821
842
} ;
822
843
newSocket . onclose = ( ) => {
844
+ this . _closeSocket ( ) ;
823
845
reject ( new Error ( 'Connection handshake error' ) ) ;
824
846
} ;
825
847
newSocket . onopen = ( ) => {
@@ -849,6 +871,7 @@ export abstract class AWSWebSocketProvider {
849
871
850
872
this . awsRealTimeSocket . onclose = event => {
851
873
this . logger . debug ( `WebSocket closed ${ event . reason } ` ) ;
874
+ this . _closeSocket ( ) ;
852
875
reject ( new Error ( JSON . stringify ( event ) ) ) ;
853
876
} ;
854
877
@@ -912,7 +935,11 @@ export abstract class AWSWebSocketProvider {
912
935
return ;
913
936
}
914
937
915
- this . keepAliveTimeout = connectionTimeoutMs ;
938
+ // Set up a keep alive heartbeat for this connection
939
+ this . keepAliveHeartbeatIntervalId = setInterval ( ( ) => {
940
+ this . keepAliveHeartbeat ( connectionTimeoutMs ) ;
941
+ } , DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT ) ;
942
+
916
943
this . awsRealTimeSocket . onmessage =
917
944
this . _handleIncomingSubscriptionMessage . bind ( this ) ;
918
945
@@ -923,6 +950,7 @@ export abstract class AWSWebSocketProvider {
923
950
924
951
this . awsRealTimeSocket . onclose = event => {
925
952
this . logger . debug ( `WebSocket closed ${ event . reason } ` ) ;
953
+ this . _closeSocket ( ) ;
926
954
this . _errorDisconnect ( CONTROL_MSG . CONNECTION_CLOSED ) ;
927
955
} ;
928
956
}
0 commit comments