1- import { HeartbeatLogs , HeartbeatMetrics , RequestMetadata , Watchdogs , logger } from "./LoggerWrapper.js" ;
1+ import { Events , HeartbeatLogs , HeartbeatMetrics , RequestMetadata , Watchdogs , logger } from "./LoggerWrapper.js" ;
22import * as k8s from "@kubernetes/client-node" ;
33import { InstrumentationCR , ListResponse } from "./RequestDefinition.js"
44import { InstrumentationCRsCollection } from "./InstrumentationCRsCollection.js" ;
@@ -11,7 +11,7 @@ export class K8sWatcher {
1111
1212 private static lastSuccessfulListTimestamp : Date = new Date ( ) ;
1313
14- public static async StartWatchingCRs ( crs : InstrumentationCRsCollection , onNewCR : ( cr : InstrumentationCR , isRemoved : boolean ) => void , onResetCRs : ( crs : InstrumentationCR [ ] ) => void , operationId : string ) : Promise < void > {
14+ public static async StartWatchingCRs ( crs : InstrumentationCRsCollection , onNewCR : ( cr : InstrumentationCR , isRemoved : boolean ) => void , onResetCRs : ( crs : InstrumentationCR [ ] ) => void , operationId : string , clusterArmId : string , clusterArmRegion : string ) : Promise < void > {
1515 const kc = new k8s . KubeConfig ( ) ;
1616 kc . loadFromDefault ( ) ;
1717
@@ -23,7 +23,7 @@ export class K8sWatcher {
2323 let latestResourceVersion : string = null ;
2424 while ( true ) { // eslint-disable-line
2525 try {
26- latestResourceVersion = await K8sWatcher . WatchCRs ( k8sApi , watch , latestResourceVersion , crs , operationId , onNewCR , onResetCRs ) ;
26+ latestResourceVersion = await K8sWatcher . WatchCRs ( k8sApi , watch , latestResourceVersion , crs , operationId , onNewCR , onResetCRs , clusterArmId , clusterArmRegion ) ;
2727 } catch ( e ) {
2828 // either the list call or the watch call failed
2929 const ex = logger . sanitizeException ( e ) ;
@@ -40,12 +40,14 @@ export class K8sWatcher {
4040 }
4141
4242 // pause for a bit to avoid generating too much load in case of cascading failures
43- await new Promise ( r => setTimeout ( r , 5000 ) ) ;
43+ // randomize the wait time to avoid all pods of the deployment hitting the API server at the same time
44+ // if this is caused by networking or node issues and happens for all pods at the same time
45+ await new Promise ( r => setTimeout ( r , Math . random ( ) * 5000.0 ) ) ;
4446 }
4547 }
4648 }
4749
48- private static async WatchCRs ( k8sApi : k8s . CustomObjectsApi , watch : k8s . Watch , latestResourceVersion : string , crs : InstrumentationCRsCollection , operationId : string , onNewCR : ( cr : InstrumentationCR , isRemoved : boolean ) => void , onResetCRs : ( crs : InstrumentationCR [ ] ) => void ) : Promise < string > {
50+ private static async WatchCRs ( k8sApi : k8s . CustomObjectsApi , watch : k8s . Watch , latestResourceVersion : string , crs : InstrumentationCRsCollection , operationId : string , onNewCR : ( cr : InstrumentationCR , isRemoved : boolean ) => void , onResetCRs : ( crs : InstrumentationCR [ ] ) => void , clusterArmId : string , clusterArmRegion : string ) : Promise < string > {
4951 let requestMetadata = new RequestMetadata ( "CR watcher" , crs ) ;
5052
5153 logger . info ( `Listing CRs, resourceVersion=${ latestResourceVersion } ...` , operationId , requestMetadata ) ;
@@ -56,7 +58,8 @@ export class K8sWatcher {
5658 group : K8sWatcher . crdApiGroup ,
5759 version : K8sWatcher . crdApiVersion ,
5860 plural : K8sWatcher . crdNamePlural ,
59- resourceVersion : latestResourceVersion ?? undefined
61+ resourceVersion : latestResourceVersion ?? undefined ,
62+ timeoutSeconds : 30
6063 } ) ;
6164
6265 logger . addHeartbeatMetric ( HeartbeatMetrics . CRsListCallSucceededCount , 1 , "200" ) ;
@@ -79,6 +82,9 @@ export class K8sWatcher {
7982
8083 // watch() doesn't block (it starts the loop and returns immediately), so we can't just return the promise it returns to our caller
8184 // we must instead create our own promise and resolve it manually when the watch informs us that it stopped via a callback
85+ const watchTimeoutSeconds = 240 ;
86+ let doneCallbackInvoked = false ;
87+ let watchAbortController : AbortController = null ;
8288 const watchIsDonePromise : Promise < string > = new Promise ( ( resolve , reject ) => {
8389 try {
8490 // /api/v1/namespaces
@@ -87,7 +93,7 @@ export class K8sWatcher {
8793 {
8894 allowWatchBookmarks : true ,
8995 resourceVersion : latestResourceVersion ,
90- timeoutSeconds : 240 // watch will be stopped after at most this many seconds
96+ timeoutSeconds : watchTimeoutSeconds // watch will be stopped after at most this many seconds
9197 //fieldSelector: fieldSelector
9298 } ,
9399 ( type , apiObj ) => {
@@ -115,7 +121,9 @@ export class K8sWatcher {
115121 }
116122 } ,
117123 err => { // watch is done callback
118- logger . info ( "Watch has completed" , operationId , requestMetadata ) ;
124+ doneCallbackInvoked = true ;
125+
126+ logger . info ( "Watch has completed via the done callback" , operationId , requestMetadata ) ;
119127 if ( err != null ) {
120128 // this indicates an issue with the watch encountered once the stream is opened
121129 // we want to handle it in the same way as an exception (which is triggered during opening of the stream)
@@ -131,7 +139,11 @@ export class K8sWatcher {
131139 logger . addHeartbeatMetric ( HeartbeatMetrics . CRsWatchCallSucceededCount , 1 , "200" ) ;
132140
133141 resolve ( latestResourceVersion ) ;
142+ } ) . then ( abortController => {
143+ watchAbortController = abortController ;
134144 } ) ;
145+
146+ logger . info ( "Watch call returned" , operationId , requestMetadata ) ;
135147 } catch ( e ) {
136148 logger . error ( `Watch.watch() threw: ${ JSON . stringify ( e ) } ` , operationId , requestMetadata ) ;
137149 logger . addHeartbeatMetric ( HeartbeatMetrics . CRsWatchCallFailedCount , 1 , e ?. statusCode ?? 0 ) ;
@@ -141,7 +153,34 @@ export class K8sWatcher {
141153 }
142154 } ) ;
143155
144- return watchIsDonePromise ;
156+ // the watch has a bug where it may not invoke the callback upon connection being lost, and it will ignore the timeout in that case
157+ // we have to handle that with a manual timeout, and log if the manual timeout occures below the watch reports completion via the callback
158+ const timeoutPromise = new Promise < string > ( ( resolve , reject ) => {
159+ setTimeout ( ( ) =>
160+ {
161+ if ( doneCallbackInvoked ) {
162+ // the watch completed normally, we don't need to do anything
163+ } else {
164+ // the watch hasn't invoked the callback within the timeout, so need to step in
165+ logger . error ( `Watch hung, manual timeout is used` , operationId , requestMetadata ) ;
166+ // no await, no hurry
167+ logger . SendEvent ( Events [ Events . WatchHung ] , operationId , null , clusterArmId , clusterArmRegion , true ) ;
168+
169+ // try to abort the watch, not sure if it's possible, best effort
170+ try {
171+ watchAbortController ?. abort ( "Watch hung, manual timeout is used" ) ;
172+ } catch ( e ) {
173+ // swallow, the watch is already in a bad state, we don't care what happens here
174+ logger . error ( `Failed to abort the watch: ${ e } ` , operationId , requestMetadata ) ;
175+ }
176+
177+ reject ( new Error ( `Watch hung, manual timeout is used` ) ) ;
178+ }
179+ } ,
180+ 3 * watchTimeoutSeconds * 1000 ) ;
181+ } ) ;
182+
183+ return Promise . race ( [ watchIsDonePromise , timeoutPromise ] ) ;
145184 }
146185
147186 private static IsExpectedIntermittentException ( e : any ) {
0 commit comments