1- use core:: assert_matches:: debug_assert_matches;
1+ use core:: { assert_matches:: debug_assert_matches, fmt :: Display } ;
22
33use alloc:: {
44 collections:: btree_map:: BTreeMap ,
55 string:: { String , ToString } ,
66 vec:: Vec ,
77} ;
8+ use serde:: Serialize ;
89use sqlite_nostd:: { self as sqlite, Connection , ManagedStmt , ResultCode } ;
910use streaming_iterator:: StreamingIterator ;
1011
11- use crate :: { error:: SQLiteError , operations:: delete_bucket} ;
12+ use crate :: {
13+ error:: SQLiteError ,
14+ ext:: SafeManagedStmt ,
15+ operations:: delete_bucket,
16+ sync_local:: { PartialSyncOperation , SyncOperation } ,
17+ } ;
1218
13- use super :: { bucket_priority:: BucketPriority , interface:: BucketRequest } ;
19+ use super :: {
20+ bucket_priority:: BucketPriority , interface:: BucketRequest , streaming_sync:: OwnedCheckpoint ,
21+ sync_status:: Timestamp ,
22+ } ;
1423
1524/// An adapter for storing sync state.
1625///
@@ -19,16 +28,22 @@ use super::{bucket_priority::BucketPriority, interface::BucketRequest};
1928pub struct StorageAdapter {
2029 pub db : * mut sqlite:: sqlite3 ,
2130 progress_stmt : ManagedStmt ,
31+ time_stmt : ManagedStmt ,
2232}
2333
2434impl StorageAdapter {
2535 pub fn new ( db : * mut sqlite:: sqlite3 ) -> Result < Self , ResultCode > {
36+ // language=SQLite
2637 let progress =
2738 db. prepare_v2 ( "SELECT name, count_at_last, count_since_last FROM ps_buckets" ) ?;
2839
40+ // language=SQLite
41+ let time = db. prepare_v2 ( "SELECT unixepoch()" ) ?;
42+
2943 Ok ( Self {
3044 db,
3145 progress_stmt : progress,
46+ time_stmt : time,
3247 } )
3348 }
3449
@@ -131,12 +146,200 @@ impl StorageAdapter {
131146 last_applied_op,
132147 } ) ;
133148 }
149+
150+ fn validate_checkpoint (
151+ & self ,
152+ checkpoint : & OwnedCheckpoint ,
153+ priority : Option < BucketPriority > ,
154+ ) -> Result < CheckpointResult , SQLiteError > {
155+ // language=SQLite
156+ let statement = self . db . prepare_v2 (
157+ "WITH
158+ bucket_list(bucket, checksum) AS (
159+ SELECT
160+ json_extract(json_each.value, '$.bucket') as bucket,
161+ json_extract(json_each.value, '$.checksum') as checksum
162+ FROM json_each(?1)
163+ )
164+ SELECT
165+ bucket_list.bucket as bucket,
166+ IFNULL(buckets.add_checksum, 0) as add_checksum,
167+ IFNULL(buckets.op_checksum, 0) as oplog_checksum,
168+ bucket_list.checksum as expected_checksum
169+ FROM bucket_list
170+ LEFT OUTER JOIN ps_buckets AS buckets ON
171+ buckets.name = bucket_list.bucket
172+ GROUP BY bucket_list.bucket" ,
173+ ) ?;
174+
175+ #[ derive( Serialize ) ]
176+ struct BucketInfo < ' a > {
177+ bucket : & ' a str ,
178+ checksum : i32 ,
179+ }
180+
181+ let mut buckets = Vec :: < BucketInfo > :: new ( ) ;
182+ for bucket in & checkpoint. buckets {
183+ if bucket. is_in_priority ( priority) {
184+ buckets. push ( BucketInfo {
185+ bucket : & bucket. bucket ,
186+ checksum : bucket. checksum ,
187+ } ) ;
188+ }
189+ }
190+
191+ let bucket_desc = serde_json:: to_string ( & buckets) ?;
192+ statement. bind_text ( 1 , & bucket_desc, sqlite:: Destructor :: STATIC ) ?;
193+
194+ let mut failures: Vec < String > = Vec :: new ( ) ;
195+ while statement. step ( ) ? == ResultCode :: ROW {
196+ let name = statement. column_text ( 0 ) ?;
197+ // checksums with column_int are wrapped to i32 by SQLite
198+ let add_checksum = statement. column_int ( 1 ) ;
199+ let oplog_checksum = statement. column_int ( 2 ) ;
200+ let expected_checksum = statement. column_int ( 3 ) ;
201+
202+ // wrapping add is like +, but safely overflows
203+ let checksum = oplog_checksum. wrapping_add ( add_checksum) ;
204+
205+ if checksum != expected_checksum {
206+ failures. push ( String :: from ( name) ) ;
207+ }
208+ }
209+
210+ Ok ( CheckpointResult {
211+ failed_buckets : failures,
212+ } )
213+ }
214+
215+ pub fn sync_local (
216+ & self ,
217+ checkpoint : & OwnedCheckpoint ,
218+ priority : Option < BucketPriority > ,
219+ ) -> Result < SyncLocalResult , SQLiteError > {
220+ let checksums = self . validate_checkpoint ( checkpoint, priority) ?;
221+
222+ if !checksums. is_valid ( ) {
223+ self . delete_buckets ( checksums. failed_buckets . iter ( ) . map ( |i| i. as_str ( ) ) ) ?;
224+ return Ok ( SyncLocalResult :: ChecksumFailure ( checksums) ) ;
225+ }
226+
227+ let update_bucket = self
228+ . db
229+ . prepare_v2 ( "UPDATE ps_buckets SET last_op = ? WHERE name = ?" ) ?;
230+
231+ for bucket in & checkpoint. buckets {
232+ if bucket. is_in_priority ( priority) {
233+ update_bucket. bind_int64 ( 1 , checkpoint. last_op_id ) ?;
234+ update_bucket. bind_text ( 2 , & bucket. bucket , sqlite:: Destructor :: STATIC ) ?;
235+ update_bucket. exec ( ) ?;
236+ }
237+ }
238+
239+ if let ( None , Some ( write_checkpoint) ) = ( & priority, & checkpoint. write_checkpoint ) {
240+ update_bucket. bind_int64 ( 1 , * write_checkpoint) ?;
241+ update_bucket. bind_text ( 2 , "$local" , sqlite:: Destructor :: STATIC ) ?;
242+ update_bucket. exec ( ) ?;
243+ }
244+
245+ #[ derive( Serialize ) ]
246+ struct PartialArgs < ' a > {
247+ priority : BucketPriority ,
248+ buckets : Vec < & ' a str > ,
249+ }
250+
251+ let sync_result = match priority {
252+ None => SyncOperation :: new ( self . db , None ) . apply ( ) ,
253+ Some ( priority) => {
254+ let args = PartialArgs {
255+ priority,
256+ buckets : checkpoint
257+ . buckets
258+ . iter ( )
259+ . filter_map ( |item| {
260+ if item. is_in_priority ( Some ( priority) ) {
261+ Some ( item. bucket . as_str ( ) )
262+ } else {
263+ None
264+ }
265+ } )
266+ . collect ( ) ,
267+ } ;
268+
269+ // TODO: Avoid this serialization, it's currently used to bind JSON SQL parameters.
270+ let serialized_args = serde_json:: to_string ( & args) ?;
271+ SyncOperation :: new (
272+ self . db ,
273+ Some ( PartialSyncOperation {
274+ priority,
275+ args : & serialized_args,
276+ } ) ,
277+ )
278+ . apply ( )
279+ }
280+ } ?;
281+
282+ if sync_result == 1 {
283+ // TODO: Force compact
284+
285+ Ok ( SyncLocalResult :: ChangesApplied )
286+ } else {
287+ Ok ( SyncLocalResult :: PendingLocalChanges )
288+ }
289+ }
290+
291+ pub fn now ( & self ) -> Result < Timestamp , ResultCode > {
292+ self . time_stmt . reset ( ) ?;
293+ self . time_stmt . step ( ) ?;
294+
295+ Ok ( Timestamp ( self . time_stmt . column_int64 ( 0 ) ) )
296+ }
134297}
135298
136299pub struct BucketInfo {
137300 pub id : i64 ,
138301 pub last_applied_op : i64 ,
139302}
303+
304+ pub struct CheckpointResult {
305+ failed_buckets : Vec < String > ,
306+ }
307+
308+ impl CheckpointResult {
309+ pub fn is_valid ( & self ) -> bool {
310+ self . failed_buckets . is_empty ( )
311+ }
312+ }
313+
314+ impl Display for CheckpointResult {
315+ fn fmt ( & self , f : & mut core:: fmt:: Formatter < ' _ > ) -> core:: fmt:: Result {
316+ if self . is_valid ( ) {
317+ write ! ( f, "Valid checkpoint result" )
318+ } else {
319+ write ! ( f, "Checksums didn't match, failed for: " ) ?;
320+ for ( i, item) in self . failed_buckets . iter ( ) . enumerate ( ) {
321+ if i != 0 {
322+ write ! ( f, ", " ) ?;
323+ }
324+
325+ item. fmt ( f) ?;
326+ }
327+
328+ Ok ( ( ) )
329+ }
330+ }
331+ }
332+
333+ pub enum SyncLocalResult {
334+ /// Changes could not be applied due to a checksum mismatch.
335+ ChecksumFailure ( CheckpointResult ) ,
336+ /// Changes could not be applied because they would break consistency - we need to wait for
337+ /// pending local CRUD data to be uploaded and acknowledged in a write checkpoint.
338+ PendingLocalChanges ,
339+ /// The checkpoint has been applied and changes have been published.
340+ ChangesApplied ,
341+ }
342+
140343/// Information about the amount of operations a bucket had at the last checkpoint and how many
141344/// operations have been inserted in the meantime.
142345pub struct PersistedBucketProgress < ' a > {
0 commit comments