@@ -35,7 +35,7 @@ class StreamingSyncImplementation implements StreamingSync {
3535 final InternalConnector connector;
3636 final ResolvedSyncOptions options;
3737
38- final Logger logger = isolateLogger ;
38+ final Logger logger;
3939
4040 final Stream <void > crudUpdateTriggerStream;
4141
@@ -68,14 +68,16 @@ class StreamingSyncImplementation implements StreamingSync {
6868 required http.Client client,
6969 Mutex ? syncMutex,
7070 Mutex ? crudMutex,
71+ Logger ? logger,
7172
7273 /// A unique identifier for this streaming sync implementation
7374 /// A good value is typically the DB file path which it will mutate when syncing.
7475 String ? identifier = "unknown" ,
7576 }) : _client = client,
7677 syncMutex = syncMutex ?? Mutex (identifier: "sync-$identifier " ),
7778 crudMutex = crudMutex ?? Mutex (identifier: "crud-$identifier " ),
78- _userAgentHeaders = userAgentHeaders ();
79+ _userAgentHeaders = userAgentHeaders (),
80+ logger = logger ?? isolateLogger;
7981
8082 Duration get _retryDelay => options.retryDelay;
8183
@@ -122,6 +124,7 @@ class StreamingSyncImplementation implements StreamingSync {
122124 @override
123125 Future <void > streamingSync () async {
124126 try {
127+ assert (_abort == null );
125128 _abort = AbortController ();
126129 clientId = await adapter.getClientId ();
127130 _crudLoop ();
@@ -310,7 +313,7 @@ class StreamingSyncImplementation implements StreamingSync {
310313 var merged = addBroadcast (requestStream, _nonLineSyncEvents.stream);
311314
312315 Future <void >? credentialsInvalidation;
313- bool haveInvalidated = false ;
316+ bool shouldStopIteration = false ;
314317
315318 // Trigger a CRUD upload on reconnect
316319 _internalCrudTriggerController.add (null );
@@ -336,6 +339,7 @@ class StreamingSyncImplementation implements StreamingSync {
336339 case StreamingSyncCheckpointComplete ():
337340 final result = await _applyCheckpoint (targetCheckpoint! , _abort);
338341 if (result.abort) {
342+ shouldStopIteration = true ;
339343 return ;
340344 }
341345 case StreamingSyncCheckpointPartiallyComplete (: final bucketPriority):
@@ -345,6 +349,7 @@ class StreamingSyncImplementation implements StreamingSync {
345349 // This means checksums failed. Start again with a new checkpoint.
346350 // TODO: better back-off
347351 // await new Promise((resolve) => setTimeout(resolve, 50));
352+ shouldStopIteration = true ;
348353 return ;
349354 } else if (! result.ready) {
350355 // If we have pending uploads, we can't complete new checkpoints
@@ -398,13 +403,14 @@ class StreamingSyncImplementation implements StreamingSync {
398403 if (tokenExpiresIn == 0 ) {
399404 // Token expired already - stop the connection immediately
400405 connector.prefetchCredentials (invalidate: true ).ignore ();
406+ shouldStopIteration = true ;
401407 break ;
402408 } else if (tokenExpiresIn <= 30 ) {
403409 // Token expires soon - refresh it in the background
404410 credentialsInvalidation ?? =
405411 connector.prefetchCredentials ().then ((_) {
406412 // Token has been refreshed - we should restart the connection.
407- haveInvalidated = true ;
413+ shouldStopIteration = true ;
408414 // trigger next loop iteration ASAP, don't wait for another
409415 // message from the server.
410416 if (! aborted) {
@@ -421,7 +427,7 @@ class StreamingSyncImplementation implements StreamingSync {
421427 }
422428
423429 await for (var line in merged) {
424- if (aborted || haveInvalidated ) {
430+ if (aborted || shouldStopIteration ) {
425431 break ;
426432 }
427433
@@ -434,10 +440,10 @@ class StreamingSyncImplementation implements StreamingSync {
434440 break ;
435441 case TokenRefreshComplete ():
436442 // We have a new token, so stop the iteration.
437- haveInvalidated = true ;
443+ shouldStopIteration = true ;
438444 }
439445
440- if (haveInvalidated ) {
446+ if (shouldStopIteration ) {
441447 // Stop this connection, so that a new one will be started
442448 break ;
443449 }
0 commit comments