|
1 | 1 | use alloc::format; |
2 | | -use alloc::string::String; |
3 | 2 |
|
4 | | -use crate::error::{PSResult, SQLiteError}; |
| 3 | +use crate::error::SQLiteError; |
| 4 | +use crate::sync::line::DataLine; |
| 5 | +use crate::sync::operations::insert_bucket_operations; |
| 6 | +use crate::sync::storage_adapter::StorageAdapter; |
5 | 7 | use sqlite_nostd as sqlite; |
6 | 8 | use sqlite_nostd::{Connection, ResultCode}; |
7 | 9 |
|
8 | 10 | use crate::ext::SafeManagedStmt; |
9 | 11 |
|
10 | 12 | // Run inside a transaction |
11 | 13 | pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> { |
12 | | - // language=SQLite |
13 | | - let statement = db.prepare_v2( |
14 | | - "\ |
15 | | -SELECT |
16 | | - json_extract(e.value, '$.bucket') as bucket, |
17 | | - json_extract(e.value, '$.data') as data, |
18 | | - json_extract(e.value, '$.has_more') as has_more, |
19 | | - json_extract(e.value, '$.after') as after, |
20 | | - json_extract(e.value, '$.next_after') as next_after |
21 | | -FROM json_each(json_extract(?1, '$.buckets')) e", |
22 | | - )?; |
23 | | - statement.bind_text(1, data, sqlite::Destructor::STATIC)?; |
24 | | - |
25 | | - while statement.step()? == ResultCode::ROW { |
26 | | - let bucket = statement.column_text(0)?; |
27 | | - let data = statement.column_text(1)?; |
28 | | - // let _has_more = statement.column_int(2)? != 0; |
29 | | - // let _after = statement.column_text(3)?; |
30 | | - // let _next_after = statement.column_text(4)?; |
31 | | - |
32 | | - insert_bucket_operations(db, bucket, data)?; |
33 | | - } |
34 | | - |
35 | | - Ok(()) |
36 | | -} |
37 | | - |
38 | | -pub fn insert_bucket_operations( |
39 | | - db: *mut sqlite::sqlite3, |
40 | | - bucket: &str, |
41 | | - data: &str, |
42 | | -) -> Result<(), SQLiteError> { |
43 | | - // Statement to insert new operations (only for PUT and REMOVE). |
44 | | - // language=SQLite |
45 | | - let iterate_statement = db.prepare_v2( |
46 | | - "\ |
47 | | -SELECT |
48 | | - json_extract(e.value, '$.op_id') as op_id, |
49 | | - json_extract(e.value, '$.op') as op, |
50 | | - json_extract(e.value, '$.object_type') as object_type, |
51 | | - json_extract(e.value, '$.object_id') as object_id, |
52 | | - json_extract(e.value, '$.checksum') as checksum, |
53 | | - json_extract(e.value, '$.data') as data, |
54 | | - json_extract(e.value, '$.subkey') as subkey |
55 | | -FROM json_each(?) e", |
56 | | - )?; |
57 | | - iterate_statement.bind_text(1, data, sqlite::Destructor::STATIC)?; |
58 | | - |
59 | | - // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows. |
60 | | - // We can consider splitting this into separate SELECT and INSERT statements. |
61 | | - // language=SQLite |
62 | | - let bucket_statement = db.prepare_v2( |
63 | | - "INSERT INTO ps_buckets(name) |
64 | | - VALUES(?) |
65 | | - ON CONFLICT DO UPDATE |
66 | | - SET last_applied_op = last_applied_op |
67 | | - RETURNING id, last_applied_op", |
68 | | - )?; |
69 | | - bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; |
70 | | - bucket_statement.step()?; |
71 | | - |
72 | | - let bucket_id = bucket_statement.column_int64(0); |
73 | | - |
74 | | - // This is an optimization for initial sync - we can avoid persisting individual REMOVE |
75 | | - // operations when last_applied_op = 0. |
76 | | - // We do still need to do the "supersede_statement" step for this case, since a REMOVE |
77 | | - // operation can supersede another PUT operation we're syncing at the same time. |
78 | | - let mut is_empty = bucket_statement.column_int64(1) == 0; |
79 | | - |
80 | | - // Statement to supersede (replace) operations with the same key. |
81 | | - // language=SQLite |
82 | | - let supersede_statement = db.prepare_v2( |
83 | | - "\ |
84 | | -DELETE FROM ps_oplog |
85 | | - WHERE unlikely(ps_oplog.bucket = ?1) |
86 | | - AND ps_oplog.key = ?2 |
87 | | -RETURNING op_id, hash", |
88 | | - )?; |
89 | | - supersede_statement.bind_int64(1, bucket_id)?; |
90 | | - |
91 | | - // language=SQLite |
92 | | - let insert_statement = db.prepare_v2("\ |
93 | | -INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?; |
94 | | - insert_statement.bind_int64(1, bucket_id)?; |
95 | | - |
96 | | - let updated_row_statement = db.prepare_v2( |
97 | | - "\ |
98 | | -INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", |
99 | | - )?; |
100 | | - |
101 | | - bucket_statement.reset()?; |
102 | | - |
103 | | - let mut last_op: Option<i64> = None; |
104 | | - let mut add_checksum: i32 = 0; |
105 | | - let mut op_checksum: i32 = 0; |
106 | | - let mut added_ops: i32 = 0; |
107 | | - |
108 | | - while iterate_statement.step()? == ResultCode::ROW { |
109 | | - let op_id = iterate_statement.column_int64(0); |
110 | | - let op = iterate_statement.column_text(1)?; |
111 | | - let object_type = iterate_statement.column_text(2); |
112 | | - let object_id = iterate_statement.column_text(3); |
113 | | - let checksum = iterate_statement.column_int(4); |
114 | | - let op_data = iterate_statement.column_text(5); |
115 | | - |
116 | | - last_op = Some(op_id); |
117 | | - added_ops += 1; |
118 | | - |
119 | | - if op == "PUT" || op == "REMOVE" { |
120 | | - let key: String; |
121 | | - if let (Ok(object_type), Ok(object_id)) = (object_type.as_ref(), object_id.as_ref()) { |
122 | | - let subkey = iterate_statement.column_text(6).unwrap_or("null"); |
123 | | - key = format!("{}/{}/{}", &object_type, &object_id, subkey); |
124 | | - } else { |
125 | | - key = String::from(""); |
126 | | - } |
127 | | - |
128 | | - supersede_statement.bind_text(2, &key, sqlite::Destructor::STATIC)?; |
129 | | - |
130 | | - let mut superseded = false; |
131 | | - |
132 | | - while supersede_statement.step()? == ResultCode::ROW { |
133 | | - // Superseded (deleted) a previous operation, add the checksum |
134 | | - let supersede_checksum = supersede_statement.column_int(1); |
135 | | - add_checksum = add_checksum.wrapping_add(supersede_checksum); |
136 | | - op_checksum = op_checksum.wrapping_sub(supersede_checksum); |
137 | | - |
138 | | - // Superseded an operation, only skip if the bucket was empty |
139 | | - // Previously this checked "superseded_op <= last_applied_op". |
140 | | - // However, that would not account for a case where a previous |
141 | | - // PUT operation superseded the original PUT operation in this |
142 | | - // same batch, in which case superseded_op is not accurate for this. |
143 | | - if !is_empty { |
144 | | - superseded = true; |
145 | | - } |
146 | | - } |
147 | | - supersede_statement.reset()?; |
148 | | - |
149 | | - if op == "REMOVE" { |
150 | | - let should_skip_remove = !superseded; |
151 | | - |
152 | | - add_checksum = add_checksum.wrapping_add(checksum); |
153 | | - |
154 | | - if !should_skip_remove { |
155 | | - if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) { |
156 | | - updated_row_statement.bind_text( |
157 | | - 1, |
158 | | - object_type, |
159 | | - sqlite::Destructor::STATIC, |
160 | | - )?; |
161 | | - updated_row_statement.bind_text( |
162 | | - 2, |
163 | | - object_id, |
164 | | - sqlite::Destructor::STATIC, |
165 | | - )?; |
166 | | - updated_row_statement.exec()?; |
167 | | - } |
168 | | - } |
169 | | - |
170 | | - continue; |
171 | | - } |
172 | | - |
173 | | - insert_statement.bind_int64(2, op_id)?; |
174 | | - if key != "" { |
175 | | - insert_statement.bind_text(3, &key, sqlite::Destructor::STATIC)?; |
176 | | - } else { |
177 | | - insert_statement.bind_null(3)?; |
178 | | - } |
179 | | - |
180 | | - if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) { |
181 | | - insert_statement.bind_text(4, object_type, sqlite::Destructor::STATIC)?; |
182 | | - insert_statement.bind_text(5, object_id, sqlite::Destructor::STATIC)?; |
183 | | - } else { |
184 | | - insert_statement.bind_null(4)?; |
185 | | - insert_statement.bind_null(5)?; |
186 | | - } |
187 | | - if let Ok(data) = op_data { |
188 | | - insert_statement.bind_text(6, data, sqlite::Destructor::STATIC)?; |
189 | | - } else { |
190 | | - insert_statement.bind_null(6)?; |
191 | | - } |
192 | | - |
193 | | - insert_statement.bind_int(7, checksum)?; |
194 | | - insert_statement.exec()?; |
195 | | - |
196 | | - op_checksum = op_checksum.wrapping_add(checksum); |
197 | | - } else if op == "MOVE" { |
198 | | - add_checksum = add_checksum.wrapping_add(checksum); |
199 | | - } else if op == "CLEAR" { |
200 | | - // Any remaining PUT operations should get an implicit REMOVE |
201 | | - // language=SQLite |
202 | | - let clear_statement1 = db |
203 | | - .prepare_v2( |
204 | | - "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) |
205 | | -SELECT row_type, row_id |
206 | | -FROM ps_oplog |
207 | | -WHERE bucket = ?1", |
208 | | - ) |
209 | | - .into_db_result(db)?; |
210 | | - clear_statement1.bind_int64(1, bucket_id)?; |
211 | | - clear_statement1.exec()?; |
212 | | - |
213 | | - let clear_statement2 = db |
214 | | - .prepare_v2("DELETE FROM ps_oplog WHERE bucket = ?1") |
215 | | - .into_db_result(db)?; |
216 | | - clear_statement2.bind_int64(1, bucket_id)?; |
217 | | - clear_statement2.exec()?; |
218 | | - |
219 | | - // And we need to re-apply all of those. |
220 | | - // We also replace the checksum with the checksum of the CLEAR op. |
221 | | - // language=SQLite |
222 | | - let clear_statement2 = db.prepare_v2( |
223 | | - "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2", |
224 | | - )?; |
225 | | - clear_statement2.bind_int64(2, bucket_id)?; |
226 | | - clear_statement2.bind_int(1, checksum)?; |
227 | | - clear_statement2.exec()?; |
228 | | - |
229 | | - add_checksum = 0; |
230 | | - is_empty = true; |
231 | | - op_checksum = 0; |
232 | | - } |
233 | | - } |
234 | | - |
235 | | - if let Some(last_op) = &last_op { |
236 | | - // language=SQLite |
237 | | - let statement = db.prepare_v2( |
238 | | - "UPDATE ps_buckets |
239 | | - SET last_op = ?2, |
240 | | - add_checksum = (add_checksum + ?3) & 0xffffffff, |
241 | | - op_checksum = (op_checksum + ?4) & 0xffffffff, |
242 | | - count_since_last = count_since_last + ?5 |
243 | | - WHERE id = ?1", |
244 | | - )?; |
245 | | - statement.bind_int64(1, bucket_id)?; |
246 | | - statement.bind_int64(2, *last_op)?; |
247 | | - statement.bind_int(3, add_checksum)?; |
248 | | - statement.bind_int(4, op_checksum)?; |
249 | | - statement.bind_int(5, added_ops)?; |
250 | | - |
251 | | - statement.exec()?; |
252 | | - } |
| 14 | + let line = serde_json::from_str::<DataLine>(data)?; |
| 15 | + let adapter = StorageAdapter::new(db)?; |
| 16 | + insert_bucket_operations(&adapter, &line)?; |
253 | 17 |
|
254 | 18 | Ok(()) |
255 | 19 | } |
|
0 commit comments