11use alloc:: collections:: BTreeSet ;
22use alloc:: format;
33use alloc:: string:: String ;
4+ use alloc:: vec:: Vec ;
5+ use serde:: Deserialize ;
46
57use crate :: bucket_priority:: BucketPriority ;
68use crate :: error:: { PSResult , SQLiteError } ;
@@ -14,30 +16,25 @@ fn can_apply_sync_changes(
1416 db : * mut sqlite:: sqlite3 ,
1517 priority : BucketPriority ,
1618) -> Result < bool , SQLiteError > {
17- // We can only make sync changes visible if data is consistent, meaning that we've seen the
18- // target operation sent in the original checkpoint message. We allow weakening consistency when
19- // buckets from different priorities are involved (buckets with higher priorities or a lower
20- // priority number can be published before we've reached the checkpoint for other buckets).
21- // language=SQLite
22- let statement = db. prepare_v2 (
23- "\
19+ // Don't publish downloaded data until the upload queue is empty (except for downloaded data in
20+ // priority 0, which is published earlier).
21+ if !priority. may_publish_with_outstanding_uploads ( ) {
22+ // language=SQLite
23+ let statement = db. prepare_v2 (
24+ "\
2425 SELECT group_concat(name)
2526FROM ps_buckets
26- WHERE (target_op > last_op) AND (priority <= ?)" ,
27- ) ?;
28- statement. bind_int ( 1 , priority. into ( ) ) ?;
27+ WHERE target_op > last_op AND name = '$local'" ,
28+ ) ?;
2929
30- if statement. step ( ) ? != ResultCode :: ROW {
31- return Err ( SQLiteError :: from ( ResultCode :: ABORT ) ) ;
32- }
30+ if statement. step ( ) ? != ResultCode :: ROW {
31+ return Err ( SQLiteError :: from ( ResultCode :: ABORT ) ) ;
32+ }
3333
34- if statement. column_type ( 0 ) ? == ColumnType :: Text {
35- return Ok ( false ) ;
36- }
34+ if statement. column_type ( 0 ) ? == ColumnType :: Text {
35+ return Ok ( false ) ;
36+ }
3737
38- // Don't publish downloaded data until the upload queue is empty (except for downloaded data in
39- // priority 0, which is published earlier).
40- if !priority. may_publish_with_outstanding_uploads ( ) {
4138 let statement = db. prepare_v2 ( "SELECT 1 FROM ps_crud LIMIT 1" ) ?;
4239 if statement. step ( ) ? != ResultCode :: DONE {
4340 return Ok ( false ) ;
@@ -47,13 +44,27 @@ WHERE (target_op > last_op) AND (priority <= ?)",
4744 Ok ( true )
4845}
4946
50- pub fn sync_local < V : Value > ( db : * mut sqlite:: sqlite3 , data : & V ) -> Result < i64 , SQLiteError > {
51- let priority = match data. value_type ( ) {
52- ColumnType :: Integer => BucketPriority :: try_from ( data. int ( ) ) ,
53- ColumnType :: Float => BucketPriority :: try_from ( data. double ( ) as i32 ) ,
54- // Older clients without bucket priority support typically send an empty string here.
55- _ => Ok ( BucketPriority :: LOWEST ) ,
56- } ?;
47+ pub fn sync_local ( db : * mut sqlite:: sqlite3 , data : * mut sqlite:: value ) -> Result < i64 , SQLiteError > {
48+ #[ derive( Deserialize ) ]
49+ struct SyncLocalArguments {
50+ #[ serde( rename = "buckets" ) ]
51+ _buckets : Vec < String > ,
52+ priority : Option < BucketPriority > ,
53+ }
54+
55+ const FALLBACK_PRIORITY : BucketPriority = BucketPriority :: LOWEST ;
56+ let ( has_args, priority) = match data. value_type ( ) {
57+ ColumnType :: Text => {
58+ let text = data. text ( ) ;
59+ if text. len ( ) > 0 {
60+ let args: SyncLocalArguments = serde_json:: from_str ( text) ?;
61+ ( true , args. priority . unwrap_or ( FALLBACK_PRIORITY ) )
62+ } else {
63+ ( false , FALLBACK_PRIORITY )
64+ }
65+ }
66+ _ => ( false , FALLBACK_PRIORITY ) ,
67+ } ;
5768
5869 if !can_apply_sync_changes ( db, priority) ? {
5970 return Ok ( 0 ) ;
@@ -78,12 +89,17 @@ pub fn sync_local<V: Value>(db: *mut sqlite::sqlite3, data: &V) -> Result<i64, S
7889 "\
7990 -- 1. Filter oplog by the ops added but not applied yet (oplog b).
8091-- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
81- WITH updated_rows AS (
82- SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets
83- CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op)
84- WHERE buckets.priority <= ?1
85- UNION SELECT TRUE, row_type, row_id FROM ps_updated_rows
86- )
92+ WITH
93+ involved_buckets (id) AS (
94+ SELECT id FROM ps_buckets WHERE ?1 IS NULL
95+ OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets')))
96+ ),
97+ updated_rows AS (
98+ SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets
99+ CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op)
100+ WHERE buckets.id IN (SELECT id FROM involved_buckets)
101+ UNION SELECT TRUE, row_type, row_id FROM ps_updated_rows
102+ )
87103
88104-- 3. Group the objects from different buckets together into a single one (ops).
89105SELECT b.row_type as type,
@@ -98,15 +114,19 @@ FROM updated_rows b
98114 LEFT OUTER JOIN ps_oplog AS r
99115 ON r.row_type = b.row_type
100116 AND r.row_id = b.row_id
101- AND (SELECT priority FROM ps_buckets WHERE id = r.bucket) <= ?1
117+ AND r.bucket IN (SELECT id FROM involved_buckets)
102118-- Group for (3)
103119GROUP BY b.row_type, b.row_id" ,
104120 )
105121 . into_db_result ( db) ?;
106122
107- // TODO: cache statements
123+ if has_args {
124+ statement. bind_value ( 1 , data) ?;
125+ } else {
126+ statement. bind_null ( 1 ) ?;
127+ }
108128
109- statement . bind_int ( 1 , priority . into ( ) ) ? ;
129+ // TODO: cache statements
110130 while statement. step ( ) . into_db_result ( db) ? == ResultCode :: ROW {
111131 let type_name = statement. column_text ( 0 ) ?;
112132 let id = statement. column_text ( 1 ) ?;
@@ -170,10 +190,15 @@ GROUP BY b.row_type, b.row_id",
170190 . prepare_v2 (
171191 "UPDATE ps_buckets
172192 SET last_applied_op = last_op
173- WHERE last_applied_op != last_op AND priority <= ?" ,
193+ WHERE last_applied_op != last_op AND
194+ (?1 IS NULL OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets'))))" ,
174195 )
175196 . into_db_result ( db) ?;
176- updated. bind_int ( 1 , priority. into ( ) ) ?;
197+ if has_args {
198+ updated. bind_value ( 1 , data) ?;
199+ } else {
200+ updated. bind_null ( 1 ) ?;
201+ }
177202 updated. exec ( ) ?;
178203
179204 if priority == BucketPriority :: LOWEST {
0 commit comments