17
17
18
18
import { _FirebaseInstallationsInternal } from '@firebase/installations' ;
19
19
import { Logger } from '@firebase/logger' ;
20
- import { ConfigUpdateObserver } from '../public_types' ;
20
+ import {
21
+ ConfigUpdate ,
22
+ ConfigUpdateObserver ,
23
+ FetchResponse ,
24
+ FirebaseRemoteConfigObject
25
+ } from '../public_types' ;
21
26
import { calculateBackoffMillis , FirebaseError } from '@firebase/util' ;
22
27
import { ERROR_FACTORY , ErrorCode } from '../errors' ;
23
28
import { Storage } from '../storage/storage' ;
24
29
import { VisibilityMonitor } from './visibility_monitor' ;
30
+ import { StorageCache } from '../storage/storage_cache' ;
31
+ import {
32
+ FetchRequest ,
33
+ RemoteConfigAbortSignal
34
+ } from './remote_config_fetch_client' ;
35
+ import { RestClient } from './rest_client' ;
25
36
26
37
const API_KEY_HEADER = 'X-Goog-Api-Key' ;
27
38
const INSTALLATIONS_AUTH_TOKEN_HEADER = 'X-Goog-Firebase-Installations-Auth' ;
28
39
const ORIGINAL_RETRIES = 8 ;
40
+ const MAXIMUM_FETCH_ATTEMPTS = 3 ;
29
41
const NO_BACKOFF_TIME_IN_MILLIS = - 1 ;
30
42
const NO_FAILED_REALTIME_STREAMS = 0 ;
43
+ const TEMPLATE_VERSION_KEY = 'latestTemplateVersionNumber' ;
31
44
32
45
export class RealtimeHandler {
33
46
constructor (
@@ -38,7 +51,9 @@ export class RealtimeHandler {
38
51
private readonly projectId : string ,
39
52
private readonly apiKey : string ,
40
53
private readonly appId : string ,
41
- private readonly logger : Logger
54
+ private readonly logger : Logger ,
55
+ private readonly storageCache : StorageCache ,
56
+ private readonly restClient : RestClient
42
57
) {
43
58
void this . setRetriesRemaining ( ) ;
44
59
void VisibilityMonitor . getInstance ( ) . on (
@@ -56,6 +71,7 @@ export class RealtimeHandler {
56
71
private reader : ReadableStreamDefaultReader | undefined ;
57
72
private httpRetriesRemaining : number = ORIGINAL_RETRIES ;
58
73
private isInBackground : boolean = false ;
74
+ private readonly decoder = new TextDecoder ( 'utf-8' ) ;
59
75
60
76
private async setRetriesRemaining ( ) : Promise < void > {
61
77
// Retrieve number of remaining retries from last session. The minimum retry count being one.
@@ -229,6 +245,239 @@ export class RealtimeHandler {
229
245
return canMakeConnection ;
230
246
}
231
247
248
+ private fetchResponseIsUpToDate (
249
+ fetchResponse : FetchResponse ,
250
+ lastKnownVersion : number
251
+ ) : boolean {
252
+ if ( fetchResponse . config != null && fetchResponse . templateVersionNumber ) {
253
+ return fetchResponse . templateVersionNumber >= lastKnownVersion ;
254
+ }
255
+ return false ;
256
+ }
257
+
258
+ private parseAndValidateConfigUpdateMessage ( message : string ) : string {
259
+ const left = message . indexOf ( '{' ) ;
260
+ const right = message . indexOf ( '}' , left ) ;
261
+
262
+ if ( left < 0 || right < 0 ) {
263
+ return '' ;
264
+ }
265
+ return left >= right ? '' : message . substring ( left , right + 1 ) ;
266
+ }
267
+
268
+ private isEventListenersEmpty ( ) : boolean {
269
+ return this . observers . size === 0 ;
270
+ }
271
+
272
+ private getRandomInt ( max : number ) : number {
273
+ return Math . floor ( Math . random ( ) * max ) ;
274
+ }
275
+
276
+ private executeAllListenerCallbacks ( configUpdate : ConfigUpdate ) : void {
277
+ this . observers . forEach ( observer => observer . next ( configUpdate ) ) ;
278
+ }
279
+
280
+ private getChangedParams (
281
+ newConfig : FirebaseRemoteConfigObject ,
282
+ oldConfig : FirebaseRemoteConfigObject
283
+ ) : Set < string > {
284
+ const changed = new Set < string > ( ) ;
285
+ const newKeys = new Set ( Object . keys ( newConfig || { } ) ) ;
286
+ const oldKeys = new Set ( Object . keys ( oldConfig || { } ) ) ;
287
+
288
+ for ( const key of newKeys ) {
289
+ if ( ! oldKeys . has ( key ) ) {
290
+ changed . add ( key ) ;
291
+ continue ;
292
+ }
293
+ if (
294
+ JSON . stringify ( ( newConfig as any ) [ key ] ) !==
295
+ JSON . stringify ( ( oldConfig as any ) [ key ] )
296
+ ) {
297
+ changed . add ( key ) ;
298
+ continue ;
299
+ }
300
+ }
301
+
302
+ for ( const key of oldKeys ) {
303
+ if ( ! newKeys . has ( key ) ) {
304
+ changed . add ( key ) ;
305
+ }
306
+ }
307
+ return changed ;
308
+ }
309
+
310
+ private async fetchLatestConfig (
311
+ remainingAttempts : number ,
312
+ targetVersion : number
313
+ ) : Promise < void > {
314
+ const remainingAttemptsAfterFetch = remainingAttempts - 1 ;
315
+ const currentAttempt = MAXIMUM_FETCH_ATTEMPTS - remainingAttemptsAfterFetch ;
316
+ const customSignals = this . storageCache . getCustomSignals ( ) ;
317
+ if ( customSignals ) {
318
+ this . logger . debug (
319
+ `Fetching config with custom signals: ${ JSON . stringify ( customSignals ) } `
320
+ ) ;
321
+ }
322
+ try {
323
+ const fetchRequest : FetchRequest = {
324
+ cacheMaxAgeMillis : 0 ,
325
+ signal : new RemoteConfigAbortSignal ( ) ,
326
+ customSignals : customSignals ,
327
+ fetchType : 'REALTIME' ,
328
+ fetchAttempt : currentAttempt
329
+ } ;
330
+
331
+ const fetchResponse : FetchResponse = await this . restClient . fetch (
332
+ fetchRequest
333
+ ) ;
334
+ let activatedConfigs = await this . storage . getActiveConfig ( ) ;
335
+
336
+ if ( ! this . fetchResponseIsUpToDate ( fetchResponse , targetVersion ) ) {
337
+ this . logger . debug (
338
+ "Fetched template version is the same as SDK's current version." +
339
+ ' Retrying fetch.'
340
+ ) ;
341
+ // Continue fetching until template version number is greater than current.
342
+ await this . autoFetch ( remainingAttemptsAfterFetch , targetVersion ) ;
343
+ return ;
344
+ }
345
+
346
+ if ( fetchResponse . config == null ) {
347
+ this . logger . debug (
348
+ 'The fetch succeeded, but the backend had no updates.'
349
+ ) ;
350
+ return ;
351
+ }
352
+
353
+ if ( activatedConfigs == null ) {
354
+ activatedConfigs = { } ;
355
+ }
356
+
357
+ const updatedKeys = this . getChangedParams (
358
+ fetchResponse . config ,
359
+ activatedConfigs
360
+ ) ;
361
+
362
+ if ( updatedKeys . size === 0 ) {
363
+ this . logger . debug ( 'Config was fetched, but no params changed.' ) ;
364
+ return ;
365
+ }
366
+
367
+ const configUpdate : ConfigUpdate = {
368
+ getUpdatedKeys ( ) : Set < string > {
369
+ return new Set ( updatedKeys ) ;
370
+ }
371
+ } ;
372
+ this . executeAllListenerCallbacks ( configUpdate ) ;
373
+ } catch ( e : unknown ) {
374
+ const errorMessage = e instanceof Error ? e . message : String ( e ) ;
375
+ const error = ERROR_FACTORY . create ( ErrorCode . CONFIG_UPDATE_NOT_FETCHED , {
376
+ originalErrorMessage : `Failed to auto-fetch config update: ${ errorMessage } `
377
+ } ) ;
378
+ this . propagateError ( error ) ;
379
+ }
380
+ }
381
+
382
+ private async autoFetch (
383
+ remainingAttempts : number ,
384
+ targetVersion : number
385
+ ) : Promise < void > {
386
+ if ( remainingAttempts === 0 ) {
387
+ const error = ERROR_FACTORY . create ( ErrorCode . CONFIG_UPDATE_NOT_FETCHED , {
388
+ originalErrorMessage :
389
+ 'Unable to fetch the latest version of the template.'
390
+ } ) ;
391
+ this . propagateError ( error ) ;
392
+ return ;
393
+ }
394
+
395
+ const timeTillFetch = this . getRandomInt ( 4 ) ;
396
+ setTimeout ( async ( ) => {
397
+ await this . fetchLatestConfig ( remainingAttempts , targetVersion ) ;
398
+ } , timeTillFetch ) ;
399
+ }
400
+
401
+ private async handleNotifications (
402
+ reader : ReadableStreamDefaultReader
403
+ ) : Promise < void > {
404
+ let partialConfigUpdateMessage : string ;
405
+ let currentConfigUpdateMessage = '' ;
406
+
407
+ while ( true ) {
408
+ const { done, value } = await reader . read ( ) ;
409
+ if ( done ) {
410
+ break ;
411
+ }
412
+
413
+ partialConfigUpdateMessage = this . decoder . decode ( value , { stream : true } ) ;
414
+ currentConfigUpdateMessage += partialConfigUpdateMessage ;
415
+
416
+ if ( partialConfigUpdateMessage . includes ( '}' ) ) {
417
+ currentConfigUpdateMessage = this . parseAndValidateConfigUpdateMessage (
418
+ currentConfigUpdateMessage
419
+ ) ;
420
+
421
+ if ( currentConfigUpdateMessage . length === 0 ) {
422
+ continue ;
423
+ }
424
+ try {
425
+ const jsonObject = JSON . parse ( currentConfigUpdateMessage ) ;
426
+
427
+ if ( this . isEventListenersEmpty ( ) ) {
428
+ break ;
429
+ }
430
+
431
+ if ( TEMPLATE_VERSION_KEY in jsonObject ) {
432
+ const oldTemplateVersion =
433
+ await this . storage . getLastKnownTemplateVersion ( ) ;
434
+ let targetTemplateVersion = Number (
435
+ jsonObject [ TEMPLATE_VERSION_KEY ]
436
+ ) ;
437
+
438
+ if (
439
+ oldTemplateVersion &&
440
+ targetTemplateVersion > oldTemplateVersion
441
+ ) {
442
+ await this . autoFetch (
443
+ MAXIMUM_FETCH_ATTEMPTS ,
444
+ targetTemplateVersion
445
+ ) ;
446
+ }
447
+ }
448
+ } catch ( e : any ) {
449
+ this . logger . error ( 'Unable to parse latest config update message.' , e ) ;
450
+ this . propagateError (
451
+ ERROR_FACTORY . create ( ErrorCode . CONFIG_UPDATE_MESSAGE_INVALID , {
452
+ originalErrorMessage : e
453
+ } )
454
+ ) ;
455
+ }
456
+ currentConfigUpdateMessage = '' ;
457
+ }
458
+ }
459
+ }
460
+
461
+ public async listenForNotifications (
462
+ reader : ReadableStreamDefaultReader
463
+ ) : Promise < void > {
464
+ try {
465
+ await this . handleNotifications ( reader ) ;
466
+ } catch ( e ) {
467
+ if ( ! this . isInBackground ) {
468
+ this . logger . debug (
469
+ 'Real-time connection was closed due to an exception.' ,
470
+ e
471
+ ) ;
472
+ }
473
+ } finally {
474
+ if ( this . reader ) {
475
+ this . reader . cancel ( ) ;
476
+ this . reader = undefined ;
477
+ }
478
+ }
479
+ }
480
+
232
481
/**
233
482
* Open the real-time connection, begin listening for updates, and auto-fetch when an update is
234
483
* received.
@@ -263,8 +512,8 @@ export class RealtimeHandler {
263
512
if ( response . ok && response . body ) {
264
513
this . resetRetryCount ( ) ;
265
514
await this . resetRealtimeBackoff ( ) ;
266
- // const configAutoFetch = this.startAutoFetch(reader );
267
- //await configAutoFetch .listenForNotifications();
515
+ const redaer = response . body . getReader ( ) ;
516
+ this . listenForNotifications ( redaer ) ;
268
517
}
269
518
} catch ( error ) {
270
519
if ( this . isInBackground ) {
0 commit comments