Skip to content

Commit 0fbc7e4

Browse files
committed
Introduced PowerSyncControlCommand constants.
1 parent b6599c9 commit 0fbc7e4

File tree

2 files changed

+37
-30
lines changed

2 files changed

+37
-30
lines changed

PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,17 @@ namespace PowerSync.Common.Client.Sync.Bucket;
88
using PowerSync.Common.Utils;
99
using Newtonsoft.Json;
1010

11+
public static class PowerSyncControlCommand
12+
{
13+
public const string PROCESS_TEXT_LINE = "line_text";
14+
public const string PROCESS_BSON_LINE = "line_binary";
15+
public const string STOP = "stop";
16+
public const string START = "start";
17+
public const string NOTIFY_TOKEN_REFRESHED = "refreshed_token";
18+
public const string NOTIFY_CRUD_UPLOAD_COMPLETED = "completed_upload";
19+
public const string UPDATE_SUBSCRIPTIONS = "update_subscriptions";
20+
}
21+
1122
public class Checkpoint
1223
{
1324
[JsonProperty("last_op_id")]

PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -301,15 +301,15 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
301301
while (true)
302302
{
303303
UpdateSyncStatus(new SyncStatusOptions { Connecting = true });
304-
304+
var iterationResult = (StreamingSyncIterationResult?)null;
305305
try
306306
{
307307
if (signal.Value.IsCancellationRequested)
308308
{
309309
break;
310310
}
311-
var iterationResult = await StreamingSyncIteration(nestedCts.Token, options);
312-
if (!iterationResult.Retry)
311+
iterationResult = await StreamingSyncIteration(nestedCts.Token, options);
312+
if (!iterationResult.Retry.GetValueOrDefault(false))
313313
{
314314

315315
// A sync error ocurred that we cannot recover from here.
@@ -345,12 +345,10 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
345345
DownloadError = ex
346346
}
347347
});
348-
349-
// On error, wait a little before retrying
350-
await DelayRetry();
351348
}
352349
finally
353350
{
351+
notifyCompletedUploads = null;
354352

355353
if (!signal.Value.IsCancellationRequested)
356354
{
@@ -359,11 +357,18 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
359357
nestedCts = new CancellationTokenSource();
360358
}
361359

362-
UpdateSyncStatus(new SyncStatusOptions
360+
361+
if (iterationResult != null && iterationResult.ImmediateRestart.GetValueOrDefault(false))
363362
{
364-
Connected = false,
365-
Connecting = true // May be unnecessary
366-
});
363+
UpdateSyncStatus(new SyncStatusOptions
364+
{
365+
Connected = false,
366+
Connecting = true // May be unnecessary
367+
});
368+
369+
// On error, wait a little before retrying
370+
await DelayRetry();
371+
}
367372
}
368373
}
369374

@@ -377,7 +382,7 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
377382

378383
protected record StreamingSyncIterationResult
379384
{
380-
public bool Retry { get; init; }
385+
public bool? Retry { get; init; }
381386

382387
public bool? ImmediateRestart { get; init; }
383388
}
@@ -396,35 +401,24 @@ protected async Task<StreamingSyncIterationResult> StreamingSyncIteration(Cancel
396401
Params = options?.Params ?? DEFAULT_STREAM_CONNECTION_OPTIONS.Params,
397402
ClientImplementation = options?.ClientImplementation ?? DEFAULT_STREAM_CONNECTION_OPTIONS.ClientImplementation
398403
};
399-
Console.WriteLine("Using sync client implementation: " + resolvedOptions.ClientImplementation);
400404

401405
if (resolvedOptions.ClientImplementation == SyncClientImplementation.RUST)
402406
{
403-
// Use Rust-based sync implementation
404-
// var rustImpl = new RustStreamingSyncImplementation(Options);
405-
// return await rustImpl.RustSyncIteration(signal, resolvedOptions);
406-
407407
return await RustStreamingSyncIteration(signal, resolvedOptions);
408408
}
409409
else
410410
{
411-
// Use legacy C#-based sync implementation
412411
return await LegacyStreamingSyncIteration(signal, resolvedOptions);
413412
}
414413
}
415414
});
416415
}
417416

418-
// protected async Task<StreamingSyncIterationResult> RustStreamingSyncIteration(StreamingSyncImplementationOptions options, CancellationToken signal, RequiredPowerSyncConnectionOptions resolvedOptions)
419-
// {
420-
// // var rustImpl = new RustStreamingSyncImplementation(options);
421-
// // return await rustImpl.RustSyncIteration(signal, resolvedOptions);
422-
// throw new NotImplementedException("Rust streaming sync implementation is not yet implemented.");
423-
// }
424-
425417
protected async Task<StreamingSyncIterationResult> RustStreamingSyncIteration(CancellationToken? signal, RequiredPowerSyncConnectionOptions resolvedOptions)
426418
{
427419
Task? receivingLines = null;
420+
var hideDisconnectOnRestart = false;
421+
428422

429423
var nestedCts = new CancellationTokenSource();
430424
signal?.Register(() => { nestedCts.Cancel(); });
@@ -451,18 +445,18 @@ async Task Connect(EstablishSyncStream instruction)
451445
while ((line = await reader.ReadLineAsync()) != null)
452446
{
453447
logger.LogDebug("Parsing line for rust sync stream {message}", "xx");
454-
await Control("line_text", line);
448+
await Control(PowerSyncControlCommand.PROCESS_TEXT_LINE, line);
455449
}
456450
}
457451

458452
async Task Stop()
459453
{
460-
await Control("stop");
454+
await Control(PowerSyncControlCommand.STOP);
461455
}
462456

463457
async Task Control(string op, object? payload = null)
464458
{
465-
logger.LogDebug("Control call {message}", op);
459+
logger.LogTrace("Control call {message}", op);
466460

467461
var rawResponse = await Options.Adapter.Control(op, payload);
468462
HandleInstructions(Instruction.ParseInstructions(rawResponse));
@@ -531,6 +525,7 @@ void HandleInstruction(Instruction instruction)
531525
break;
532526
case CloseSyncStream:
533527
nestedCts.Cancel();
528+
hideDisconnectOnRestart = true;
534529
logger.LogWarning("Closing stream");
535530
break;
536531
case FlushFileSystem:
@@ -546,8 +541,9 @@ void HandleInstruction(Instruction instruction)
546541

547542
try
548543
{
549-
notifyCompletedUploads = () => { Task.Run(async () => await Control("completed_upload")); };
550-
await Control("start", JsonConvert.SerializeObject(new { parameters = resolvedOptions.Params }));
544+
await Control(PowerSyncControlCommand.START, JsonConvert.SerializeObject(new { parameters = resolvedOptions.Params }));
545+
notifyCompletedUploads = () => { Task.Run(async () => await Control(PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED)); };
546+
551547
if (receivingLines != null)
552548
{
553549
await receivingLines;
@@ -559,7 +555,7 @@ void HandleInstruction(Instruction instruction)
559555
await Stop();
560556
}
561557

562-
return new StreamingSyncIterationResult { Retry = true };
558+
return new StreamingSyncIterationResult { ImmediateRestart = hideDisconnectOnRestart };
563559
}
564560

565561
protected async Task<StreamingSyncIterationResult> LegacyStreamingSyncIteration(CancellationToken signal, RequiredPowerSyncConnectionOptions resolvedOptions)

0 commit comments

Comments
 (0)