diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 3a9f98f..1277c0e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,19 +9,25 @@ jobs: fail-fast: false matrix: include: - - os: ubuntu-latest + - os: ubuntu-24.04 - os: macos-latest steps: - uses: actions/checkout@v3 with: submodules: true + - uses: dart-lang/setup-dart@v1 - - name: Build lib + - name: Ubuntu setup + if: matrix.os == 'ubuntu-24.04' run: | - cargo build -p powersync_loadable --release + sudo apt install libreadline-dev - - name: Build sqlite + - name: Build run: | + # Need a debug build for the dart tests + cargo build -p powersync_loadable + + cargo build -p powersync_loadable --release cargo build -p powersync_core --release --features static cargo build -p powersync_sqlite --release cargo build -p sqlite3 --release @@ -37,3 +43,12 @@ jobs: - name: Check loadable extension run: | ./target/release/sqlite3 ":memory:" ".load ./target/release/libpowersync" "select powersync_rs_version()" + + - name: Run dart-based tests + # Extension loading fails on macos currently + if: matrix.os == 'ubuntu-24.04' + run: | + cd dart + dart pub get + dart test + dart analyze diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 5fd6dc8..69f41fc 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -19,6 +19,7 @@ mod error; mod ext; mod kv; mod macros; +mod migrations; mod operations; mod operations_vtab; mod schema_management; diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs new file mode 100644 index 0000000..830fc79 --- /dev/null +++ b/crates/core/src/migrations.rs @@ -0,0 +1,288 @@ +extern crate alloc; + +use alloc::format; +use alloc::string::{String, ToString}; +use alloc::vec::Vec; + +use sqlite::ResultCode; +use sqlite_nostd as sqlite; +use sqlite_nostd::{Connection, Context}; + +use crate::error::{PSResult, SQLiteError}; + +pub fn powersync_migrate( + ctx: *mut sqlite::context, + target_version: i32, +) -> Result<(), SQLiteError> { + let local_db = ctx.db_handle(); + + // language=SQLite + local_db.exec_safe( + "\ +CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)", + )?; + + // language=SQLite + let current_version_stmt = + local_db.prepare_v2("SELECT ifnull(max(id), 0) as version FROM ps_migration")?; + let rc = current_version_stmt.step()?; + if rc != ResultCode::ROW { + return Err(SQLiteError::from(ResultCode::ABORT)); + } + + let mut current_version = current_version_stmt.column_int(0)?; + + while current_version > target_version { + // Run down migrations. + // This is rare, we don't worry about optimizing this. + + current_version_stmt.reset()?; + + let down_migrations_stmt = local_db.prepare_v2("select e.value ->> 'sql' as sql from (select id, down_migrations from ps_migration where id > ?1 order by id desc limit 1) m, json_each(m.down_migrations) e")?; + down_migrations_stmt.bind_int(1, target_version)?; + + let mut down_sql: Vec = alloc::vec![]; + + while down_migrations_stmt.step()? == ResultCode::ROW { + let sql = down_migrations_stmt.column_text(0)?; + down_sql.push(sql.to_string()); + } + + for sql in down_sql { + let rs = local_db.exec_safe(&sql); + if let Err(code) = rs { + return Err(SQLiteError( + code, + Some(format!( + "Down migration failed for {:} {:} {:}", + current_version, + sql, + local_db + .errmsg() + .unwrap_or(String::from("Conversion error")) + )), + )); + } + } + + // Refresh the version + current_version_stmt.reset()?; + let rc = current_version_stmt.step()?; + if rc != ResultCode::ROW { + return Err(SQLiteError( + rc, + Some("Down migration failed - could not get version".to_string()), + )); + } + let new_version = current_version_stmt.column_int(0)?; + if new_version >= current_version { + // Database down from version $currentVersion to $version failed - version not updated after dow migration + return Err(SQLiteError( + ResultCode::ABORT, + Some(format!( + "Down migration failed - version not updated from {:}", + current_version + )), + )); + } + current_version = new_version; + } + current_version_stmt.reset()?; + + if current_version < 1 { + // language=SQLite + local_db + .exec_safe( + " +CREATE TABLE ps_oplog( +bucket TEXT NOT NULL, +op_id INTEGER NOT NULL, +op INTEGER NOT NULL, +row_type TEXT, +row_id TEXT, +key TEXT, +data TEXT, +hash INTEGER NOT NULL, +superseded INTEGER NOT NULL); + +CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0; +CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id); +CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0; + +CREATE TABLE ps_buckets( +name TEXT PRIMARY KEY, +last_applied_op INTEGER NOT NULL DEFAULT 0, +last_op INTEGER NOT NULL DEFAULT 0, +target_op INTEGER NOT NULL DEFAULT 0, +add_checksum INTEGER NOT NULL DEFAULT 0, +pending_delete INTEGER NOT NULL DEFAULT 0 +); + +CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)); + +CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT); + +INSERT INTO ps_migration(id, down_migrations) VALUES(1, NULL); +", + ) + .into_db_result(local_db)?; + } + + if current_version < 2 && target_version >= 2 { + // language=SQLite + local_db.exec_safe("\ +CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER); +INSERT INTO ps_tx(id, current_tx, next_tx) VALUES(1, NULL, 1); + +ALTER TABLE ps_crud ADD COLUMN tx_id INTEGER; + +INSERT INTO ps_migration(id, down_migrations) VALUES(2, json_array(json_object('sql', 'DELETE FROM ps_migration WHERE id >= 2', 'params', json_array()), json_object('sql', 'DROP TABLE ps_tx', 'params', json_array()), json_object('sql', 'ALTER TABLE ps_crud DROP COLUMN tx_id', 'params', json_array()))); +").into_db_result(local_db)?; + } + + if current_version < 3 && target_version >= 3 { + // language=SQLite + local_db.exec_safe("\ +CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB); +INSERT INTO ps_kv(key, value) values('client_id', uuid()); + +INSERT INTO ps_migration(id, down_migrations) VALUES(3, json_array(json_object('sql', 'DELETE FROM ps_migration WHERE id >= 3'), json_object('sql', 'DROP TABLE ps_kv'))); + ").into_db_result(local_db)?; + } + + if current_version < 4 && target_version >= 4 { + // language=SQLite + local_db.exec_safe("\ +ALTER TABLE ps_buckets ADD COLUMN op_checksum INTEGER NOT NULL DEFAULT 0; +ALTER TABLE ps_buckets ADD COLUMN remove_operations INTEGER NOT NULL DEFAULT 0; + +UPDATE ps_buckets SET op_checksum = ( +SELECT IFNULL(SUM(ps_oplog.hash), 0) & 0xffffffff FROM ps_oplog WHERE ps_oplog.bucket = ps_buckets.name +); + +INSERT INTO ps_migration(id, down_migrations) +VALUES(4, + json_array( + json_object('sql', 'DELETE FROM ps_migration WHERE id >= 4'), + json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN op_checksum'), + json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN remove_operations') + )); + ").into_db_result(local_db)?; + } + + if current_version < 5 && target_version >= 5 { + // Start by dropping all existing views and triggers (but not tables). + // This is because the triggers are restructured in this version, and + // need to be re-created from scratch. Not dropping them can make it + // refer to tables or columns not existing anymore, which can case + // issues later on. + // The same applies for the down migration. + + // language=SQLite + local_db + .exec_safe( + "\ +SELECT powersync_drop_view(view.name) +FROM sqlite_master view +WHERE view.type = 'view' + AND view.sql GLOB '*-- powersync-auto-generated'; + +ALTER TABLE ps_buckets RENAME TO ps_buckets_old; +ALTER TABLE ps_oplog RENAME TO ps_oplog_old; + +CREATE TABLE ps_buckets( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + op_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +) STRICT; + +CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name); + +CREATE TABLE ps_oplog( + bucket INTEGER NOT NULL, + op_id INTEGER NOT NULL, + row_type TEXT, + row_id TEXT, + key TEXT, + data TEXT, + hash INTEGER NOT NULL) STRICT; + +CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id); +CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id); +CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key); + +CREATE TABLE ps_updated_rows( + row_type TEXT, + row_id TEXT) STRICT; + +CREATE UNIQUE INDEX ps_updated_rows_row ON ps_updated_rows (row_type, row_id); + +INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) +SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_old; + +DROP TABLE ps_buckets_old; + +INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) +SELECT ps_buckets.id, oplog.op_id, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash + FROM ps_oplog_old oplog + JOIN ps_buckets + ON ps_buckets.name = oplog.bucket + WHERE oplog.superseded = 0 AND oplog.op = 3 + ORDER BY oplog.bucket, oplog.op_id; + +INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) +SELECT row_type, row_id + FROM ps_oplog_old oplog + WHERE oplog.op != 3; + +UPDATE ps_buckets SET add_checksum = 0xffffffff & (add_checksum + ( +SELECT IFNULL(SUM(oplog.hash), 0) + FROM ps_oplog_old oplog + WHERE oplog.bucket = ps_buckets.name + AND (oplog.superseded = 1 OR oplog.op != 3) +)); + +UPDATE ps_buckets SET op_checksum = 0xffffffff & (op_checksum - ( + SELECT IFNULL(SUM(oplog.hash), 0) + FROM ps_oplog_old oplog + WHERE oplog.bucket = ps_buckets.name + AND (oplog.superseded = 1 OR oplog.op != 3) +)); + +DROP TABLE ps_oplog_old; + +INSERT INTO ps_migration(id, down_migrations) +VALUES(5, + json_array( + -- Drop existing views and triggers if any + json_object('sql', 'SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated'''), + + json_object('sql', 'ALTER TABLE ps_buckets RENAME TO ps_buckets_5'), + json_object('sql', 'ALTER TABLE ps_oplog RENAME TO ps_oplog_5'), + json_object('sql', 'CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)'), + json_object('sql', 'INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5'), + json_object('sql', 'CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)'), + json_object('sql', 'CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0'), + json_object('sql', 'CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)'), + json_object('sql', 'CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0'), + json_object('sql', 'INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket'), + json_object('sql', 'DROP TABLE ps_oplog_5'), + json_object('sql', 'DROP TABLE ps_buckets_5'), + json_object('sql', 'INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r'), + json_object('sql', 'INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)'), + json_object('sql', 'DROP TABLE ps_updated_rows'), + + json_object('sql', 'DELETE FROM ps_migration WHERE id >= 5') + )); + ", + ) + .into_db_result(local_db)?; + } + + Ok(()) +} diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index b84b682..067365b 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -1,12 +1,11 @@ use alloc::format; -use alloc::string::{String, ToString}; +use alloc::string::String; use crate::error::{PSResult, SQLiteError}; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, ResultCode}; use crate::ext::SafeManagedStmt; -use crate::util::*; // Run inside a transaction pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> { @@ -57,23 +56,6 @@ FROM json_each(?) e", )?; iterate_statement.bind_text(1, data, sqlite::Destructor::STATIC)?; - // Statement to supersede (replace) operations with the same key. - // language=SQLite - let supersede_statement = db.prepare_v2( - "\ -DELETE FROM ps_oplog - WHERE ps_oplog.superseded = 0 - AND unlikely(ps_oplog.bucket = ?1) - AND ps_oplog.key = ?2 -RETURNING op_id, hash", - )?; - supersede_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; - - // language=SQLite - let insert_statement = db.prepare_v2("\ -INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, superseded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0)")?; - insert_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; - // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows. // We can consider splitting this into separate SELECT and INSERT statements. // language=SQLite @@ -82,23 +64,45 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super VALUES(?) ON CONFLICT DO UPDATE SET last_applied_op = last_applied_op - RETURNING last_applied_op", + RETURNING id, last_applied_op", )?; bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; bucket_statement.step()?; + let bucket_id = bucket_statement.column_int64(0)?; + // This is an optimization for initial sync - we can avoid persisting individual REMOVE // operations when last_applied_op = 0. // We do still need to do the "supersede_statement" step for this case, since a REMOVE // operation can supersede another PUT operation we're syncing at the same time. - let mut last_applied_op = bucket_statement.column_int64(0)?; + let mut last_applied_op = bucket_statement.column_int64(1)?; + + // Statement to supersede (replace) operations with the same key. + // language=SQLite + let supersede_statement = db.prepare_v2( + "\ +DELETE FROM ps_oplog + WHERE unlikely(ps_oplog.bucket = ?1) + AND ps_oplog.key = ?2 +RETURNING op_id, hash", + )?; + supersede_statement.bind_int64(1, bucket_id)?; + + // language=SQLite + let insert_statement = db.prepare_v2("\ +INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?; + insert_statement.bind_int64(1, bucket_id)?; + + let updated_row_statement = db.prepare_v2( + "\ +INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", + )?; bucket_statement.reset()?; let mut last_op: Option = None; let mut add_checksum: i32 = 0; let mut op_checksum: i32 = 0; - let mut remove_operations: i32 = 0; while iterate_statement.step()? == ResultCode::ROW { let op_id = iterate_statement.column_int64(0)?; @@ -140,65 +144,83 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super } supersede_statement.reset()?; - let should_skip_remove = !superseded && op == "REMOVE"; - if should_skip_remove { - // If a REMOVE statement did not replace (supersede) any previous - // operations, we do not need to persist it. - // The same applies if the bucket was not synced to the local db yet, - // even if it did supersede another operation. - // Handle the same as MOVE. + if op == "REMOVE" { + let should_skip_remove = !superseded; + add_checksum = add_checksum.wrapping_add(checksum); + + if !should_skip_remove { + if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) { + updated_row_statement.bind_text( + 1, + object_type, + sqlite::Destructor::STATIC, + )?; + updated_row_statement.bind_text( + 2, + object_id, + sqlite::Destructor::STATIC, + )?; + updated_row_statement.exec()?; + } + } + continue; } - let opi = if op == "PUT" { 3 } else { 4 }; insert_statement.bind_int64(2, op_id)?; - insert_statement.bind_int(3, opi)?; if key != "" { - insert_statement.bind_text(4, &key, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(3, &key, sqlite::Destructor::STATIC)?; } else { - insert_statement.bind_null(4)?; + insert_statement.bind_null(3)?; } if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) { - insert_statement.bind_text(5, object_type, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(6, object_id, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(4, object_type, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(5, object_id, sqlite::Destructor::STATIC)?; } else { + insert_statement.bind_null(4)?; insert_statement.bind_null(5)?; - insert_statement.bind_null(6)?; } if let Ok(data) = op_data { - insert_statement.bind_text(7, data, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(6, data, sqlite::Destructor::STATIC)?; } else { - insert_statement.bind_null(7)?; + insert_statement.bind_null(6)?; } - insert_statement.bind_int(8, checksum)?; + insert_statement.bind_int(7, checksum)?; insert_statement.exec()?; op_checksum = op_checksum.wrapping_add(checksum); - - if opi == 4 { - // We persisted a REMOVE statement, so the bucket needs - // to be compacted at some point. - remove_operations += 1; - } } else if op == "MOVE" { add_checksum = add_checksum.wrapping_add(checksum); } else if op == "CLEAR" { // Any remaining PUT operations should get an implicit REMOVE // language=SQLite - let clear_statement = db.prepare_v2("UPDATE ps_oplog SET op=4, data=NULL, hash=0 WHERE (op=3 OR op=4) AND bucket=?1").into_db_result(db)?; - clear_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; - clear_statement.exec()?; + let clear_statement1 = db + .prepare_v2( + "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) +SELECT row_type, row_id +FROM ps_oplog +WHERE bucket = ?1", + ) + .into_db_result(db)?; + clear_statement1.bind_int64(1, bucket_id)?; + clear_statement1.exec()?; + + let clear_statement2 = db + .prepare_v2("DELETE FROM ps_oplog WHERE bucket = ?1") + .into_db_result(db)?; + clear_statement2.bind_int64(1, bucket_id)?; + clear_statement2.exec()?; // And we need to re-apply all of those. // We also replace the checksum with the checksum of the CLEAR op. // language=SQLite let clear_statement2 = db.prepare_v2( - "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE name = ?2", + "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2", )?; - clear_statement2.bind_text(2, bucket, sqlite::Destructor::STATIC)?; + clear_statement2.bind_int64(2, bucket_id)?; clear_statement2.bind_int(1, checksum)?; clear_statement2.exec()?; @@ -214,15 +236,13 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super "UPDATE ps_buckets SET last_op = ?2, add_checksum = (add_checksum + ?3) & 0xffffffff, - op_checksum = (op_checksum + ?4) & 0xffffffff, - remove_operations = (remove_operations + ?5) - WHERE name = ?1", + op_checksum = (op_checksum + ?4) & 0xffffffff + WHERE id = ?1", )?; - statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; + statement.bind_int64(1, bucket_id)?; statement.bind_int64(2, *last_op)?; statement.bind_int(3, add_checksum)?; statement.bind_int(4, op_checksum)?; - statement.bind_int(5, remove_operations)?; statement.exec()?; } @@ -230,109 +250,42 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super Ok(()) } -pub fn clear_remove_ops(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> { - // language=SQLite - let statement = db.prepare_v2( - " -SELECT - name, - last_applied_op, - (SELECT IFNULL(SUM(oplog.hash), 0) - FROM ps_oplog oplog - WHERE oplog.bucket = ps_buckets.name - AND oplog.op_id <= ps_buckets.last_applied_op - AND (oplog.superseded = 1 OR oplog.op != 3) - ) as checksum -FROM ps_buckets -WHERE ps_buckets.pending_delete = 0 AND - ps_buckets.remove_operations >= CASE - WHEN ?1 = '' THEN 1 - ELSE IFNULL(?1 ->> 'threshold', 1) - END", - )?; - // Compact bucket if there are 50 or more operations - statement.bind_text(1, _data, sqlite::Destructor::STATIC); - - // language=SQLite - let update_statement = db.prepare_v2( - " - UPDATE ps_buckets - SET add_checksum = (add_checksum + ?2) & 0xffffffff, - op_checksum = (op_checksum - ?2) & 0xffffffff, - remove_operations = 0 - WHERE ps_buckets.name = ?1", - )?; - - // language=SQLite - let delete_statement = db.prepare_v2( - "DELETE - FROM ps_oplog - WHERE (superseded = 1 OR op != 3) - AND bucket = ?1 - AND op_id <= ?2", - )?; - - while statement.step()? == ResultCode::ROW { - // Note: Each iteration here may be run in a separate transaction. - let name = statement.column_text(0)?; - let last_applied_op = statement.column_int64(1)?; - let checksum = statement.column_int(2)?; - - update_statement.bind_text(1, name, sqlite::Destructor::STATIC)?; - update_statement.bind_int(2, checksum)?; - update_statement.exec()?; - - // Must use the same values as above - delete_statement.bind_text(1, name, sqlite::Destructor::STATIC)?; - delete_statement.bind_int64(2, last_applied_op)?; - delete_statement.exec()?; - } +pub fn clear_remove_ops(_db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> { + // No-op Ok(()) } -pub fn delete_pending_buckets(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> { - // language=SQLite - let statement = db.prepare_v2( - "DELETE FROM ps_oplog WHERE bucket IN (SELECT name FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op)")?; - statement.exec()?; - - // language=SQLite - let statement = db.prepare_v2("DELETE FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op")?; - statement.exec()?; +pub fn delete_pending_buckets(_db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> { + // No-op Ok(()) } pub fn delete_bucket(db: *mut sqlite::sqlite3, name: &str) -> Result<(), SQLiteError> { - let id = gen_uuid(); - let new_name = format!("$delete_{}_{}", name, id.hyphenated().to_string()); - // language=SQLite - let statement = db.prepare_v2( - "UPDATE ps_oplog SET op=4, data=NULL, bucket=?1 WHERE op=3 AND superseded=0 AND bucket=?2", - )?; - statement.bind_text(1, &new_name, sqlite::Destructor::STATIC)?; - statement.bind_text(2, &name, sqlite::Destructor::STATIC)?; - statement.exec()?; + let statement = db.prepare_v2("DELETE FROM ps_buckets WHERE name = ?1 RETURNING id")?; + statement.bind_text(1, name, sqlite::Destructor::STATIC)?; - // Rename bucket - // language=SQLite - let statement = db.prepare_v2("UPDATE ps_oplog SET bucket=?1 WHERE bucket=?2")?; - statement.bind_text(1, &new_name, sqlite::Destructor::STATIC)?; - statement.bind_text(2, name, sqlite::Destructor::STATIC)?; - statement.exec()?; + if statement.step()? == ResultCode::ROW { + let bucket_id = statement.column_int64(0)?; - // language=SQLite - let statement = db.prepare_v2("DELETE FROM ps_buckets WHERE name = ?1")?; - statement.bind_text(1, name, sqlite::Destructor::STATIC)?; - statement.exec()?; + // language=SQLite + let updated_statement = db.prepare_v2( + "\ +INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) +SELECT row_type, row_id +FROM ps_oplog +WHERE bucket = ?1", + )?; + updated_statement.bind_int64(1, bucket_id)?; + updated_statement.exec()?; - // language=SQLite - let statement = db.prepare_v2( - "INSERT INTO ps_buckets(name, pending_delete, last_op) SELECT ?1, 1, IFNULL(MAX(op_id), 0) FROM ps_oplog WHERE bucket = ?1")?; - statement.bind_text(1, &new_name, sqlite::Destructor::STATIC)?; - statement.exec()?; + // language=SQLite + let delete_statement = db.prepare_v2("DELETE FROM ps_oplog WHERE bucket=?1")?; + delete_statement.bind_int64(1, bucket_id)?; + delete_statement.exec()?; + } Ok(()) } diff --git a/crates/core/src/schema_management.rs b/crates/core/src/schema_management.rs index 1adce6e..2a46473 100644 --- a/crates/core/src/schema_management.rs +++ b/crates/core/src/schema_management.rs @@ -10,22 +10,26 @@ use sqlite::{Connection, ResultCode, Value}; use sqlite_nostd as sqlite; use sqlite_nostd::Context; -use crate::{create_auto_tx_function, create_sqlite_text_fn}; -use crate::error::{SQLiteError, PSResult}; +use crate::error::{PSResult, SQLiteError}; use crate::ext::ExtendedDatabase; use crate::util::{quote_identifier, quote_json_path}; +use crate::{create_auto_tx_function, create_sqlite_text_fn}; fn update_tables(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), SQLiteError> { { // In a block so that the statement is finalized before dropping tables // language=SQLite - let statement = db.prepare_v2("\ + let statement = db + .prepare_v2( + "\ SELECT json_extract(json_each.value, '$.name') as name, powersync_internal_table_name(json_each.value) as internal_name, ifnull(json_extract(json_each.value, '$.local_only'), 0) as local_only FROM json_each(json_extract(?, '$.tables')) - WHERE name NOT IN (SELECT name FROM powersync_tables)").into_db_result(db)?; + WHERE name NOT IN (SELECT name FROM powersync_tables)", + ) + .into_db_result(db)?; statement.bind_text(1, schema, sqlite::Destructor::STATIC)?; while statement.step().into_db_result(db)? == ResultCode::ROW { @@ -33,32 +37,56 @@ SELECT let internal_name = statement.column_text(1)?; let local_only = statement.column_int(2)? != 0; - db.exec_safe(&format!("CREATE TABLE {:}(id TEXT PRIMARY KEY NOT NULL, data TEXT)", quote_identifier(internal_name))).into_db_result(db)?; + db.exec_safe(&format!( + "CREATE TABLE {:}(id TEXT PRIMARY KEY NOT NULL, data TEXT)", + quote_identifier(internal_name) + )) + .into_db_result(db)?; if !local_only { // MOVE data if any - db.exec_text(&format!("INSERT INTO {:}(id, data) + db.exec_text( + &format!( + "INSERT INTO {:}(id, data) SELECT id, data FROM ps_untyped - WHERE type = ?", quote_identifier(internal_name)), name).into_db_result(db)?; + WHERE type = ?", + quote_identifier(internal_name) + ), + name, + ) + .into_db_result(db)?; // language=SQLite - db.exec_text("DELETE + db.exec_text( + "DELETE FROM ps_untyped - WHERE type = ?", name)?; + WHERE type = ?", + name, + )?; } if !local_only { // MOVE data if any - db.exec_text(&format!("INSERT INTO {:}(id, data) + db.exec_text( + &format!( + "INSERT INTO {:}(id, data) SELECT id, data FROM ps_untyped - WHERE type = ?", quote_identifier(internal_name)), name).into_db_result(db)?; + WHERE type = ?", + quote_identifier(internal_name) + ), + name, + ) + .into_db_result(db)?; // language=SQLite - db.exec_text("DELETE + db.exec_text( + "DELETE FROM ps_untyped - WHERE type = ?", name)?; + WHERE type = ?", + name, + )?; } } } @@ -68,11 +96,15 @@ SELECT { // In a block so that the statement is finalized before dropping tables // language=SQLite - let statement = db.prepare_v2("\ + let statement = db + .prepare_v2( + "\ SELECT name, internal_name, local_only FROM powersync_tables WHERE name NOT IN ( SELECT json_extract(json_each.value, '$.name') FROM json_each(json_extract(?, '$.tables')) - )").into_db_result(db)?; + )", + ) + .into_db_result(db)?; statement.bind_text(1, schema, sqlite::Destructor::STATIC)?; while statement.step()? == ResultCode::ROW { @@ -83,7 +115,14 @@ SELECT name, internal_name, local_only FROM powersync_tables WHERE name NOT IN ( tables_to_drop.push(String::from(internal_name)); if !local_only { - db.exec_text(&format!("INSERT INTO ps_untyped(type, id, data) SELECT ?, id, data FROM {:}", quote_identifier(internal_name)), name).into_db_result(db)?; + db.exec_text( + &format!( + "INSERT INTO ps_untyped(type, id, data) SELECT ?, id, data FROM {:}", + quote_identifier(internal_name) + ), + name, + ) + .into_db_result(db)?; } } } @@ -116,7 +155,6 @@ SELECT ").into_db_result(db)?; statement.bind_text(1, schema, sqlite::Destructor::STATIC)?; - while statement.step().into_db_result(db)? == ResultCode::ROW { let table_name = statement.column_text(0)?; let index_name = statement.column_text(1)?; @@ -150,7 +188,12 @@ SELECT } } - let sql = format!("CREATE INDEX {} ON {}({})", quote_identifier(index_name), quote_identifier(table_name), column_values.join(", ")); + let sql = format!( + "CREATE INDEX {} ON {}({})", + quote_identifier(index_name), + quote_identifier(table_name), + column_values.join(", ") + ); if existing_sql == "" { statements.push(sql); } else if existing_sql != sql { @@ -194,7 +237,6 @@ SELECT Ok(()) } - fn update_views(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), SQLiteError> { // Update existing views if modified // language=SQLite @@ -276,7 +318,6 @@ create_sqlite_text_fn!( "powersync_replace_schema" ); - pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { db.create_function_v2( "powersync_replace_schema", diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 382e830..e1061ed 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -15,7 +15,7 @@ pub fn can_update_local(db: *mut sqlite::sqlite3) -> Result { "\ SELECT group_concat(name) FROM ps_buckets -WHERE target_op > last_op AND (name = '$local' OR pending_delete = 0)", +WHERE target_op > last_op", )?; if statement.step()? != ResultCode::ROW { @@ -64,25 +64,28 @@ pub fn sync_local(db: *mut sqlite::sqlite3, _data: &str) -> Result buckets.last_applied_op) + UNION ALL SELECT row_type, row_id FROM ps_updated_rows +) + -- 3. Group the objects from different buckets together into a single one (ops). -SELECT r.row_type as type, - r.row_id as id, +SELECT b.row_type as type, + b.row_id as id, r.data as data, - json_group_array(r.bucket) FILTER (WHERE r.op=3) as buckets, + count(r.bucket) as buckets, /* max() affects which row is used for 'data' */ - max(r.op_id) FILTER (WHERE r.op=3) as op_id --- 1. Filter oplog by the ops added but not applied yet (oplog b). -FROM ps_buckets AS buckets - CROSS JOIN ps_oplog AS b ON b.bucket = buckets.name - AND (b.op_id > buckets.last_applied_op) - -- 2. Find *all* current ops over different buckets for those objects (oplog r). - INNER JOIN ps_oplog AS r + max(r.op_id) as op_id +-- 2. Find *all* current ops over different buckets for those objects (oplog r). +FROM updated_rows b + LEFT OUTER JOIN ps_oplog AS r ON r.row_type = b.row_type AND r.row_id = b.row_id -WHERE r.superseded = 0 -AND b.superseded = 0 -- Group for (3) -GROUP BY r.row_type, r.row_id", +GROUP BY b.row_type, b.row_id", ) .into_db_result(db)?; @@ -91,7 +94,7 @@ GROUP BY r.row_type, r.row_id", while statement.step().into_db_result(db)? == ResultCode::ROW { let type_name = statement.column_text(0)?; let id = statement.column_text(1)?; - let buckets = statement.column_text(3)?; + let buckets = statement.column_int(3)?; let data = statement.column_text(2); let table_name = internal_table_name(type_name); @@ -99,7 +102,7 @@ GROUP BY r.row_type, r.row_id", if tables.contains(&table_name) { let quoted = quote_internal_name(type_name, false); - if buckets == "[]" { + if buckets == 0 { // DELETE let delete_statement = db .prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)) @@ -116,7 +119,7 @@ GROUP BY r.row_type, r.row_id", insert_statement.exec()?; } } else { - if buckets == "[]" { + if buckets == 0 { // DELETE // language=SQLite let delete_statement = db @@ -147,6 +150,10 @@ GROUP BY r.row_type, r.row_id", ) .into_db_result(db)?; + // language=SQLite + db.exec_safe("DELETE FROM ps_updated_rows") + .into_db_result(db)?; + // language=SQLite db.exec_safe("insert or replace into ps_kv(key, value) values('last_synced_at', datetime())") .into_db_result(db)?; diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index de78756..dcd8972 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -10,7 +10,8 @@ use sqlite::{ResultCode, Value}; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; -use crate::error::{PSResult, SQLiteError}; +use crate::error::SQLiteError; +use crate::migrations::powersync_migrate; use crate::util::quote_identifier; use crate::{create_auto_tx_function, create_sqlite_text_fn}; @@ -117,166 +118,32 @@ fn powersync_init_impl( ) -> Result { let local_db = ctx.db_handle(); - // language=SQLite - local_db.exec_safe( - "\ -CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)", - )?; - - // language=SQLite - let current_version_stmt = - local_db.prepare_v2("SELECT ifnull(max(id), 0) as version FROM ps_migration")?; - let rc = current_version_stmt.step()?; - if rc != ResultCode::ROW { - return Err(SQLiteError::from(ResultCode::ABORT)); - } - - const CODE_VERSION: i32 = 4; - - let mut current_version = current_version_stmt.column_int(0)?; - - while current_version > CODE_VERSION { - // Run down migrations. - // This is rare, we don't worry about optimizing this. - - current_version_stmt.reset()?; - - let down_migrations_stmt = local_db.prepare_v2("select e.value ->> 'sql' as sql from (select id, down_migrations from ps_migration where id > ?1 order by id desc limit 1) m, json_each(m.down_migrations) e")?; - down_migrations_stmt.bind_int(1, CODE_VERSION)?; - - let mut down_sql: Vec = alloc::vec![]; - - while down_migrations_stmt.step()? == ResultCode::ROW { - let sql = down_migrations_stmt.column_text(0)?; - down_sql.push(sql.to_string()); - } - - for sql in down_sql { - let rs = local_db.exec_safe(&sql); - if let Err(code) = rs { - return Err(SQLiteError( - code, - Some(format!( - "Down migration failed for {:} {:}", - current_version, sql - )), - )); - } - } - - // Refresh the version - current_version_stmt.reset()?; - let rc = current_version_stmt.step()?; - if rc != ResultCode::ROW { - return Err(SQLiteError( - rc, - Some("Down migration failed - could not get version".to_string()), - )); - } - let new_version = current_version_stmt.column_int(0)?; - if new_version >= current_version { - // Database down from version $currentVersion to $version failed - version not updated after dow migration - return Err(SQLiteError( - ResultCode::ABORT, - Some(format!( - "Down migration failed - version not updated from {:}", - current_version - )), - )); - } - current_version = new_version; - } - - current_version_stmt.reset()?; - - if current_version < 1 { - // language=SQLite - local_db - .exec_safe( - " -CREATE TABLE ps_oplog( - bucket TEXT NOT NULL, - op_id INTEGER NOT NULL, - op INTEGER NOT NULL, - row_type TEXT, - row_id TEXT, - key TEXT, - data TEXT, - hash INTEGER NOT NULL, - superseded INTEGER NOT NULL); - -CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0; -CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id); -CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0; - -CREATE TABLE ps_buckets( - name TEXT PRIMARY KEY, - last_applied_op INTEGER NOT NULL DEFAULT 0, - last_op INTEGER NOT NULL DEFAULT 0, - target_op INTEGER NOT NULL DEFAULT 0, - add_checksum INTEGER NOT NULL DEFAULT 0, - pending_delete INTEGER NOT NULL DEFAULT 0 -); - -CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)); - -CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT); - -INSERT INTO ps_migration(id, down_migrations) VALUES(1, NULL); -", - ) - .into_db_result(local_db)?; - } - - if current_version < 2 { - // language=SQLite - local_db.exec_safe("\ -CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER); -INSERT INTO ps_tx(id, current_tx, next_tx) VALUES(1, NULL, 1); - -ALTER TABLE ps_crud ADD COLUMN tx_id INTEGER; - -INSERT INTO ps_migration(id, down_migrations) VALUES(2, json_array(json_object('sql', 'DELETE FROM ps_migration WHERE id >= 2', 'params', json_array()), json_object('sql', 'DROP TABLE ps_tx', 'params', json_array()), json_object('sql', 'ALTER TABLE ps_crud DROP COLUMN tx_id', 'params', json_array()))); -").into_db_result(local_db)?; - } - - if current_version < 3 { - // language=SQLite - local_db.exec_safe("\ -CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB); -INSERT INTO ps_kv(key, value) values('client_id', uuid()); - -INSERT INTO ps_migration(id, down_migrations) VALUES(3, json_array(json_object('sql', 'DELETE FROM ps_migration WHERE id >= 3'), json_object('sql', 'DROP TABLE ps_kv'))); - ").into_db_result(local_db)?; - } + setup_internal_views(local_db)?; - if current_version < 4 { - // language=SQLite - local_db.exec_safe("\ -ALTER TABLE ps_buckets ADD COLUMN op_checksum INTEGER NOT NULL DEFAULT 0; -ALTER TABLE ps_buckets ADD COLUMN remove_operations INTEGER NOT NULL DEFAULT 0; + powersync_migrate(ctx, 5)?; -UPDATE ps_buckets SET op_checksum = ( - SELECT IFNULL(SUM(ps_oplog.hash), 0) & 0xffffffff FROM ps_oplog WHERE ps_oplog.bucket = ps_buckets.name -); + Ok(String::from("")) +} -INSERT INTO ps_migration(id, down_migrations) - VALUES(4, - json_array( - json_object('sql', 'DELETE FROM ps_migration WHERE id >= 4'), - json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN op_checksum'), - json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN remove_operations') - )); - ").into_db_result(local_db)?; - } +create_auto_tx_function!(powersync_init_tx, powersync_init_impl); +create_sqlite_text_fn!(powersync_init, powersync_init_tx, "powersync_init"); - setup_internal_views(local_db)?; +fn powersync_test_migration_impl( + ctx: *mut sqlite::context, + args: &[*mut sqlite::value], +) -> Result { + let target_version = args[0].int(); + powersync_migrate(ctx, target_version)?; Ok(String::from("")) } -create_auto_tx_function!(powersync_init_tx, powersync_init_impl); -create_sqlite_text_fn!(powersync_init, powersync_init_tx, "powersync_init"); +create_auto_tx_function!(powersync_test_migration_tx, powersync_test_migration_impl); +create_sqlite_text_fn!( + powersync_test_migration, + powersync_test_migration_tx, + "powersync_test_migration" +); fn powersync_clear_impl( ctx: *mut sqlite::context, @@ -293,6 +160,7 @@ DELETE FROM ps_oplog; DELETE FROM ps_crud; DELETE FROM ps_buckets; DELETE FROM ps_untyped; +DELETE FROM ps_updated_rows; DELETE FROM ps_kv WHERE key != 'client_id'; ", )?; @@ -459,6 +327,17 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { None, )?; + db.create_function_v2( + "powersync_test_migration", + 1, + sqlite::UTF8, + None, + Some(powersync_test_migration), + None, + None, + None, + )?; + // Initialize the extension internal tables. db.create_function_v2( "powersync_clear", diff --git a/crates/core/src/views.rs b/crates/core/src/views.rs index 06af8db..ce188f6 100644 --- a/crates/core/src/views.rs +++ b/crates/core/src/views.rs @@ -10,7 +10,7 @@ use sqlite::{Connection, Context, ResultCode, Value}; use sqlite_nostd as sqlite; use crate::create_sqlite_text_fn; -use crate::error::{SQLiteError, PSResult}; +use crate::error::{PSResult, SQLiteError}; use crate::util::*; fn powersync_view_sql_impl( @@ -59,7 +59,11 @@ fn powersync_view_sql_impl( return Ok(view_statement); } -create_sqlite_text_fn!(powersync_view_sql, powersync_view_sql_impl, "powersync_view_sql"); +create_sqlite_text_fn!( + powersync_view_sql, + powersync_view_sql_impl, + "powersync_view_sql" +); fn powersync_trigger_delete_sql_impl( ctx: *mut sqlite::context, @@ -79,32 +83,31 @@ fn powersync_trigger_delete_sql_impl( let type_string = quote_string(name); return if !local_only && !insert_only { - let trigger = format!("\ + let trigger = format!( + "\ CREATE TRIGGER {:} INSTEAD OF DELETE ON {:} FOR EACH ROW BEGIN DELETE FROM {:} WHERE id = OLD.id; INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'DELETE', 'type', {:}, 'id', OLD.id)); -INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded) - SELECT '$local', - 1, - 'REMOVE', - {:}, - OLD.id, - 0, - 0; -INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES('$local', 1, 0, {:}); -END", trigger_name, quoted_name, internal_name, type_string, type_string, MAX_OP_ID); +INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES({:}, OLD.id); +INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {:}); +END", + trigger_name, quoted_name, internal_name, type_string, type_string, MAX_OP_ID + ); Ok(trigger) } else if local_only { - let trigger = format!("\ + let trigger = format!( + "\ CREATE TRIGGER {:} INSTEAD OF DELETE ON {:} FOR EACH ROW BEGIN DELETE FROM {:} WHERE id = OLD.id; -END", trigger_name, quoted_name, internal_name); +END", + trigger_name, quoted_name, internal_name + ); Ok(trigger) } else if insert_only { Ok(String::from("")) @@ -115,7 +118,8 @@ END", trigger_name, quoted_name, internal_name); create_sqlite_text_fn!( powersync_trigger_delete_sql, - powersync_trigger_delete_sql_impl, "powersync_trigger_delete_sql" + powersync_trigger_delete_sql_impl, + "powersync_trigger_delete_sql" ); fn powersync_trigger_insert_sql_impl( @@ -163,25 +167,21 @@ fn powersync_trigger_insert_sql_impl( INSERT INTO {:} SELECT NEW.id, json_object({:}); INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'PUT', 'type', {:}, 'id', NEW.id, 'data', json(powersync_diff('{{}}', json_object({:}))))); - INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded) - SELECT '$local', - 1, - 'REMOVE', - {:}, - NEW.id, - 0, - 0; - INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES('$local', 1, 0, {:}); + INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES({:}, NEW.id); + INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {:}); END", trigger_name, quoted_name, internal_name, json_fragment, type_string, json_fragment, type_string, MAX_OP_ID); Ok(trigger) } else if local_only { - let trigger = format!("\ + let trigger = format!( + "\ CREATE TRIGGER {:} INSTEAD OF INSERT ON {:} FOR EACH ROW BEGIN INSERT INTO {:} SELECT NEW.id, json_object({:}); - END", trigger_name, quoted_name, internal_name, json_fragment); + END", + trigger_name, quoted_name, internal_name, json_fragment + ); Ok(trigger) } else if insert_only { let trigger = format!("\ @@ -199,7 +199,8 @@ fn powersync_trigger_insert_sql_impl( create_sqlite_text_fn!( powersync_trigger_insert_sql, - powersync_trigger_insert_sql_impl, "powersync_trigger_insert_sql" + powersync_trigger_insert_sql_impl, + "powersync_trigger_insert_sql" ); fn powersync_trigger_update_sql_impl( @@ -252,19 +253,13 @@ BEGIN SET data = json_object({:}) WHERE id = NEW.id; INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'PATCH', 'type', {:}, 'id', NEW.id, 'data', json(powersync_diff(json_object({:}), json_object({:}))))); - INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded) - SELECT '$local', - 1, - 'REMOVE', - {:}, - NEW.id, - 0, - 0; - INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES('$local', 1, 0, {:}); + INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES({:}, NEW.id); + INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {:}); END", trigger_name, quoted_name, internal_name, json_fragment_new, type_string, json_fragment_old, json_fragment_new, type_string, MAX_OP_ID); Ok(trigger) } else if local_only { - let trigger = format!("\ + let trigger = format!( + "\ CREATE TRIGGER {:} INSTEAD OF UPDATE ON {:} FOR EACH ROW @@ -276,7 +271,9 @@ BEGIN UPDATE {:} SET data = json_object({:}) WHERE id = NEW.id; -END", trigger_name, quoted_name, internal_name, json_fragment_new); +END", + trigger_name, quoted_name, internal_name, json_fragment_new + ); Ok(trigger) } else if insert_only { Ok(String::from("")) @@ -287,7 +284,8 @@ END", trigger_name, quoted_name, internal_name, json_fragment_new); create_sqlite_text_fn!( powersync_trigger_update_sql, - powersync_trigger_update_sql_impl, "powersync_trigger_update_sql" + powersync_trigger_update_sql_impl, + "powersync_trigger_update_sql" ); pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { diff --git a/dart/.gitignore b/dart/.gitignore new file mode 100644 index 0000000..f46c1ec --- /dev/null +++ b/dart/.gitignore @@ -0,0 +1 @@ +.dart_tool diff --git a/dart/README.md b/dart/README.md new file mode 100644 index 0000000..2ff344c --- /dev/null +++ b/dart/README.md @@ -0,0 +1,15 @@ +# Dart Tests + +This folder contains tests written in Dart, as a convenient higher-level language. + +The tests loads the compiled debug library. Before testing, build first using: + +```sh +cargo build -p powersync_loadable +``` + +Then test here: + +```sh +dart test +``` diff --git a/dart/pubspec.lock b/dart/pubspec.lock new file mode 100644 index 0000000..53c0592 --- /dev/null +++ b/dart/pubspec.lock @@ -0,0 +1,389 @@ +# Generated by pub +# See https://dart.dev/tools/pub/glossary#lockfile +packages: + _fe_analyzer_shared: + dependency: transitive + description: + name: _fe_analyzer_shared + sha256: "0b2f2bd91ba804e53a61d757b986f89f1f9eaed5b11e4b2f5a2468d86d6c9fc7" + url: "https://pub.dev" + source: hosted + version: "67.0.0" + analyzer: + dependency: transitive + description: + name: analyzer + sha256: "37577842a27e4338429a1cbc32679d508836510b056f1eedf0c8d20e39c1383d" + url: "https://pub.dev" + source: hosted + version: "6.4.1" + args: + dependency: transitive + description: + name: args + sha256: "7cf60b9f0cc88203c5a190b4cd62a99feea42759a7fa695010eb5de1c0b2252a" + url: "https://pub.dev" + source: hosted + version: "2.5.0" + async: + dependency: transitive + description: + name: async + sha256: "947bfcf187f74dbc5e146c9eb9c0f10c9f8b30743e341481c1e2ed3ecc18c20c" + url: "https://pub.dev" + source: hosted + version: "2.11.0" + boolean_selector: + dependency: transitive + description: + name: boolean_selector + sha256: "6cfb5af12253eaf2b368f07bacc5a80d1301a071c73360d746b7f2e32d762c66" + url: "https://pub.dev" + source: hosted + version: "2.1.1" + collection: + dependency: transitive + description: + name: collection + sha256: ee67cb0715911d28db6bf4af1026078bd6f0128b07a5f66fb2ed94ec6783c09a + url: "https://pub.dev" + source: hosted + version: "1.18.0" + convert: + dependency: transitive + description: + name: convert + sha256: "0f08b14755d163f6e2134cb58222dd25ea2a2ee8a195e53983d57c075324d592" + url: "https://pub.dev" + source: hosted + version: "3.1.1" + coverage: + dependency: transitive + description: + name: coverage + sha256: c1fb2dce3c0085f39dc72668e85f8e0210ec7de05345821ff58530567df345a5 + url: "https://pub.dev" + source: hosted + version: "1.9.2" + crypto: + dependency: transitive + description: + name: crypto + sha256: ec30d999af904f33454ba22ed9a86162b35e52b44ac4807d1d93c288041d7d27 + url: "https://pub.dev" + source: hosted + version: "3.0.5" + ffi: + dependency: transitive + description: + name: ffi + sha256: "16ed7b077ef01ad6170a3d0c57caa4a112a38d7a2ed5602e0aca9ca6f3d98da6" + url: "https://pub.dev" + source: hosted + version: "2.1.3" + file: + dependency: transitive + description: + name: file + sha256: "5fc22d7c25582e38ad9a8515372cd9a93834027aacf1801cf01164dac0ffa08c" + url: "https://pub.dev" + source: hosted + version: "7.0.0" + frontend_server_client: + dependency: transitive + description: + name: frontend_server_client + sha256: f64a0333a82f30b0cca061bc3d143813a486dc086b574bfb233b7c1372427694 + url: "https://pub.dev" + source: hosted + version: "4.0.0" + glob: + dependency: transitive + description: + name: glob + sha256: "0e7014b3b7d4dac1ca4d6114f82bf1782ee86745b9b42a92c9289c23d8a0ab63" + url: "https://pub.dev" + source: hosted + version: "2.1.2" + http_multi_server: + dependency: transitive + description: + name: http_multi_server + sha256: "97486f20f9c2f7be8f514851703d0119c3596d14ea63227af6f7a481ef2b2f8b" + url: "https://pub.dev" + source: hosted + version: "3.2.1" + http_parser: + dependency: transitive + description: + name: http_parser + sha256: "2aa08ce0341cc9b354a498388e30986515406668dbcc4f7c950c3e715496693b" + url: "https://pub.dev" + source: hosted + version: "4.0.2" + io: + dependency: transitive + description: + name: io + sha256: "2ec25704aba361659e10e3e5f5d672068d332fc8ac516421d483a11e5cbd061e" + url: "https://pub.dev" + source: hosted + version: "1.0.4" + js: + dependency: transitive + description: + name: js + sha256: c1b2e9b5ea78c45e1a0788d29606ba27dc5f71f019f32ca5140f61ef071838cf + url: "https://pub.dev" + source: hosted + version: "0.7.1" + logging: + dependency: transitive + description: + name: logging + sha256: "623a88c9594aa774443aa3eb2d41807a48486b5613e67599fb4c41c0ad47c340" + url: "https://pub.dev" + source: hosted + version: "1.2.0" + matcher: + dependency: transitive + description: + name: matcher + sha256: d2323aa2060500f906aa31a895b4030b6da3ebdcc5619d14ce1aada65cd161cb + url: "https://pub.dev" + source: hosted + version: "0.12.16+1" + meta: + dependency: transitive + description: + name: meta + sha256: "7687075e408b093f36e6bbf6c91878cc0d4cd10f409506f7bc996f68220b9136" + url: "https://pub.dev" + source: hosted + version: "1.12.0" + mime: + dependency: transitive + description: + name: mime + sha256: "801fd0b26f14a4a58ccb09d5892c3fbdeff209594300a542492cf13fba9d247a" + url: "https://pub.dev" + source: hosted + version: "1.0.6" + node_preamble: + dependency: transitive + description: + name: node_preamble + sha256: "6e7eac89047ab8a8d26cf16127b5ed26de65209847630400f9aefd7cd5c730db" + url: "https://pub.dev" + source: hosted + version: "2.0.2" + package_config: + dependency: transitive + description: + name: package_config + sha256: "1c5b77ccc91e4823a5af61ee74e6b972db1ef98c2ff5a18d3161c982a55448bd" + url: "https://pub.dev" + source: hosted + version: "2.1.0" + path: + dependency: transitive + description: + name: path + sha256: "087ce49c3f0dc39180befefc60fdb4acd8f8620e5682fe2476afd0b3688bb4af" + url: "https://pub.dev" + source: hosted + version: "1.9.0" + pool: + dependency: transitive + description: + name: pool + sha256: "20fe868b6314b322ea036ba325e6fc0711a22948856475e2c2b6306e8ab39c2a" + url: "https://pub.dev" + source: hosted + version: "1.5.1" + pub_semver: + dependency: transitive + description: + name: pub_semver + sha256: "40d3ab1bbd474c4c2328c91e3a7df8c6dd629b79ece4c4bd04bee496a224fb0c" + url: "https://pub.dev" + source: hosted + version: "2.1.4" + shelf: + dependency: transitive + description: + name: shelf + sha256: ad29c505aee705f41a4d8963641f91ac4cee3c8fad5947e033390a7bd8180fa4 + url: "https://pub.dev" + source: hosted + version: "1.4.1" + shelf_packages_handler: + dependency: transitive + description: + name: shelf_packages_handler + sha256: "89f967eca29607c933ba9571d838be31d67f53f6e4ee15147d5dc2934fee1b1e" + url: "https://pub.dev" + source: hosted + version: "3.0.2" + shelf_static: + dependency: transitive + description: + name: shelf_static + sha256: c87c3875f91262785dade62d135760c2c69cb217ac759485334c5857ad89f6e3 + url: "https://pub.dev" + source: hosted + version: "1.1.3" + shelf_web_socket: + dependency: transitive + description: + name: shelf_web_socket + sha256: "9ca081be41c60190ebcb4766b2486a7d50261db7bd0f5d9615f2d653637a84c1" + url: "https://pub.dev" + source: hosted + version: "1.0.4" + source_map_stack_trace: + dependency: transitive + description: + name: source_map_stack_trace + sha256: c0713a43e323c3302c2abe2a1cc89aa057a387101ebd280371d6a6c9fa68516b + url: "https://pub.dev" + source: hosted + version: "2.1.2" + source_maps: + dependency: transitive + description: + name: source_maps + sha256: "708b3f6b97248e5781f493b765c3337db11c5d2c81c3094f10904bfa8004c703" + url: "https://pub.dev" + source: hosted + version: "0.10.12" + source_span: + dependency: transitive + description: + name: source_span + sha256: "53e943d4206a5e30df338fd4c6e7a077e02254531b138a15aec3bd143c1a8b3c" + url: "https://pub.dev" + source: hosted + version: "1.10.0" + sqlite3: + dependency: "direct main" + description: + name: sqlite3 + sha256: "45f168ae2213201b54e09429ed0c593dc2c88c924a1488d6f9c523a255d567cb" + url: "https://pub.dev" + source: hosted + version: "2.4.6" + stack_trace: + dependency: transitive + description: + name: stack_trace + sha256: "73713990125a6d93122541237550ee3352a2d84baad52d375a4cad2eb9b7ce0b" + url: "https://pub.dev" + source: hosted + version: "1.11.1" + stream_channel: + dependency: transitive + description: + name: stream_channel + sha256: ba2aa5d8cc609d96bbb2899c28934f9e1af5cddbd60a827822ea467161eb54e7 + url: "https://pub.dev" + source: hosted + version: "2.1.2" + string_scanner: + dependency: transitive + description: + name: string_scanner + sha256: "688af5ed3402a4bde5b3a6c15fd768dbf2621a614950b17f04626c431ab3c4c3" + url: "https://pub.dev" + source: hosted + version: "1.3.0" + term_glyph: + dependency: transitive + description: + name: term_glyph + sha256: a29248a84fbb7c79282b40b8c72a1209db169a2e0542bce341da992fe1bc7e84 + url: "https://pub.dev" + source: hosted + version: "1.2.1" + test: + dependency: "direct dev" + description: + name: test + sha256: "7ee446762c2c50b3bd4ea96fe13ffac69919352bd3b4b17bac3f3465edc58073" + url: "https://pub.dev" + source: hosted + version: "1.25.2" + test_api: + dependency: transitive + description: + name: test_api + sha256: "9955ae474176f7ac8ee4e989dadfb411a58c30415bcfb648fa04b2b8a03afa7f" + url: "https://pub.dev" + source: hosted + version: "0.7.0" + test_core: + dependency: transitive + description: + name: test_core + sha256: "2bc4b4ecddd75309300d8096f781c0e3280ca1ef85beda558d33fcbedc2eead4" + url: "https://pub.dev" + source: hosted + version: "0.6.0" + typed_data: + dependency: transitive + description: + name: typed_data + sha256: facc8d6582f16042dd49f2463ff1bd6e2c9ef9f3d5da3d9b087e244a7b564b3c + url: "https://pub.dev" + source: hosted + version: "1.3.2" + vm_service: + dependency: transitive + description: + name: vm_service + sha256: "5c5f338a667b4c644744b661f309fb8080bb94b18a7e91ef1dbd343bed00ed6d" + url: "https://pub.dev" + source: hosted + version: "14.2.5" + watcher: + dependency: transitive + description: + name: watcher + sha256: "3d2ad6751b3c16cf07c7fca317a1413b3f26530319181b37e3b9039b84fc01d8" + url: "https://pub.dev" + source: hosted + version: "1.1.0" + web: + dependency: transitive + description: + name: web + sha256: d43c1d6b787bf0afad444700ae7f4db8827f701bc61c255ac8d328c6f4d52062 + url: "https://pub.dev" + source: hosted + version: "1.0.0" + web_socket_channel: + dependency: transitive + description: + name: web_socket_channel + sha256: d88238e5eac9a42bb43ca4e721edba3c08c6354d4a53063afaa568516217621b + url: "https://pub.dev" + source: hosted + version: "2.4.0" + webkit_inspection_protocol: + dependency: transitive + description: + name: webkit_inspection_protocol + sha256: "87d3f2333bb240704cd3f1c6b5b7acd8a10e7f0bc28c28dcf14e782014f4a572" + url: "https://pub.dev" + source: hosted + version: "1.2.1" + yaml: + dependency: transitive + description: + name: yaml + sha256: "75769501ea3489fca56601ff33454fe45507ea3bfb014161abc3b43ae25989d5" + url: "https://pub.dev" + source: hosted + version: "3.1.2" +sdks: + dart: ">=3.4.0 <4.0.0" diff --git a/dart/pubspec.yaml b/dart/pubspec.yaml new file mode 100644 index 0000000..f2cd1a3 --- /dev/null +++ b/dart/pubspec.yaml @@ -0,0 +1,10 @@ +name: powersync_sqlite_core_tests +publish_to: "none" +version: 0.0.1 +description: Tests for powersync-sqlite-core +environment: + sdk: ^3.4.0 +dependencies: + sqlite3: ^2.4.5 +dev_dependencies: + test: ^1.25.0 diff --git a/dart/test/migration_test.dart b/dart/test/migration_test.dart new file mode 100644 index 0000000..cf860b0 --- /dev/null +++ b/dart/test/migration_test.dart @@ -0,0 +1,179 @@ +import 'dart:convert'; + +import 'package:sqlite3/common.dart'; +import 'package:test/test.dart'; + +import 'utils/native_test_utils.dart'; +import 'utils/migration_fixtures.dart' as fixtures; +import 'utils/schema.dart'; + +void main() { + group('Migration Tests', () { + /// These tests test up and down migrations between various schema versions. + /// We hardcode the starting schema state of older library versions. + /// Note that down migrations are "simulated" using the current library version, + /// which could in theory be different from actually running the down + /// migrations on older library versions. + + late CommonDatabase db; + + setUp(() async { + db = openTestDatabase(); + }); + + tearDown(() { + db.dispose(); + }); + + /// This tests that the extension can load + test('extension setup', () async { + final row1 = db.select('select sqlite_version() as version').first; + print('sqlite ${row1['version']}'); + final row = db.select('select powersync_rs_version() as version').first; + print('powersync-sqlite-core ${row['version']}'); + }); + + /// This tests that the tests are setup correctly + test('match database version', () async { + expect( + fixtures.expectedState.keys.last, equals(fixtures.databaseVersion)); + }); + + /// Test the basic database setup (no migrations from other versions). + /// Get this test passing before any others below, since it tests the + /// finalState fixture, which is an input in other tests. + test('create database from scratch', () async { + db.select('select powersync_init()'); + final schema = '${getSchema(db)}\n${getMigrations(db)}'; + final expected = fixtures.finalState.trim(); + if (expected != schema) { + // This gives more usable output if the test fails + print('-- CURRENT SCHEMA:'); + print(schema); + } + expect(schema, equals(expected)); + }); + + // We test that we can _start_ at any state, and get to + // the same final state. We don't test individual target migrations. + for (var startState = 2; + startState <= fixtures.databaseVersion; + startState++) { + /// This tests with just the base tables + test('migrate from $startState', () async { + db.execute(fixtures.expectedState[startState]!); + db.select('select powersync_init()'); + final schema = '${getSchema(db)}\n${getMigrations(db)}'; + expect(schema, equals(fixtures.finalState.trim())); + }); + + /// This tests with some data + test('migrate from $startState with data1', () async { + db.execute(fixtures.expectedState[startState]!); + db.execute(fixtures.data1[startState]!); + db.select('select powersync_init()'); + final data = getData(db); + expect(data, equals(fixtures.finalData1.trim())); + }); + } + + for (var endState = 2; endState < fixtures.databaseVersion; endState++) { + /// Test that we can _start_ at the final state, and down migrate to + /// any version. + + /// This tests with just the base tables + test('migrate down to $endState', () async { + db.execute(fixtures.finalState); + db.select('select powersync_test_migration(?)', [endState]); + final schema = '${getSchema(db)}\n${getMigrations(db)}'; + expect(schema, equals(fixtures.expectedState[endState]!.trim())); + }); + + /// This tests with some data + test('migrate down to $endState with data1', () async { + db.execute(fixtures.finalState); + db.execute(fixtures.data1[fixtures.databaseVersion]!); + db.select('select powersync_test_migration(?)', [endState]); + final data = getData(db); + expect(data, equals(fixtures.dataDown1[endState]!.trim())); + }); + } + + /// Here we apply a developer schema _after_ migrating + test('schema after migration', () async { + db.execute(fixtures.expectedState[2]!); + var tableSchema = { + 'tables': [ + { + 'name': 'lists', + 'columns': [ + {'name': 'description', 'type': 'TEXT'} + ] + } + ] + }; + db.select('select powersync_init()'); + db.select( + 'select powersync_replace_schema(?)', [jsonEncode(tableSchema)]); + + final schema = getSchema(db); + final expected = + '${fixtures.finalState.replaceAll(RegExp(r';INSERT INTO ps_migration.*'), '').trim()}\n${fixtures.schema5.trim()}'; + expect(schema, equals(expected)); + }); + + /// Here we start with a schema from fixtures on db version 3 + test('schema 3 -> 5', () async { + db.execute(fixtures.expectedState[3]!); + db.execute(fixtures.schema3); + var tableSchema = { + 'tables': [ + { + 'name': 'lists', + 'columns': [ + {'name': 'description', 'type': 'TEXT'} + ] + } + ] + }; + db.select('select powersync_init()'); + db.select( + 'select powersync_replace_schema(?)', [jsonEncode(tableSchema)]); + + final schema = getSchema(db); + final expected = + '${fixtures.finalState.replaceAll(RegExp(r';INSERT INTO ps_migration.*'), '').trim()}\n${fixtures.schema5.trim()}'; + expect(schema, equals(expected)); + }); + + /// Here we start with a schema from fixtures on db version 5, + /// and down migrate to version 4. + test('schema 5 -> 4', () async { + db.execute(fixtures.expectedState[5]!); + db.execute(fixtures.schema5); + db.select('select powersync_test_migration(4)'); + + final schema = getSchema(db); + // Note that this schema contains no views - those are deleted during the migration + final expected = + '${fixtures.expectedState[4]!.replaceAll(RegExp(r';INSERT INTO ps_migration.*'), '').trim()}\n${fixtures.schemaDown3.trim()}'; + expect(schema, equals(expected)); + }); + + /// Here we start with a schema from fixtures on db version 5, + /// and down migrate to version 3. + /// While the schema for views and triggers is the same from version 2 -> 4, + /// some errors only occurred when going down two versions. + test('schema 5 -> 3', () async { + db.execute(fixtures.expectedState[5]!); + db.execute(fixtures.schema5); + db.select('select powersync_test_migration(3)'); + + final schema = getSchema(db); + // Note that this schema contains no views - those are deleted during the migration + final expected = + '${fixtures.expectedState[3]!.replaceAll(RegExp(r';INSERT INTO ps_migration.*'), '').trim()}\n${fixtures.schemaDown3.trim()}'; + expect(schema, equals(expected)); + }); + }); +} diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart new file mode 100644 index 0000000..0fa0740 --- /dev/null +++ b/dart/test/utils/migration_fixtures.dart @@ -0,0 +1,339 @@ +/// The current database version +const databaseVersion = 5; + +/// This is the base database state that we expect at various schema versions. +/// Generated by loading the specific library version, and exporting the schema. +const expectedState = { + 2: r''' +;CREATE TABLE ps_buckets( + name TEXT PRIMARY KEY, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +) +;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) +;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) +;CREATE TABLE ps_oplog( + bucket TEXT NOT NULL, + op_id INTEGER NOT NULL, + op INTEGER NOT NULL, + row_type TEXT, + row_id TEXT, + key TEXT, + data TEXT, + hash INTEGER NOT NULL, + superseded INTEGER NOT NULL) +;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) +;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) +;CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0 +;CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id) +;CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0 +;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) +;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') +''', + 3: r''' +;CREATE TABLE ps_buckets( + name TEXT PRIMARY KEY, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +) +;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) +;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) +;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) +;CREATE TABLE ps_oplog( + bucket TEXT NOT NULL, + op_id INTEGER NOT NULL, + op INTEGER NOT NULL, + row_type TEXT, + row_id TEXT, + key TEXT, + data TEXT, + hash INTEGER NOT NULL, + superseded INTEGER NOT NULL) +;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) +;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) +;CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0 +;CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id) +;CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0 +;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) +;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]') +''', + 4: r''' +;CREATE TABLE ps_buckets( + name TEXT PRIMARY KEY, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0) +;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) +;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) +;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) +;CREATE TABLE ps_oplog( + bucket TEXT NOT NULL, + op_id INTEGER NOT NULL, + op INTEGER NOT NULL, + row_type TEXT, + row_id TEXT, + key TEXT, + data TEXT, + hash INTEGER NOT NULL, + superseded INTEGER NOT NULL) +;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) +;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) +;CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0 +;CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id) +;CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0 +;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) +;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') +''', + 5: r''' +;CREATE TABLE ps_buckets( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + op_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +) STRICT +;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) +;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) +;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) +;CREATE TABLE ps_oplog( + bucket INTEGER NOT NULL, + op_id INTEGER NOT NULL, + row_type TEXT, + row_id TEXT, + key TEXT, + data TEXT, + hash INTEGER NOT NULL) STRICT +;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) +;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) +;CREATE TABLE ps_updated_rows( + row_type TEXT, + row_id TEXT) STRICT +;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name) +;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key) +;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id) +;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id) +;CREATE UNIQUE INDEX ps_updated_rows_row ON ps_updated_rows (row_type, row_id) +;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) +;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') +''' +}; + +final finalState = expectedState[databaseVersion]!; + +/// data to test "up" migrations +const data1 = { + 2: r''' +;INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, pending_delete) VALUES + ('b1', 0, 0, 0, 0, 0), + ('b2', 0, 0, 0, 1000, 0) +;INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded) VALUES + ('b1', 1, 3, 'todos', 't1', '', '{}', 100, 0), + ('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0), + ('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0), + ('b2', 4, 4, 'lists', 'l2', '', null, 5, 0) +''', + 3: r''' +;INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, pending_delete) VALUES + ('b1', 0, 0, 0, 0, 0), + ('b2', 0, 0, 0, 1000, 0) +;INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded) VALUES + ('b1', 1, 3, 'todos', 't1', '', '{}', 100, 0), + ('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0), + ('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0), + ('b2', 4, 4, 'lists', 'l2', '', null, 5, 0) +''', + 4: r''' +;INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES + ('b1', 0, 0, 0, 0, 120, 0), + ('b2', 0, 0, 0, 1000, 8, 0) +;INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded) VALUES + ('b1', 1, 3, 'todos', 't1', '', '{}', 100, 0), + ('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0), + ('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0), + ('b2', 4, 4, 'lists', 'l2', '', null, 5, 0) +''', + 5: r''' +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0), + (2, 'b2', 0, 0, 0, 1005, 3, 0) +;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES + (1, 1, 'todos', 't1', '', '{}', 100), + (1, 2, 'todos', 't2', '', '{}', 20), + (2, 3, 'lists', 'l1', '', '{}', 3) +;INSERT INTO ps_updated_rows(row_type, row_id) VALUES + ('lists', 'l2') +''' +}; + +/// data to test "down" migrations +/// This is slightly different from the above, +/// since we don't preserve all data in the migration process +const dataDown1 = { + 2: r''' +;INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, pending_delete) VALUES + ('$local', 0, 0, 9223372036854775807, 0, 1), + ('b1', 0, 0, 0, 0, 0), + ('b2', 0, 0, 0, 1005, 0) +;INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded) VALUES + ('$local', 1, 4, 'lists', 'l2', null, null, 0, 0), + ('b1', 1, 3, 'todos', 't1', '', '{}', 100, 0), + ('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0), + ('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0) +''', + 3: r''' +;INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, pending_delete) VALUES + ('$local', 0, 0, 9223372036854775807, 0, 1), + ('b1', 0, 0, 0, 0, 0), + ('b2', 0, 0, 0, 1005, 0) +;INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded) VALUES + ('$local', 1, 4, 'lists', 'l2', null, null, 0, 0), + ('b1', 1, 3, 'todos', 't1', '', '{}', 100, 0), + ('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0), + ('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0) +''', + 4: r''' +;INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, pending_delete, op_checksum, remove_operations) VALUES + ('$local', 0, 0, 9223372036854775807, 0, 1, 0, 0), + ('b1', 0, 0, 0, 0, 0, 120, 0), + ('b2', 0, 0, 0, 1005, 0, 3, 0) +;INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded) VALUES + ('$local', 1, 4, 'lists', 'l2', null, null, 0, 0), + ('b1', 1, 3, 'todos', 't1', '', '{}', 100, 0), + ('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0), + ('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0) +''' +}; + +final finalData1 = data1[databaseVersion]!; + +/// Starting developer schema at database version 3. +const schema3 = r''' +;CREATE TABLE "ps_data__lists"(id TEXT PRIMARY KEY NOT NULL, data TEXT) +;CREATE VIEW "lists"("id", "description") AS SELECT id, CAST(json_extract(data, '$.description') as TEXT) FROM "ps_data__lists" -- powersync-auto-generated +;CREATE TRIGGER "ps_view_delete_lists" +INSTEAD OF DELETE ON "lists" +FOR EACH ROW +BEGIN +DELETE FROM "ps_data__lists" WHERE id = OLD.id; +INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'DELETE', 'type', 'lists', 'id', OLD.id)); +INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded) + SELECT '$local', + 1, + 'REMOVE', + 'lists', + OLD.id, + 0, + 0; +INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES('$local', 1, 0, 9223372036854775807); +END +;CREATE TRIGGER "ps_view_insert_lists" + INSTEAD OF INSERT ON "lists" + FOR EACH ROW + BEGIN + SELECT CASE + WHEN (NEW.id IS NULL) + THEN RAISE (FAIL, 'id is required') + END; + INSERT INTO "ps_data__lists" + SELECT NEW.id, json_object('description', NEW."description"); + INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'PUT', 'type', 'lists', 'id', NEW.id, 'data', json(powersync_diff('{}', json_object('description', NEW."description"))))); + INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded) + SELECT '$local', + 1, + 'REMOVE', + 'lists', + NEW.id, + 0, + 0; + INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES('$local', 1, 0, 9223372036854775807); + END +;CREATE TRIGGER "ps_view_update_lists" +INSTEAD OF UPDATE ON "lists" +FOR EACH ROW +BEGIN + SELECT CASE + WHEN (OLD.id != NEW.id) + THEN RAISE (FAIL, 'Cannot update id') + END; + UPDATE "ps_data__lists" + SET data = json_object('description', NEW."description") + WHERE id = NEW.id; + INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'PATCH', 'type', 'lists', 'id', NEW.id, 'data', json(powersync_diff(json_object('description', OLD."description"), json_object('description', NEW."description"))))); + INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded) + SELECT '$local', + 1, + 'REMOVE', + 'lists', + NEW.id, + 0, + 0; + INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES('$local', 1, 0, 9223372036854775807); +END +'''; + +/// Expected developer schema when migrating down from 5 -> 4 or 5 -> 3. +const schemaDown3 = r''' +;CREATE TABLE "ps_data__lists"(id TEXT PRIMARY KEY NOT NULL, data TEXT) +'''; + +/// Expected developer schema at database version 5. +const schema5 = r''' +;CREATE TABLE "ps_data__lists"(id TEXT PRIMARY KEY NOT NULL, data TEXT) +;CREATE VIEW "lists"("id", "description") AS SELECT id, CAST(json_extract(data, '$.description') as TEXT) FROM "ps_data__lists" -- powersync-auto-generated +;CREATE TRIGGER "ps_view_delete_lists" +INSTEAD OF DELETE ON "lists" +FOR EACH ROW +BEGIN +DELETE FROM "ps_data__lists" WHERE id = OLD.id; +INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'DELETE', 'type', 'lists', 'id', OLD.id)); +INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES('lists', OLD.id); +INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, 9223372036854775807); +END +;CREATE TRIGGER "ps_view_insert_lists" + INSTEAD OF INSERT ON "lists" + FOR EACH ROW + BEGIN + SELECT CASE + WHEN (NEW.id IS NULL) + THEN RAISE (FAIL, 'id is required') + END; + INSERT INTO "ps_data__lists" + SELECT NEW.id, json_object('description', NEW."description"); + INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'PUT', 'type', 'lists', 'id', NEW.id, 'data', json(powersync_diff('{}', json_object('description', NEW."description"))))); + INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES('lists', NEW.id); + INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, 9223372036854775807); + END +;CREATE TRIGGER "ps_view_update_lists" +INSTEAD OF UPDATE ON "lists" +FOR EACH ROW +BEGIN + SELECT CASE + WHEN (OLD.id != NEW.id) + THEN RAISE (FAIL, 'Cannot update id') + END; + UPDATE "ps_data__lists" + SET data = json_object('description', NEW."description") + WHERE id = NEW.id; + INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'PATCH', 'type', 'lists', 'id', NEW.id, 'data', json(powersync_diff(json_object('description', OLD."description"), json_object('description', NEW."description"))))); + INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES('lists', NEW.id); + INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, 9223372036854775807); +END +'''; diff --git a/dart/test/utils/native_test_utils.dart b/dart/test/utils/native_test_utils.dart new file mode 100644 index 0000000..284142e --- /dev/null +++ b/dart/test/utils/native_test_utils.dart @@ -0,0 +1,50 @@ +import 'dart:ffi'; + +import 'package:sqlite3/common.dart'; +import 'package:sqlite3/open.dart' as sqlite_open; +import 'package:sqlite3/sqlite3.dart'; + +const defaultSqlitePath = 'libsqlite3.so.0'; + +const libPath = '../target/debug'; + +CommonDatabase openTestDatabase() { + sqlite_open.open.overrideFor(sqlite_open.OperatingSystem.linux, () { + return DynamicLibrary.open('libsqlite3.so.0'); + }); + sqlite_open.open.overrideFor(sqlite_open.OperatingSystem.macOS, () { + return DynamicLibrary.open('libsqlite3.dylib'); + }); + var lib = DynamicLibrary.open(getLibraryForPlatform(path: libPath)); + var extension = SqliteExtension.inLibrary(lib, 'sqlite3_powersync_init'); + sqlite3.ensureExtensionLoaded(extension); + return sqlite3.open(':memory:'); +} + +String getLibraryForPlatform({String? path = "."}) { + switch (Abi.current()) { + case Abi.androidArm: + case Abi.androidArm64: + case Abi.androidX64: + return '$path/libpowersync.so'; + case Abi.macosArm64: + case Abi.macosX64: + return '$path/libpowersync.dylib'; + case Abi.linuxX64: + case Abi.linuxArm64: + return '$path/libpowersync.so'; + case Abi.windowsX64: + return '$path/powersync.dll'; + case Abi.androidIA32: + throw ArgumentError( + 'Unsupported processor architecture. X86 Android emulators are not ' + 'supported. Please use an x86_64 emulator instead. All physical ' + 'Android devices are supported including 32bit ARM.', + ); + default: + throw ArgumentError( + 'Unsupported processor architecture "${Abi.current()}". ' + 'Please open an issue on GitHub to request it.', + ); + } +} diff --git a/dart/test/utils/schema.dart b/dart/test/utils/schema.dart new file mode 100644 index 0000000..2775032 --- /dev/null +++ b/dart/test/utils/schema.dart @@ -0,0 +1,110 @@ +import 'package:sqlite3/common.dart'; + +/// Utilities for getting the SQLite schema + +/// Get tables, indexes, views and triggers, as one big string +String getSchema(CommonDatabase db) { + final rows = db.select(""" +SELECT type, name, sql FROM sqlite_master ORDER BY + CASE + WHEN type = 'table' AND name LIKE 'ps_data_%' THEN 3 + WHEN type = 'table' THEN 1 + WHEN type = 'index' THEN 2 + WHEN type = 'view' THEN 4 + WHEN type = 'trigger' THEN 5 + END ASC, name ASC"""); + + List result = []; + for (var row in rows) { + if (row['name'].startsWith('__') || row['name'] == 'sqlite_sequence') { + // Internal SQLite tables. + continue; + } + if (row['sql'] != null) { + var sql = (row['sql'] as String).trim(); + // We put a semicolon before each statement instead of after, + // so that comments at the end of the statement are preserved. + result.add(';$sql'); + } + } + return result.join('\n'); +} + +/// Get data from the ps_migration table +String getMigrations(CommonDatabase db) { + List result = []; + var migrationRows = + db.select('SELECT id, down_migrations FROM ps_migration ORDER BY id ASC'); + + for (var row in migrationRows) { + var version = row['id']!; + var downMigrations = row['down_migrations']; + if (downMigrations == null) { + result.add( + ';INSERT INTO ps_migration(id, down_migrations) VALUES($version, null)'); + } else { + result.add( + ';INSERT INTO ps_migration(id, down_migrations) VALUES($version, ${escapeSqlString(downMigrations)})'); + } + } + return result.join('\n'); +} + +/// Get data from specific tables, as INSERT INTO statements. +String getData(CommonDatabase db) { + const queries = [ + {'table': 'ps_buckets', 'query': 'select * from ps_buckets order by name'}, + { + 'table': 'ps_oplog', + 'query': 'select * from ps_oplog order by bucket, op_id' + }, + { + 'table': 'ps_updated_rows', + 'query': 'select * from ps_updated_rows order by row_type, row_id' + } + ]; + List result = []; + for (var q in queries) { + try { + final rs = db.select(q['query']!); + if (rs.isEmpty) { + continue; + } + + result.add( + ';INSERT INTO ${q['table']}(${rs.columnNames.join(', ')}) VALUES'); + var values = rs.rows + .map((row) => + '(${row.map((column) => escapeSqlLiteral(column)).join(', ')})') + .join(',\n '); + result.add(' $values'); + } catch (e) { + if (e.toString().contains('no such table')) { + // Table doesn't exist - ignore + } else { + rethrow; + } + } + } + return result.join('\n'); +} + +/// Escape an integer, string or null value as a literal for a query. +String escapeSqlLiteral(dynamic value) { + if (value == null) { + return 'null'; + } else if (value is String) { + return escapeSqlString(value); + } else if (value is int) { + return '$value'; + } else { + throw ArgumentError('Unsupported value type: $value'); + } +} + +/// Quote a string for usage in a SQLite query. +/// +/// Not safe for general usage, but should be sufficient for these tests. +String escapeSqlString(String text) { + return """'${text.replaceAll(RegExp(r"'"), "''")}'"""; +}