Skip to content

Commit c638337

Browse files
committed
Merge branch 'feat/rust-sync-implementation' into feat/api-catchup-effort-2
2 parents dabda64 + d516eae commit c638337

File tree

22 files changed

+2450
-202
lines changed

22 files changed

+2450
-202
lines changed

.github/workflows/test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ jobs:
1717
with:
1818
dotnet-version: '8.0'
1919

20+
- name: Install MAUI Workloads
21+
run: dotnet workload restore
22+
2023
- name: Download PowerSync extension
2124
run: dotnet run --project Tools/Setup
2225

Directory.build.props

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
<?xml version="1.0" encoding="utf-8"?>
22
<Project>
3+
<PropertyGroup>
4+
<MSBuildWarningsAsMessages>$(MSBuildWarningsAsMessages);NETSDK1202</MSBuildWarningsAsMessages>
5+
</PropertyGroup>
36
<ItemGroup>
47
<Compile Include="$(MSBuildThisFileDirectory)IsExternalInit.cs" Visible="false" />
58
</ItemGroup>

PowerSync/PowerSync.Common/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# PowerSync.Common Changelog
22

3+
## 0.0.5-alpha.1
4+
- Using the latest (0.4.9) version of the core extension, it introduces support for the Rust Sync implementation and also makes it the default - users can still opt out and use the legacy C# sync implementation as option when calling `connect()`.
5+
36
## 0.0.4-alpha.1
47
- Fixed MAUI issues related to extension loading when installing package outside of the monorepo.
58

PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,17 @@ namespace PowerSync.Common.Client.Sync.Bucket;
88
using PowerSync.Common.DB.Crud;
99
using PowerSync.Common.Utils;
1010

11+
public static class PowerSyncControlCommand
12+
{
13+
public const string PROCESS_TEXT_LINE = "line_text";
14+
public const string PROCESS_BSON_LINE = "line_binary";
15+
public const string STOP = "stop";
16+
public const string START = "start";
17+
public const string NOTIFY_TOKEN_REFRESHED = "refreshed_token";
18+
public const string NOTIFY_CRUD_UPLOAD_COMPLETED = "completed_upload";
19+
public const string UPDATE_SUBSCRIPTIONS = "update_subscriptions";
20+
}
21+
1122
public class Checkpoint
1223
{
1324
[JsonProperty("last_op_id")]
@@ -52,7 +63,7 @@ public class BucketStorageEvent
5263
public interface IBucketStorageAdapter : IEventStream<BucketStorageEvent>
5364
{
5465
Task Init();
55-
66+
5667
Task<CrudEntry?> NextCrudItem();
5768
Task<bool> HasCrud();
5869
Task<CrudBatch?> GetCrudBatch(int limit = 100);
@@ -71,7 +82,7 @@ public interface IBucketStorageAdapter : IEventStream<BucketStorageEvent>
7182
/// Get a unique client ID.
7283
/// </summary>
7384
Task<string> GetClientId();
74-
85+
7586
/// <summary>
7687
/// Invokes the `powersync_control` function for the sync client.
7788
/// </summary>

PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs

Lines changed: 162 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,166 @@ public string GetMaxOpId()
8989
{
9090
return MAX_OP_ID;
9191
}
92-
92+
93+
public void StartSession() { }
94+
95+
public async Task<BucketState[]> GetBucketStates()
96+
{
97+
return
98+
await db.GetAll<BucketState>("SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != '$local'");
99+
}
100+
101+
public async Task SaveSyncData(SyncDataBatch batch)
102+
{
103+
await db.WriteTransaction(async tx =>
104+
{
105+
int count = 0;
106+
foreach (var b in batch.Buckets)
107+
{
108+
var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
109+
["save", JsonConvert.SerializeObject(new { buckets = new[] { JsonConvert.DeserializeObject(b.ToJSON()) } })]);
110+
logger.LogDebug("saveSyncData {message}", JsonConvert.SerializeObject(result));
111+
count += b.Data.Length;
112+
}
113+
compactCounter += count;
114+
});
115+
}
116+
117+
public async Task RemoveBuckets(string[] buckets)
118+
{
119+
foreach (var bucket in buckets)
120+
{
121+
await DeleteBucket(bucket);
122+
}
123+
}
124+
125+
private async Task DeleteBucket(string bucket)
126+
{
127+
await db.WriteTransaction(async tx =>
128+
{
129+
await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
130+
["delete_bucket", bucket]);
131+
});
132+
133+
logger.LogDebug("Done deleting bucket");
134+
pendingBucketDeletes = true;
135+
}
136+
137+
private record LastSyncedResult(string? synced_at);
138+
public async Task<bool> HasCompletedSync()
139+
{
140+
if (hasCompletedSync) return true;
141+
142+
var result = await db.Get<LastSyncedResult>("SELECT powersync_last_synced_at() as synced_at");
143+
144+
hasCompletedSync = result.synced_at != null;
145+
return hasCompletedSync;
146+
}
147+
148+
public async Task<SyncLocalDatabaseResult> SyncLocalDatabase(Checkpoint checkpoint)
149+
{
150+
var validation = await ValidateChecksums(checkpoint);
151+
if (!validation.CheckpointValid)
152+
{
153+
logger.LogError("Checksums failed for {failures}", JsonConvert.SerializeObject(validation.CheckpointFailures));
154+
foreach (var failedBucket in validation.CheckpointFailures ?? [])
155+
{
156+
await DeleteBucket(failedBucket);
157+
}
158+
return new SyncLocalDatabaseResult
159+
{
160+
Ready = false,
161+
CheckpointValid = false,
162+
CheckpointFailures = validation.CheckpointFailures
163+
};
164+
}
165+
166+
var bucketNames = checkpoint.Buckets.Select(b => b.Bucket).ToArray();
167+
await db.WriteTransaction(async tx =>
168+
{
169+
await tx.Execute(
170+
"UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))",
171+
[checkpoint.LastOpId, JsonConvert.SerializeObject(bucketNames)]
172+
);
173+
174+
if (checkpoint.WriteCheckpoint != null)
175+
{
176+
await tx.Execute(
177+
"UPDATE ps_buckets SET last_op = ? WHERE name = '$local'",
178+
[checkpoint.WriteCheckpoint]
179+
);
180+
}
181+
});
182+
183+
var valid = await UpdateObjectsFromBuckets(checkpoint);
184+
if (!valid)
185+
{
186+
logger.LogDebug("Not at a consistent checkpoint - cannot update local db");
187+
return new SyncLocalDatabaseResult
188+
{
189+
Ready = false,
190+
CheckpointValid = true
191+
};
192+
}
193+
194+
await ForceCompact();
195+
196+
return new SyncLocalDatabaseResult
197+
{
198+
Ready = true,
199+
CheckpointValid = true
200+
};
201+
}
202+
203+
private async Task<bool> UpdateObjectsFromBuckets(Checkpoint checkpoint)
204+
{
205+
return await db.WriteTransaction(async tx =>
206+
{
207+
var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
208+
["sync_local", ""]);
209+
210+
return result.InsertId == 1;
211+
});
212+
}
213+
214+
private record ResultResult(object result);
215+
216+
public class ResultDetail
217+
{
218+
[JsonProperty("valid")]
219+
public bool Valid { get; set; }
220+
221+
[JsonProperty("failed_buckets")]
222+
public List<string>? FailedBuckets { get; set; }
223+
}
224+
225+
public async Task<SyncLocalDatabaseResult> ValidateChecksums(
226+
Checkpoint checkpoint)
227+
{
228+
var result = await db.Get<ResultResult>("SELECT powersync_validate_checkpoint(?) as result",
229+
[JsonConvert.SerializeObject(checkpoint)]);
230+
231+
logger.LogDebug("validateChecksums result item {message}", JsonConvert.SerializeObject(result));
232+
233+
if (result == null) return new SyncLocalDatabaseResult { CheckpointValid = false, Ready = false };
234+
235+
var resultDetail = JsonConvert.DeserializeObject<ResultDetail>(result.result.ToString() ?? "{}");
236+
237+
if (resultDetail?.Valid == true)
238+
{
239+
return new SyncLocalDatabaseResult { Ready = true, CheckpointValid = true };
240+
}
241+
else
242+
{
243+
return new SyncLocalDatabaseResult
244+
{
245+
CheckpointValid = false,
246+
Ready = false,
247+
CheckpointFailures = resultDetail?.FailedBuckets?.ToArray() ?? []
248+
};
249+
}
250+
}
251+
93252
/// <summary>
94253
/// Force a compact operation, primarily for testing purposes.
95254
/// </summary>
@@ -267,16 +426,15 @@ public async Task<bool> HasCrud()
267426
{
268427
return await db.GetOptional<object>("SELECT 1 as ignore FROM ps_crud LIMIT 1") != null;
269428
}
270-
271429

272430
record ControlResult(string? r);
273431

274432
public async Task<string> Control(string op, object? payload = null)
275433
{
276434
return await db.WriteTransaction(async tx =>
277435
{
278-
var result = await tx.Get<ControlResult>("SELECT powersync_control(?, ?) AS r", [op, payload ?? ""]);
436+
var result = await tx.Get<ControlResult>("SELECT powersync_control(?, ?) AS r", [op, payload]);
279437
return result.r!;
280438
});
281439
}
282-
}
440+
}

PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
using Newtonsoft.Json.Linq;
2+
using Newtonsoft.Json;
23

34
namespace PowerSync.Common.Client.Sync.Stream;
45

5-
using Newtonsoft.Json;
6-
76
/// <summary>
87
/// An internal instruction emitted by the sync client in the core extension in response to the
98
/// SDK passing sync data into the extension.
@@ -15,7 +14,6 @@ public static Instruction[] ParseInstructions(string rawResponse)
1514
{
1615
var jsonArray = JArray.Parse(rawResponse);
1716
List<Instruction> instructions = [];
18-
1917
foreach (JObject item in jsonArray)
2018
{
2119
var instruction = ParseInstruction(item);
@@ -25,7 +23,6 @@ public static Instruction[] ParseInstructions(string rawResponse)
2523
}
2624
instructions.Add(instruction);
2725
}
28-
2926
return instructions.ToArray();
3027
}
3128

@@ -45,12 +42,11 @@ public static Instruction[] ParseInstructions(string rawResponse)
4542
return new FlushFileSystem();
4643
if (json.ContainsKey("DidCompleteSync"))
4744
return new DidCompleteSync();
48-
4945
throw new JsonSerializationException("Unknown Instruction type.");
5046
}
5147
}
5248

53-
public class LogLine: Instruction
49+
public class LogLine : Instruction
5450
{
5551
[JsonProperty("severity")]
5652
public string Severity { get; set; } = null!; // "DEBUG", "INFO", "WARNING"
@@ -59,13 +55,13 @@ public class LogLine: Instruction
5955
public string Line { get; set; } = null!;
6056
}
6157

62-
public class EstablishSyncStream: Instruction
58+
public class EstablishSyncStream : Instruction
6359
{
6460
[JsonProperty("request")]
6561
public StreamingSyncRequest Request { get; set; } = null!;
6662
}
6763

68-
public class UpdateSyncStatus: Instruction
64+
public class UpdateSyncStatus : Instruction
6965
{
7066
[JsonProperty("status")]
7167
public CoreSyncStatus Status { get; set; } = null!;
@@ -119,7 +115,7 @@ public class BucketProgress
119115
public int TargetCount { get; set; }
120116
}
121117

122-
public class FetchCredentials: Instruction
118+
public class FetchCredentials : Instruction
123119
{
124120
[JsonProperty("did_expire")]
125121
public bool DidExpire { get; set; }

PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,80 @@ public async Task<T> Get<T>(string path, Dictionary<string, string>? headers = n
117117
return JsonConvert.DeserializeObject<T>(responseData)!;
118118
}
119119

120+
/// <summary>
121+
/// Posts to the stream endpoint and returns a raw NDJSON stream that can be read line by line.
122+
/// </summary>
123+
public async Task<Stream> PostStreamRaw(SyncStreamOptions options)
124+
{
125+
var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers);
126+
var response = await httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, options.CancellationToken);
127+
128+
if (response.Content == null)
129+
{
130+
throw new HttpRequestException($"HTTP {response.StatusCode}: No content");
131+
}
132+
133+
if (response.StatusCode == System.Net.HttpStatusCode.Unauthorized)
134+
{
135+
InvalidateCredentials();
136+
}
137+
138+
if (!response.IsSuccessStatusCode)
139+
{
140+
var errorText = await response.Content.ReadAsStringAsync();
141+
throw new HttpRequestException($"HTTP {response.StatusCode}: {errorText}");
142+
}
143+
144+
return await response.Content.ReadAsStreamAsync();
145+
}
146+
147+
148+
/// <summary>
149+
/// Originally used for the C# streaming sync implementation.
150+
/// </summary>
151+
public async IAsyncEnumerable<StreamingSyncLine?> PostStream(SyncStreamOptions options)
152+
{
153+
using var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers);
154+
using var response = await httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, options.CancellationToken);
155+
156+
if (response.Content == null)
157+
{
158+
throw new HttpRequestException($"HTTP {response.StatusCode}: No content");
159+
}
160+
161+
if (response.StatusCode == System.Net.HttpStatusCode.Unauthorized)
162+
{
163+
InvalidateCredentials();
164+
}
165+
166+
if (!response.IsSuccessStatusCode)
167+
{
168+
var errorText = await response.Content.ReadAsStringAsync();
169+
throw new HttpRequestException($"HTTP {response.StatusCode}: {errorText}");
170+
}
171+
172+
var stream = await response.Content.ReadAsStreamAsync();
173+
174+
// Read NDJSON stream
175+
using var reader = new StreamReader(stream, Encoding.UTF8);
176+
string? line;
177+
178+
using var timeoutCts = new CancellationTokenSource();
179+
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(options.CancellationToken, timeoutCts.Token);
180+
181+
linkedCts.Token.Register(() =>
182+
{
183+
stream.Close();
184+
});
185+
186+
while ((line = await reader.ReadLineAsync()) != null)
187+
{
188+
timeoutCts.CancelAfter(TimeSpan.FromMilliseconds(STREAMING_POST_TIMEOUT_MS));
189+
yield return ParseStreamingSyncLine(JObject.Parse(line));
190+
}
191+
}
192+
193+
120194
public static StreamingSyncLine? ParseStreamingSyncLine(JObject json)
121195
{
122196
// Determine the type based on available keys

0 commit comments

Comments
 (0)