diff --git a/core/rs/bundle_static/Cargo.lock b/core/rs/bundle_static/Cargo.lock index a0858632d..909df5a63 100644 --- a/core/rs/bundle_static/Cargo.lock +++ b/core/rs/bundle_static/Cargo.lock @@ -108,6 +108,7 @@ name = "crsql_core" version = "0.1.0" dependencies = [ "bytes", + "libc-print", "num-derive", "num-traits", "sqlite_nostd", @@ -180,6 +181,15 @@ version = "0.2.148" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" +[[package]] +name = "libc-print" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a660208db49e35faf57b37484350f1a61072f2a5becf0592af6015d9ddd4b0" +dependencies = [ + "libc", +] + [[package]] name = "libloading" version = "0.7.4" diff --git a/core/rs/core/Cargo.lock b/core/rs/core/Cargo.lock index d2006ae23..dc45e3470 100644 --- a/core/rs/core/Cargo.lock +++ b/core/rs/core/Cargo.lock @@ -74,6 +74,7 @@ name = "crsql_core" version = "0.1.0" dependencies = [ "bytes", + "libc-print", "num-derive", "num-traits", "sqlite_nostd", @@ -109,6 +110,15 @@ version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +[[package]] +name = "libc-print" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a660208db49e35faf57b37484350f1a61072f2a5becf0592af6015d9ddd4b0" +dependencies = [ + "libc", +] + [[package]] name = "libloading" version = "0.7.4" diff --git a/core/rs/core/Cargo.toml b/core/rs/core/Cargo.toml index c6464f5aa..3556c1342 100644 --- a/core/rs/core/Cargo.toml +++ b/core/rs/core/Cargo.toml @@ -15,6 +15,7 @@ sqlite_nostd = { path="../sqlite-rs-embedded/sqlite_nostd" } bytes = { version = "1.5", default-features = false } num-traits = { version = "0.2.17", default-features = false } num-derive = "0.4.1" +libc-print = "0.1.22" [dev-dependencies] diff --git a/core/rs/core/src/changes_vtab_write.rs b/core/rs/core/src/changes_vtab_write.rs index dee60f70c..818dd124e 100644 --- a/core/rs/core/src/changes_vtab_write.rs +++ b/core/rs/core/src/changes_vtab_write.rs @@ -41,15 +41,13 @@ fn did_cid_win( let col_vrsn_stmt_ref = tbl_info.get_col_version_stmt(db)?; let col_vrsn_stmt = col_vrsn_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; - let bind_result = col_vrsn_stmt.bind_int64(1, key); + let bind_result = col_vrsn_stmt + .bind_int64(1, key) + .and_then(|_| col_vrsn_stmt.bind_text(2, col_name, sqlite::Destructor::STATIC)); if let Err(rc) = bind_result { reset_cached_stmt(col_vrsn_stmt.stmt)?; return Err(rc); } - if let Err(rc) = col_vrsn_stmt.bind_text(2, col_name, sqlite::Destructor::STATIC) { - reset_cached_stmt(col_vrsn_stmt.stmt)?; - return Err(rc); - } match col_vrsn_stmt.step() { Ok(ResultCode::ROW) => { @@ -98,16 +96,13 @@ fn did_cid_win( let col_site_id_stmt_ref = tbl_info.get_col_site_id_stmt(db)?; let col_site_id_stmt = col_site_id_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; - let bind_result = col_site_id_stmt.bind_int64(1, key); + let bind_result = col_site_id_stmt.bind_int64(1, key).and_then(|_| { + col_site_id_stmt.bind_text(2, col_name, sqlite::Destructor::STATIC) + }); if let Err(rc) = bind_result { reset_cached_stmt(col_site_id_stmt.stmt)?; return Err(rc); } - if let Err(rc) = col_site_id_stmt.bind_text(2, col_name, sqlite::Destructor::STATIC) - { - reset_cached_stmt(col_site_id_stmt.stmt)?; - return Err(rc); - } match col_site_id_stmt.step() { Ok(ResultCode::ROW) => { @@ -171,38 +166,58 @@ fn set_winner_clock( if insert_site_id.is_empty() { None } else { - (*ext_data).pSelectSiteIdOrdinalStmt.bind_blob( + let bind_result = (*ext_data).pSelectSiteIdOrdinalStmt.bind_blob( 1, insert_site_id, sqlite::Destructor::STATIC, - )?; - let rc = (*ext_data).pSelectSiteIdOrdinalStmt.step()?; - if rc == ResultCode::ROW { - let ordinal = (*ext_data).pSelectSiteIdOrdinalStmt.column_int64(0); - (*ext_data).pSelectSiteIdOrdinalStmt.clear_bindings()?; - (*ext_data).pSelectSiteIdOrdinalStmt.reset()?; - - Some(ordinal) - } else { - (*ext_data).pSelectSiteIdOrdinalStmt.clear_bindings()?; - (*ext_data).pSelectSiteIdOrdinalStmt.reset()?; - // site id had no ordinal yet. - // set one and return the ordinal. - (*ext_data).pSetSiteIdOrdinalStmt.bind_blob( - 1, - insert_site_id, - sqlite::Destructor::STATIC, - )?; - let rc = (*ext_data).pSetSiteIdOrdinalStmt.step()?; - if rc == ResultCode::DONE { - (*ext_data).pSetSiteIdOrdinalStmt.clear_bindings()?; - (*ext_data).pSetSiteIdOrdinalStmt.reset()?; - return Err(ResultCode::ABORT); + ); + + if let Err(rc) = bind_result { + reset_cached_stmt((*ext_data).pSelectSiteIdOrdinalStmt)?; + return Err(rc); + } + + match (*ext_data).pSelectSiteIdOrdinalStmt.step() { + Ok(ResultCode::ROW) => { + let ordinal = (*ext_data).pSelectSiteIdOrdinalStmt.column_int64(0); + reset_cached_stmt((*ext_data).pSelectSiteIdOrdinalStmt)?; + Some(ordinal) + } + Ok(_) => { + reset_cached_stmt((*ext_data).pSelectSiteIdOrdinalStmt)?; + // site id had no ordinal yet. + // set one and return the ordinal. + let bind_result = (*ext_data).pSetSiteIdOrdinalStmt.bind_blob( + 1, + insert_site_id, + sqlite::Destructor::STATIC, + ); + + if let Err(rc) = bind_result { + reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt); + return Err(rc); + } + + match (*ext_data).pSetSiteIdOrdinalStmt.step() { + Ok(ResultCode::DONE) => { + reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?; + return Err(ResultCode::ABORT); + } + Ok(_) => { + let ordinal = (*ext_data).pSetSiteIdOrdinalStmt.column_int64(0); + reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?; + Some(ordinal) + } + Err(rc) => { + reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?; + return Err(rc); + } + } + } + Err(rc) => { + reset_cached_stmt((*ext_data).pSetSiteIdOrdinalStmt)?; + return Err(rc); } - let ordinal = (*ext_data).pSetSiteIdOrdinalStmt.column_int64(0); - (*ext_data).pSetSiteIdOrdinalStmt.clear_bindings()?; - (*ext_data).pSetSiteIdOrdinalStmt.reset()?; - Some(ordinal) } } }; @@ -210,13 +225,9 @@ fn set_winner_clock( let set_stmt_ref = tbl_info.get_set_winner_clock_stmt(db)?; let set_stmt = set_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; - let bind_result = set_stmt.bind_int64(1, key); - if let Err(rc) = bind_result { - reset_cached_stmt(set_stmt.stmt)?; - return Err(rc); - } let bind_result = set_stmt - .bind_text(2, insert_col_name, sqlite::Destructor::STATIC) + .bind_int64(1, key) + .and_then(|_| set_stmt.bind_text(2, insert_col_name, sqlite::Destructor::STATIC)) .and_then(|_| set_stmt.bind_int64(3, insert_col_vrsn)) .and_then(|_| set_stmt.bind_int64(4, insert_db_vrsn)) .and_then(|_| set_stmt.bind_int64(5, insert_seq)) @@ -266,18 +277,16 @@ fn merge_sentinel_only_insert( (*ext_data) .pSetSyncBitStmt .step() - .and_then(|_| (*ext_data).pSetSyncBitStmt.reset()) .and_then(|_| merge_stmt.step()) }; - // TODO: report err? - let _ = reset_cached_stmt(merge_stmt.stmt); + unsafe { (*ext_data).pSetSyncBitStmt.reset()? }; + reset_cached_stmt(merge_stmt.stmt)?; let sync_rc = unsafe { - (*ext_data) - .pClearSyncBitStmt - .step() - .and_then(|_| (*ext_data).pClearSyncBitStmt.reset()) + let rc = (*ext_data).pClearSyncBitStmt.step(); + (*ext_data).pClearSyncBitStmt.reset()?; + rc }; if let Err(sync_rc) = sync_rc { @@ -343,16 +352,14 @@ unsafe fn merge_delete( let rc = (*ext_data) .pSetSyncBitStmt .step() - .and_then(|_| (*ext_data).pSetSyncBitStmt.reset()) .and_then(|_| delete_stmt.step()); + (*ext_data).pSetSyncBitStmt.reset()?; reset_cached_stmt(delete_stmt.stmt)?; - let sync_rc = (*ext_data) - .pClearSyncBitStmt - .step() - .and_then(|_| (*ext_data).pClearSyncBitStmt.reset()); + let sync_rc = (*ext_data).pClearSyncBitStmt.step(); + (*ext_data).pClearSyncBitStmt.reset()?; if let Err(sync_rc) = sync_rc { return Err(sync_rc); } @@ -407,12 +414,9 @@ fn get_local_cl( let local_cl_stmt_ref = tbl_info.get_local_cl_stmt(db)?; let local_cl_stmt = local_cl_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; - let rc = local_cl_stmt.bind_int64(1, key); - if let Err(rc) = rc { - reset_cached_stmt(local_cl_stmt.stmt)?; - return Err(rc); - } - let rc = local_cl_stmt.bind_int64(2, key); + let rc = local_cl_stmt + .bind_int64(1, key) + .and_then(|_| local_cl_stmt.bind_int64(2, key)); if let Err(rc) = rc { reset_cached_stmt(local_cl_stmt.stmt)?; return Err(rc); @@ -661,15 +665,14 @@ unsafe fn merge_insert( let rc = (*(*tab).pExtData) .pSetSyncBitStmt .step() - .and_then(|_| (*(*tab).pExtData).pSetSyncBitStmt.reset()) .and_then(|_| merge_stmt.step()); reset_cached_stmt(merge_stmt.stmt)?; - let sync_rc = (*(*tab).pExtData) - .pClearSyncBitStmt - .step() - .and_then(|_| (*(*tab).pExtData).pClearSyncBitStmt.reset()); + let sync_rc = (*(*tab).pExtData).pClearSyncBitStmt.step(); + + (*(*tab).pExtData).pSetSyncBitStmt.reset(); + (*(*tab).pExtData).pClearSyncBitStmt.reset(); if let Err(rc) = rc { return Err(rc); diff --git a/core/rs/core/src/debug.rs b/core/rs/core/src/debug.rs new file mode 100644 index 000000000..08df1bc86 --- /dev/null +++ b/core/rs/core/src/debug.rs @@ -0,0 +1,39 @@ +use alloc::format; +use alloc::string::String; +use core::ffi::c_void; +use sqlite::{context, value}; +use sqlite_nostd as sqlite; + +// Global context for logging - will be set during initialization +static mut DEBUG_ENABLED: bool = false; + +pub fn debug_log(msg: &str) { + unsafe { + if DEBUG_ENABLED { + libc_print::libc_println!("[DEBUG] {}", msg); + } + } +} + +pub unsafe extern "C" fn x_crsql_set_debug(ctx: *mut context, argc: i32, argv: *mut *mut value) { + if argc == 0 { + // If no arguments, return current state + sqlite::result_int(ctx, if DEBUG_ENABLED { 1 } else { 0 }); + return; + } + + if argc > 1 { + // Too many arguments + return; + } + + let enabled = { + let arg = *argv; + sqlite::value_int(arg) != 0 + }; + + DEBUG_ENABLED = enabled; + + // Return success (the new state) + sqlite::result_int(ctx, if enabled { 1 } else { 0 }); +} diff --git a/core/rs/core/src/lib.rs b/core/rs/core/src/lib.rs index 3e83a0103..b66e43a0e 100644 --- a/core/rs/core/src/lib.rs +++ b/core/rs/core/src/lib.rs @@ -26,6 +26,7 @@ mod create_crr; pub mod db_version; #[cfg(not(feature = "test"))] mod db_version; +mod debug; mod ext_data; mod is_crr; mod local_writes; @@ -69,6 +70,8 @@ use tableinfo::{crsql_ensure_table_infos_are_up_to_date, is_table_compatible, pu use teardown::*; use triggers::create_triggers; +pub use debug::debug_log; + pub extern "C" fn crsql_as_table( ctx: *mut sqlite::context, argc: i32, @@ -109,6 +112,22 @@ pub extern "C" fn sqlite3_crsqlcore_init( ) -> *mut c_void { sqlite::EXTENSION_INIT2(api); + let rc = db + .create_function_v2( + "crsql_set_debug", + 1, + sqlite::UTF8 | sqlite::DIRECTONLY, + None, + Some(debug::x_crsql_set_debug), + None, + None, + None, + ) + .unwrap_or(sqlite::ResultCode::ERROR); + if rc != ResultCode::OK { + return null_mut(); + } + let rc = db .create_function_v2( "crsql_automigrate", diff --git a/core/rs/core/src/tableinfo.rs b/core/rs/core/src/tableinfo.rs index 256900bed..98510d158 100644 --- a/core/rs/core/src/tableinfo.rs +++ b/core/rs/core/src/tableinfo.rs @@ -111,7 +111,10 @@ impl TableInfo { let stmt_ref = self.get_select_key_stmt(db)?; let stmt = stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; for (i, pk) in pks.iter().enumerate() { - stmt.bind_value(i as i32 + 1, *pk)?; + if let Err(rc) = stmt.bind_value(i as i32 + 1, *pk) { + stmt.clear_bindings()?; + return Err(rc); + } } match stmt.step() { Ok(ResultCode::DONE) => { @@ -141,7 +144,10 @@ impl TableInfo { let stmt_ref = self.get_insert_or_ignore_returning_key_stmt(db)?; let stmt = stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; for (i, pk) in pks.iter().enumerate() { - stmt.bind_value(i as i32 + 1, *pk)?; + if let Err(rc) = stmt.bind_value(i as i32 + 1, *pk) { + stmt.clear_bindings()?; + return Err(rc); + } } match stmt.step() { Ok(ResultCode::DONE) => { @@ -208,7 +214,10 @@ impl TableInfo { let stmt_ref = self.get_insert_key_stmt(db)?; let stmt = stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; for (i, pk) in pks.iter().enumerate() { - stmt.bind_value(i as i32 + 1, *pk)?; + if let Err(rc) = stmt.bind_value(i as i32 + 1, *pk) { + stmt.clear_bindings()?; + return Err(rc); + } } match stmt.step() { Ok(ResultCode::ROW) => { @@ -478,15 +487,15 @@ impl TableInfo { // from the old pk can override the ones from the new at a node // following our changes. let sql = format!( - "UPDATE OR REPLACE \"{table_name}__crsql_clock\" SET - key = ?, - db_version = ?, - seq = ?, - col_version = col_version + 1, - site_id = 0 - WHERE - key = ? AND col_name = ?", - table_name = crate::util::escape_ident(&self.tbl_name), + "UPDATE OR REPLACE \"{table_name}__crsql_clock\" SET + key = ?, + db_version = ?, + seq = ?, + col_version = col_version + 1, + site_id = 0 + WHERE + key = ? AND col_name = ?", + table_name = crate::util::escape_ident(&self.tbl_name), ); let ret = db.prepare_v3(&sql, sqlite::PREPARE_PERSISTENT)?; *self.move_non_sentinels_stmt.try_borrow_mut()? = Some(ret); diff --git a/core/rs/integration_check/Cargo.lock b/core/rs/integration_check/Cargo.lock index f5193cbd1..9ce49607f 100644 --- a/core/rs/integration_check/Cargo.lock +++ b/core/rs/integration_check/Cargo.lock @@ -131,6 +131,7 @@ name = "crsql_core" version = "0.1.0" dependencies = [ "bytes", + "libc-print", "num-derive", "num-traits", "sqlite_nostd", diff --git a/core/src/crsqlite.test.c b/core/src/crsqlite.test.c index e27f01d7a..e2385fe66 100644 --- a/core/src/crsqlite.test.c +++ b/core/src/crsqlite.test.c @@ -503,7 +503,6 @@ static void noopsDoNotMoveClocks() { printf("NoopsDoNotMoveClocks\n"); // syncing from A -> B, while no changes happen on B, moves up // B's clock still. - sqlite3 *db1; sqlite3 *db2; int rc = SQLITE_OK; @@ -634,4 +633,4 @@ void crsqlTestSuite() { // testSyncBit(); // testDbVersion(); // testSiteId(); -} \ No newline at end of file +} diff --git a/py/correctness/tests/test_sync.py b/py/correctness/tests/test_sync.py index fd30f2b18..fe1602d11 100644 --- a/py/correctness/tests/test_sync.py +++ b/py/correctness/tests/test_sync.py @@ -397,17 +397,23 @@ def test_merge_same_w_tie_breaker(): db3.execute("SELECT crsql_config_set('merge-equal-values', 1);") db3.commit() + # Sync changes so all nodes have seen changes from other nodes sync_left_to_right(db1, db2, 0) - changes2 = db2.execute("SELECT \"table\", pk, cid, val, col_version, site_id, db_version FROM crsql_changes").fetchall() - sync_left_to_right(db2, db1, 0) - changes1 = db1.execute("SELECT \"table\", pk, cid, val, col_version, site_id, db_version FROM crsql_changes").fetchall() - sync_left_to_right(db2, db3, 0) + sync_left_to_right(db3, db2, 0) + sync_left_to_right(db3, db1, 0) + + changes1 = db1.execute("SELECT \"table\", pk, cid, val, col_version, site_id, db_version FROM crsql_changes").fetchall() + changes2 = db2.execute("SELECT \"table\", pk, cid, val, col_version, site_id, db_version FROM crsql_changes").fetchall() changes3 = db3.execute("SELECT \"table\", pk, cid, val, col_version, site_id, db_version FROM crsql_changes").fetchall() - # check that everything by db_version is the same - assert (changes2[:-6] == changes1[:-6] == changes3[:-6]) + # check that everything but db_version is the same + # print("changes2", changes2) + changes1_no_dbv = [x[:-1] for x in changes1] + changes2_no_dbv = [x[:-1] for x in changes2] + changes3_no_dbv = [x[:-1] for x in changes3] + assert (changes2_no_dbv == changes1_no_dbv == changes3_no_dbv) # Test that we're stable / do not loop when we tie break equal values