17
17
18
18
import { _FirebaseInstallationsInternal } from '@firebase/installations' ;
19
19
import { Logger } from '@firebase/logger' ;
20
- import { ConfigUpdateObserver } from '../public_types' ;
20
+ import {
21
+ ConfigUpdate ,
22
+ ConfigUpdateObserver ,
23
+ FetchResponse ,
24
+ FirebaseRemoteConfigObject
25
+ } from '../public_types' ;
21
26
import { calculateBackoffMillis , FirebaseError } from '@firebase/util' ;
22
27
import { ERROR_FACTORY , ErrorCode } from '../errors' ;
23
28
import { Storage } from '../storage/storage' ;
24
29
import { VisibilityMonitor } from './visibility_monitor' ;
30
+ import { StorageCache } from '../storage/storage_cache' ;
31
+ import {
32
+ FetchRequest ,
33
+ RemoteConfigAbortSignal
34
+ } from './remote_config_fetch_client' ;
35
+ import { RestClient } from './rest_client' ;
25
36
26
37
const API_KEY_HEADER = 'X-Goog-Api-Key' ;
27
38
const INSTALLATIONS_AUTH_TOKEN_HEADER = 'X-Goog-Firebase-Installations-Auth' ;
28
39
const ORIGINAL_RETRIES = 8 ;
40
+ const MAXIMUM_FETCH_ATTEMPTS = 3 ;
29
41
const NO_BACKOFF_TIME_IN_MILLIS = - 1 ;
30
42
const NO_FAILED_REALTIME_STREAMS = 0 ;
43
+ const REALTIME_DISABLED_KEY = 'featureDisabled' ;
44
+ const REALTIME_RETRY_INTERVAL = 'retryIntervalSeconds' ;
45
+ const TEMPLATE_VERSION_KEY = 'latestTemplateVersionNumber' ;
31
46
32
47
export class RealtimeHandler {
33
48
constructor (
@@ -38,7 +53,9 @@ export class RealtimeHandler {
38
53
private readonly projectId : string ,
39
54
private readonly apiKey : string ,
40
55
private readonly appId : string ,
41
- private readonly logger : Logger
56
+ private readonly logger : Logger ,
57
+ private readonly storageCache : StorageCache ,
58
+ private readonly restClient : RestClient
42
59
) {
43
60
void this . setRetriesRemaining ( ) ;
44
61
void VisibilityMonitor . getInstance ( ) . on (
@@ -56,6 +73,7 @@ export class RealtimeHandler {
56
73
private reader : ReadableStreamDefaultReader | undefined ;
57
74
private httpRetriesRemaining : number = ORIGINAL_RETRIES ;
58
75
private isInBackground : boolean = false ;
76
+ private readonly decoder = new TextDecoder ( 'utf-8' ) ;
59
77
60
78
private async setRetriesRemaining ( ) : Promise < void > {
61
79
// Retrieve number of remaining retries from last session. The minimum retry count being one.
@@ -90,6 +108,21 @@ export class RealtimeHandler {
90
108
} ) ;
91
109
}
92
110
111
+ private async updateBackoffMetadataWithRetryInterval (
112
+ retryIntervalSeconds : number
113
+ ) : Promise < void > {
114
+ const currentTime = Date . now ( ) ;
115
+ const backoffDurationInMillis = retryIntervalSeconds * 1000 ;
116
+ const backoffEndTime = new Date ( currentTime + backoffDurationInMillis ) ;
117
+ const numFailedStreams =
118
+ ( await this . storage . getRealtimeBackoffMetadata ( ) ) ?. numFailedStreams || 0 ;
119
+ await this . storage . setRealtimeBackoffMetadata ( {
120
+ backoffEndTimeMillis : backoffEndTime ,
121
+ numFailedStreams
122
+ } ) ;
123
+ this . retryHttpConnectionWhenBackoffEnds ( ) ;
124
+ }
125
+
93
126
/**
94
127
* HTTP status code that the Realtime client should retry on.
95
128
*/
@@ -229,6 +262,276 @@ export class RealtimeHandler {
229
262
return canMakeConnection ;
230
263
}
231
264
265
+ private fetchResponseIsUpToDate (
266
+ fetchResponse : FetchResponse ,
267
+ lastKnownVersion : number
268
+ ) : boolean {
269
+ if ( fetchResponse . config != null && fetchResponse . templateVersion ) {
270
+ return fetchResponse . templateVersion >= lastKnownVersion ;
271
+ }
272
+ return false ;
273
+ }
274
+
275
+ private parseAndValidateConfigUpdateMessage ( message : string ) : string {
276
+ const left = message . indexOf ( '{' ) ;
277
+ const right = message . indexOf ( '}' , left ) ;
278
+
279
+ if ( left < 0 || right < 0 ) {
280
+ return '' ;
281
+ }
282
+ return left >= right ? '' : message . substring ( left , right + 1 ) ;
283
+ }
284
+
285
+ private isEventListenersEmpty ( ) : boolean {
286
+ return this . observers . size === 0 ;
287
+ }
288
+
289
+ private getRandomInt ( max : number ) : number {
290
+ return Math . floor ( Math . random ( ) * max ) ;
291
+ }
292
+
293
+ private executeAllListenerCallbacks ( configUpdate : ConfigUpdate ) : void {
294
+ this . observers . forEach ( observer => observer . next ( configUpdate ) ) ;
295
+ }
296
+
297
+ private getChangedParams (
298
+ newConfig : FirebaseRemoteConfigObject ,
299
+ oldConfig : FirebaseRemoteConfigObject
300
+ ) : Set < string > {
301
+ const changed = new Set < string > ( ) ;
302
+ const newKeys = new Set ( Object . keys ( newConfig || { } ) ) ;
303
+ const oldKeys = new Set ( Object . keys ( oldConfig || { } ) ) ;
304
+
305
+ for ( const key of newKeys ) {
306
+ if ( ! oldKeys . has ( key ) ) {
307
+ changed . add ( key ) ;
308
+ continue ;
309
+ }
310
+ if (
311
+ JSON . stringify ( ( newConfig as any ) [ key ] ) !==
312
+ JSON . stringify ( ( oldConfig as any ) [ key ] )
313
+ ) {
314
+ changed . add ( key ) ;
315
+ continue ;
316
+ }
317
+ }
318
+
319
+ for ( const key of oldKeys ) {
320
+ if ( ! newKeys . has ( key ) ) {
321
+ changed . add ( key ) ;
322
+ }
323
+ }
324
+ return changed ;
325
+ }
326
+
327
+ private async fetchLatestConfig (
328
+ remainingAttempts : number ,
329
+ targetVersion : number
330
+ ) : Promise < void > {
331
+ const remainingAttemptsAfterFetch = remainingAttempts - 1 ;
332
+ const currentAttempt = MAXIMUM_FETCH_ATTEMPTS - remainingAttemptsAfterFetch ;
333
+ const customSignals = this . storageCache . getCustomSignals ( ) ;
334
+ if ( customSignals ) {
335
+ this . logger . debug (
336
+ `Fetching config with custom signals: ${ JSON . stringify ( customSignals ) } `
337
+ ) ;
338
+ }
339
+ try {
340
+ const fetchRequest : FetchRequest = {
341
+ cacheMaxAgeMillis : 0 ,
342
+ signal : new RemoteConfigAbortSignal ( ) ,
343
+ customSignals : customSignals ,
344
+ fetchType : 'REALTIME' ,
345
+ fetchAttempt : currentAttempt
346
+ } ;
347
+
348
+ const fetchResponse : FetchResponse = await this . restClient . fetch (
349
+ fetchRequest
350
+ ) ;
351
+ let activatedConfigs = await this . storage . getActiveConfig ( ) ;
352
+
353
+ if ( ! this . fetchResponseIsUpToDate ( fetchResponse , targetVersion ) ) {
354
+ this . logger . debug (
355
+ "Fetched template version is the same as SDK's current version." +
356
+ ' Retrying fetch.'
357
+ ) ;
358
+ // Continue fetching until template version number is greater than current.
359
+ await this . autoFetch ( remainingAttemptsAfterFetch , targetVersion ) ;
360
+ return ;
361
+ }
362
+
363
+ if ( fetchResponse . config == null ) {
364
+ this . logger . debug (
365
+ 'The fetch succeeded, but the backend had no updates.'
366
+ ) ;
367
+ return ;
368
+ }
369
+
370
+ if ( activatedConfigs == null ) {
371
+ activatedConfigs = { } ;
372
+ }
373
+
374
+ const updatedKeys = this . getChangedParams (
375
+ fetchResponse . config ,
376
+ activatedConfigs
377
+ ) ;
378
+
379
+ if ( updatedKeys . size === 0 ) {
380
+ this . logger . debug ( 'Config was fetched, but no params changed.' ) ;
381
+ return ;
382
+ }
383
+
384
+ const configUpdate : ConfigUpdate = {
385
+ getUpdatedKeys ( ) : Set < string > {
386
+ return new Set ( updatedKeys ) ;
387
+ }
388
+ } ;
389
+ this . executeAllListenerCallbacks ( configUpdate ) ;
390
+ } catch ( e : unknown ) {
391
+ const errorMessage = e instanceof Error ? e . message : String ( e ) ;
392
+ const error = ERROR_FACTORY . create ( ErrorCode . CONFIG_UPDATE_NOT_FETCHED , {
393
+ originalErrorMessage : `Failed to auto-fetch config update: ${ errorMessage } `
394
+ } ) ;
395
+ this . propagateError ( error ) ;
396
+ }
397
+ }
398
+
399
+ private async autoFetch (
400
+ remainingAttempts : number ,
401
+ targetVersion : number
402
+ ) : Promise < void > {
403
+ if ( remainingAttempts === 0 ) {
404
+ const error = ERROR_FACTORY . create ( ErrorCode . CONFIG_UPDATE_NOT_FETCHED , {
405
+ originalErrorMessage :
406
+ 'Unable to fetch the latest version of the template.'
407
+ } ) ;
408
+ this . propagateError ( error ) ;
409
+ return ;
410
+ }
411
+
412
+ const timeTillFetch = this . getRandomInt ( 4 ) ;
413
+ setTimeout ( async ( ) => {
414
+ await this . fetchLatestConfig ( remainingAttempts , targetVersion ) ;
415
+ } , timeTillFetch ) ;
416
+ }
417
+
418
+ private async handleNotifications (
419
+ reader : ReadableStreamDefaultReader
420
+ ) : Promise < void > {
421
+ if ( reader == null ) {
422
+ return ;
423
+ }
424
+
425
+ let partialConfigUpdateMessage : string ;
426
+ let currentConfigUpdateMessage = '' ;
427
+
428
+ while ( true ) {
429
+ const { done, value } = await reader . read ( ) ;
430
+ if ( done ) {
431
+ break ;
432
+ }
433
+
434
+ partialConfigUpdateMessage = this . decoder . decode ( value , { stream : true } ) ;
435
+ currentConfigUpdateMessage += partialConfigUpdateMessage ;
436
+
437
+ if ( partialConfigUpdateMessage . includes ( '}' ) ) {
438
+ currentConfigUpdateMessage = this . parseAndValidateConfigUpdateMessage (
439
+ currentConfigUpdateMessage
440
+ ) ;
441
+
442
+ if ( currentConfigUpdateMessage . length === 0 ) {
443
+ continue ;
444
+ }
445
+
446
+ try {
447
+ const jsonObject = JSON . parse ( currentConfigUpdateMessage ) ;
448
+
449
+ if ( this . isEventListenersEmpty ( ) ) {
450
+ break ;
451
+ }
452
+
453
+ if (
454
+ REALTIME_DISABLED_KEY in jsonObject &&
455
+ jsonObject [ REALTIME_DISABLED_KEY ] === true
456
+ ) {
457
+ const error = ERROR_FACTORY . create (
458
+ ErrorCode . CONFIG_UPDATE_UNAVAILABLE ,
459
+ {
460
+ originalErrorMessage :
461
+ 'The server is temporarily unavailable. Try again in a few minutes.'
462
+ }
463
+ ) ;
464
+ this . propagateError ( error ) ;
465
+ break ;
466
+ }
467
+
468
+ if ( TEMPLATE_VERSION_KEY in jsonObject ) {
469
+ const oldTemplateVersion =
470
+ await this . storage . getLastKnownTemplateVersion ( ) ;
471
+ let targetTemplateVersion = Number (
472
+ jsonObject [ TEMPLATE_VERSION_KEY ]
473
+ ) ;
474
+ if (
475
+ oldTemplateVersion &&
476
+ targetTemplateVersion > oldTemplateVersion
477
+ ) {
478
+ await this . autoFetch (
479
+ MAXIMUM_FETCH_ATTEMPTS ,
480
+ targetTemplateVersion
481
+ ) ;
482
+ }
483
+ }
484
+
485
+ // This field in the response indicates that the realtime request should retry after the
486
+ // specified interval to establish a long-lived connection. This interval extends the
487
+ // backoff duration without affecting the number of retries, so it will not enter an
488
+ // exponential backoff state.
489
+ if ( REALTIME_RETRY_INTERVAL in jsonObject ) {
490
+ const retryIntervalSeconds = Number (
491
+ jsonObject [ REALTIME_RETRY_INTERVAL ]
492
+ ) ;
493
+ await this . updateBackoffMetadataWithRetryInterval (
494
+ retryIntervalSeconds
495
+ ) ;
496
+ }
497
+ } catch ( e : any ) {
498
+ this . logger . error ( 'Unable to parse latest config update message.' , e ) ;
499
+ this . propagateError (
500
+ ERROR_FACTORY . create ( ErrorCode . CONFIG_UPDATE_MESSAGE_INVALID , {
501
+ originalErrorMessage : e
502
+ } )
503
+ ) ;
504
+ }
505
+ currentConfigUpdateMessage = '' ;
506
+ }
507
+ }
508
+ }
509
+
510
+ public async listenForNotifications (
511
+ reader : ReadableStreamDefaultReader
512
+ ) : Promise < void > {
513
+ try {
514
+ await this . handleNotifications ( reader ) ;
515
+ } catch ( e ) {
516
+ // If the real-time connection is at an unexpected lifecycle state when the app is
517
+ // backgrounded, it's expected closing the connection and will throw an exception.
518
+ if ( ! this . isInBackground ) {
519
+ // Otherwise, the real-time server connection was closed due to a transient issue.
520
+ this . logger . debug (
521
+ 'Real-time connection was closed due to an exception.' ,
522
+ e
523
+ ) ;
524
+ }
525
+ } finally {
526
+ // Only need to close the reader, beginRealtimeHttpStream will disconnect
527
+ // the connection
528
+ if ( this . reader ) {
529
+ this . reader . cancel ( ) ;
530
+ this . reader = undefined ;
531
+ }
532
+ }
533
+ }
534
+
232
535
/**
233
536
* Open the real-time connection, begin listening for updates, and auto-fetch when an update is
234
537
* received.
@@ -263,8 +566,9 @@ export class RealtimeHandler {
263
566
if ( response . ok && response . body ) {
264
567
this . resetRetryCount ( ) ;
265
568
await this . resetRealtimeBackoff ( ) ;
266
- //const configAutoFetch = this.startAutoFetch(reader);
267
- //await configAutoFetch.listenForNotifications();
569
+ const reader = response . body . getReader ( ) ;
570
+ // Start listening for realtime notifications.
571
+ await this . listenForNotifications ( reader ) ;
268
572
}
269
573
} catch ( error ) {
270
574
if ( this . isInBackground ) {
0 commit comments