@@ -119,6 +119,7 @@ class StreamingSyncImplementation implements StreamingSync {
119119
120120 // Now close the client in all cases not covered above
121121 _client.close ();
122+ _statusStreamController.close ();
122123 }
123124
124125 bool get aborted {
@@ -281,7 +282,7 @@ class StreamingSyncImplementation implements StreamingSync {
281282 for (final (i, priority) in existingPriorityState.indexed) {
282283 switch (
283284 BucketPriority .comparator (priority.priority, completed.priority)) {
284- case < 0 :
285+ case > 0 :
285286 // Entries from here on have a higher priority than the one that was
286287 // just completed
287288 final copy = existingPriorityState.toList ();
@@ -293,7 +294,7 @@ class StreamingSyncImplementation implements StreamingSync {
293294 copy[i] = completed;
294295 _updateStatus (statusInPriority: copy);
295296 return ;
296- case > 0 :
297+ case < 0 :
297298 continue ;
298299 }
299300 }
@@ -537,49 +538,55 @@ class StreamingSyncImplementation implements StreamingSync {
537538 return true ;
538539 }
539540
540- Stream <StreamingSyncLine ?> streamingSyncRequest (
541- StreamingSyncRequest data ) async * {
542- final credentials = await credentialsCallback ();
543- if (credentials == null ) {
544- throw CredentialsException ('Not logged in' );
545- }
546- final uri = credentials.endpointUri ('sync/stream' );
547-
548- final request = http.Request ('POST' , uri);
549- request.headers['Content-Type' ] = 'application/json' ;
550- request.headers['Authorization' ] = "Token ${credentials .token }" ;
551- request.headers.addAll (_userAgentHeaders);
552-
553- request.body = convert.jsonEncode (data);
554-
555- http.StreamedResponse res;
556- try {
557- // Do not close the client during the request phase - this causes uncaught errors.
558- _safeToClose = false ;
559- res = await _client.send (request);
560- } finally {
561- _safeToClose = true ;
562- }
563- if (aborted) {
564- return ;
565- }
541+ Stream <StreamingSyncLine ?> streamingSyncRequest (StreamingSyncRequest data) {
542+ Future <http. ByteStream ?> setup ( ) async {
543+ final credentials = await credentialsCallback ();
544+ if (credentials == null ) {
545+ throw CredentialsException ('Not logged in' );
546+ }
547+ final uri = credentials.endpointUri ('sync/stream' );
548+
549+ final request = http.Request ('POST' , uri);
550+ request.headers['Content-Type' ] = 'application/json' ;
551+ request.headers['Authorization' ] = "Token ${credentials .token }" ;
552+ request.headers.addAll (_userAgentHeaders);
553+
554+ request.body = convert.jsonEncode (data);
555+
556+ http.StreamedResponse res;
557+ try {
558+ // Do not close the client during the request phase - this causes uncaught errors.
559+ _safeToClose = false ;
560+ res = await _client.send (request);
561+ } finally {
562+ _safeToClose = true ;
563+ }
564+ if (aborted) {
565+ return null ;
566+ }
566567
567- if (res.statusCode == 401 ) {
568- if (invalidCredentialsCallback != null ) {
569- await invalidCredentialsCallback !();
568+ if (res.statusCode == 401 ) {
569+ if (invalidCredentialsCallback != null ) {
570+ await invalidCredentialsCallback !();
571+ }
570572 }
571- }
572- if (res.statusCode != 200 ) {
573- throw await SyncResponseException .fromStreamedResponse (res);
573+ if (res.statusCode != 200 ) {
574+ throw await SyncResponseException .fromStreamedResponse (res);
575+ }
576+
577+ return res.stream;
574578 }
575579
576- // Note: The response stream is automatically closed when this loop errors
577- await for (var line in ndjson (res.stream)) {
578- if (aborted) {
579- break ;
580+ return Stream .fromFuture (setup ()).asyncExpand ((stream) {
581+ if (stream == null || aborted) {
582+ return const Stream .empty ();
583+ } else {
584+ return ndjson (stream)
585+ .map ((line) =>
586+ StreamingSyncLine .fromJson (line as Map <String , dynamic >))
587+ .takeWhile ((_) => ! aborted);
580588 }
581- yield StreamingSyncLine .fromJson (line as Map <String , dynamic >);
582- }
589+ });
583590 }
584591
585592 /// Delays the standard `retryDelay` Duration, but exits early if
0 commit comments