@@ -5,25 +5,31 @@ use alloc::string::String;
55use const_format:: formatcp;
66use core:: ffi:: { c_char, c_int, c_void, CStr } ;
77use core:: ptr:: null_mut;
8+ use serde:: Serialize ;
9+ use serde_json:: value:: RawValue ;
810
911use sqlite:: { Connection , ResultCode , Value } ;
10- use sqlite_nostd as sqlite;
1112use sqlite_nostd:: ManagedStmt ;
12- use sqlite_nostd:: ResultCode :: NULL ;
13+ use sqlite_nostd:: { self as sqlite , ColumnType } ;
1314
1415use crate :: error:: SQLiteError ;
1516use crate :: ext:: SafeManagedStmt ;
1617use crate :: schema:: TableInfoFlags ;
18+ use crate :: util:: MAX_OP_ID ;
1719use crate :: vtab_util:: * ;
1820
19- const MANUAL_NAME : & CStr = c"powersync_crud " ;
21+ const MANUAL_NAME : & CStr = c"powersync_crud_ " ;
2022const SIMPLE_NAME : & CStr = c"powersync_crud" ;
2123
2224// Structure:
2325// CREATE TABLE powersync_crud_(data TEXT, options INT HIDDEN);
26+ // CREATE TABLE powersync_crud(op TEXT, id TEXT, data TEXT old_values TEXT, metadata TEXT);
2427//
2528// This is a insert-only virtual table. It generates transaction ids in ps_tx, and inserts data in
2629// ps_crud(tx_id, data).
30+ // The second form (without the trailing underscore) takes the data to insert as individual
31+ // components and constructs the data to insert into `ps_crud` internally. It will also update
32+ // `ps_updated_rows` and the `$local` bucket.
2733//
2834// Using a virtual table like this allows us to hook into xBegin, xCommit and xRollback to automatically
2935// increment transaction ids. These are only called when powersync_crud_ is used as part of a transaction,
@@ -33,8 +39,159 @@ const SIMPLE_NAME: &CStr = c"powersync_crud";
3339struct VirtualTable {
3440 base : sqlite:: vtab ,
3541 db : * mut sqlite:: sqlite3 ,
36- current_tx : Option < i64 > ,
37- insert_statement : Option < ManagedStmt > ,
42+ current_tx : Option < ActiveCrudTransaction > ,
43+ is_simple : bool ,
44+ }
45+
46+ struct ActiveCrudTransaction {
47+ tx_id : i64 ,
48+ mode : CrudTransactionMode ,
49+ }
50+
51+ enum CrudTransactionMode {
52+ Manual {
53+ stmt : ManagedStmt ,
54+ } ,
55+ Simple {
56+ stmt : ManagedStmt ,
57+ set_updated_rows : ManagedStmt ,
58+ update_local_bucket : ManagedStmt ,
59+ } ,
60+ }
61+
62+ impl VirtualTable {
63+ fn value_to_json < ' a > ( value : & ' a * mut sqlite:: value ) -> Option < & ' a RawValue > {
64+ match value. value_type ( ) {
65+ ColumnType :: Text => {
66+ Some ( unsafe {
67+ // Safety: RawValue is a transparent type wrapping a str. We assume that it
68+ // contains valid JSON.
69+ core:: mem:: transmute :: < & ' a str , & ' a RawValue > ( value. text ( ) )
70+ } )
71+ }
72+ _ => None ,
73+ }
74+ }
75+
76+ fn handle_insert ( & self , args : & [ * mut sqlite:: value ] ) -> Result < ( ) , SQLiteError > {
77+ let current_tx = self
78+ . current_tx
79+ . as_ref ( )
80+ . ok_or_else ( || SQLiteError ( ResultCode :: MISUSE , Some ( String :: from ( "No tx_id" ) ) ) ) ?;
81+
82+ match & current_tx. mode {
83+ CrudTransactionMode :: Manual { stmt } => {
84+ // Columns are (data TEXT, options INT HIDDEN)
85+ let data = args[ 0 ] . text ( ) ;
86+ let flags = match args[ 1 ] . value_type ( ) {
87+ sqlite_nostd:: ColumnType :: Null => TableInfoFlags :: default ( ) ,
88+ _ => TableInfoFlags ( args[ 1 ] . int ( ) as u32 ) ,
89+ } ;
90+
91+ stmt. bind_int64 ( 1 , current_tx. tx_id ) ?;
92+ stmt. bind_text ( 2 , data, sqlite:: Destructor :: STATIC ) ?;
93+ stmt. bind_int ( 3 , flags. 0 as i32 ) ?;
94+ stmt. exec ( ) ?;
95+ }
96+ CrudTransactionMode :: Simple {
97+ stmt,
98+ set_updated_rows,
99+ update_local_bucket,
100+ } => {
101+ // Columns are (op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT)
102+ let metadata = args[ 5 ] ;
103+ let op = args[ 0 ] . text ( ) ;
104+ let id = args[ 1 ] . text ( ) ;
105+ let row_type = args[ 2 ] . text ( ) ;
106+
107+ #[ derive( Serialize ) ]
108+ struct CrudEntry < ' a > {
109+ op : & ' a str ,
110+ id : & ' a str ,
111+ #[ serde( rename = "type" ) ]
112+ row_type : & ' a str ,
113+ data : Option < & ' a RawValue > ,
114+ old : Option < & ' a RawValue > ,
115+ metadata : Option < & ' a str > ,
116+ }
117+
118+ // First, we insert into ps_crud like the manual vtab would too. We have to create
119+ // the JSON out of the individual components for that.
120+ stmt. bind_int64 ( 1 , current_tx. tx_id ) ?;
121+ let serialized = serde_json:: to_string ( & CrudEntry {
122+ op,
123+ id,
124+ row_type,
125+ data : Self :: value_to_json ( & args[ 3 ] ) ,
126+ old : Self :: value_to_json ( & args[ 4 ] ) ,
127+ metadata : if metadata. value_type ( ) == ColumnType :: Text {
128+ Some ( metadata. text ( ) )
129+ } else {
130+ None
131+ } ,
132+ } ) ?;
133+ stmt. bind_text ( 2 , & serialized, sqlite:: Destructor :: STATIC ) ?;
134+ stmt. exec ( ) ?;
135+
136+ // However, we also set ps_updated_rows and update the $local bucket
137+ set_updated_rows. bind_text ( 1 , row_type, sqlite:: Destructor :: STATIC ) ?;
138+ set_updated_rows. bind_text ( 2 , id, sqlite:: Destructor :: STATIC ) ?;
139+ set_updated_rows. exec ( ) ?;
140+ update_local_bucket. exec ( ) ?;
141+ }
142+ }
143+
144+ Ok ( ( ) )
145+ }
146+
147+ fn begin ( & mut self ) -> Result < ( ) , SQLiteError > {
148+ let db = self . db ;
149+
150+ // language=SQLite
151+ let statement =
152+ db. prepare_v2 ( "UPDATE ps_tx SET next_tx = next_tx + 1 WHERE id = 1 RETURNING next_tx" ) ?;
153+ let tx_id = if statement. step ( ) ? == ResultCode :: ROW {
154+ statement. column_int64 ( 0 ) - 1
155+ } else {
156+ return Err ( SQLiteError :: from ( ResultCode :: ABORT ) ) ;
157+ } ;
158+
159+ self . current_tx = Some ( ActiveCrudTransaction {
160+ tx_id,
161+ mode : if self . is_simple {
162+ CrudTransactionMode :: Simple {
163+ // language=SQLite
164+ stmt : db. prepare_v3 ( "INSERT INTO ps_crud(tx_id, data) VALUES (?, ?)" , 0 ) ?,
165+ // language=SQLite
166+ set_updated_rows : db. prepare_v3 (
167+ "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)" ,
168+ 0 ,
169+ ) ?,
170+ update_local_bucket : db. prepare_v3 ( formatcp ! ( "INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})" ) , 0 ) ?,
171+ }
172+ } else {
173+ const SQL : & str = formatcp ! (
174+ "\
175+ WITH insertion (tx_id, data) AS (VALUES (?1, ?2))
176+ INSERT INTO ps_crud(tx_id, data)
177+ SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data->'data' != '{{}}';
178+ " ,
179+ TableInfoFlags :: IGNORE_EMPTY_UPDATE
180+ ) ;
181+
182+ let insert_statement = db. prepare_v3 ( SQL , 0 ) ?;
183+ CrudTransactionMode :: Manual {
184+ stmt : insert_statement,
185+ }
186+ } ,
187+ } ) ;
188+
189+ Ok ( ( ) )
190+ }
191+
192+ fn end_transaction ( & mut self ) {
193+ self . current_tx = None ;
194+ }
38195}
39196
40197extern "C" fn connect (
@@ -54,7 +211,7 @@ extern "C" fn connect(
54211 let is_simple = name == SIMPLE_NAME ;
55212
56213 let sql = if is_simple {
57- "CREATE TABLE powersync_crud(op TEXT, id TEXT, data TEXT old_values TEXT, metadata TEXT);"
214+ "CREATE TABLE powersync_crud(op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT);"
58215 } else {
59216 "CREATE TABLE powersync_crud_(data TEXT, options INT HIDDEN);"
60217 } ;
@@ -72,7 +229,7 @@ extern "C" fn connect(
72229 } ,
73230 db,
74231 current_tx : None ,
75- insert_statement : None ,
232+ is_simple ,
76233 } ) ) ;
77234 * vtab = tab. cast :: < sqlite:: vtab > ( ) ;
78235 let _ = sqlite:: vtab_config ( db, 0 ) ;
@@ -87,81 +244,25 @@ extern "C" fn disconnect(vtab: *mut sqlite::vtab) -> c_int {
87244 ResultCode :: OK as c_int
88245}
89246
90- fn begin_impl ( tab : & mut VirtualTable ) -> Result < ( ) , SQLiteError > {
91- let db = tab. db ;
92-
93- const SQL : & str = formatcp ! (
94- "\
95- WITH insertion (tx_id, data) AS (VALUES (?1, ?2))
96- INSERT INTO ps_crud(tx_id, data)
97- SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data->'data' != '{{}}';
98- " ,
99- TableInfoFlags :: IGNORE_EMPTY_UPDATE
100- ) ;
101-
102- let insert_statement = db. prepare_v3 ( SQL , 0 ) ?;
103- tab. insert_statement = Some ( insert_statement) ;
104-
105- // language=SQLite
106- let statement =
107- db. prepare_v2 ( "UPDATE ps_tx SET next_tx = next_tx + 1 WHERE id = 1 RETURNING next_tx" ) ?;
108- if statement. step ( ) ? == ResultCode :: ROW {
109- let tx_id = statement. column_int64 ( 0 ) - 1 ;
110- tab. current_tx = Some ( tx_id) ;
111- } else {
112- return Err ( SQLiteError :: from ( ResultCode :: ABORT ) ) ;
113- }
114-
115- Ok ( ( ) )
116- }
117-
118247extern "C" fn begin ( vtab : * mut sqlite:: vtab ) -> c_int {
119248 let tab = unsafe { & mut * ( vtab. cast :: < VirtualTable > ( ) ) } ;
120- let result = begin_impl ( tab) ;
249+ let result = tab. begin ( ) ;
121250 vtab_result ( vtab, result)
122251}
123252
124253extern "C" fn commit ( vtab : * mut sqlite:: vtab ) -> c_int {
125254 let tab = unsafe { & mut * ( vtab. cast :: < VirtualTable > ( ) ) } ;
126- tab. current_tx = None ;
127- tab. insert_statement = None ;
255+ tab. end_transaction ( ) ;
128256 ResultCode :: OK as c_int
129257}
130258
131259extern "C" fn rollback ( vtab : * mut sqlite:: vtab ) -> c_int {
132260 let tab = unsafe { & mut * ( vtab. cast :: < VirtualTable > ( ) ) } ;
133- tab. current_tx = None ;
134- tab. insert_statement = None ;
261+ tab. end_transaction ( ) ;
135262 // ps_tx will be rolled back automatically
136263 ResultCode :: OK as c_int
137264}
138265
139- fn insert_operation (
140- vtab : * mut sqlite:: vtab ,
141- data : & str ,
142- flags : TableInfoFlags ,
143- ) -> Result < ( ) , SQLiteError > {
144- let tab = unsafe { & mut * ( vtab. cast :: < VirtualTable > ( ) ) } ;
145- if tab. current_tx . is_none ( ) {
146- return Err ( SQLiteError (
147- ResultCode :: MISUSE ,
148- Some ( String :: from ( "No tx_id" ) ) ,
149- ) ) ;
150- }
151- let current_tx = tab. current_tx . unwrap ( ) ;
152- // language=SQLite
153- let statement = tab
154- . insert_statement
155- . as_ref ( )
156- . ok_or ( SQLiteError :: from ( NULL ) ) ?;
157- statement. bind_int64 ( 1 , current_tx) ?;
158- statement. bind_text ( 2 , data, sqlite:: Destructor :: STATIC ) ?;
159- statement. bind_int ( 3 , flags. 0 as i32 ) ?;
160- statement. exec ( ) ?;
161-
162- Ok ( ( ) )
163- }
164-
165266extern "C" fn update (
166267 vtab : * mut sqlite:: vtab ,
167268 argc : c_int ,
@@ -177,12 +278,8 @@ extern "C" fn update(
177278 ResultCode :: MISUSE as c_int
178279 } else if rowid. value_type ( ) == sqlite:: ColumnType :: Null {
179280 // INSERT
180- let data = args[ 2 ] . text ( ) ;
181- let flags = match args[ 3 ] . value_type ( ) {
182- sqlite_nostd:: ColumnType :: Null => TableInfoFlags :: default ( ) ,
183- _ => TableInfoFlags ( args[ 3 ] . int ( ) as u32 ) ,
184- } ;
185- let result = insert_operation ( vtab, data, flags) ;
281+ let tab = unsafe { & * ( vtab. cast :: < VirtualTable > ( ) ) } ;
282+ let result = tab. handle_insert ( & args[ 2 ..] ) ;
186283 vtab_result ( vtab, result)
187284 } else {
188285 // UPDATE - not supported
0 commit comments