Skip to content

Commit a310bac

Browse files
committed
Cleanup. Final iteration result handling that works for both Rust and C_SHARP sync implementations.
1 parent 1037191 commit a310bac

File tree

1 file changed

+30
-42
lines changed

1 file changed

+30
-42
lines changed

PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs

Lines changed: 30 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -300,31 +300,17 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
300300
// - Close any sync stream ReadableStreams (which will also close any established network requests)
301301
while (true)
302302
{
303-
Console.WriteLine(1);
304303
UpdateSyncStatus(new SyncStatusOptions { Connecting = true });
305-
Console.WriteLine(2);
306304
var iterationResult = (StreamingSyncIterationResult?)null;
305+
var shouldDelayRetry = true;
306+
307307
try
308308
{
309-
Console.WriteLine(3);
310309
if (signal.Value.IsCancellationRequested)
311310
{
312-
Console.WriteLine("BREAKINGNNNNG");
313311
break;
314312
}
315-
Console.WriteLine(4);
316313
iterationResult = await StreamingSyncIteration(nestedCts.Token, options);
317-
if (!iterationResult.Retry)
318-
{
319-
Console.WriteLine(5);
320-
321-
// A sync error ocurred that we cannot recover from here.
322-
// This loop must terminate.
323-
// The nestedCts will close any open network requests and streams below.
324-
break;
325-
}
326-
// Continue immediately
327-
Console.WriteLine(6);
328314
}
329315
catch (Exception ex)
330316
{
@@ -334,15 +320,20 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
334320
exMessage = "Stream closed or timed out -" + ex.InnerException.Message;
335321
}
336322

337-
Console.WriteLine(7);
338-
logger.LogError("Caught exception in streaming sync: {message}", exMessage);
339-
Console.WriteLine(8);
340323
// Either:
341324
// - A network request failed with a failed connection or not OKAY response code.
342325
// - There was a sync processing error.
343326
// This loop will retry.
344327
// The nested abort controller will cleanup any open network requests and streams.
345-
// The WebRemote should only abort pending fetch requests or close active Readable streams.
328+
if (nestedCts.IsCancellationRequested)
329+
{
330+
logger.LogWarning("Caught exception in streaming sync: {message}", exMessage);
331+
shouldDelayRetry = false;
332+
}
333+
else
334+
{
335+
logger.LogError("Caught exception in streaming sync: {message}", exMessage);
336+
}
346337

347338
UpdateSyncStatus(new SyncStatusOptions
348339
{
@@ -352,14 +343,9 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
352343
DownloadError = ex
353344
}
354345
});
355-
356-
Console.WriteLine(9);
357-
await DelayRetry();
358-
Console.WriteLine(10);
359346
}
360347
finally
361348
{
362-
Console.WriteLine(11);
363349
notifyCompletedUploads = null;
364350

365351
if (!signal.Value.IsCancellationRequested)
@@ -369,19 +355,21 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
369355
nestedCts = new CancellationTokenSource();
370356
}
371357

372-
373-
// if (iterationResult != null && iterationResult.ImmediateRestart.GetValueOrDefault(false))
374-
// {
375-
Console.WriteLine(12);
376-
UpdateSyncStatus(new SyncStatusOptions
358+
if (iterationResult != null && (iterationResult.ImmediateRestart != true && iterationResult.LegacyRetry != true))
377359
{
378-
Connected = false,
379-
Connecting = true // May be unnecessary
380-
});
381360

382-
// On error, wait a little before retrying
361+
UpdateSyncStatus(new SyncStatusOptions
362+
{
363+
Connected = false,
364+
Connecting = true
365+
});
383366

384-
// }
367+
// On error, wait a little before retrying
368+
if (shouldDelayRetry)
369+
{
370+
await DelayRetry();
371+
}
372+
}
385373
}
386374
}
387375

@@ -395,7 +383,7 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
395383

396384
protected record StreamingSyncIterationResult
397385
{
398-
public bool Retry { get; init; }
386+
public bool? LegacyRetry { get; init; }
399387

400388
public bool? ImmediateRestart { get; init; }
401389
}
@@ -631,7 +619,7 @@ protected async Task<StreamingSyncIterationResult> LegacyStreamingSyncIteration(
631619
{
632620
logger.LogDebug("Stream has closed while waiting");
633621
// The stream has closed while waiting
634-
return new StreamingSyncIterationResult { Retry = true };
622+
return new StreamingSyncIterationResult { LegacyRetry = true };
635623
}
636624

637625
// A connection is active and messages are being received
@@ -678,7 +666,7 @@ protected async Task<StreamingSyncIterationResult> LegacyStreamingSyncIteration(
678666
// This means checksums failed. Start again with a new checkpoint.
679667
// TODO: better back-off
680668
await Task.Delay(50);
681-
return new StreamingSyncIterationResult { Retry = true };
669+
return new StreamingSyncIterationResult { LegacyRetry = true };
682670
}
683671
else if (!result.Ready)
684672
{
@@ -780,14 +768,14 @@ protected async Task<StreamingSyncIterationResult> LegacyStreamingSyncIteration(
780768
// (uses the same one), this should have some delay.
781769
//
782770
await DelayRetry();
783-
return new StreamingSyncIterationResult { Retry = true };
771+
return new StreamingSyncIterationResult { LegacyRetry = true };
784772
}
785773
else if (remainingSeconds < 30)
786774
{
787775
logger.LogDebug("Token will expire soon; reconnect");
788776
// Pre-emptively refresh the token
789777
Options.Remote.InvalidateCredentials();
790-
return new StreamingSyncIterationResult { Retry = true };
778+
return new StreamingSyncIterationResult { LegacyRetry = true };
791779
}
792780
TriggerCrudUpload();
793781
}
@@ -816,7 +804,7 @@ protected async Task<StreamingSyncIterationResult> LegacyStreamingSyncIteration(
816804
// This means checksums failed. Start again with a new checkpoint.
817805
// TODO: better back-off
818806
await Task.Delay(50);
819-
return new StreamingSyncIterationResult { Retry = false };
807+
return new StreamingSyncIterationResult { LegacyRetry = false };
820808
}
821809
else if (!result.Ready)
822810
{
@@ -846,7 +834,7 @@ protected async Task<StreamingSyncIterationResult> LegacyStreamingSyncIteration(
846834

847835
logger.LogDebug("Stream input empty");
848836
// Connection closed. Likely due to auth issue.
849-
return new StreamingSyncIterationResult { Retry = true };
837+
return new StreamingSyncIterationResult { LegacyRetry = true };
850838
}
851839

852840
public new void Close()

0 commit comments

Comments
 (0)