@@ -35,7 +35,7 @@ class StreamingSyncImplementation implements StreamingSync {
3535 final InternalConnector connector;
3636 final ResolvedSyncOptions options;
3737
38- final Logger logger = isolateLogger ;
38+ final Logger logger;
3939
4040 final Stream <void > crudUpdateTriggerStream;
4141
@@ -68,14 +68,16 @@ class StreamingSyncImplementation implements StreamingSync {
6868 required http.Client client,
6969 Mutex ? syncMutex,
7070 Mutex ? crudMutex,
71+ Logger ? logger,
7172
7273 /// A unique identifier for this streaming sync implementation
7374 /// A good value is typically the DB file path which it will mutate when syncing.
7475 String ? identifier = "unknown" ,
7576 }) : _client = client,
7677 syncMutex = syncMutex ?? Mutex (identifier: "sync-$identifier " ),
7778 crudMutex = crudMutex ?? Mutex (identifier: "crud-$identifier " ),
78- _userAgentHeaders = userAgentHeaders ();
79+ _userAgentHeaders = userAgentHeaders (),
80+ logger = logger ?? isolateLogger;
7981
8082 Duration get _retryDelay => options.retryDelay;
8183
@@ -122,6 +124,7 @@ class StreamingSyncImplementation implements StreamingSync {
122124 @override
123125 Future <void > streamingSync () async {
124126 try {
127+ assert (_abort == null );
125128 _abort = AbortController ();
126129 clientId = await adapter.getClientId ();
127130 _crudLoop ();
@@ -310,7 +313,7 @@ class StreamingSyncImplementation implements StreamingSync {
310313 var merged = addBroadcast (requestStream, _nonLineSyncEvents.stream);
311314
312315 Future <void >? credentialsInvalidation;
313- bool haveInvalidated = false ;
316+ bool shouldStopIteration = false ;
314317
315318 // Trigger a CRUD upload on reconnect
316319 _internalCrudTriggerController.add (null );
@@ -336,6 +339,7 @@ class StreamingSyncImplementation implements StreamingSync {
336339 case StreamingSyncCheckpointComplete ():
337340 final result = await _applyCheckpoint (targetCheckpoint! , _abort);
338341 if (result.abort) {
342+ shouldStopIteration = true ;
339343 return ;
340344 }
341345 case StreamingSyncCheckpointPartiallyComplete (: final bucketPriority):
@@ -345,6 +349,7 @@ class StreamingSyncImplementation implements StreamingSync {
345349 // This means checksums failed. Start again with a new checkpoint.
346350 // TODO: better back-off
347351 // await new Promise((resolve) => setTimeout(resolve, 50));
352+ shouldStopIteration = true ;
348353 return ;
349354 } else if (! result.ready) {
350355 // If we have pending uploads, we can't complete new checkpoints
@@ -404,7 +409,7 @@ class StreamingSyncImplementation implements StreamingSync {
404409 credentialsInvalidation ?? =
405410 connector.prefetchCredentials ().then ((_) {
406411 // Token has been refreshed - we should restart the connection.
407- haveInvalidated = true ;
412+ shouldStopIteration = true ;
408413 // trigger next loop iteration ASAP, don't wait for another
409414 // message from the server.
410415 if (! aborted) {
@@ -421,7 +426,7 @@ class StreamingSyncImplementation implements StreamingSync {
421426 }
422427
423428 await for (var line in merged) {
424- if (aborted || haveInvalidated ) {
429+ if (aborted || shouldStopIteration ) {
425430 break ;
426431 }
427432
@@ -434,10 +439,10 @@ class StreamingSyncImplementation implements StreamingSync {
434439 break ;
435440 case TokenRefreshComplete ():
436441 // We have a new token, so stop the iteration.
437- haveInvalidated = true ;
442+ shouldStopIteration = true ;
438443 }
439444
440- if (haveInvalidated ) {
445+ if (shouldStopIteration ) {
441446 // Stop this connection, so that a new one will be started
442447 break ;
443448 }
0 commit comments