diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 42f1e8f..7e70847 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,6 +17,9 @@ jobs: with: dotnet-version: '8.0' + - name: Install MAUI Workloads + run: dotnet workload restore + - name: Download PowerSync extension run: dotnet run --project Tools/Setup diff --git a/Directory.build.props b/Directory.build.props index 5b4d6fa..bcdc24a 100644 --- a/Directory.build.props +++ b/Directory.build.props @@ -1,5 +1,8 @@ + + $(MSBuildWarningsAsMessages);NETSDK1202 + diff --git a/PowerSync/PowerSync.Common/CHANGELOG.md b/PowerSync/PowerSync.Common/CHANGELOG.md index 21f94ac..4fc0f47 100644 --- a/PowerSync/PowerSync.Common/CHANGELOG.md +++ b/PowerSync/PowerSync.Common/CHANGELOG.md @@ -1,5 +1,8 @@ # PowerSync.Common Changelog +## 0.0.5-alpha.1 +- Using the latest (0.4.9) version of the core extension, it introduces support for the Rust Sync implementation and also makes it the default - users can still opt out and use the legacy C# sync implementation as option when calling `connect()`. + ## 0.0.4-alpha.1 - Fixed MAUI issues related to extension loading when installing package outside of the monorepo. diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index 835e75c..9f17882 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -60,11 +60,11 @@ public interface IPowerSyncDatabase : IEventStream Task Execute(string query, object[]? parameters = null); - Task GetAll(string sql, params object[]? parameters); + Task GetAll(string sql, object[]? parameters = null); - Task GetOptional(string sql, params object[]? parameters); + Task GetOptional(string sql, object[]? parameters = null); - Task Get(string sql, params object[]? parameters); + Task Get(string sql, object[]? parameters = null); Task ReadLock(Func> fn, DBLockOptions? options = null); @@ -88,7 +88,7 @@ public class PowerSyncDatabase : EventStream, IPowerSyncDataba private static readonly int DEFAULT_WATCH_THROTTLE_MS = 30; private static readonly Regex POWERSYNC_TABLE_MATCH = new Regex(@"(^ps_data__|^ps_data_local__)", RegexOptions.Compiled); - public bool Closed; + public new bool Closed; public bool Ready; protected Task isReadyTask; @@ -236,7 +236,7 @@ protected async Task UpdateHasSynced() ); DateTime? lastCompleteSync = null; - + // TODO: Will be altered/extended when reporting individual sync priority statuses is supported foreach (var result in results) { diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs index aa76d58..4611bb2 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs @@ -8,6 +8,17 @@ namespace PowerSync.Common.Client.Sync.Bucket; using PowerSync.Common.Utils; using Newtonsoft.Json; +public static class PowerSyncControlCommand +{ + public const string PROCESS_TEXT_LINE = "line_text"; + public const string PROCESS_BSON_LINE = "line_binary"; + public const string STOP = "stop"; + public const string START = "start"; + public const string NOTIFY_TOKEN_REFRESHED = "refreshed_token"; + public const string NOTIFY_CRUD_UPLOAD_COMPLETED = "completed_upload"; + public const string UPDATE_SUBSCRIPTIONS = "update_subscriptions"; +} + public class Checkpoint { [JsonProperty("last_op_id")] @@ -113,4 +124,9 @@ public interface IBucketStorageAdapter : IEventStream /// Get a unique client ID. /// Task GetClientId(); + + /// + /// Invokes the `powersync_control` function for the sync client. + /// + Task Control(string op, object? payload); } \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs index 495f6e0..78c549d 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs @@ -111,7 +111,7 @@ await db.WriteTransaction(async tx => foreach (var b in batch.Buckets) { var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - ["save", JsonConvert.SerializeObject(new { buckets = new[] { b.ToJSON() } })]); + ["save", JsonConvert.SerializeObject(new { buckets = new[] { JsonConvert.DeserializeObject(b.ToJSON()) } })]); logger.LogDebug("saveSyncData {message}", JsonConvert.SerializeObject(result)); count += b.Data.Length; } @@ -314,7 +314,7 @@ public async Task UpdateLocalTarget(Func> callback) } var rs = await db.GetAll( - "SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'" + "SELECT seq FROM main.sqlite_sequence WHERE name = 'ps_crud'" ); if (rs.Length == 0) @@ -338,7 +338,7 @@ public async Task UpdateLocalTarget(Func> callback) } var rsAfter = await tx.GetAll( - "SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'" + "SELECT seq FROM main.sqlite_sequence WHERE name = 'ps_crud'" ); if (rsAfter.Length == 0) @@ -351,16 +351,18 @@ public async Task UpdateLocalTarget(Func> callback) if (seqAfter != seqBefore) { - logger.LogDebug("[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})", seqAfter, seqBefore); + logger.LogDebug("[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})", seqAfter, + seqBefore); return false; } var response = await tx.Execute( - "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", - [opId] - ); + "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", + [opId] + ); - logger.LogDebug("[updateLocalTarget] Response from updating target_op: {response}", JsonConvert.SerializeObject(response)); + logger.LogDebug("[updateLocalTarget] Response from updating target_op: {response}", + JsonConvert.SerializeObject(response)); return true; }); } @@ -388,33 +390,33 @@ public async Task UpdateLocalTarget(Func> callback) var last = all[all.Length - 1]; return new CrudBatch( - Crud: all, - HaveMore: true, - CompleteCallback: async (string? writeCheckpoint) => - { - await db.WriteTransaction(async tx => + Crud: all, + HaveMore: true, + CompleteCallback: async (string? writeCheckpoint) => { - await tx.Execute("DELETE FROM ps_crud WHERE id <= ?", [last.ClientId]); - - if (!string.IsNullOrEmpty(writeCheckpoint)) + await db.WriteTransaction(async tx => { - var crudResult = await tx.GetAll("SELECT 1 FROM ps_crud LIMIT 1"); - if (crudResult?.Length > 0) + await tx.Execute("DELETE FROM ps_crud WHERE id <= ?", [last.ClientId]); + + if (!string.IsNullOrEmpty(writeCheckpoint)) + { + var crudResult = await tx.GetAll("SELECT 1 FROM ps_crud LIMIT 1"); + if (crudResult?.Length > 0) + { + await tx.Execute( + "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", + [writeCheckpoint]); + } + } + else { await tx.Execute( "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", - [writeCheckpoint]); + [GetMaxOpId()]); } - } - else - { - await tx.Execute( - "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", - [GetMaxOpId()]); - } - }); - } - ); + }); + } + ); } public async Task NextCrudItem() @@ -434,4 +436,15 @@ public async Task SetTargetCheckpoint(Checkpoint checkpoint) // No Op await Task.CompletedTask; } + + record ControlResult(string? r); + + public async Task Control(string op, object? payload = null) + { + return await db.WriteTransaction(async tx => + { + var result = await tx.Get("SELECT powersync_control(?, ?) AS r", [op, payload]); + return result.r!; + }); + } } diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs new file mode 100644 index 0000000..7bfef98 --- /dev/null +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs @@ -0,0 +1,129 @@ +using Newtonsoft.Json.Linq; +using Newtonsoft.Json; + +namespace PowerSync.Common.Client.Sync.Stream; + +/// +/// An internal instruction emitted by the sync client in the core extension in response to the +/// SDK passing sync data into the extension. +/// +public abstract class Instruction +{ + + public static Instruction[] ParseInstructions(string rawResponse) + { + var jsonArray = JArray.Parse(rawResponse); + List instructions = []; + + foreach (JObject item in jsonArray) + { + var instruction = ParseInstruction(item); + if (instruction == null) + { + throw new JsonSerializationException("Failed to parse instruction from JSON."); + } + instructions.Add(instruction); + } + + return instructions.ToArray(); + } + + public static Instruction? ParseInstruction(JObject json) + { + if (json.ContainsKey("LogLine")) + return json["LogLine"]!.ToObject(); + if (json.ContainsKey("UpdateSyncStatus")) + return json["UpdateSyncStatus"]!.ToObject(); + if (json.ContainsKey("EstablishSyncStream")) + return json["EstablishSyncStream"]!.ToObject(); + if (json.ContainsKey("FetchCredentials")) + return json["FetchCredentials"]!.ToObject(); + if (json.ContainsKey("CloseSyncStream")) + return new CloseSyncStream(); + if (json.ContainsKey("FlushFileSystem")) + return new FlushFileSystem(); + if (json.ContainsKey("DidCompleteSync")) + return new DidCompleteSync(); + + throw new JsonSerializationException("Unknown Instruction type."); + } +} + +public class LogLine : Instruction +{ + [JsonProperty("severity")] + public string Severity { get; set; } = null!; // "DEBUG", "INFO", "WARNING" + + [JsonProperty("line")] + public string Line { get; set; } = null!; +} + +public class EstablishSyncStream : Instruction +{ + [JsonProperty("request")] + public StreamingSyncRequest Request { get; set; } = null!; +} + +public class UpdateSyncStatus : Instruction +{ + [JsonProperty("status")] + public CoreSyncStatus Status { get; set; } = null!; +} + +public class CoreSyncStatus +{ + [JsonProperty("connected")] + public bool Connected { get; set; } + + [JsonProperty("connecting")] + public bool Connecting { get; set; } + + [JsonProperty("priority_status")] + public List PriorityStatus { get; set; } = []; + + [JsonProperty("downloading")] + public DownloadProgress? Downloading { get; set; } +} + +public class SyncPriorityStatus +{ + [JsonProperty("priority")] + public int Priority { get; set; } + + [JsonProperty("last_synced_at")] + public long LastSyncedAt { get; set; } + + [JsonProperty("has_synced")] + public bool? HasSynced { get; set; } +} + +public class DownloadProgress +{ + [JsonProperty("buckets")] + public Dictionary Buckets { get; set; } = null!; +} + +public class BucketProgress +{ + [JsonProperty("priority")] + public int Priority { get; set; } + + [JsonProperty("at_last")] + public int AtLast { get; set; } + + [JsonProperty("since_last")] + public int SinceLast { get; set; } + + [JsonProperty("target_count")] + public int TargetCount { get; set; } +} + +public class FetchCredentials : Instruction +{ + [JsonProperty("did_expire")] + public bool DidExpire { get; set; } +} + +public class CloseSyncStream : Instruction { } +public class FlushFileSystem : Instruction { } +public class DidCompleteSync : Instruction { } \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs index f47228f..d26795f 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs @@ -124,6 +124,37 @@ public async Task Get(string path, Dictionary? headers = n return JsonConvert.DeserializeObject(responseData)!; } + /// + /// Posts to the stream endpoint and returns a raw NDJSON stream that can be read line by line. + /// + public async Task PostStreamRaw(SyncStreamOptions options) + { + var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers); + var response = await httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, options.CancellationToken); + + if (response.Content == null) + { + throw new HttpRequestException($"HTTP {response.StatusCode}: No content"); + } + + if (response.StatusCode == System.Net.HttpStatusCode.Unauthorized) + { + InvalidateCredentials(); + } + + if (!response.IsSuccessStatusCode) + { + var errorText = await response.Content.ReadAsStringAsync(); + throw new HttpRequestException($"HTTP {response.StatusCode}: {errorText}"); + } + + return await response.Content.ReadAsStreamAsync(); + } + + + /// + /// Originally used for the C# streaming sync implementation. + /// public async IAsyncEnumerable PostStream(SyncStreamOptions options) { using var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers); diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index c2c56fb..62466cc 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -1,6 +1,8 @@ namespace PowerSync.Common.Client.Sync.Stream; using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -10,6 +12,30 @@ namespace PowerSync.Common.Client.Sync.Stream; using PowerSync.Common.DB.Crud; using PowerSync.Common.Utils; + + +public enum SyncClientImplementation +{ + + /// + /// This implementation offloads the sync line decoding and handling into the PowerSync core extension. + /// + /// ## Compatibility warning + /// + /// The Rust sync client stores sync data in a format that is slightly different than the one used + /// by the old C# implementation. When adopting the RUST client on existing + /// databases, the PowerSync SDK will migrate the format automatically. + RUST, + /// + /// Decodes and handles sync lines received from the sync service in C#. + /// + /// This is the legacy option. + /// + /// The explicit choice to use the C#-based sync implementation will be removed from a future version of the SDK. + /// + C_SHARP +} + public class AdditionalConnectionOptions(int? retryDelayMs = null, int? crudUploadThrottleMs = null) { /// @@ -48,17 +74,24 @@ public class StreamingSyncImplementationOptions : AdditionalConnectionOptions public ILogger? Logger { get; init; } } -public class BaseConnectionOptions(Dictionary? parameters = null) +public class BaseConnectionOptions(Dictionary? parameters = null, SyncClientImplementation? clientImplementation = null) { /// /// These parameters are passed to the sync rules and will be available under the `user_parameters` object. /// public Dictionary? Params { get; set; } = parameters; + + /// + /// Whether to use the RUST or C# sync client implementation. + /// + public SyncClientImplementation? ClientImplementation { get; set; } = clientImplementation; } public class RequiredPowerSyncConnectionOptions : BaseConnectionOptions { public new Dictionary Params { get; set; } = new(); + + public new SyncClientImplementation ClientImplementation { get; set; } = new(); } public class StreamingSyncImplementationEvent @@ -95,7 +128,8 @@ public class StreamingSyncImplementation : EventStream { - if (!SyncStatus.Connected || SyncStatus.DataFlowStatus.Uploading) + if (!SyncStatus.Connected || isUploadingCrud) { return; } - Task.Run(async () => await InternalUploadAllCrud()); + isUploadingCrud = true; + Task.Run(async () => + { + await InternalUploadAllCrud(); + notifyCompletedUploads?.Invoke(); + isUploadingCrud = false; + }); }; } @@ -259,13 +302,15 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio }); }); - /// This loops runs until [retry] is false or the abort signal is set to aborted. - /// Aborting the nestedCts will: - /// - Abort any pending fetch requests - /// - Close any sync stream ReadableStreams (which will also close any established network requests) + // This loops runs until [retry] is false or the abort signal is set to aborted. + // Aborting the nestedCts will: + // - Abort any pending fetch requests + // - Close any sync stream ReadableStreams (which will also close any established network requests) while (true) { UpdateSyncStatus(new SyncStatusOptions { Connecting = true }); + var iterationResult = (StreamingSyncIterationResult?)null; + var shouldDelayRetry = true; try { @@ -273,16 +318,7 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio { break; } - var iterationResult = await StreamingSyncIteration(nestedCts.Token, options); - if (!iterationResult.Retry) - { - - // A sync error ocurred that we cannot recover from here. - // This loop must terminate. - // The nestedCts will close any open network requests and streams below. - break; - } - // Continue immediately + iterationResult = await StreamingSyncIteration(nestedCts.Token, options); } catch (Exception ex) { @@ -292,15 +328,20 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio exMessage = "Stream closed or timed out -" + ex.InnerException.Message; } - - logger.LogError("Caught exception in streaming sync: {message}", exMessage); - // Either: // - A network request failed with a failed connection or not OKAY response code. // - There was a sync processing error. // This loop will retry. // The nested abort controller will cleanup any open network requests and streams. - // The WebRemote should only abort pending fetch requests or close active Readable streams. + if (nestedCts.IsCancellationRequested) + { + logger.LogWarning("Caught exception in streaming sync: {message}", exMessage); + shouldDelayRetry = false; + } + else + { + logger.LogError("Caught exception in streaming sync: {message}", exMessage); + } UpdateSyncStatus(new SyncStatusOptions { @@ -310,12 +351,10 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio DownloadError = ex } }); - - // On error, wait a little before retrying - await DelayRetry(); } finally { + notifyCompletedUploads = null; if (!signal.Value.IsCancellationRequested) { @@ -324,11 +363,21 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio nestedCts = new CancellationTokenSource(); } - UpdateSyncStatus(new SyncStatusOptions + if (iterationResult != null && (iterationResult.ImmediateRestart != true && iterationResult.LegacyRetry != true)) { - Connected = false, - Connecting = true // May be unnecessary - }); + + UpdateSyncStatus(new SyncStatusOptions + { + Connected = false, + Connecting = true + }); + + // On error, wait a little before retrying + if (shouldDelayRetry) + { + await DelayRetry(); + } + } } } @@ -342,9 +391,18 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio protected record StreamingSyncIterationResult { - public bool Retry { get; init; } + public bool? LegacyRetry { get; init; } + + public bool? ImmediateRestart { get; init; } + } + + protected record EnqueuedCommand + { + public string Command { get; init; } = null!; + public object? Payload { get; init; } } + protected async Task StreamingSyncIteration(CancellationToken signal, PowerSyncConnectionOptions? options) { return await locks.ObtainLock(new LockOptions @@ -355,284 +413,507 @@ protected async Task StreamingSyncIteration(Cancel { var resolvedOptions = new RequiredPowerSyncConnectionOptions { - Params = options?.Params ?? DEFAULT_STREAM_CONNECTION_OPTIONS.Params + Params = options?.Params ?? DEFAULT_STREAM_CONNECTION_OPTIONS.Params, + ClientImplementation = options?.ClientImplementation ?? DEFAULT_STREAM_CONNECTION_OPTIONS.ClientImplementation }; - logger.LogDebug("Streaming sync iteration started"); - Options.Adapter.StartSession(); - var bucketEntries = await Options.Adapter.GetBucketStates(); - var initialBuckets = new Dictionary(); - - foreach (var entry in bucketEntries) + if (resolvedOptions.ClientImplementation == SyncClientImplementation.RUST) + { + return await RustStreamingSyncIteration(signal, resolvedOptions); + } + else { - initialBuckets[entry.Bucket] = entry.OpId; + return await LegacyStreamingSyncIteration(signal, resolvedOptions); } + } + }); + } - var req = initialBuckets - .Select(kvp => new BucketRequest - { - Name = kvp.Key, - After = kvp.Value - }) - .ToList(); - var targetCheckpoint = (Checkpoint?)null; - var validatedCheckpoint = (Checkpoint?)null; - var appliedCheckpoint = (Checkpoint?)null; + protected async Task RustStreamingSyncIteration(CancellationToken? signal, RequiredPowerSyncConnectionOptions resolvedOptions) + { + Task? receivingLines = null; + bool hadSyncLine = false; + bool hideDisconnectOnRestart = false; - var bucketSet = new HashSet(initialBuckets.Keys); + var controlInvocations = (EventStream)null!; - var clientId = await Options.Adapter.GetClientId(); + var nestedCts = new CancellationTokenSource(); + signal?.Register(() => { nestedCts.Cancel(); }); - logger.LogDebug("Requesting stream from server"); + async Task Connect(EstablishSyncStream instruction) + { + var syncOptions = new SyncStreamOptions + { + Path = "/sync/stream", + CancellationToken = nestedCts.Token, + Data = instruction.Request + }; - var syncOptions = new SyncStreamOptions + controlInvocations = new EventStream(); + try + { + controlInvocations?.RunListenerAsync(async (line) => { - Path = "/sync/stream", - CancellationToken = signal, - Data = new StreamingSyncRequest + await Control(line.Command, line.Payload); + + // Triggers a local CRUD upload when the first sync line has been received. + // This allows uploading local changes that have been made while offline or disconnected. + if (!hadSyncLine) { - Buckets = req, - IncludeChecksum = true, - RawData = true, - Parameters = resolvedOptions.Params, // Replace with actual params - ClientId = clientId + TriggerCrudUpload(); + hadSyncLine = true; } - }; + }); - var stream = Options.Remote.PostStream(syncOptions); - var first = true; - await foreach (var line in stream) + var stream = await Options.Remote.PostStreamRaw(syncOptions); + using var reader = new StreamReader(stream, Encoding.UTF8); + + syncOptions.CancellationToken.Register(() => { - if (first) - { - first = false; - logger.LogDebug("Stream established. Processing events"); - } + try { stream?.Close(); } catch { } + }); - if (line == null) + UpdateSyncStatus(new SyncStatusOptions + { + Connected = true + }); + + string? line; + while ((line = await reader.ReadLineAsync()) != null) + { + controlInvocations?.Emit(new EnqueuedCommand { - logger.LogDebug("Stream has closed while waiting"); - // The stream has closed while waiting - return new StreamingSyncIterationResult { Retry = true }; - } + Command = PowerSyncControlCommand.PROCESS_TEXT_LINE, + Payload = line + }); - // A connection is active and messages are being received - if (!SyncStatus.Connected) + } + } + finally + { + var activeInstructions = controlInvocations; + controlInvocations = null; + activeInstructions?.Close(); + } + } + + async Task Stop() + { + await Control(PowerSyncControlCommand.STOP); + } + + async Task Control(string op, object? payload = null) + { + var rawResponse = await Options.Adapter.Control(op, payload); + logger.LogTrace("powersync_control {op}, {payload}, {rawResponse}", op, payload, rawResponse); + HandleInstructions(Instruction.ParseInstructions(rawResponse)); + } + + async void HandleInstructions(Instruction[] instructions) + { + foreach (var instruction in instructions) + { + await HandleInstruction(instruction); + } + } + + async Task HandleInstruction(Instruction instruction) + { + switch (instruction) + { + case LogLine logLine: + switch (logLine.Severity) { - // There is a connection now - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true - }); - TriggerCrudUpload(); + case "DEBUG": + logger.LogDebug("{message}", logLine.Line); + break; + case "INFO": + logger.LogInformation("{message}", logLine.Line); + break; + case "WARNING": + logger.LogWarning("{message}", logLine.Line); + break; } + break; + case UpdateSyncStatus syncStatus: + var info = syncStatus.Status; + var coreCompleteSync = + info.PriorityStatus.FirstOrDefault(s => s.Priority == SyncProgress.FULL_SYNC_PRIORITY); + var completeSync = coreCompleteSync != null ? CoreStatusToSyncStatus(coreCompleteSync) : null; - if (line is StreamingSyncCheckpoint syncCheckpoint) + UpdateSyncStatus(new SyncStatusOptions { - logger.LogDebug("Sync checkpoint: {message}", syncCheckpoint); - - targetCheckpoint = syncCheckpoint.Checkpoint; - var bucketsToDelete = new HashSet(bucketSet); - var newBuckets = new HashSet(); - - foreach (var checksum in syncCheckpoint.Checkpoint.Buckets) + Connected = info.Connected, + Connecting = info.Connecting, + LastSyncedAt = completeSync?.LastSyncedAt, + HasSynced = completeSync?.HasSynced, + PriorityStatusEntries = info.PriorityStatus.Select(CoreStatusToSyncStatus).ToArray(), + DataFlow = new SyncDataFlowStatus { - newBuckets.Add(checksum.Bucket); - bucketsToDelete.Remove(checksum.Bucket); + Downloading = info.Downloading != null, + DownloadProgress = info.Downloading?.Buckets } - if (bucketsToDelete.Count > 0) + }, + new UpdateSyncStatusOptions { - logger.LogDebug("Removing buckets: {message}", string.Join(", ", bucketsToDelete)); + ClearDownloadError = true, } - - bucketSet = newBuckets; - await Options.Adapter.RemoveBuckets([.. bucketsToDelete]); - await Options.Adapter.SetTargetCheckpoint(targetCheckpoint); + ); + break; + case EstablishSyncStream establishSyncStream: + if (receivingLines != null) + { + throw new Exception("Unexpected request to establish sync stream, already connected"); } - else if (line is StreamingSyncCheckpointComplete checkpointComplete) + receivingLines = Connect(establishSyncStream); + break; + case FetchCredentials fetchCredentials: + if (fetchCredentials.DidExpire) { - logger.LogDebug("Checkpoint complete: {message}", targetCheckpoint); - - var result = await Options.Adapter.SyncLocalDatabase(targetCheckpoint!); + Options.Remote.InvalidateCredentials(); + } + else + { + Options.Remote.InvalidateCredentials(); - if (!result.CheckpointValid) + // Restart iteration after the credentials have been refreshed. + try { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - await Task.Delay(50); - return new StreamingSyncIterationResult { Retry = true }; + await Options.Remote.FetchCredentials(); + controlInvocations?.Emit(new EnqueuedCommand + { + Command = PowerSyncControlCommand.NOTIFY_TOKEN_REFRESHED + }); } - else if (!result.Ready) + catch (Exception err) { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - // Landing here the whole time + logger.LogWarning("Could not prefetch credentials: {message}", err.Message); } - else - { - appliedCheckpoint = targetCheckpoint; - logger.LogDebug("Validated checkpoint: {message}", appliedCheckpoint); - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true, - LastSyncedAt = DateTime.Now, - DataFlow = new SyncDataFlowStatus - { - Downloading = false - } - }, new UpdateSyncStatusOptions - { - ClearDownloadError = true - }); + } + break; + case CloseSyncStream: + nestedCts.Cancel(); + hideDisconnectOnRestart = true; + logger.LogWarning("Closing stream"); + break; + case FlushFileSystem: + // ignore + break; + case DidCompleteSync: + UpdateSyncStatus( + new SyncStatusOptions { }, + new UpdateSyncStatusOptions { ClearDownloadError = true }); + break; + } + } - } + try + { + await Control(PowerSyncControlCommand.START, JsonConvert.SerializeObject(new { parameters = resolvedOptions.Params })); - validatedCheckpoint = targetCheckpoint; - } - else if (line is StreamingSyncCheckpointDiff checkpointDiff) + notifyCompletedUploads = () => + { + Task.Run(() => + { + if (controlInvocations != null && !controlInvocations.Closed) { - // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint - if (targetCheckpoint == null) + controlInvocations?.Emit(new EnqueuedCommand { - throw new Exception("Checkpoint diff without previous checkpoint"); - } + Command = PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED + }); + } + }); + }; - var diff = checkpointDiff.CheckpointDiff; - var newBuckets = new Dictionary(); + if (receivingLines != null) + { + await receivingLines; + } + } + finally + { + notifyCompletedUploads = null; + await Stop(); + } - foreach (var checksum in targetCheckpoint.Buckets) - { - newBuckets[checksum.Bucket] = checksum; - } + return new StreamingSyncIterationResult { ImmediateRestart = hideDisconnectOnRestart }; + } - foreach (var checksum in diff.UpdatedBuckets) - { - newBuckets[checksum.Bucket] = checksum; - } + protected async Task LegacyStreamingSyncIteration(CancellationToken signal, RequiredPowerSyncConnectionOptions resolvedOptions) + { + logger.LogWarning("The legacy sync client implementation is deprecated and will be removed in a future release."); + logger.LogDebug("Streaming sync iteration started"); + Options.Adapter.StartSession(); + var bucketEntries = await Options.Adapter.GetBucketStates(); + var initialBuckets = new Dictionary(); - foreach (var bucket in diff.RemovedBuckets) - { - newBuckets.Remove(bucket); - } + foreach (var entry in bucketEntries) + { + initialBuckets[entry.Bucket] = entry.OpId; + } - var newWriteCheckpoint = !string.IsNullOrEmpty(diff.WriteCheckpoint) ? diff.WriteCheckpoint : null; - var newCheckpoint = new Checkpoint - { - LastOpId = diff.LastOpId, - Buckets = [.. newBuckets.Values], - WriteCheckpoint = newWriteCheckpoint - }; + var req = initialBuckets + .Select(kvp => new BucketRequest + { + Name = kvp.Key, + After = kvp.Value + }) + .ToList(); + + var targetCheckpoint = (Checkpoint?)null; + var validatedCheckpoint = (Checkpoint?)null; + var appliedCheckpoint = (Checkpoint?)null; + + var bucketSet = new HashSet(initialBuckets.Keys); + + var clientId = await Options.Adapter.GetClientId(); + + logger.LogDebug("Requesting stream from server"); + + var syncOptions = new SyncStreamOptions + { + Path = "/sync/stream", + CancellationToken = signal, + Data = new StreamingSyncRequest + { + Buckets = req, + IncludeChecksum = true, + RawData = true, + Parameters = resolvedOptions.Params, + ClientId = clientId + } + }; - targetCheckpoint = newCheckpoint; + var stream = Options.Remote.PostStream(syncOptions); + var first = true; + await foreach (var line in stream) + { + if (first) + { + first = false; + logger.LogDebug("Stream established. Processing events"); + } + + if (line == null) + { + logger.LogDebug("Stream has closed while waiting"); + // The stream has closed while waiting + return new StreamingSyncIterationResult { LegacyRetry = true }; + } + + // A connection is active and messages are being received + if (!SyncStatus.Connected) + { + // There is a connection now + UpdateSyncStatus(new SyncStatusOptions + { + Connected = true + }); + TriggerCrudUpload(); + } - bucketSet = [.. newBuckets.Keys]; + if (line is StreamingSyncCheckpoint syncCheckpoint) + { + logger.LogDebug("Sync checkpoint: {message}", syncCheckpoint); - var bucketsToDelete = diff.RemovedBuckets.ToArray(); - if (bucketsToDelete.Length > 0) + targetCheckpoint = syncCheckpoint.Checkpoint; + var bucketsToDelete = new HashSet(bucketSet); + var newBuckets = new HashSet(); + + foreach (var checksum in syncCheckpoint.Checkpoint.Buckets) + { + newBuckets.Add(checksum.Bucket); + bucketsToDelete.Remove(checksum.Bucket); + } + if (bucketsToDelete.Count > 0) + { + logger.LogDebug("Removing buckets: {message}", string.Join(", ", bucketsToDelete)); + } + + bucketSet = newBuckets; + await Options.Adapter.RemoveBuckets([.. bucketsToDelete]); + await Options.Adapter.SetTargetCheckpoint(targetCheckpoint); + } + else if (line is StreamingSyncCheckpointComplete checkpointComplete) + { + logger.LogDebug("Checkpoint complete: {message}", targetCheckpoint); + + var result = await Options.Adapter.SyncLocalDatabase(targetCheckpoint!); + + if (!result.CheckpointValid) + { + // This means checksums failed. Start again with a new checkpoint. + await Task.Delay(50); + return new StreamingSyncIterationResult { LegacyRetry = true }; + } + else if (!result.Ready) + { + // Checksums valid, but need more data for a consistent checkpoint. + // Continue waiting. + // Landing here the whole time + } + else + { + appliedCheckpoint = targetCheckpoint; + logger.LogDebug("Validated checkpoint: {message}", appliedCheckpoint); + + UpdateSyncStatus(new SyncStatusOptions + { + Connected = true, + LastSyncedAt = DateTime.Now, + DataFlow = new SyncDataFlowStatus { - logger.LogDebug("Remove buckets: {message}", string.Join(", ", bucketsToDelete)); + Downloading = false } + }, new UpdateSyncStatusOptions + { + ClearDownloadError = true + }); + + } + + validatedCheckpoint = targetCheckpoint; + } + else if (line is StreamingSyncCheckpointDiff checkpointDiff) + { + // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint + if (targetCheckpoint == null) + { + throw new Exception("Checkpoint diff without previous checkpoint"); + } + + var diff = checkpointDiff.CheckpointDiff; + var newBuckets = new Dictionary(); + + foreach (var checksum in targetCheckpoint.Buckets) + { + newBuckets[checksum.Bucket] = checksum; + } + + foreach (var checksum in diff.UpdatedBuckets) + { + newBuckets[checksum.Bucket] = checksum; + } + + foreach (var bucket in diff.RemovedBuckets) + { + newBuckets.Remove(bucket); + } + + var newWriteCheckpoint = !string.IsNullOrEmpty(diff.WriteCheckpoint) ? diff.WriteCheckpoint : null; + var newCheckpoint = new Checkpoint + { + LastOpId = diff.LastOpId, + Buckets = [.. newBuckets.Values], + WriteCheckpoint = newWriteCheckpoint + }; + + targetCheckpoint = newCheckpoint; - // Perform async operations - await Options.Adapter.RemoveBuckets(bucketsToDelete); - await Options.Adapter.SetTargetCheckpoint(targetCheckpoint); + bucketSet = [.. newBuckets.Keys]; + + var bucketsToDelete = diff.RemovedBuckets.ToArray(); + if (bucketsToDelete.Length > 0) + { + logger.LogDebug("Remove buckets: {message}", string.Join(", ", bucketsToDelete)); + } + + // Perform async operations + await Options.Adapter.RemoveBuckets(bucketsToDelete); + await Options.Adapter.SetTargetCheckpoint(targetCheckpoint); + } + else if (line is StreamingSyncDataJSON dataJSON) + { + UpdateSyncStatus(new SyncStatusOptions + { + DataFlow = new SyncDataFlowStatus + { + Downloading = true } - else if (line is StreamingSyncDataJSON dataJSON) + }); + await Options.Adapter.SaveSyncData(new SyncDataBatch([SyncDataBucket.FromRow(dataJSON.Data)])); + } + else if (line is StreamingSyncKeepalive keepalive) + { + var remainingSeconds = keepalive.TokenExpiresIn; + if (remainingSeconds == 0) + { + // Connection would be closed automatically right after this + logger.LogDebug("Token expiring; reconnect"); + Options.Remote.InvalidateCredentials(); + + // For a rare case where the backend connector does not update the token + // (uses the same one), this should have some delay. + // + await DelayRetry(); + return new StreamingSyncIterationResult { LegacyRetry = true }; + } + else if (remainingSeconds < 30) + { + logger.LogDebug("Token will expire soon; reconnect"); + // Pre-emptively refresh the token + Options.Remote.InvalidateCredentials(); + return new StreamingSyncIterationResult { LegacyRetry = true }; + } + TriggerCrudUpload(); + } + else + { + logger.LogDebug("Sync complete"); + + if (targetCheckpoint == appliedCheckpoint) + { + UpdateSyncStatus(new SyncStatusOptions { - UpdateSyncStatus(new SyncStatusOptions - { - DataFlow = new SyncDataFlowStatus - { - Downloading = true - } - }); - await Options.Adapter.SaveSyncData(new SyncDataBatch([SyncDataBucket.FromRow(dataJSON.Data)])); + Connected = true, + LastSyncedAt = DateTime.Now, + }, + new UpdateSyncStatusOptions + { + ClearDownloadError = true } - else if (line is StreamingSyncKeepalive keepalive) + ); + } + else if (validatedCheckpoint == targetCheckpoint) + { + var result = await Options.Adapter.SyncLocalDatabase(targetCheckpoint!); + if (!result.CheckpointValid) { - var remainingSeconds = keepalive.TokenExpiresIn; - if (remainingSeconds == 0) - { - // Connection would be closed automatically right after this - logger.LogDebug("Token expiring; reconnect"); - Options.Remote.InvalidateCredentials(); - - // For a rare case where the backend connector does not update the token - // (uses the same one), this should have some delay. - // - await DelayRetry(); - return new StreamingSyncIterationResult { Retry = true }; - } - else if (remainingSeconds < 30) - { - logger.LogDebug("Token will expire soon; reconnect"); - // Pre-emptively refresh the token - Options.Remote.InvalidateCredentials(); - return new StreamingSyncIterationResult { Retry = true }; - } - TriggerCrudUpload(); + // This means checksums failed. Start again with a new checkpoint. + await Task.Delay(50); + return new StreamingSyncIterationResult { LegacyRetry = false }; + } + else if (!result.Ready) + { + // Checksums valid, but need more data for a consistent checkpoint. + // Continue waiting. } else { - logger.LogDebug("Sync complete"); - - if (targetCheckpoint == appliedCheckpoint) + appliedCheckpoint = targetCheckpoint; + UpdateSyncStatus(new SyncStatusOptions { - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true, - LastSyncedAt = DateTime.Now, - }, - new UpdateSyncStatusOptions + Connected = true, + LastSyncedAt = DateTime.Now, + DataFlow = new SyncDataFlowStatus { - ClearDownloadError = true + Downloading = false, } - ); - } - else if (validatedCheckpoint == targetCheckpoint) + }, + new UpdateSyncStatusOptions { - var result = await Options.Adapter.SyncLocalDatabase(targetCheckpoint!); - if (!result.CheckpointValid) - { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - await Task.Delay(50); - return new StreamingSyncIterationResult { Retry = false }; - } - else if (!result.Ready) - { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - } - else - { - appliedCheckpoint = targetCheckpoint; - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true, - LastSyncedAt = DateTime.Now, - DataFlow = new SyncDataFlowStatus - { - Downloading = false, - } - }, - new UpdateSyncStatusOptions - { - ClearDownloadError = true - }); - } - } + ClearDownloadError = true + }); } } - - logger.LogDebug("Stream input empty"); - // Connection closed. Likely due to auth issue. - return new StreamingSyncIterationResult { Retry = true }; } - }); + } + logger.LogDebug("Stream input empty"); + // Connection closed. Likely due to auth issue. + return new StreamingSyncIterationResult { LegacyRetry = true }; } public new void Close() @@ -755,30 +1036,47 @@ protected record UpdateSyncStatusOptions( ); protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptions? updateOptions = null) { - var updatedStatus = new SyncStatus(new SyncStatusOptions + try { - Connected = options.Connected ?? SyncStatus.Connected, - Connecting = !options.Connected.GetValueOrDefault() && (options.Connecting ?? SyncStatus.Connecting), - LastSyncedAt = options.LastSyncedAt ?? SyncStatus.LastSyncedAt, - DataFlow = new SyncDataFlowStatus + var updatedStatus = new SyncStatus(new SyncStatusOptions { - Uploading = options.DataFlow?.Uploading ?? SyncStatus.DataFlowStatus.Uploading, - Downloading = options.DataFlow?.Downloading ?? SyncStatus.DataFlowStatus.Downloading, - DownloadError = updateOptions?.ClearDownloadError == true ? null : options.DataFlow?.DownloadError ?? SyncStatus.DataFlowStatus.DownloadError, - UploadError = updateOptions?.ClearUploadError == true ? null : options.DataFlow?.UploadError ?? SyncStatus.DataFlowStatus.UploadError, + Connected = options.Connected ?? SyncStatus.Connected, + Connecting = !options.Connected.GetValueOrDefault() && (options.Connecting ?? SyncStatus.Connecting), + LastSyncedAt = options.LastSyncedAt ?? SyncStatus.LastSyncedAt, + DataFlow = new SyncDataFlowStatus + { + Uploading = options.DataFlow?.Uploading ?? SyncStatus.DataFlowStatus.Uploading, + Downloading = options.DataFlow?.Downloading ?? SyncStatus.DataFlowStatus.Downloading, + DownloadError = updateOptions?.ClearDownloadError == true ? null : options.DataFlow?.DownloadError ?? SyncStatus.DataFlowStatus.DownloadError, + UploadError = updateOptions?.ClearUploadError == true ? null : options.DataFlow?.UploadError ?? SyncStatus.DataFlowStatus.UploadError, + } + }); + + if (!SyncStatus.Equals(updatedStatus)) + { + SyncStatus = updatedStatus; + logger.LogDebug("[Sync status updated]: {message}", updatedStatus.ToJSON()); + // Only trigger this if there was a change + Emit(new StreamingSyncImplementationEvent { StatusChanged = updatedStatus }); } - }); - if (!SyncStatus.Equals(updatedStatus)) + // Trigger this for all updates + Emit(new StreamingSyncImplementationEvent { StatusUpdated = options }); + } + catch (Exception ex) { - SyncStatus = updatedStatus; - logger.LogDebug("[Sync status updated]: {message}", updatedStatus.ToJSON()); - // Only trigger this if there was a change - Emit(new StreamingSyncImplementationEvent { StatusChanged = updatedStatus }); + logger.LogError("Error updating sync status: {message}", ex.Message); } + } - // Trigger this for all updates - Emit(new StreamingSyncImplementationEvent { StatusUpdated = options }); + private static DB.Crud.SyncPriorityStatus CoreStatusToSyncStatus(SyncPriorityStatus status) + { + return new DB.Crud.SyncPriorityStatus + { + Priority = status.Priority, + HasSynced = status.HasSynced ?? null, + LastSyncedAt = status?.LastSyncedAt != null ? new DateTime(status!.LastSyncedAt) : null + }; } private async Task DelayRetry() diff --git a/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs b/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs new file mode 100644 index 0000000..b709a2c --- /dev/null +++ b/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs @@ -0,0 +1,81 @@ +using PowerSync.Common.Client.Sync.Stream; + +namespace PowerSync.Common.DB.Crud; + +/// +/// Provides realtime progress on how PowerSync is downloading rows. +/// +/// The reported progress always reflects the status towards th end of a sync iteration (after +/// which a consistent snapshot of all buckets is available locally). +/// +/// In rare cases (in particular, when a [compacting](https://docs.powersync.com/usage/lifecycle-maintenance/compacting-buckets) +/// operation takes place between syncs), it's possible for the returned numbers to be slightly +/// inaccurate. For this reason, the sync progress should be seen as an approximation of progress. +/// The information returned is good enough to build progress bars, but not exact enough to track +/// individual download counts. +/// +/// Also note that data is downloaded in bulk, which means that individual counters are unlikely +/// to be updated one-by-one. +/// +public class SyncProgress : ProgressWithOperations +{ + public static readonly int FULL_SYNC_PRIORITY = 2147483647; + protected Dictionary InternalProgress { get; } + + public SyncProgress(Dictionary progress) + { + this.InternalProgress = progress; + var untilCompletion = UntilPriority(FULL_SYNC_PRIORITY); + + TotalOperations = untilCompletion.TotalOperations; + DownloadedOperations = untilCompletion.DownloadedOperations; + DownloadedFraction = untilCompletion.DownloadedFraction; + } + + public ProgressWithOperations UntilPriority(int priority) + { + var total = 0; + var downloaded = 0; + + foreach (var progress in InternalProgress.Values) + { + // Include higher-priority buckets, which are represented by lower numbers. + if (progress.Priority <= priority) + { + downloaded += progress.SinceLast; + total += progress.TargetCount - progress.AtLast; + } + } + + return new ProgressWithOperations + { + TotalOperations = total, + DownloadedOperations = downloaded, + DownloadedFraction = total == 0 ? 1.0 : (double)downloaded / total + }; + } +} + +/// +/// Information about a progressing download made by the PowerSync SDK. +/// +/// +public class ProgressWithOperations +{ + /// + /// The total number of operations to download for the current sync iteration to complete. + /// + public int TotalOperations { get; set; } + + /// + /// The numnber of operations that have already been downloaded. + /// + public int DownloadedOperations { get; set; } + + /// + /// This will be a number between 0.0 and 1.0 (inclusive). + /// + /// When this number reaches 1.0, all changes have been received from the sync service. + /// + public double DownloadedFraction { get; set; } +} \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs index 4107a59..cdda36a 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs @@ -1,21 +1,15 @@ namespace PowerSync.Common.DB.Crud; +using PowerSync.Common.Client.Sync.Stream; using Newtonsoft.Json; +using Microsoft.Extensions.Options; public class SyncDataFlowStatus { - [JsonProperty("downloading")] - public bool Downloading { get; set; } = false; + [JsonProperty("downloading")] public bool Downloading { get; set; } = false; - [JsonProperty("uploading")] - public bool Uploading { get; set; } = false; + [JsonProperty("uploading")] public bool Uploading { get; set; } = false; - [JsonProperty("downloadError")] - public string? DownloadErrorMessage => DownloadError?.Message; - - [JsonProperty("uploadError")] - public string? UploadErrorMessage => UploadError?.Message; - /// /// Error during downloading (including connecting). /// Cleared on the next successful data download. @@ -29,11 +23,28 @@ public class SyncDataFlowStatus /// [JsonIgnore] public Exception? UploadError { get; set; } = null; + + + /// + /// Internal information about how far we are downloading operations in buckets. + /// + public Dictionary? DownloadProgress { get; set; } = null; +} + +public class SyncPriorityStatus +{ + [JsonProperty("priority")] public int Priority { get; set; } + + [JsonProperty("lastSyncedAt")] public DateTime? LastSyncedAt { get; set; } + + [JsonProperty("hasSynced")] public bool? HasSynced { get; set; } } public class SyncStatusOptions { - public SyncStatusOptions() { } + public SyncStatusOptions() + { + } public SyncStatusOptions(SyncStatusOptions options) { @@ -42,22 +53,21 @@ public SyncStatusOptions(SyncStatusOptions options) DataFlow = options.DataFlow; LastSyncedAt = options.LastSyncedAt; HasSynced = options.HasSynced; + PriorityStatusEntries = options.PriorityStatusEntries; } - [JsonProperty("connected")] - public bool? Connected { get; set; } + [JsonProperty("connected")] public bool? Connected { get; set; } - [JsonProperty("connecting")] - public bool? Connecting { get; set; } + [JsonProperty("connecting")] public bool? Connecting { get; set; } - [JsonProperty("dataFlow")] - public SyncDataFlowStatus? DataFlow { get; set; } + [JsonProperty("dataFlow")] public SyncDataFlowStatus? DataFlow { get; set; } - [JsonProperty("lastSyncedAt")] - public DateTime? LastSyncedAt { get; set; } + [JsonProperty("lastSyncedAt")] public DateTime? LastSyncedAt { get; set; } - [JsonProperty("hasSynced")] - public bool? HasSynced { get; set; } + [JsonProperty("hasSynced")] public bool? HasSynced { get; set; } + + [JsonProperty("priorityStatusEntries")] + public SyncPriorityStatus[]? PriorityStatusEntries { get; set; } } public class SyncStatus(SyncStatusOptions options) @@ -85,19 +95,82 @@ public class SyncStatus(SyncStatusOptions options) /// public SyncDataFlowStatus DataFlowStatus => Options.DataFlow ?? new SyncDataFlowStatus(); + /// + /// Provides sync status information for all bucket priorities, sorted by priority (highest first). + /// + public SyncPriorityStatus[] PriorityStatusEntries => + (Options.PriorityStatusEntries ?? []) + .OrderBy(entry => entry.Priority) + .ToArray(); + + /// + /// A realtime progress report on how many operations have been downloaded and + /// how many are necessary in total to complete the next sync iteration. + /// + public SyncProgress? DownloadProgress() + { + var internalProgress = Options.DataFlow?.DownloadProgress; + if (internalProgress == null) + { + return null; + } + + return new SyncProgress(internalProgress); + } + + /// + /// Reports the sync status (a pair of HasSynced and LastSyncedAt fields) + /// for a specific bucket priority level. + /// + /// When buckets with different priorities are declared, PowerSync may choose to synchronize higher-priority + /// buckets first. When a consistent view over all buckets for all priorities up until the given priority is + /// reached, PowerSync makes data from those buckets available before lower-priority buckets have finished + /// syncing. + /// + /// This method returns the status for the requested priority or the next higher priority level that has + /// status information available. This is because when PowerSync makes data for a given priority available, + /// all buckets in higher-priorities are guaranteed to be consistent with that checkpoint. + /// For example, if PowerSync just finished synchronizing buckets in priority level 3, calling this method + /// with a priority of 1 may return information for priority level 3. + /// + public SyncPriorityStatus StatusForPriority(int priority) + { + foreach (var known in PriorityStatusEntries) + { + if (known.Priority >= priority) + { + return known; + } + } + + // Fallback if no matching or higher-priority entry is found + return new SyncPriorityStatus + { + Priority = priority, + LastSyncedAt = LastSyncedAt, + HasSynced = HasSynced + }; + } + + private string SerializeObject() + { + return JsonConvert.SerializeObject(new { Options = Options, UploadErrorMessage = Options.DataFlow?.UploadError?.Message, DownloadErrorMessage = DataFlowStatus.DownloadError?.Message }); + } + public bool IsEqual(SyncStatus status) { - return JsonConvert.SerializeObject(Options) == JsonConvert.SerializeObject(status.Options); + return this.SerializeObject() == status.SerializeObject(); } public string GetMessage() { var dataFlow = DataFlowStatus; - return $"SyncStatus"; + return + $"SyncStatus"; } public string ToJSON() { - return JsonConvert.SerializeObject(this); + return SerializeObject(); } } \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs index 0b4c7bb..1ca43a0 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs @@ -155,18 +155,18 @@ public Task ExecuteBatch(string query, object[][]? parameters = nul throw new NotImplementedException(); } - public async Task Get(string sql, params object[]? parameters) + public async Task Get(string sql, object[]? parameters = null) { return await ReadLock((ctx) => ctx.Get(sql, parameters)); ; } - public async Task GetAll(string sql, params object[]? parameters) + public async Task GetAll(string sql, object[]? parameters = null) { return await ReadLock((ctx) => ctx.GetAll(sql, parameters)); } - public async Task GetOptional(string sql, params object[]? parameters) + public async Task GetOptional(string sql, object[]? parameters = null) { return await ReadLock((ctx) => ctx.GetOptional(sql, parameters)); } @@ -307,17 +307,17 @@ public Task Execute(string query, object[]? parameters = null) return connection.Execute(query, parameters); } - public Task Get(string sql, params object[]? parameters) + public Task Get(string sql, object[]? parameters = null) { return connection.Get(sql, parameters); } - public Task GetAll(string sql, params object[]? parameters) + public Task GetAll(string sql, object[]? parameters = null) { return connection.GetAll(sql, parameters); } - public Task GetOptional(string sql, params object[]? parameters) + public Task GetOptional(string sql, object[]? parameters = null) { return connection.GetOptional(sql, parameters); } diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs index 16c5c30..6c73c8b 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs @@ -142,7 +142,7 @@ public async Task ExecuteQuery(string query, object[]? parameters = var row = new Dictionary(); for (int i = 0; i < reader.FieldCount; i++) { - row[reader.GetName(i)] = reader.IsDBNull(i) ? null : reader.GetValue(i); + row[reader.GetName(i)] = reader.IsDBNull(i) ? null! : reader.GetValue(i); } rows.Add(row); } diff --git a/PowerSync/PowerSync.Common/Utils/EventStream.cs b/PowerSync/PowerSync.Common/Utils/EventStream.cs index 671abcd..3bf93cd 100644 --- a/PowerSync/PowerSync.Common/Utils/EventStream.cs +++ b/PowerSync/PowerSync.Common/Utils/EventStream.cs @@ -25,6 +25,8 @@ CancellationTokenSource RunListenerAsync( public class EventStream : IEventStream { + public bool Closed = false; + // Closest implementation to a ConcurrentSet in .Net private readonly ConcurrentDictionary, byte> subscribers = new(); @@ -158,6 +160,7 @@ public void Close() subscriber.Writer.TryComplete(); RemoveSubscriber(subscriber); } + Closed = true; } private void RemoveSubscriber(Channel channel) diff --git a/PowerSync/PowerSync.Maui/CHANGELOG.md b/PowerSync/PowerSync.Maui/CHANGELOG.md index a0f0049..ee6b964 100644 --- a/PowerSync/PowerSync.Maui/CHANGELOG.md +++ b/PowerSync/PowerSync.Maui/CHANGELOG.md @@ -1,5 +1,9 @@ # PowerSync.Maui Changelog +## 0.0.3-alpha.1 +- Upstream PowerSync.Common version bump +- Using the latest (0.4.9) version of the core extension, it introduces support for the Rust Sync implementation and also makes it the default - users can still opt out and use the legacy C# sync implementation as option when calling `connect()`. + ## 0.0.2-alpha.1 - Fixed issues related to extension loading when installing package outside of the monorepo. diff --git a/README.md b/README.md index 2607d75..98a61bb 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,20 @@ Run a specific test dotnet test -v n --framework net8.0 --filter "test-file-pattern" ``` +### Integration Tests +Integration tests in `PowerSync.Common.IntegrationTests` are intended to run against the [self-host nodejs demo](https://github.com/powersync-ja/self-host-demo/tree/main/demos/nodejs). +The integration tests are disabled by default, define the following environment variable when running the tests. + +```bash +RUN_INTEGRATION_TESTS=true dotnet test -v n --framework net8.0 +``` + +Only run integration tests, without any unit tests. + +```bash +RUN_INTEGRATION_TESTS=true dotnet test -v n --framework net8.0 --filter "Category=Integration" +``` + ## Using the PowerSync.Common package in your project ```bash dotnet add package PowerSync.Common --prerelease diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeClient.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeClient.cs new file mode 100644 index 0000000..eab9a3e --- /dev/null +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeClient.cs @@ -0,0 +1,110 @@ +namespace PowerSync.Common.IntegrationTests; + +using System; +using System.Net.Http; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using System.Collections.Generic; +using PowerSync.Common.DB.Crud; + +public class NodeClient +{ + private readonly HttpClient _httpClient; + private readonly string _backendUrl; + private readonly string _userId; + + public NodeClient(string userId) + { + _httpClient = new HttpClient(); + _backendUrl = "http://localhost:6060"; + _userId = userId; + } + + public NodeClient(string backendUrl, string userId) + { + _httpClient = new HttpClient(); + _backendUrl = backendUrl; + _userId = userId; + } + + public Task CreateList(string id, string name) + { + return CreateItem("lists", id, name); + } + async Task CreateItem(string table, string id, string name) + { + var data = new Dictionary + { + { "created_at", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss") }, + { "name", name }, + { "owner_id", _userId } + }; + + var batch = new[] + { + new + { + op = UpdateType.PUT.ToString(), + table = table, + id = id, + data = data + } + }; + + var payload = JsonSerializer.Serialize(new { batch }); + var content = new StringContent(payload, Encoding.UTF8, "application/json"); + + HttpResponseMessage response = await _httpClient.PostAsync($"{_backendUrl}/api/data", content); + + if (!response.IsSuccessStatusCode) + { + Console.WriteLine(await response.Content.ReadAsStringAsync()); + throw new Exception( + $"Failed to create item. Status: {response.StatusCode}, " + + $"Response: {await response.Content.ReadAsStringAsync()}" + ); + } + + return await response.Content.ReadAsStringAsync(); + } + + public Task DeleteList(string id) + { + return DeleteItem("lists", id); + } + + async Task DeleteItem(string table, string id) + { + var batch = new[] + { + new + { + op = UpdateType.DELETE.ToString(), + table = table, + id = id + } + }; + + var payload = JsonSerializer.Serialize(new { batch }); + var content = new StringContent(payload, Encoding.UTF8, "application/json"); + + HttpResponseMessage response = await _httpClient.PostAsync($"{_backendUrl}/api/data", content); + + if (!response.IsSuccessStatusCode) + { + Console.WriteLine(await response.Content.ReadAsStringAsync()); + throw new Exception( + $"Failed to delete item. Status: {response.StatusCode}, " + + $"Response: {await response.Content.ReadAsStringAsync()}" + ); + } + + return await response.Content.ReadAsStringAsync(); + } + + public void Dispose() + { + _httpClient?.Dispose(); + } +} \ No newline at end of file diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeConnector.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeConnector.cs new file mode 100644 index 0000000..b9448b3 --- /dev/null +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeConnector.cs @@ -0,0 +1,113 @@ +namespace PowerSync.Common.IntegrationTests; + + +using System; +using System.Collections.Generic; +using System.Net.Http; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using System.IO; +using PowerSync.Common.Client; +using PowerSync.Common.Client.Connection; +using PowerSync.Common.DB.Crud; + + +public class NodeConnector : IPowerSyncBackendConnector +{ + private readonly HttpClient _httpClient; + + public string BackendUrl { get; } + public string PowerSyncUrl { get; } + public string UserId { get; private set; } + private string? clientId; + + public NodeConnector(string userId) + { + _httpClient = new HttpClient(); + + // Load or generate User ID + UserId = userId; + + BackendUrl = "http://localhost:6060"; + PowerSyncUrl = "http://localhost:8080"; + + clientId = null; + } + + public async Task FetchCredentials() + { + string tokenEndpoint = "api/auth/token"; + string url = $"{BackendUrl}/{tokenEndpoint}?user_id={UserId}"; + + HttpResponseMessage response = await _httpClient.GetAsync(url); + if (!response.IsSuccessStatusCode) + { + throw new Exception($"Received {response.StatusCode} from {tokenEndpoint}: {await response.Content.ReadAsStringAsync()}"); + } + + string responseBody = await response.Content.ReadAsStringAsync(); + var jsonResponse = JsonSerializer.Deserialize>(responseBody); + + if (jsonResponse == null || !jsonResponse.ContainsKey("token")) + { + throw new Exception("Invalid response received from authentication endpoint."); + } + + return new PowerSyncCredentials(PowerSyncUrl, jsonResponse["token"]); + } + + public async Task UploadData(IPowerSyncDatabase database) + { + CrudTransaction? transaction; + try + { + transaction = await database.GetNextCrudTransaction(); + } + catch (Exception ex) + { + Console.WriteLine($"UploadData Error: {ex.Message}"); + return; + } + + if (transaction == null) + { + return; + } + + clientId ??= await database.GetClientId(); + + try + { + var batch = new List(); + + foreach (var operation in transaction.Crud) + { + batch.Add(new + { + op = operation.Op.ToString(), + table = operation.Table, + id = operation.Id, + data = operation.OpData + }); + } + + var payload = JsonSerializer.Serialize(new { batch }); + var content = new StringContent(payload, Encoding.UTF8, "application/json"); + + HttpResponseMessage response = await _httpClient.PostAsync($"{BackendUrl}/api/data", content); + + if (!response.IsSuccessStatusCode) + { + throw new Exception($"Received {response.StatusCode} from /api/data: {await response.Content.ReadAsStringAsync()}"); + } + + await transaction.Complete(); + } + catch (Exception ex) + { + Console.WriteLine($"UploadData Error: {ex.Message}"); + throw; + } + } +} diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj b/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj new file mode 100644 index 0000000..21189c9 --- /dev/null +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj @@ -0,0 +1,31 @@ + + + + net8.0 + enable + enable + false + + + + + + + + + + + + + + + + PreserveNewest + + + + + + + + diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs new file mode 100644 index 0000000..6e2da0e --- /dev/null +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs @@ -0,0 +1,254 @@ +using Newtonsoft.Json; +using PowerSync.Common.Client; +using System.Data.Common; +using System.Diagnostics; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using PowerSync.Common.Client.Sync.Stream; + + +namespace PowerSync.Common.IntegrationTests; + +[Trait("Category", "Integration")] +public class SyncIntegrationTests : IAsyncLifetime +{ + private record ListResult(string id, string name, string owner_id, string created_at); + + private string userId = Uuid(); + + private NodeClient nodeClient = default!; + + private PowerSyncDatabase db = default!; + + public async Task InitializeAsync() + { + // Create a logger factory + ILoggerFactory loggerFactory = LoggerFactory.Create(builder => + { + builder.AddConsole(); + builder.SetMinimumLevel(LogLevel.Information); + }); + + var logger = loggerFactory.CreateLogger("PowerSyncLogger"); + + nodeClient = new NodeClient(userId); + db = new PowerSyncDatabase(new PowerSyncDatabaseOptions + { + Database = new SQLOpenOptions { DbFilename = "powersync-sync-tests.db" }, + Schema = TestSchema.PowerSyncSchema, + Logger = logger + + }); + await db.Init(); + var connector = new NodeConnector(userId); + + await ClearAllData(); + + Console.WriteLine($"Using User ID: {userId}"); + try + { + await db.Connect(connector, new PowerSyncConnectionOptions + { + ClientImplementation = SyncClientImplementation.RUST, + }); + await db.WaitForFirstSync(); + } + catch (Exception ex) + { + Console.WriteLine($"Exception during InitializeAsync: {ex}"); + throw; + } + } + + public async Task DisposeAsync() + { + await ClearAllData(); + await Task.Delay(2000); + await db.DisconnectAndClear(); + await db.Close(); + } + + [IntegrationFact(Timeout = 3000)] + public async Task SyncDownCreateOperationTest() + { + var watched = new TaskCompletionSource(); + var cts = new CancellationTokenSource(); + var id = Uuid(); + + await db.Watch("select * from lists where id = ?", [id], new WatchHandler + { + OnResult = (x) => + { + // Verify that the item was added locally + if (x.Length == 1) + { + watched.SetResult(true); + cts.Cancel(); + } + } + }, new SQLWatchOptions + { + Signal = cts.Token + }); + + await nodeClient.CreateList(id, name: "Test List magic"); + await watched.Task; + } + + [IntegrationFact(Timeout = 3000)] + public async Task SyncDownDeleteOperationTest() + { + var watched = new TaskCompletionSource(); + var cts = new CancellationTokenSource(); + var id = Uuid(); + + await nodeClient.CreateList(id, name: "Test List to delete"); + + await db.Watch("select * from lists where id = ?", [id], new WatchHandler + { + OnResult = (x) => + { + // Verify that the item was added locally + if (x.Length == 1) + { + watched.SetResult(true); + cts.Cancel(); + } + } + }, new SQLWatchOptions + { + Signal = cts.Token + }); + + await watched.Task; + await nodeClient.DeleteList(id); + + watched = new TaskCompletionSource(); + cts = new CancellationTokenSource(); + + await db.Watch("select * from lists where id = ?", [id], new WatchHandler + { + OnResult = (x) => + { + // Verify that the item was deleted locally + if (x.Length == 0) + { + watched.SetResult(true); + cts.Cancel(); + } + } + }, new SQLWatchOptions + { + Signal = cts.Token + }); + + await watched.Task; + } + + [IntegrationFact(Timeout = 5000)] + public async Task SyncDownLargeCreateOperationTest() + { + var watched = new TaskCompletionSource(); + var cts = new CancellationTokenSource(); + var id = Uuid(); + var listName = Uuid(); + + await db.Watch("select * from lists where name = ?", [listName], new WatchHandler + { + OnResult = (x) => + { + // Verify that the item was added locally + if (x.Length == 100) + { + watched.SetResult(true); + cts.Cancel(); + } + } + }, new SQLWatchOptions + { + Signal = cts.Token + }); + + for (int i = 0; i < 100; i++) + { + await nodeClient.CreateList(Uuid(), listName); + } + await watched.Task; + } + + [IntegrationFact(Timeout = 5000)] + public async Task SyncDownCreateOperationAfterLargeUploadTest() + { + var localInsertWatch = new TaskCompletionSource(); + var backendInsertWatch = new TaskCompletionSource(); + var cts = new CancellationTokenSource(); + var id = Uuid(); + var listName = Uuid(); + + await db.Watch("select * from lists where name = ?", [listName], new WatchHandler + { + OnResult = (x) => + { + // Verify that the items were added locally + if (x.Length == 100) + { + localInsertWatch.SetResult(true); + } + // Verify that the new item added to backend was synced down + else if (x.Length == 101) + { + backendInsertWatch.SetResult(true); + cts.Cancel(); + } + } + }, new SQLWatchOptions + { + Signal = cts.Token + }); + + for (int i = 0; i < 100; i++) + { + await db.Execute("insert into lists (id, name, owner_id, created_at) values (uuid(), ?, ?, datetime())", + [listName, userId]); + } + await localInsertWatch.Task; + + // let the crud upload finish + await Task.Delay(2000); + + await nodeClient.CreateList(Uuid(), listName); + await backendInsertWatch.Task; + } + + private async Task ClearAllData() + { + // Inefficient but simple way to clear all data, avoiding payload limitations + var results = await db.GetAll("select * from lists"); + foreach (var item in results) + { + await nodeClient.DeleteList(item.id); + } + } + static string Uuid() + { + return Guid.NewGuid().ToString(); + } +} + +[Trait("Category", "Integration")] +public class IntegrationFactAttribute : FactAttribute +{ + public IntegrationFactAttribute() + { + if (Environment.GetEnvironmentVariable("RUN_INTEGRATION_TESTS") != "true") + { + Skip = "Integration tests are disabled. Set RUN_INTEGRATION_TESTS=true to run."; + } + + // Set default timeout if not already set + if (Timeout == 0) + { + Timeout = 5000; // 5 seconds default for all integration tests + } + } +} \ No newline at end of file diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/TestSchema.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/TestSchema.cs new file mode 100644 index 0000000..dbf79a2 --- /dev/null +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/TestSchema.cs @@ -0,0 +1,33 @@ +namespace PowerSync.Common.IntegrationTests; + +using PowerSync.Common.DB.Schema; + +public class TestSchema +{ + public static Table Todos = new Table(new Dictionary + { + { "list_id", ColumnType.TEXT }, + { "created_at", ColumnType.TEXT }, + { "completed_at", ColumnType.TEXT }, + { "description", ColumnType.TEXT }, + { "created_by", ColumnType.TEXT }, + { "completed_by", ColumnType.TEXT }, + { "completed", ColumnType.INTEGER } + }, new TableOptions + { + Indexes = new Dictionary> { { "list", new List { "list_id" } } } + }); + + public static Table Lists = new Table(new Dictionary + { + { "created_at", ColumnType.TEXT }, + { "name", ColumnType.TEXT }, + { "owner_id", ColumnType.TEXT } + }); + + public static Schema PowerSyncSchema = new Schema(new Dictionary + { + { "todos", Todos }, + { "lists", Lists } + }); +} \ No newline at end of file diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/xunit.runner.json b/Tests/PowerSync/PowerSync.Common.IntegrationTests/xunit.runner.json new file mode 100644 index 0000000..fcdb064 --- /dev/null +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/xunit.runner.json @@ -0,0 +1,5 @@ +{ + "methodDisplay": "method", + "diagnosticMessages": true, + "longRunningTestSeconds": 10 +} \ No newline at end of file diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs index b19727f..7a2f9bf 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs @@ -4,7 +4,7 @@ namespace PowerSync.Common.Tests.Client.Sync; using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging; - +using Newtonsoft.Json; using PowerSync.Common.Client; using PowerSync.Common.Client.Sync.Bucket; using PowerSync.Common.DB.Schema; @@ -17,7 +17,7 @@ class TestData Op = new OpType(OpTypeEnum.PUT).ToJSON(), ObjectType = "assets", ObjectId = "O1", - Data = new { description = "bar" }, + Data = JsonConvert.SerializeObject(new { description = "bar" }), Checksum = 1 }); @@ -27,7 +27,7 @@ class TestData Op = new OpType(OpTypeEnum.PUT).ToJSON(), ObjectType = "assets", ObjectId = "O2", - Data = new { description = "bar" }, + Data = JsonConvert.SerializeObject(new { description = "bar" }), Checksum = 2 }); @@ -37,7 +37,7 @@ class TestData Op = new OpType(OpTypeEnum.PUT).ToJSON(), ObjectType = "assets", ObjectId = "O1", - Data = new { description = "bard" }, + Data = JsonConvert.SerializeObject(new { description = "bard" }), Checksum = 3 }); @@ -235,7 +235,7 @@ public async Task ShouldUseSubkeys() Subkey = "b", ObjectType = "assets", ObjectId = "O1", - Data = new { description = "B" }, + Data = JsonConvert.SerializeObject(new { description = "B" }), Checksum = 4 }); @@ -863,7 +863,7 @@ await bucketStorage.SaveSyncData( ObjectType = "assets", ObjectId = "O3", Checksum = 5, - Data = new { description = "server updated" } + Data = JsonConvert.SerializeObject(new { description = "server updated" }) }) ], false) ]) diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs index 965cd13..c3ce177 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs @@ -45,8 +45,8 @@ public async Task Insert_RecordCrudEntryTest() JsonConvert.SerializeObject(new { op = "PUT", - type = "assets", id = testId, + type = "assets", data = new { description = "test" } }), crudEntry.Data @@ -80,8 +80,8 @@ public async Task InsertOrReplaceTest() JsonConvert.SerializeObject(new { op = "PUT", - type = "assets", id = testId, + type = "assets", data = new { description = "test2" } }), crudEntry.Data @@ -112,8 +112,8 @@ public async Task UpdateTest() JsonConvert.SerializeObject(new { op = "PATCH", - type = "assets", id = testId, + type = "assets", data = new { description = "test2" } }), crudEntry.Data @@ -144,8 +144,8 @@ public async Task DeleteTest() JsonConvert.SerializeObject(new { op = "DELETE", + id = testId, type = "assets", - id = testId }), crudEntry.Data ); @@ -234,8 +234,8 @@ public async Task BigNumbersIntegerTest() JsonConvert.SerializeObject(new { op = "PUT", - type = "assets", id = testId, + type = "assets", data = new { quantity = bigNumber } }), crudEntry.Data @@ -267,8 +267,8 @@ public async Task BigNumbersTextTest() JsonConvert.SerializeObject(new { op = "PUT", - type = "assets", id = testId, + type = "assets", data = new { quantity = bigNumber.ToString() } }), crudEntry.Data @@ -287,8 +287,8 @@ await db.Execute("UPDATE assets SET description = ?, quantity = CAST(quantity AS JsonConvert.SerializeObject(new { op = "PATCH", - type = "assets", id = testId, + type = "assets", data = new { description = "updated", quantity = bigNumber + 1 } }), crudEntry.Data diff --git a/Tools/Setup/Setup.cs b/Tools/Setup/Setup.cs index b47192e..81fd9c3 100644 --- a/Tools/Setup/Setup.cs +++ b/Tools/Setup/Setup.cs @@ -7,10 +7,11 @@ public class PowerSyncSetup { - private const string VERSION = "0.3.14"; + private const string VERSION = "0.4.9"; + private const string GITHUB_BASE_URL = $"https://github.com/powersync-ja/powersync-sqlite-core/releases/download/v{VERSION}"; - private const string MAVEN_BASE_URL = $"https://repo1.maven.org/maven2/co/powersync/powersync-sqlite-core/{VERSION}"; - + private const string MAVEN_BASE_URL = $"https://repo1.maven.org/maven2/com/powersync/powersync-sqlite-core/{VERSION}"; + private readonly HttpClient _httpClient; private readonly string _basePath; @@ -37,7 +38,7 @@ public async Task RunSetup() public async Task SetupDesktop() { Console.WriteLine("Setting up Desktop libraries..."); - + var runtimeConfigs = GetDesktopRuntimeConfigs(); var commonPath = Path.Combine(_basePath, "PowerSync.Common"); @@ -51,10 +52,10 @@ private static Dictionary GetDesktopRuntimeConfigs() { return new Dictionary { - { "osx-x64", new RuntimeConfig("libpowersync_x64.dylib", "libpowersync.dylib") }, - { "osx-arm64", new RuntimeConfig("libpowersync_aarch64.dylib", "libpowersync.dylib") }, - { "linux-x64", new RuntimeConfig("libpowersync_x64.so", "libpowersync.so") }, - { "linux-arm64", new RuntimeConfig("libpowersync_aarch64.so", "libpowersync.so") }, + { "osx-x64", new RuntimeConfig("libpowersync_x64.macos.dylib", "libpowersync.dylib") }, + { "osx-arm64", new RuntimeConfig("libpowersync_aarch64.macos.dylib", "libpowersync.dylib") }, + { "linux-x64", new RuntimeConfig("libpowersync_x64.linux.so", "libpowersync.so") }, + { "linux-arm64", new RuntimeConfig("libpowersync_aarch64.linux.so", "libpowersync.so") }, { "win-x64", new RuntimeConfig("powersync_x64.dll", "powersync.dll") } }; } @@ -63,19 +64,19 @@ private async Task ProcessDesktopRuntime(string basePath, KeyValuePairNone true true + + 8.0.90 11.0 21.0 diff --git a/root.sln b/root.sln index f83584e..b4c92ae 100644 --- a/root.sln +++ b/root.sln @@ -23,6 +23,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WPF", "demos\WPF\WPF.csproj EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PowerSync.Maui", "PowerSync\PowerSync.Maui\PowerSync.Maui.csproj", "{A4A91B9F-0C86-41CB-BEF0-C002819C43BE}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PowerSync.Common.IntegrationTests", "Tests\PowerSync\PowerSync.Common.IntegrationTests\PowerSync.Common.IntegrationTests.csproj", "{EB81D453-777D-40B5-A504-4144906ADBF4}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -105,6 +107,18 @@ Global {A4A91B9F-0C86-41CB-BEF0-C002819C43BE}.Release|x64.Build.0 = Release|Any CPU {A4A91B9F-0C86-41CB-BEF0-C002819C43BE}.Release|x86.ActiveCfg = Release|Any CPU {A4A91B9F-0C86-41CB-BEF0-C002819C43BE}.Release|x86.Build.0 = Release|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Debug|x64.ActiveCfg = Debug|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Debug|x64.Build.0 = Debug|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Debug|x86.ActiveCfg = Debug|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Debug|x86.Build.0 = Debug|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Release|Any CPU.Build.0 = Release|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Release|x64.ActiveCfg = Release|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Release|x64.Build.0 = Release|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Release|x86.ActiveCfg = Release|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -117,5 +131,6 @@ Global {B8B2A9E2-FEC9-495B-B03E-23078E7B651D} = {9144195A-C68F-4B1E-A574-474EDD424D6C} {AF297026-0BEA-4B8E-97C9-6540C6D52B36} = {9144195A-C68F-4B1E-A574-474EDD424D6C} {A4A91B9F-0C86-41CB-BEF0-C002819C43BE} = {B1D87BA9-8812-4EFA-BBBE-1FF1EEEB5433} + {EB81D453-777D-40B5-A504-4144906ADBF4} = {C784FBE4-CC1E-4A0A-AE8E-6B818DD3724D} EndGlobalSection EndGlobal