@@ -2,6 +2,7 @@ namespace PowerSync.Common.Client.Sync.Stream;
22
33using System . Net . Sockets ;
44using System . Text ;
5+ using System . Threading . Tasks ;
56using Microsoft . Extensions . Logging ;
67using Microsoft . Extensions . Logging . Abstractions ;
78
@@ -425,7 +426,8 @@ protected async Task<StreamingSyncIterationResult> StreamingSyncIteration(Cancel
425426 protected async Task < StreamingSyncIterationResult > RustStreamingSyncIteration ( CancellationToken ? signal , RequiredPowerSyncConnectionOptions resolvedOptions )
426427 {
427428 Task ? receivingLines = null ;
428- var hideDisconnectOnRestart = false ;
429+ bool hadSyncLine = false ;
430+ bool hideDisconnectOnRestart = false ;
429431
430432
431433 var nestedCts = new CancellationTokenSource ( ) ;
@@ -448,12 +450,23 @@ async Task Connect(EstablishSyncStream instruction)
448450 try { stream ? . Close ( ) ; } catch { }
449451 } ) ;
450452
451- string ? line ;
453+ UpdateSyncStatus ( new SyncStatusOptions
454+ {
455+ Connected = true
456+ } ) ;
452457
458+ string ? line ;
453459 while ( ( line = await reader . ReadLineAsync ( ) ) != null )
454460 {
455- logger . LogDebug ( "Parsing line for rust sync stream {message}" , "xx" ) ;
456461 await Control ( PowerSyncControlCommand . PROCESS_TEXT_LINE , line ) ;
462+
463+ // Triggers a local CRUD upload when the first sync line has been received.
464+ // This allows uploading local changes that have been made while offline or disconnected.
465+ if ( ! hadSyncLine )
466+ {
467+ TriggerCrudUpload ( ) ;
468+ hadSyncLine = true ;
469+ }
457470 }
458471 }
459472
@@ -464,21 +477,20 @@ async Task Stop()
464477
465478 async Task Control ( string op , object ? payload = null )
466479 {
467- logger . LogTrace ( "Control call {message}" , op ) ;
468-
469480 var rawResponse = await Options . Adapter . Control ( op , payload ) ;
481+ logger . LogTrace ( "powersync_control {op}, {payload}, {rawResponse}" , op , payload , rawResponse ) ;
470482 HandleInstructions ( Instruction . ParseInstructions ( rawResponse ) ) ;
471483 }
472484
473- void HandleInstructions ( Instruction [ ] instructions )
485+ async void HandleInstructions ( Instruction [ ] instructions )
474486 {
475487 foreach ( var instruction in instructions )
476488 {
477- HandleInstruction ( instruction ) ;
489+ await HandleInstruction ( instruction ) ;
478490 }
479491 }
480492
481- void HandleInstruction ( Instruction instruction )
493+ async Task HandleInstruction ( Instruction instruction )
482494 {
483495 switch ( instruction )
484496 {
@@ -529,7 +541,26 @@ void HandleInstruction(Instruction instruction)
529541 receivingLines = Connect ( establishSyncStream ) ;
530542 break ;
531543 case FetchCredentials fetchCredentials :
532- Options . Remote . InvalidateCredentials ( ) ;
544+ if ( fetchCredentials . DidExpire )
545+ {
546+ Options . Remote . InvalidateCredentials ( ) ;
547+ }
548+ else
549+ {
550+ Options . Remote . InvalidateCredentials ( ) ;
551+
552+ // Restart iteration after the credentials have been refreshed.
553+ try
554+ {
555+ await Options . Remote . FetchCredentials ( ) ;
556+ await Control ( PowerSyncControlCommand . NOTIFY_TOKEN_REFRESHED ) ;
557+ }
558+ catch ( Exception err )
559+ {
560+ logger . LogWarning ( "Could not prefetch credentials: {message}" , err . Message ) ;
561+ }
562+
563+ }
533564 break ;
534565 case CloseSyncStream :
535566 nestedCts . Cancel ( ) ;
@@ -550,7 +581,17 @@ void HandleInstruction(Instruction instruction)
550581 try
551582 {
552583 await Control ( PowerSyncControlCommand . START , JsonConvert . SerializeObject ( new { parameters = resolvedOptions . Params } ) ) ;
553- notifyCompletedUploads = ( ) => { Task . Run ( async ( ) => await Control ( PowerSyncControlCommand . NOTIFY_CRUD_UPLOAD_COMPLETED ) ) ; } ;
584+
585+ notifyCompletedUploads = ( ) =>
586+ {
587+ Task . Run ( async ( ) =>
588+ {
589+ if ( ! nestedCts . IsCancellationRequested )
590+ {
591+ await Control ( PowerSyncControlCommand . NOTIFY_CRUD_UPLOAD_COMPLETED ) ;
592+ }
593+ } ) ;
594+ } ;
554595
555596 if ( receivingLines != null )
556597 {
0 commit comments