@@ -19,6 +19,7 @@ class PubSubInstance {
1919 this . recoveryAttempts = 0 ;
2020 this . lastRecoveryAttempt = 0 ;
2121 this . _hadPubSubFlag = true ;
22+ this . _loopErrorLogged = false ;
2223
2324 this . checkSchemaVersions ( )
2425 . catch ( err => {
@@ -37,19 +38,23 @@ class PubSubInstance {
3738 }
3839 this . run ( )
3940 . then ( ( ) => {
41+ this . _loopErrorLogged = false ;
4042 this . startLoop ( ) ;
4143 } )
4244 . catch ( err => {
43- logger . error ( { msg : 'Failed to process subscription loop' , app : this . app , err } ) ;
44-
45- this . _hadPubSubFlag = true ;
46- oauth2Apps . setMeta ( this . app , {
47- pubSubFlag : {
48- message : `Failed to process subscription loop` ,
49- description : [ err . message , err . reason , err . code ] . filter ( val => val ) . join ( '; ' )
50- }
51- } ) ;
52- setTimeout ( ( ) => this . startLoop ( ) , 3000 ) ;
45+ if ( ! this . _loopErrorLogged ) {
46+ logger . error ( { msg : 'Failed to process subscription loop' , app : this . app , err } ) ;
47+ this . _loopErrorLogged = true ;
48+
49+ this . _hadPubSubFlag = true ;
50+ oauth2Apps . setMeta ( this . app , {
51+ pubSubFlag : {
52+ message : `Failed to process subscription loop` ,
53+ description : [ err . message , err . reason , err . code ] . filter ( val => val ) . join ( '; ' )
54+ }
55+ } ) ;
56+ }
57+ setTimeout ( ( ) => this . startLoop ( ) , err . retryDelay || 3000 ) ;
5358 } ) ;
5459 }
5560
@@ -251,14 +256,16 @@ class PubSubInstance {
251256 let now = Date . now ( ) ;
252257 let backoffMs = Math . min ( 3000 * Math . pow ( 2 , Math . min ( this . recoveryAttempts , 20 ) ) , 5 * 60 * 1000 ) ;
253258 if ( now - this . lastRecoveryAttempt < backoffMs ) {
254- logger . warn ( { msg : 'Skipping subscription recovery (backoff)' , app : this . app , reason, recoveryAttempts : this . recoveryAttempts , backoffMs } ) ;
255- throw new Error ( reason ) ;
259+ let remainingMs = backoffMs - ( now - this . lastRecoveryAttempt ) ;
260+ let err = new Error ( reason ) ;
261+ err . retryDelay = remainingMs ;
262+ throw err ;
256263 }
257264
258265 this . lastRecoveryAttempt = now ;
259266 this . recoveryAttempts ++ ;
260267
261- logger . warn ( { msg : 'Attempting subscription recovery' , app : this . app , reason, recoveryAttempts : this . recoveryAttempts } ) ;
268+ logger . info ( { msg : 'Attempting subscription recovery' , app : this . app , reason, recoveryAttempts : this . recoveryAttempts } ) ;
262269
263270 try {
264271 await this . getApp ( true ) ;
@@ -269,7 +276,9 @@ class PubSubInstance {
269276 this . recoveryAttempts = 0 ;
270277 logger . info ( { msg : 'Successfully recovered Pub/Sub subscription' , app : this . app , reason } ) ;
271278 } catch ( recoveryErr ) {
272- logger . error ( { msg : 'Failed to recover Pub/Sub subscription' , app : this . app , reason, err : recoveryErr } ) ;
279+ let nextBackoffMs = Math . min ( 3000 * Math . pow ( 2 , Math . min ( this . recoveryAttempts , 20 ) ) , 5 * 60 * 1000 ) ;
280+ logger . warn ( { msg : 'Subscription recovery failed' , app : this . app , reason, err : recoveryErr , nextRetryMs : nextBackoffMs } ) ;
281+ recoveryErr . retryDelay = nextBackoffMs ;
273282 throw recoveryErr ;
274283 }
275284 }
0 commit comments