diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index 7dc0b77..5f47ed8 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -80,8 +80,6 @@ public interface IPowerSyncDatabase : IEventStream public class PowerSyncDatabase : EventStream, IPowerSyncDatabase { - private static readonly int FULL_SYNC_PRIORITY = 2147483647; - public IDBAdapter Database; private Schema schema; @@ -156,9 +154,36 @@ public async Task WaitForReady() await isReadyTask; } - public async Task WaitForFirstSync(CancellationToken? cancellationToken = null) + public class PrioritySyncRequest + { + public CancellationToken? Token { get; set; } + public int? Priority { get; set; } + } + + /// + /// Wait for the first sync operation to complete. + /// + /// + /// An object providing a cancellation token and a priority target. + /// When a priority target is set, the task may complete when all buckets with the given (or higher) + /// priorities have been synchronized. This can be earlier than a complete sync. + /// + /// A task which will complete once the first full sync has completed. + public async Task WaitForFirstSync(PrioritySyncRequest? request = null) { - if (CurrentStatus.HasSynced == true) + var priority = request?.Priority ?? null; + var cancellationToken = request?.Token ?? null; + + bool StatusMatches(SyncStatus status) + { + if (priority == null) + { + return status.HasSynced == true; + } + return status.StatusForPriority(priority.Value).HasSynced == true; + } + + if (StatusMatches(CurrentStatus)) { return; } @@ -166,11 +191,11 @@ public async Task WaitForFirstSync(CancellationToken? cancellationToken = null) var tcs = new TaskCompletionSource(); var cts = new CancellationTokenSource(); - var _ = Task.Run(() => + _ = Task.Run(() => { foreach (var update in Listen(cts.Token)) { - if (update.StatusChanged?.HasSynced == true) + if (update.StatusChanged != null && StatusMatches(update.StatusChanged!)) { cts.Cancel(); tcs.SetResult(true); @@ -227,7 +252,7 @@ private async Task LoadVersion() } } - private record LastSyncedResult(int? priority, string? last_synced_at); + private record LastSyncedResult(int priority, string? last_synced_at); protected async Task UpdateHasSynced() { @@ -236,28 +261,39 @@ protected async Task UpdateHasSynced() ); DateTime? lastCompleteSync = null; + List priorityStatuses = []; - // TODO: Will be altered/extended when reporting individual sync priority statuses is supported foreach (var result in results) { var parsedDate = DateTime.Parse(result.last_synced_at + "Z"); - if (result.priority == FULL_SYNC_PRIORITY) + if (result.priority == SyncProgress.FULL_SYNC_PRIORITY) { // This lowest-possible priority represents a complete sync. lastCompleteSync = parsedDate; } + else + { + priorityStatuses.Add(new DB.Crud.SyncPriorityStatus + { + Priority = result.priority, + HasSynced = true, + LastSyncedAt = parsedDate + }); + } } var hasSynced = lastCompleteSync != null; - if (hasSynced != CurrentStatus.HasSynced) + var updatedStatus = new SyncStatus(new SyncStatusOptions(CurrentStatus.Options) { - CurrentStatus = new SyncStatus(new SyncStatusOptions(CurrentStatus.Options) - { - HasSynced = hasSynced, - LastSyncedAt = lastCompleteSync, - }); + HasSynced = hasSynced, + PriorityStatusEntries = priorityStatuses.ToArray(), + LastSyncedAt = lastCompleteSync, + }); + if (!updatedStatus.IsEqual(CurrentStatus)) + { + CurrentStatus = updatedStatus; Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus }); } } @@ -534,7 +570,7 @@ await tx.Execute( } /// - /// Get an unique client id for this database. + /// Get a unique client id for this database. /// /// The id is not reset when the database is cleared, only when the database is deleted. /// diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs index 4611bb2..57295a7 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs @@ -4,9 +4,9 @@ namespace PowerSync.Common.Client.Sync.Bucket; using System; using System.Threading.Tasks; +using Newtonsoft.Json; using PowerSync.Common.DB.Crud; using PowerSync.Common.Utils; -using Newtonsoft.Json; public static class PowerSyncControlCommand { @@ -31,38 +31,6 @@ public class Checkpoint public string? WriteCheckpoint { get; set; } = null; } -public class BucketState -{ - [JsonProperty("bucket")] - public string Bucket { get; set; } = null!; - - [JsonProperty("op_id")] - public string OpId { get; set; } = null!; -} - -public class SyncLocalDatabaseResult -{ - [JsonProperty("ready")] - public bool Ready { get; set; } - - [JsonProperty("checkpointValid")] - public bool CheckpointValid { get; set; } - - [JsonProperty("checkpointFailures")] - public string[]? CheckpointFailures { get; set; } - - public override bool Equals(object? obj) - { - if (obj is not SyncLocalDatabaseResult other) return false; - return JsonConvert.SerializeObject(this) == JsonConvert.SerializeObject(other); - } - - public override int GetHashCode() - { - return JsonConvert.SerializeObject(this).GetHashCode(); - } -} - public class BucketChecksum { [JsonProperty("bucket")] @@ -95,21 +63,11 @@ public class BucketStorageEvent public interface IBucketStorageAdapter : IEventStream { Task Init(); - Task SaveSyncData(SyncDataBatch batch); - Task RemoveBuckets(string[] buckets); - Task SetTargetCheckpoint(Checkpoint checkpoint); - - void StartSession(); - - Task GetBucketStates(); - - Task SyncLocalDatabase(Checkpoint checkpoint); Task NextCrudItem(); Task HasCrud(); Task GetCrudBatch(int limit = 100); - Task HasCompletedSync(); Task UpdateLocalTarget(Func> callback); /// diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs index 7dfff74..3af593d 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs @@ -4,23 +4,18 @@ namespace PowerSync.Common.Client.Sync.Bucket; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; - using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; - using Newtonsoft.Json; - using PowerSync.Common.DB; using PowerSync.Common.DB.Crud; using PowerSync.Common.Utils; public class SqliteBucketStorage : EventStream, IBucketStorageAdapter { - public static readonly string MAX_OP_ID = "9223372036854775807"; private readonly IDBAdapter db; - private bool hasCompletedSync; private bool pendingBucketDeletes; private readonly HashSet tableNames; private string? clientId; @@ -37,8 +32,7 @@ private record ExistingTableRowsResult(string name); public SqliteBucketStorage(IDBAdapter db, ILogger? logger = null) { this.db = db; - this.logger = logger ?? NullLogger.Instance; ; - hasCompletedSync = false; + this.logger = logger ?? NullLogger.Instance; pendingBucketDeletes = true; tableNames = []; @@ -62,9 +56,9 @@ public SqliteBucketStorage(IDBAdapter db, ILogger? logger = null) public async Task Init() { - - hasCompletedSync = false; - var existingTableRows = await db.GetAll("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'"); + var existingTableRows = + await db.GetAll( + "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'"); foreach (var row in existingTableRows) { @@ -79,6 +73,7 @@ public async Task Init() } private record ClientIdResult(string? client_id); + public async Task GetClientId() { if (clientId == null) @@ -298,6 +293,7 @@ await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES (?, ?)", } private record TargetOpResult(string target_op); + private record SequenceResult(int seq); public async Task UpdateLocalTarget(Func> callback) @@ -431,12 +427,6 @@ public async Task HasCrud() return await db.GetOptional("SELECT 1 as ignore FROM ps_crud LIMIT 1") != null; } - public async Task SetTargetCheckpoint(Checkpoint checkpoint) - { - // No Op - await Task.CompletedTask; - } - record ControlResult(string? r); public async Task Control(string op, object? payload = null) diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs index 7bfef98..42e39a7 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs @@ -14,7 +14,6 @@ public static Instruction[] ParseInstructions(string rawResponse) { var jsonArray = JArray.Parse(rawResponse); List instructions = []; - foreach (JObject item in jsonArray) { var instruction = ParseInstruction(item); @@ -24,7 +23,6 @@ public static Instruction[] ParseInstructions(string rawResponse) } instructions.Add(instruction); } - return instructions.ToArray(); } @@ -44,7 +42,6 @@ public static Instruction[] ParseInstructions(string rawResponse) return new FlushFileSystem(); if (json.ContainsKey("DidCompleteSync")) return new DidCompleteSync(); - throw new JsonSerializationException("Unknown Instruction type."); } } @@ -126,4 +123,4 @@ public class FetchCredentials : Instruction public class CloseSyncStream : Instruction { } public class FlushFileSystem : Instruction { } -public class DidCompleteSync : Instruction { } \ No newline at end of file +public class DidCompleteSync : Instruction { } diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs index d26795f..8b2cae2 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs @@ -1,33 +1,26 @@ namespace PowerSync.Common.Client.Sync.Stream; +using Connection; using System.IO; using System.Net.Http; using System.Reflection; using System.Text; using System.Threading; using System.Threading.Tasks; -using System.Text.RegularExpressions; using Newtonsoft.Json; -using Newtonsoft.Json.Linq; -using PowerSync.Common.Client.Connection; public class SyncStreamOptions { public string Path { get; set; } = ""; + public StreamingSyncRequest Data { get; set; } = new(); public Dictionary Headers { get; set; } = new(); public CancellationToken CancellationToken { get; set; } = CancellationToken.None; } -public class RequestDetails -{ - public string Url { get; set; } = ""; - public Dictionary Headers { get; set; } = new(); -} - public class Remote { @@ -227,6 +220,7 @@ public async Task PostStreamRaw(SyncStreamOptions options) } } + private async Task BuildRequest(HttpMethod method, string path, object? data = null, Dictionary? additionalHeaders = null) { var credentials = await GetCredentials(); diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index f143356..b68e48c 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -1,3 +1,6 @@ +using System.Text; +using Newtonsoft.Json.Linq; + namespace PowerSync.Common.Client.Sync.Stream; using System.Net.Sockets; @@ -5,9 +8,7 @@ namespace PowerSync.Common.Client.Sync.Stream; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; - using Newtonsoft.Json; - using PowerSync.Common.Client.Sync.Bucket; using PowerSync.Common.DB.Crud; using PowerSync.Common.Utils; @@ -141,13 +142,17 @@ public class StreamingSyncImplementation : EventStream + _ = Task.Run(() => { foreach (var _ in Options.Adapter.Listen(crudUpdateCts.Token)) { @@ -389,11 +396,12 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio }); } - protected record StreamingSyncIterationResult + protected async Task StreamingSyncIteration(CancellationToken signal, + PowerSyncConnectionOptions? options) { public bool? LegacyRetry { get; init; } - public bool? ImmediateRestart { get; init; } + public bool? ImmediateRestart { get; init; } } @@ -890,170 +898,178 @@ protected async Task LegacyStreamingSyncIteration( } public record ResponseData( - [property: JsonProperty("write_checkpoint")] string WriteCheckpoint + [property: JsonProperty("write_checkpoint")] + string WriteCheckpoint ); - public record ApiResponse( - [property: JsonProperty("data")] ResponseData Data - ); - public async Task GetWriteCheckpoint() - { - var clientId = await Options.Adapter.GetClientId(); - var path = $"/write-checkpoint2.json?client_id={clientId}"; - var response = await Options.Remote.Get(path); +public record ApiResponse( + [property: JsonProperty("data")] ResponseData Data +); - return response.Data.WriteCheckpoint; - } +public async Task GetWriteCheckpoint() +{ + var clientId = await Options.Adapter.GetClientId(); + var path = $"/write-checkpoint2.json?client_id={clientId}"; + var response = await Options.Remote.Get(path); - protected async Task InternalUploadAllCrud() - { + return response.Data.WriteCheckpoint; +} - await locks.ObtainLock(new LockOptions +protected async Task InternalUploadAllCrud() +{ + await locks.ObtainLock(new LockOptions + { + Type = LockType.CRUD, + Callback = async () => { - Type = LockType.CRUD, - Callback = async () => + CrudEntry? checkedCrudItem = null; + + while (true) { - CrudEntry? checkedCrudItem = null; + UpdateSyncStatus(new SyncStatusOptions { DataFlow = new SyncDataFlowStatus { Uploading = true } }); - while (true) + try { - UpdateSyncStatus(new SyncStatusOptions { DataFlow = new SyncDataFlowStatus { Uploading = true } }); - - try + // This is the first item in the FIFO CRUD queue. + var nextCrudItem = await Options.Adapter.NextCrudItem(); + if (nextCrudItem != null) { - // This is the first item in the FIFO CRUD queue. - var nextCrudItem = await Options.Adapter.NextCrudItem(); - if (nextCrudItem != null) + if (checkedCrudItem?.ClientId == nextCrudItem.ClientId) { - if (checkedCrudItem?.ClientId == nextCrudItem.ClientId) - { - logger.LogWarning( - "Potentially previously uploaded CRUD entries are still present in the upload queue. " + - "Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their `.Complete()` method. " + - "The next upload iteration will be delayed." - ); - throw new Exception("Delaying due to previously encountered CRUD item."); - } + logger.LogWarning( + "Potentially previously uploaded CRUD entries are still present in the upload queue. " + + "Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their `.Complete()` method. " + + "The next upload iteration will be delayed." + ); + throw new Exception("Delaying due to previously encountered CRUD item."); + } - checkedCrudItem = nextCrudItem; - await Options.UploadCrud(); - UpdateSyncStatus(new SyncStatusOptions - { - }, + checkedCrudItem = nextCrudItem; + await Options.UploadCrud(); + UpdateSyncStatus(new SyncStatusOptions + { + }, new UpdateSyncStatusOptions { ClearUploadError = true }); - - } - else - { - // Uploading is completed - await Options.Adapter.UpdateLocalTarget(GetWriteCheckpoint); - break; - } } - catch (Exception ex) + else { - checkedCrudItem = null; - UpdateSyncStatus(new SyncStatusOptions - { - DataFlow = new SyncDataFlowStatus - { - Uploading = false, - UploadError = ex - } - }); - - await DelayRetry(); - - if (!IsConnected) + // Uploading is completed + await Options.Adapter.UpdateLocalTarget(GetWriteCheckpoint); + break; + } + } + catch (Exception ex) + { + checkedCrudItem = null; + UpdateSyncStatus(new SyncStatusOptions + { + DataFlow = new SyncDataFlowStatus { - // Exit loop if sync stream is no longer connected - break; + Uploading = false, + UploadError = ex } + }); - logger.LogDebug("Caught exception when uploading. Upload will retry after a delay. Exception: {message}", ex.Message); - } - finally + await DelayRetry(); + + if (!IsConnected) { - UpdateSyncStatus(new SyncStatusOptions { DataFlow = new SyncDataFlowStatus { Uploading = false } }); + // Exit loop if sync stream is no longer connected + break; } - } - return Task.CompletedTask; + logger.LogDebug( + "Caught exception when uploading. Upload will retry after a delay. Exception: {message}", + ex.Message); + } + finally + { + UpdateSyncStatus(new SyncStatusOptions + { DataFlow = new SyncDataFlowStatus { Uploading = false } }); + } } - }); - } - public async Task HasCompletedSync() - { - return await Options.Adapter.HasCompletedSync(); - } + return Task.CompletedTask; + } + }); +} - public async Task WaitForReady() - { - // Do nothing - await Task.CompletedTask; - } +public async Task WaitForReady() +{ + // Do nothing + await Task.CompletedTask; +} - protected record UpdateSyncStatusOptions( - bool? ClearDownloadError = null, bool? ClearUploadError = null - ); - protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptions? updateOptions = null) +protected record UpdateSyncStatusOptions( + bool? ClearDownloadError = null, + bool? ClearUploadError = null +); + +protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptions? updateOptions = null) +{ + try { - try + var updatedStatus = new SyncStatus(new SyncStatusOptions { - var updatedStatus = new SyncStatus(new SyncStatusOptions - { - Connected = options.Connected ?? SyncStatus.Connected, - Connecting = !options.Connected.GetValueOrDefault() && (options.Connecting ?? SyncStatus.Connecting), - LastSyncedAt = options.LastSyncedAt ?? SyncStatus.LastSyncedAt, - DataFlow = new SyncDataFlowStatus - { - Uploading = options.DataFlow?.Uploading ?? SyncStatus.DataFlowStatus.Uploading, - Downloading = options.DataFlow?.Downloading ?? SyncStatus.DataFlowStatus.Downloading, - DownloadError = updateOptions?.ClearDownloadError == true ? null : options.DataFlow?.DownloadError ?? SyncStatus.DataFlowStatus.DownloadError, - UploadError = updateOptions?.ClearUploadError == true ? null : options.DataFlow?.UploadError ?? SyncStatus.DataFlowStatus.UploadError, - } - }); - if (!SyncStatus.Equals(updatedStatus)) + // UploadError = updateOptions?.ClearUploadError == true + // ? null + // : options.DataFlow?.UploadError ?? SyncStatus.DataFlowStatus.UploadError, + // DownloadProgress = options.DataFlow?.DownloadProgress ?? SyncStatus.DataFlowStatus.DownloadProgress, + // }, + // PriorityStatusEntries = options.PriorityStatusEntries ?? SyncStatus.PriorityStatusEntries + + + Connected = options.Connected ?? SyncStatus.Connected, + Connecting = !options.Connected.GetValueOrDefault() && (options.Connecting ?? SyncStatus.Connecting), + LastSyncedAt = options.LastSyncedAt ?? SyncStatus.LastSyncedAt, + DataFlow = new SyncDataFlowStatus { - SyncStatus = updatedStatus; - logger.LogDebug("[Sync status updated]: {message}", updatedStatus.ToJSON()); - // Only trigger this if there was a change - Emit(new StreamingSyncImplementationEvent { StatusChanged = updatedStatus }); + Uploading = options.DataFlow?.Uploading ?? SyncStatus.DataFlowStatus.Uploading, + Downloading = options.DataFlow?.Downloading ?? SyncStatus.DataFlowStatus.Downloading, + DownloadError = updateOptions?.ClearDownloadError == true ? null : options.DataFlow?.DownloadError ?? SyncStatus.DataFlowStatus.DownloadError, + UploadError = updateOptions?.ClearUploadError == true ? null : options.DataFlow?.UploadError ?? SyncStatus.DataFlowStatus.UploadError, } + }); - // Trigger this for all updates - Emit(new StreamingSyncImplementationEvent { StatusUpdated = options }); - } - catch (Exception ex) + if (!SyncStatus.Equals(updatedStatus)) { - logger.LogError("Error updating sync status: {message}", ex.Message); + SyncStatus = updatedStatus; + logger.LogDebug("[Sync status updated]: {message}", updatedStatus.ToJSON()); + // Only trigger this if there was a change + Emit(new StreamingSyncImplementationEvent { StatusChanged = updatedStatus }); } + // Trigger this for all updates + Emit(new StreamingSyncImplementationEvent { StatusUpdated = options }); } - - private static DB.Crud.SyncPriorityStatus CoreStatusToSyncStatus(SyncPriorityStatus status) + catch (Exception ex) { - return new DB.Crud.SyncPriorityStatus - { - Priority = status.Priority, - HasSynced = status.HasSynced ?? null, - LastSyncedAt = status?.LastSyncedAt != null ? new DateTime(status!.LastSyncedAt) : null - }; + logger.LogError("Error updating sync status: {message}", ex.Message); } +} - private async Task DelayRetry() +private async Task DelayRetry() +{ + if (Options.RetryDelayMs.HasValue) { - if (Options.RetryDelayMs.HasValue) - { - await Task.Delay(Options.RetryDelayMs.Value); - } + await Task.Delay(Options.RetryDelayMs.Value); } } +private static DB.Crud.SyncPriorityStatus CoreStatusToSyncStatus(SyncPriorityStatus status) +{ + return new DB.Crud.SyncPriorityStatus + { + Priority = status.Priority, + HasSynced = status.HasSynced ?? null, + LastSyncedAt = status?.LastSyncedAt != null ? new DateTime(status!.LastSyncedAt) : null + }; +} +} + enum LockType { CRUD, diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncTypes.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncTypes.cs index 6f78059..a7934fb 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncTypes.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncTypes.cs @@ -4,54 +4,6 @@ namespace PowerSync.Common.Client.Sync.Stream; using PowerSync.Common.DB.Crud; using Newtonsoft.Json; -public class ContinueCheckpointRequest -{ - [JsonProperty("buckets")] - public List Buckets { get; set; } = new(); - - [JsonProperty("checkpoint_token")] - public string CheckpointToken { get; set; } = ""; - - [JsonProperty("limit")] - public int? Limit { get; set; } -} - -public class SyncNewCheckpointRequest -{ - [JsonProperty("buckets")] - public List? Buckets { get; set; } - - [JsonProperty("request_checkpoint")] - public RequestCheckpoint RequestCheckpoint { get; set; } = new(); - - [JsonProperty("limit")] - public int? Limit { get; set; } -} - -public class RequestCheckpoint -{ - [JsonProperty("include_data")] - public bool IncludeData { get; set; } - - [JsonProperty("include_checksum")] - public bool IncludeChecksum { get; set; } -} - -public class SyncResponse -{ - [JsonProperty("data")] - public List? Data { get; set; } - - [JsonProperty("has_more")] - public bool HasMore { get; set; } - - [JsonProperty("checkpoint_token")] - public string? CheckpointToken { get; set; } - - [JsonProperty("checkpoint")] - public Checkpoint? Checkpoint { get; set; } -} - public class StreamingSyncRequest { [JsonProperty("buckets")] @@ -80,69 +32,4 @@ public class BucketRequest [JsonProperty("after")] public string After { get; set; } = ""; -} - -public abstract class StreamingSyncLine { } - -public class StreamingSyncCheckpoint : StreamingSyncLine -{ - [JsonProperty("checkpoint")] - public Checkpoint Checkpoint { get; set; } = new(); -} - -public class StreamingSyncCheckpointDiff : StreamingSyncLine -{ - [JsonProperty("checkpoint_diff")] - public CheckpointDiff CheckpointDiff { get; set; } = new(); -} - -public class CheckpointDiff -{ - [JsonProperty("last_op_id")] - public string LastOpId { get; set; } = ""; - - [JsonProperty("updated_buckets")] - public List UpdatedBuckets { get; set; } = new(); - - [JsonProperty("removed_buckets")] - public List RemovedBuckets { get; set; } = new(); - - [JsonProperty("write_checkpoint")] - public string WriteCheckpoint { get; set; } = ""; -} - -public class StreamingSyncDataJSON : StreamingSyncLine -{ - [JsonProperty("data")] - public SyncDataBucketJSON Data { get; set; } = new(); -} - -public class StreamingSyncCheckpointComplete : StreamingSyncLine -{ - [JsonProperty("checkpoint_complete")] - public CheckpointComplete CheckpointComplete { get; set; } = new(); -} - -public class CheckpointComplete -{ - [JsonProperty("last_op_id")] - public string LastOpId { get; set; } = ""; -} - -public class StreamingSyncKeepalive : StreamingSyncLine -{ - [JsonProperty("token_expires_in")] - public int? TokenExpiresIn { get; set; } -} - -public class CrudRequest -{ - [JsonProperty("data")] - public List Data { get; set; } = new(); -} - -public class CrudResponse -{ - [JsonProperty("checkpoint")] - public string? Checkpoint { get; set; } -} +} \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/DB/Crud/CrudEntry.cs b/PowerSync/PowerSync.Common/DB/Crud/CrudEntry.cs index 5fe8f02..8a3539d 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/CrudEntry.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/CrudEntry.cs @@ -5,65 +5,62 @@ namespace PowerSync.Common.DB.Crud; public enum UpdateType { - [JsonProperty("PUT")] - PUT, + [JsonProperty("PUT")] PUT, - [JsonProperty("PATCH")] - PATCH, + [JsonProperty("PATCH")] PATCH, - [JsonProperty("DELETE")] - DELETE + [JsonProperty("DELETE")] DELETE } public class CrudEntryJSON { - [JsonProperty("id")] - public string Id { get; set; } = null!; + [JsonProperty("id")] public string Id { get; set; } = null!; - [JsonProperty("data")] - public string Data { get; set; } = null!; - - [JsonProperty("tx_id")] - public long? TransactionId { get; set; } + [JsonProperty("data")] public string Data { get; set; } = null!; + + [JsonProperty("tx_id")] public long? TransactionId { get; set; } } public class CrudEntryDataJSON { - [JsonProperty("data")] - public Dictionary? Data { get; set; } - - [JsonProperty("op")] - public UpdateType Op { get; set; } - - [JsonProperty("type")] - public string Type { get; set; } = null!; - - [JsonProperty("id")] - public string Id { get; set; } = null!; + [JsonProperty("data")] public Dictionary? Data { get; set; } + + [JsonProperty("old")] public Dictionary? Old { get; set; } + + [JsonProperty("op")] public UpdateType Op { get; set; } + + [JsonProperty("type")] public string Type { get; set; } = null!; + + [JsonProperty("id")] public string Id { get; set; } = null!; + + [JsonProperty("metadata")] public string? Metadata { get; set; } } public class CrudEntryOutputJSON { - [JsonProperty("op_id")] - public int OpId { get; set; } + [JsonProperty("op_id")] public int OpId { get; set; } - [JsonProperty("op")] - public UpdateType Op { get; set; } + [JsonProperty("op")] public UpdateType Op { get; set; } - [JsonProperty("type")] - public string Type { get; set; } = null!; + [JsonProperty("type")] public string Type { get; set; } = null!; - [JsonProperty("id")] - public string Id { get; set; } = null!; + [JsonProperty("id")] public string Id { get; set; } = null!; - [JsonProperty("tx_id")] - public long? TransactionId { get; set; } + [JsonProperty("tx_id")] public long? TransactionId { get; set; } - [JsonProperty("data")] - public Dictionary? Data { get; set; } + [JsonProperty("data")] public Dictionary? Data { get; set; } } -public class CrudEntry(int clientId, UpdateType op, string table, string id, long? transactionId = null, Dictionary? opData = null) +public class CrudEntry( + int clientId, + UpdateType op, + string table, + string id, + long? transactionId = null, + Dictionary? opData = null, + Dictionary? previousValues = null, + string? metadata = null +) { public int ClientId { get; private set; } = clientId; public string Id { get; private set; } = id; @@ -72,6 +69,19 @@ public class CrudEntry(int clientId, UpdateType op, string table, string id, lon public string Table { get; private set; } = table; public long? TransactionId { get; private set; } = transactionId; + /// + /// Previous values before this change. + /// + public Dictionary? PreviousValues { get; private set; } = previousValues; + + /// + /// Client-side metadata attached with this write. + /// + /// This field is only available when the `trackMetadata` option was set to `true` when creating a table + /// and the insert or update statement set the `_metadata` column. + /// + public string? Metadata { get; private set; } = metadata; + public static CrudEntry FromRow(CrudEntryJSON dbRow) { var data = JsonConvert.DeserializeObject(dbRow.Data) @@ -83,7 +93,9 @@ public static CrudEntry FromRow(CrudEntryJSON dbRow) data.Type, data.Id, dbRow.TransactionId, - data.Data + data.Data, + data.Old, + data.Metadata ); } diff --git a/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs b/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs index b709a2c..659352e 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs @@ -21,7 +21,6 @@ public class SyncProgress : ProgressWithOperations { public static readonly int FULL_SYNC_PRIORITY = 2147483647; protected Dictionary InternalProgress { get; } - public SyncProgress(Dictionary progress) { this.InternalProgress = progress; diff --git a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs index cdda36a..8293b53 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs @@ -173,4 +173,10 @@ public string ToJSON() { return SerializeObject(); } + + private static int ComparePriorities(SyncPriorityStatus a, SyncPriorityStatus b) + { + // Lower numbers = higher priority + return a.Priority.CompareTo(b.Priority); + } } \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs index b0c906b..a31ee49 100644 --- a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs +++ b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs @@ -30,13 +30,13 @@ public class QueryRows public interface IDBGetUtils { // Execute a read-only query and return results. - Task GetAll(string sql, params object[]? parameters); + Task GetAll(string sql, object[]? parameters = null); // Execute a read-only query and return the first result, or null if the ResultSet is empty. - Task GetOptional(string sql, params object[]? parameters); + Task GetOptional(string sql, object[]? parameters = null); // Execute a read-only query and return the first result, error if the ResultSet is empty. - Task Get(string sql, params object[]? parameters); + Task Get(string sql, object[]? parameters = null); } public interface ILockContext : IDBGetUtils diff --git a/PowerSync/PowerSync.Common/DB/Schema/Table.cs b/PowerSync/PowerSync.Common/DB/Schema/Table.cs index cd9d2b1..2476e79 100644 --- a/PowerSync/PowerSync.Common/DB/Schema/Table.cs +++ b/PowerSync/PowerSync.Common/DB/Schema/Table.cs @@ -7,15 +7,57 @@ public class TableOptions( Dictionary>? indexes = null, bool? localOnly = null, bool? insertOnly = null, - string? viewName = null) + string? viewName = null, + bool? trackMetadata = null, + TrackPreviousOptions? trackPreviousOptions = null, + bool? ignoreEmptyUpdates = null +) { public Dictionary> Indexes { get; set; } = indexes ?? []; - public bool LocalOnly { get; set; } = localOnly ?? false; + public bool LocalOnly { get; } = localOnly ?? false; - public bool InsertOnly { get; set; } = insertOnly ?? false; + public bool InsertOnly { get; } = insertOnly ?? false; - public string? ViewName { get; set; } = viewName; + public string? ViewName { get; } = viewName; + + /// + /// Whether to add a hidden `_metadata` column that will be enabled for updates to attach custom + /// information about writes that will be reported through [CrudEntry.metadata]. + /// + public bool TrackMetadata { get; } = trackMetadata ?? false; + + /// + /// When set to a non-null value, track old values of columns + /// + public TrackPreviousOptions? TrackPreviousOptions { get; } = trackPreviousOptions ?? null; + + /// + /// Whether an `UPDATE` statement that doesn't change any values should be ignored when creating + /// CRUD entries. + /// + public bool IgnoreEmptyUpdates { get; } = ignoreEmptyUpdates ?? false; +} + +/// +/// Whether to include previous column values when PowerSync tracks local changes. +/// Including old values may be helpful for some backend connector implementations, +/// which is why it can be enabled on a per-table or per-column basis. +/// +public class TrackPreviousOptions +{ + /// + /// When defined, a list of column names for which old values should be tracked. + /// + [JsonProperty("columns")] + public List? Columns { get; set; } + + /// + /// Whether to only include old values when they were changed by an update, instead of always + /// including all old values, + /// + [JsonProperty("onlyWhenChanged")] + public bool? OnlyWhenChanged { get; set; } } public class Table @@ -35,16 +77,21 @@ public Table(Dictionary columns, TableOptions? options = nul { ConvertedColumns = [.. columns.Select(kv => new Column(new ColumnOptions(kv.Key, kv.Value)))]; - ConvertedIndexes = [.. (Options?.Indexes ?? []) + ConvertedIndexes = + [ + .. (Options?.Indexes ?? []) .Select(kv => new Index(new IndexOptions( kv.Key, - [.. kv.Value.Select(name => - new IndexedColumn(new IndexColumnOptions( - name.Replace("-", ""), !name.StartsWith("-"))) - )] + [ + .. kv.Value.Select(name => + new IndexedColumn(new IndexColumnOptions( + name.Replace("-", ""), !name.StartsWith("-"))) + ) + ] )) - )]; + ) + ]; Options = options ?? new TableOptions(); @@ -61,7 +108,18 @@ public void Validate() if (Columns.Count > Column.MAX_AMOUNT_OF_COLUMNS) { - throw new Exception($"Table has too many columns. The maximum number of columns is {Column.MAX_AMOUNT_OF_COLUMNS}."); + throw new Exception( + $"Table has too many columns. The maximum number of columns is {Column.MAX_AMOUNT_OF_COLUMNS}."); + } + + if (Options.TrackMetadata && Options.LocalOnly) + { + throw new Exception("Can't include metadata for local-only tables."); + } + + if (Options.TrackPreviousOptions != null && Options.LocalOnly) + { + throw new Exception("Can't include old values for local-only tables."); } var columnNames = new HashSet { "id" }; @@ -103,15 +161,27 @@ public void Validate() public string ToJSON(string Name = "") { + var trackPrevious = Options.TrackPreviousOptions; + var jsonObject = new { view_name = Options.ViewName ?? Name, local_only = Options.LocalOnly, insert_only = Options.InsertOnly, columns = ConvertedColumns.Select(c => JsonConvert.DeserializeObject(c.ToJSON())).ToList(), - indexes = ConvertedIndexes.Select(e => JsonConvert.DeserializeObject(e.ToJSON(this))).ToList() + indexes = ConvertedIndexes.Select(e => JsonConvert.DeserializeObject(e.ToJSON(this))).ToList(), + + include_metadata = Options.TrackMetadata, + ignore_empty_update = Options.IgnoreEmptyUpdates, + include_old = (object)(trackPrevious switch + { + null => false, + { Columns: null } => true, + { Columns: var cols } => cols + }), + include_old_only_when_changed = trackPrevious?.OnlyWhenChanged ?? false }; return JsonConvert.SerializeObject(jsonObject); } -} +} \ No newline at end of file