Skip to content

Commit a6b8b1f

Browse files
committed
Using event stream to interface with central control call.
1 parent 8bdff42 commit a6b8b1f

File tree

2 files changed

+59
-22
lines changed

2 files changed

+59
-22
lines changed

PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs

Lines changed: 56 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,12 @@ protected record StreamingSyncIterationResult
396396
public bool? ImmediateRestart { get; init; }
397397
}
398398

399+
protected record EnqueuedCommand
400+
{
401+
public string Command { get; init; } = null!;
402+
public object? Payload { get; init; }
403+
}
404+
399405

400406
protected async Task<StreamingSyncIterationResult> StreamingSyncIteration(CancellationToken signal, PowerSyncConnectionOptions? options)
401407
{
@@ -423,12 +429,14 @@ protected async Task<StreamingSyncIterationResult> StreamingSyncIteration(Cancel
423429
});
424430
}
425431

432+
426433
protected async Task<StreamingSyncIterationResult> RustStreamingSyncIteration(CancellationToken? signal, RequiredPowerSyncConnectionOptions resolvedOptions)
427434
{
428435
Task? receivingLines = null;
429436
bool hadSyncLine = false;
430437
bool hideDisconnectOnRestart = false;
431438

439+
var controlInvocations = (EventStream<EnqueuedCommand>)null!;
432440

433441
var nestedCts = new CancellationTokenSource();
434442
signal?.Register(() => { nestedCts.Cancel(); });
@@ -442,32 +450,52 @@ async Task Connect(EstablishSyncStream instruction)
442450
Data = instruction.Request
443451
};
444452

445-
var stream = await Options.Remote.PostStreamRaw(syncOptions);
446-
using var reader = new StreamReader(stream, Encoding.UTF8);
447-
448-
syncOptions.CancellationToken.Register(() =>
453+
controlInvocations = new EventStream<EnqueuedCommand>();
454+
try
449455
{
450-
try { stream?.Close(); } catch { }
451-
});
456+
controlInvocations?.RunListenerAsync(async (line) =>
457+
{
458+
await Control(line.Command, line.Payload);
452459

453-
UpdateSyncStatus(new SyncStatusOptions
454-
{
455-
Connected = true
456-
});
460+
// Triggers a local CRUD upload when the first sync line has been received.
461+
// This allows uploading local changes that have been made while offline or disconnected.
462+
if (!hadSyncLine)
463+
{
464+
TriggerCrudUpload();
465+
hadSyncLine = true;
466+
}
467+
});
457468

458-
string? line;
459-
while ((line = await reader.ReadLineAsync()) != null)
460-
{
461-
await Control(PowerSyncControlCommand.PROCESS_TEXT_LINE, line);
469+
var stream = await Options.Remote.PostStreamRaw(syncOptions);
470+
using var reader = new StreamReader(stream, Encoding.UTF8);
462471

463-
// Triggers a local CRUD upload when the first sync line has been received.
464-
// This allows uploading local changes that have been made while offline or disconnected.
465-
if (!hadSyncLine)
472+
syncOptions.CancellationToken.Register(() =>
466473
{
467-
TriggerCrudUpload();
468-
hadSyncLine = true;
474+
try { stream?.Close(); } catch { }
475+
});
476+
477+
UpdateSyncStatus(new SyncStatusOptions
478+
{
479+
Connected = true
480+
});
481+
482+
string? line;
483+
while ((line = await reader.ReadLineAsync()) != null)
484+
{
485+
controlInvocations?.Emit(new EnqueuedCommand
486+
{
487+
Command = PowerSyncControlCommand.PROCESS_TEXT_LINE,
488+
Payload = line
489+
});
490+
469491
}
470492
}
493+
finally
494+
{
495+
var activeInstructions = controlInvocations;
496+
controlInvocations = null;
497+
activeInstructions?.Close();
498+
}
471499
}
472500

473501
async Task Stop()
@@ -553,7 +581,10 @@ async Task HandleInstruction(Instruction instruction)
553581
try
554582
{
555583
await Options.Remote.FetchCredentials();
556-
await Control(PowerSyncControlCommand.NOTIFY_TOKEN_REFRESHED);
584+
controlInvocations?.Emit(new EnqueuedCommand
585+
{
586+
Command = PowerSyncControlCommand.NOTIFY_TOKEN_REFRESHED
587+
});
557588
}
558589
catch (Exception err)
559590
{
@@ -586,9 +617,12 @@ async Task HandleInstruction(Instruction instruction)
586617
{
587618
Task.Run(async () =>
588619
{
589-
if (!nestedCts.IsCancellationRequested)
620+
if (controlInvocations != null && !controlInvocations.Closed)
590621
{
591-
await Control(PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED);
622+
controlInvocations?.Emit(new EnqueuedCommand
623+
{
624+
Command = PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED
625+
});
592626
}
593627
});
594628
};

PowerSync/PowerSync.Common/Utils/EventStream.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ CancellationTokenSource RunListenerAsync(
2525
public class EventStream<T> : IEventStream<T>
2626
{
2727

28+
public bool Closed = false;
29+
2830
// Closest implementation to a ConcurrentSet<T> in .Net
2931
private readonly ConcurrentDictionary<Channel<T>, byte> subscribers = new();
3032

@@ -158,6 +160,7 @@ public void Close()
158160
subscriber.Writer.TryComplete();
159161
RemoveSubscriber(subscriber);
160162
}
163+
Closed = true;
161164
}
162165

163166
private void RemoveSubscriber(Channel<T> channel)

0 commit comments

Comments
 (0)