Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 52 additions & 16 deletions PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ public interface IPowerSyncDatabase : IEventStream<PowerSyncDBEvent>

public class PowerSyncDatabase : EventStream<PowerSyncDBEvent>, IPowerSyncDatabase
{
private static readonly int FULL_SYNC_PRIORITY = 2147483647;

public IDBAdapter Database;
private Schema schema;

Expand Down Expand Up @@ -156,21 +154,48 @@ public async Task WaitForReady()
await isReadyTask;
}

public async Task WaitForFirstSync(CancellationToken? cancellationToken = null)
public class PrioritySyncRequest
{
public CancellationToken? Token { get; set; }
public int? Priority { get; set; }
}

/// <summary>
/// Wait for the first sync operation to complete.
/// </summary>
/// <param name="request">
/// An object providing a cancellation token and a priority target.
/// When a priority target is set, the task may complete when all buckets with the given (or higher)
/// priorities have been synchronized. This can be earlier than a complete sync.
/// </param>
/// <returns>A task which will complete once the first full sync has completed.</returns>
public async Task WaitForFirstSync(PrioritySyncRequest? request = null)
{
if (CurrentStatus.HasSynced == true)
var priority = request?.Priority ?? null;
var cancellationToken = request?.Token ?? null;

bool StatusMatches(SyncStatus status)
{
if (priority == null)
{
return status.HasSynced == true;
}
return status.StatusForPriority(priority.Value).HasSynced == true;
}

if (StatusMatches(CurrentStatus))
{
return;
}

var tcs = new TaskCompletionSource<bool>();
var cts = new CancellationTokenSource();

var _ = Task.Run(() =>
_ = Task.Run(() =>
{
foreach (var update in Listen(cts.Token))
{
if (update.StatusChanged?.HasSynced == true)
if (update.StatusChanged != null && StatusMatches(update.StatusChanged!))
{
cts.Cancel();
tcs.SetResult(true);
Expand Down Expand Up @@ -227,7 +252,7 @@ private async Task LoadVersion()
}
}

private record LastSyncedResult(int? priority, string? last_synced_at);
private record LastSyncedResult(int priority, string? last_synced_at);

protected async Task UpdateHasSynced()
{
Expand All @@ -236,28 +261,39 @@ protected async Task UpdateHasSynced()
);

DateTime? lastCompleteSync = null;
List<DB.Crud.SyncPriorityStatus> priorityStatuses = [];

// TODO: Will be altered/extended when reporting individual sync priority statuses is supported
foreach (var result in results)
{
var parsedDate = DateTime.Parse(result.last_synced_at + "Z");

if (result.priority == FULL_SYNC_PRIORITY)
if (result.priority == SyncProgress.FULL_SYNC_PRIORITY)
{
// This lowest-possible priority represents a complete sync.
lastCompleteSync = parsedDate;
}
else
{
priorityStatuses.Add(new DB.Crud.SyncPriorityStatus
{
Priority = result.priority,
HasSynced = true,
LastSyncedAt = parsedDate
});
}
}

var hasSynced = lastCompleteSync != null;
if (hasSynced != CurrentStatus.HasSynced)
var updatedStatus = new SyncStatus(new SyncStatusOptions(CurrentStatus.Options)
{
CurrentStatus = new SyncStatus(new SyncStatusOptions(CurrentStatus.Options)
{
HasSynced = hasSynced,
LastSyncedAt = lastCompleteSync,
});
HasSynced = hasSynced,
PriorityStatusEntries = priorityStatuses.ToArray(),
LastSyncedAt = lastCompleteSync,
});

if (!updatedStatus.IsEqual(CurrentStatus))
{
CurrentStatus = updatedStatus;
Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus });
}
}
Expand Down Expand Up @@ -534,7 +570,7 @@ await tx.Execute(
}

/// <summary>
/// Get an unique client id for this database.
/// Get a unique client id for this database.
///
/// The id is not reset when the database is cleared, only when the database is deleted.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ namespace PowerSync.Common.Client.Sync.Bucket;
using System;
using System.Threading.Tasks;

using Newtonsoft.Json;
using PowerSync.Common.DB.Crud;
using PowerSync.Common.Utils;
using Newtonsoft.Json;

public static class PowerSyncControlCommand
{
Expand All @@ -31,38 +31,6 @@ public class Checkpoint
public string? WriteCheckpoint { get; set; } = null;
}

public class BucketState
{
[JsonProperty("bucket")]
public string Bucket { get; set; } = null!;

[JsonProperty("op_id")]
public string OpId { get; set; } = null!;
}

public class SyncLocalDatabaseResult
{
[JsonProperty("ready")]
public bool Ready { get; set; }

[JsonProperty("checkpointValid")]
public bool CheckpointValid { get; set; }

[JsonProperty("checkpointFailures")]
public string[]? CheckpointFailures { get; set; }

public override bool Equals(object? obj)
{
if (obj is not SyncLocalDatabaseResult other) return false;
return JsonConvert.SerializeObject(this) == JsonConvert.SerializeObject(other);
}

public override int GetHashCode()
{
return JsonConvert.SerializeObject(this).GetHashCode();
}
}

public class BucketChecksum
{
[JsonProperty("bucket")]
Expand Down Expand Up @@ -95,21 +63,11 @@ public class BucketStorageEvent
public interface IBucketStorageAdapter : IEventStream<BucketStorageEvent>
{
Task Init();
Task SaveSyncData(SyncDataBatch batch);
Task RemoveBuckets(string[] buckets);
Task SetTargetCheckpoint(Checkpoint checkpoint);

void StartSession();

Task<BucketState[]> GetBucketStates();

Task<SyncLocalDatabaseResult> SyncLocalDatabase(Checkpoint checkpoint);

Task<CrudEntry?> NextCrudItem();
Task<bool> HasCrud();
Task<CrudBatch?> GetCrudBatch(int limit = 100);

Task<bool> HasCompletedSync();
Task<bool> UpdateLocalTarget(Func<Task<string>> callback);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,18 @@ namespace PowerSync.Common.Client.Sync.Bucket;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

using Newtonsoft.Json;

using PowerSync.Common.DB;
using PowerSync.Common.DB.Crud;
using PowerSync.Common.Utils;

public class SqliteBucketStorage : EventStream<BucketStorageEvent>, IBucketStorageAdapter
{

public static readonly string MAX_OP_ID = "9223372036854775807";

private readonly IDBAdapter db;
private bool hasCompletedSync;
private bool pendingBucketDeletes;
private readonly HashSet<string> tableNames;
private string? clientId;
Expand All @@ -37,8 +32,7 @@ private record ExistingTableRowsResult(string name);
public SqliteBucketStorage(IDBAdapter db, ILogger? logger = null)
{
this.db = db;
this.logger = logger ?? NullLogger.Instance; ;
hasCompletedSync = false;
this.logger = logger ?? NullLogger.Instance;
pendingBucketDeletes = true;
tableNames = [];

Expand All @@ -62,9 +56,9 @@ public SqliteBucketStorage(IDBAdapter db, ILogger? logger = null)

public async Task Init()
{

hasCompletedSync = false;
var existingTableRows = await db.GetAll<ExistingTableRowsResult>("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'");
var existingTableRows =
await db.GetAll<ExistingTableRowsResult>(
"SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'");

foreach (var row in existingTableRows)
{
Expand All @@ -79,6 +73,7 @@ public async Task Init()
}

private record ClientIdResult(string? client_id);

public async Task<string> GetClientId()
{
if (clientId == null)
Expand Down Expand Up @@ -298,6 +293,7 @@ await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES (?, ?)",
}

private record TargetOpResult(string target_op);

private record SequenceResult(int seq);

public async Task<bool> UpdateLocalTarget(Func<Task<string>> callback)
Expand Down Expand Up @@ -431,12 +427,6 @@ public async Task<bool> HasCrud()
return await db.GetOptional<object>("SELECT 1 as ignore FROM ps_crud LIMIT 1") != null;
}

public async Task SetTargetCheckpoint(Checkpoint checkpoint)
{
// No Op
await Task.CompletedTask;
}

record ControlResult(string? r);

public async Task<string> Control(string op, object? payload = null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ public static Instruction[] ParseInstructions(string rawResponse)
{
var jsonArray = JArray.Parse(rawResponse);
List<Instruction> instructions = [];

foreach (JObject item in jsonArray)
{
var instruction = ParseInstruction(item);
Expand All @@ -24,7 +23,6 @@ public static Instruction[] ParseInstructions(string rawResponse)
}
instructions.Add(instruction);
}

return instructions.ToArray();
}

Expand All @@ -44,7 +42,6 @@ public static Instruction[] ParseInstructions(string rawResponse)
return new FlushFileSystem();
if (json.ContainsKey("DidCompleteSync"))
return new DidCompleteSync();

throw new JsonSerializationException("Unknown Instruction type.");
}
}
Expand Down Expand Up @@ -126,4 +123,4 @@ public class FetchCredentials : Instruction

public class CloseSyncStream : Instruction { }
public class FlushFileSystem : Instruction { }
public class DidCompleteSync : Instruction { }
public class DidCompleteSync : Instruction { }
12 changes: 3 additions & 9 deletions PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs
Original file line number Diff line number Diff line change
@@ -1,33 +1,26 @@
namespace PowerSync.Common.Client.Sync.Stream;

using Connection;
using System.IO;
using System.Net.Http;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Text.RegularExpressions;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

using PowerSync.Common.Client.Connection;

public class SyncStreamOptions
{
public string Path { get; set; } = "";

public StreamingSyncRequest Data { get; set; } = new();
public Dictionary<string, string> Headers { get; set; } = new();

public CancellationToken CancellationToken { get; set; } = CancellationToken.None;
}

public class RequestDetails
{
public string Url { get; set; } = "";
public Dictionary<string, string> Headers { get; set; } = new();
}

public class Remote
{

Expand Down Expand Up @@ -227,6 +220,7 @@ public async Task<Stream> PostStreamRaw(SyncStreamOptions options)
}
}


private async Task<HttpRequestMessage> BuildRequest(HttpMethod method, string path, object? data = null, Dictionary<string, string>? additionalHeaders = null)
{
var credentials = await GetCredentials();
Expand Down
Loading
Loading