@@ -116,8 +116,8 @@ export class RemoteStore {
116
116
117
117
private accumulatedWatchChanges : WatchChange [ ] = [ ] ;
118
118
119
- private watchStream : PersistentListenStream ;
120
- private writeStream : PersistentWriteStream ;
119
+ private watchStream : PersistentListenStream = null ;
120
+ private writeStream : PersistentWriteStream = null ;
121
121
122
122
/**
123
123
* The online state of the watch stream. The state is set to healthy if and
@@ -149,10 +149,7 @@ export class RemoteStore {
149
149
* LocalStore, etc.
150
150
*/
151
151
start ( ) : Promise < void > {
152
- return this . setupStreams ( ) . then ( ( ) => {
153
- // Resume any writes
154
- return this . fillWritePipeline ( ) ;
155
- } ) ;
152
+ return this . enableNetwork ( ) ;
156
153
}
157
154
158
155
private setOnlineStateToHealthy ( ) : void {
@@ -192,7 +189,26 @@ export class RemoteStore {
192
189
}
193
190
}
194
191
195
- private setupStreams ( ) : Promise < void > {
192
+ private isNetworkEnabled ( ) : boolean {
193
+ assert (
194
+ ( this . watchStream == null ) == ( this . writeStream == null ) ,
195
+ 'WatchStream and WriteStream should both be null or non-null'
196
+ ) ;
197
+ return this . watchStream != null ;
198
+ }
199
+
200
+ /** Re-enables the network. Only to be called as the counterpart to disableNetwork(). */
201
+ enableNetwork ( ) : Promise < void > {
202
+ assert (
203
+ this . watchStream == null ,
204
+ 'enableNetwork() called with non-null watchStream.'
205
+ ) ;
206
+ assert (
207
+ this . writeStream == null ,
208
+ 'enableNetwork() called with non-null writeStream.'
209
+ ) ;
210
+
211
+ // Create new streams (but note they're not started yet).
196
212
this . watchStream = this . datastore . newPersistentWatchStream ( {
197
213
onOpen : this . onWatchStreamOpen . bind ( this ) ,
198
214
onClose : this . onWatchStreamClose . bind ( this ) ,
@@ -208,15 +224,38 @@ export class RemoteStore {
208
224
// Load any saved stream token from persistent storage
209
225
return this . localStore . getLastStreamToken ( ) . then ( token => {
210
226
this . writeStream . lastStreamToken = token ;
227
+
228
+ if ( this . shouldStartWatchStream ( ) ) {
229
+ this . startWatchStream ( ) ;
230
+ }
231
+
232
+ this . updateAndBroadcastOnlineState ( OnlineState . Unknown ) ;
233
+
234
+ return this . fillWritePipeline ( ) ; // This may start the writeStream.
211
235
} ) ;
212
236
}
213
237
214
- shutdown ( ) : Promise < void > {
215
- log . debug ( LOG_TAG , 'RemoteStore shutting down.' ) ;
216
- this . cleanupWatchStreamState ( ) ;
217
- this . writeStream . stop ( ) ;
238
+ /** Temporarily disables the network. The network can be re-enabled using enableNetwork(). */
239
+ disableNetwork ( ) : Promise < void > {
240
+ this . updateAndBroadcastOnlineState ( OnlineState . Failed ) ;
241
+
242
+ // NOTE: We're guaranteed not to get any further events from these streams (not even a close
243
+ // event).
218
244
this . watchStream . stop ( ) ;
245
+ this . writeStream . stop ( ) ;
246
+
247
+ this . cleanUpWatchStreamState ( ) ;
248
+ this . cleanUpWriteStreamState ( ) ;
219
249
250
+ this . writeStream = null ;
251
+ this . watchStream = null ;
252
+
253
+ return Promise . resolve ( ) ;
254
+ }
255
+
256
+ shutdown ( ) : Promise < void > {
257
+ log . debug ( LOG_TAG , 'RemoteStore shutting down.' ) ;
258
+ this . disableNetwork ( ) ;
220
259
return Promise . resolve ( undefined ) ;
221
260
}
222
261
@@ -228,11 +267,12 @@ export class RemoteStore {
228
267
) ;
229
268
// Mark this as something the client is currently listening for.
230
269
this . listenTargets [ queryData . targetId ] = queryData ;
231
- if ( this . watchStream . isOpen ( ) ) {
232
- this . sendWatchRequest ( queryData ) ;
233
- } else if ( ! this . watchStream . isStarted ( ) ) {
270
+
271
+ if ( this . shouldStartWatchStream ( ) ) {
234
272
// The listen will be sent in onWatchStreamOpen
235
273
this . startWatchStream ( ) ;
274
+ } else if ( this . isNetworkEnabled ( ) && this . watchStream . isOpen ( ) ) {
275
+ this . sendWatchRequest ( queryData ) ;
236
276
}
237
277
}
238
278
@@ -244,7 +284,7 @@ export class RemoteStore {
244
284
) ;
245
285
const queryData = this . listenTargets [ targetId ] ;
246
286
delete this . listenTargets [ targetId ] ;
247
- if ( this . watchStream . isOpen ( ) ) {
287
+ if ( this . isNetworkEnabled ( ) && this . watchStream . isOpen ( ) ) {
248
288
this . sendUnwatchRequest ( targetId ) ;
249
289
}
250
290
}
@@ -279,10 +319,9 @@ export class RemoteStore {
279
319
}
280
320
281
321
private startWatchStream ( ) : void {
282
- assert ( ! this . watchStream . isStarted ( ) , "Can't restart started watch stream" ) ;
283
322
assert (
284
323
this . shouldStartWatchStream ( ) ,
285
- 'Tried to start watch stream even though it should not be started '
324
+ 'startWriteStream() called when shouldStartWatchStream() is false. '
286
325
) ;
287
326
this . watchStream . start ( ) ;
288
327
}
@@ -292,10 +331,14 @@ export class RemoteStore {
292
331
* active targets trying to be listened too
293
332
*/
294
333
private shouldStartWatchStream ( ) : boolean {
295
- return ! objUtils . isEmpty ( this . listenTargets ) ;
334
+ return (
335
+ this . isNetworkEnabled ( ) &&
336
+ ! this . watchStream . isStarted ( ) &&
337
+ ! objUtils . isEmpty ( this . listenTargets )
338
+ ) ;
296
339
}
297
340
298
- private cleanupWatchStreamState ( ) : void {
341
+ private cleanUpWatchStreamState ( ) : void {
299
342
// If the connection is closed then we'll never get a snapshot version for
300
343
// the accumulated changes and so we'll never be able to complete the batch.
301
344
// When we start up again the server is going to resend these changes
@@ -314,7 +357,12 @@ export class RemoteStore {
314
357
}
315
358
316
359
private onWatchStreamClose ( error : FirestoreError | null ) : Promise < void > {
317
- this . cleanupWatchStreamState ( ) ;
360
+ assert (
361
+ this . isNetworkEnabled ( ) ,
362
+ 'onWatchStreamClose() should only be called when the network is enabled'
363
+ ) ;
364
+
365
+ this . cleanUpWatchStreamState ( ) ;
318
366
319
367
// If there was an error, retry the connection.
320
368
if ( this . shouldStartWatchStream ( ) ) {
@@ -510,6 +558,11 @@ export class RemoteStore {
510
558
return promiseChain ;
511
559
}
512
560
561
+ cleanUpWriteStreamState ( ) {
562
+ this . lastBatchSeen = BATCHID_UNKNOWN ;
563
+ this . pendingWrites = [ ] ;
564
+ }
565
+
513
566
/**
514
567
* Notifies that there are new mutations to process in the queue. This is
515
568
* typically called by SyncEngine after it has sent mutations to LocalStore.
@@ -543,7 +596,9 @@ export class RemoteStore {
543
596
* writes complete the backend will be able to accept more.
544
597
*/
545
598
canWriteMutations ( ) : boolean {
546
- return this . pendingWrites . length < MAX_PENDING_WRITES ;
599
+ return (
600
+ this . isNetworkEnabled ( ) && this . pendingWrites . length < MAX_PENDING_WRITES
601
+ ) ;
547
602
}
548
603
549
604
// For testing
@@ -565,15 +620,26 @@ export class RemoteStore {
565
620
566
621
this . pendingWrites . push ( batch ) ;
567
622
568
- if ( ! this . writeStream . isStarted ( ) ) {
623
+ if ( this . shouldStartWriteStream ( ) ) {
569
624
this . startWriteStream ( ) ;
570
- } else if ( this . writeStream . handshakeComplete ) {
625
+ } else if ( this . isNetworkEnabled ( ) && this . writeStream . handshakeComplete ) {
571
626
this . writeStream . writeMutations ( batch . mutations ) ;
572
627
}
573
628
}
574
629
630
+ private shouldStartWriteStream ( ) : boolean {
631
+ return (
632
+ this . isNetworkEnabled ( ) &&
633
+ ! this . writeStream . isStarted ( ) &&
634
+ this . pendingWrites . length > 0
635
+ ) ;
636
+ }
637
+
575
638
private startWriteStream ( ) : void {
576
- assert ( ! this . writeStream . isStarted ( ) , "Can't restart started write stream" ) ;
639
+ assert (
640
+ this . shouldStartWriteStream ( ) ,
641
+ 'startWriteStream() called when shouldStartWriteStream() is false.'
642
+ ) ;
577
643
this . writeStream . start ( ) ;
578
644
}
579
645
@@ -632,6 +698,11 @@ export class RemoteStore {
632
698
}
633
699
634
700
private onWriteStreamClose ( error ?: FirestoreError ) : Promise < void > {
701
+ assert (
702
+ this . isNetworkEnabled ( ) ,
703
+ 'onWriteStreamClose() should only be called when the network is enabled'
704
+ ) ;
705
+
635
706
// Ignore close if there are no pending writes.
636
707
if ( this . pendingWrites . length > 0 ) {
637
708
assert (
@@ -653,7 +724,7 @@ export class RemoteStore {
653
724
return errorHandling . then ( ( ) => {
654
725
// The write stream might have been started by refilling the write
655
726
// pipeline for failed writes
656
- if ( this . pendingWrites . length > 0 && ! this . writeStream . isStarted ( ) ) {
727
+ if ( this . shouldStartWriteStream ( ) ) {
657
728
this . startWriteStream ( ) ;
658
729
}
659
730
} ) ;
@@ -713,33 +784,10 @@ export class RemoteStore {
713
784
handleUserChange ( user : User ) : Promise < void > {
714
785
log . debug ( LOG_TAG , 'RemoteStore changing users: uid=' , user . uid ) ;
715
786
716
- // Clear pending writes because those are per-user. Watched targets
717
- // persist across users so don't clear those.
718
- this . lastBatchSeen = BATCHID_UNKNOWN ;
719
- this . pendingWrites = [ ] ;
720
-
721
- // Stop the streams. They promise not to call us back.
722
- this . watchStream . stop ( ) ;
723
- this . writeStream . stop ( ) ;
724
-
725
- this . cleanupWatchStreamState ( ) ;
726
-
727
- // Create new streams (but note they're not started yet).
728
- return this . setupStreams ( )
729
- . then ( ( ) => {
730
- // If there are any watchedTargets, properly handle the stream
731
- // restart now that RemoteStore is ready to handle them.
732
- if ( this . shouldStartWatchStream ( ) ) {
733
- this . startWatchStream ( ) ;
734
- }
735
-
736
- // Resume any writes
737
- return this . fillWritePipeline ( ) ;
738
- } )
739
- . then ( ( ) => {
740
- // User change moves us back to the unknown state because we might
741
- // not want to re-open the stream
742
- this . setOnlineStateToUnknown ( ) ;
743
- } ) ;
787
+ // Tear down and re-create our network streams. This will ensure we get a fresh auth token
788
+ // for the new user and re-fill the write pipeline with new mutations from the LocalStore
789
+ // (since mutations are per-user).
790
+ this . disableNetwork ( ) ;
791
+ return this . enableNetwork ( ) ;
744
792
}
745
793
}
0 commit comments