diff --git a/README.md b/README.md index 12cd571e..e7ccc74e 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,7 @@ This would stop developer 1's migration from ever running if you were using cont refinery works by creating a table that keeps all the applied migrations' versions and their metadata. When you [run](https://docs.rs/refinery/latest/refinery/struct.Runner.html#method.run) the migrations `Runner`, refinery compares the applied migrations with the ones to be applied, checking for [divergent](https://docs.rs/refinery/latest/refinery/struct.Runner.html#method.set_abort_divergent) and [missing](https://docs.rs/refinery/latest/refinery/struct.Runner.html#method.set_abort_missing) and executing unapplied migrations.\ By default, refinery runs each migration in a single transaction. Alternatively, you can also configure refinery to wrap the entire execution of all migrations in a single transaction by setting [set_grouped](https://docs.rs/refinery/latest/refinery/struct.Runner.html#method.set_grouped) to true. +Directive `-- +refinery NO TRANSACTION` can be used to escape running a migration within a transaction. [!IMPORTANT]: `set_grouped` is incompatible with the no transaction directive. The rust crate intentionally ignores new migration files until your sourcecode is rebuild. This prevents accidental migrations and altering the database schema without any code changes. We can also bake the migrations into the binary, so no additional files are needed when deployed. ### Rollback diff --git a/examples/migrations/V4__insert_entries_to_cars.sql b/examples/migrations/V4__insert_entries_to_cars.sql new file mode 100644 index 00000000..51ae57b3 --- /dev/null +++ b/examples/migrations/V4__insert_entries_to_cars.sql @@ -0,0 +1,4 @@ +-- +refinery NO TRANSACTION +BEGIN; +INSERT INTO cars(id, name, brand) VALUES (2, "muscle", "toyota"); +COMMIT; diff --git a/refinery/src/lib.rs b/refinery/src/lib.rs index 945303cc..915d5930 100644 --- a/refinery/src/lib.rs +++ b/refinery/src/lib.rs @@ -33,7 +33,8 @@ for more examples refer to the [examples](https://github.com/rust-db/refinery/tr pub use refinery_core::config; pub use refinery_core::{ - error, load_sql_migrations, Error, Migration, Report, Runner, SchemaVersion, Target, + error, load_sql_migrations, Error, Migration, MigrationFlags, Report, Runner, SchemaVersion, + Target, }; #[doc(hidden)] pub use refinery_core::{AsyncMigrate, Migrate}; diff --git a/refinery/tests/mysql.rs b/refinery/tests/mysql.rs index de2a25d6..b1979c88 100644 --- a/refinery/tests/mysql.rs +++ b/refinery/tests/mysql.rs @@ -35,30 +35,38 @@ mod mysql { fn get_migrations() -> Vec { embed_migrations!("./tests/migrations"); - let migration1 = - Migration::unapplied("V1__initial.rs", &migrations::V1__initial::migration()).unwrap(); + let migration1 = Migration::unapplied( + "V1__initial.rs", + &migrations::V1__initial::migration(), + Default::default(), + ) + .unwrap(); let migration2 = Migration::unapplied( "V2__add_cars_and_motos_table.sql", include_str!("./migrations/V1-2/V2__add_cars_and_motos_table.sql"), + Default::default(), ) .unwrap(); let migration3 = Migration::unapplied( "V3__add_brand_to_cars_table", include_str!("./migrations/V3/V3__add_brand_to_cars_table.sql"), + Default::default(), ) .unwrap(); let migration4 = Migration::unapplied( "V4__add_year_to_motos_table.rs", &migrations::V4__add_year_to_motos_table::migration(), + Default::default(), ) .unwrap(); let migration5 = Migration::unapplied( "V5__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); @@ -471,6 +479,7 @@ mod mysql { let migration = Migration::unapplied( "V4__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); let err = conn @@ -507,6 +516,7 @@ mod mysql { let migration = Migration::unapplied( "V2__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); let err = conn @@ -550,12 +560,14 @@ mod mysql { "city varchar(255)", ");" ), + Default::default(), ) .unwrap(); let migration2 = Migration::unapplied( "V2__add_cars_table", include_str!("./migrations_missing/V2__add_cars_table.sql"), + Default::default(), ) .unwrap(); let err = conn diff --git a/refinery/tests/mysql_async.rs b/refinery/tests/mysql_async.rs index a9069b7c..5333e17b 100644 --- a/refinery/tests/mysql_async.rs +++ b/refinery/tests/mysql_async.rs @@ -19,30 +19,38 @@ mod mysql_async { fn get_migrations() -> Vec { embed_migrations!("./tests/migrations"); - let migration1 = - Migration::unapplied("V1__initial.rs", &migrations::V1__initial::migration()).unwrap(); + let migration1 = Migration::unapplied( + "V1__initial.rs", + &migrations::V1__initial::migration(), + Default::default(), + ) + .unwrap(); let migration2 = Migration::unapplied( "V2__add_cars_and_motos_table.sql", include_str!("./migrations/V1-2/V2__add_cars_and_motos_table.sql"), + Default::default(), ) .unwrap(); let migration3 = Migration::unapplied( "V3__add_brand_to_cars_table", include_str!("./migrations/V3/V3__add_brand_to_cars_table.sql"), + Default::default(), ) .unwrap(); let migration4 = Migration::unapplied( "V4__add_year_to_motos_table.rs", &migrations::V4__add_year_to_motos_table::migration(), + Default::default(), ) .unwrap(); let migration5 = Migration::unapplied( "V5__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); @@ -481,6 +489,7 @@ mod mysql_async { let migration = Migration::unapplied( "V4__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); @@ -526,6 +535,7 @@ mod mysql_async { let migration = Migration::unapplied( "V2__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); @@ -573,12 +583,14 @@ mod mysql_async { "city varchar(255)", ");" ), + Default::default(), ) .unwrap(); let migration2 = Migration::unapplied( "V2__add_cars_table", include_str!("./migrations_missing/V2__add_cars_table.sql"), + Default::default(), ) .unwrap(); let err = pool diff --git a/refinery/tests/postgres.rs b/refinery/tests/postgres.rs index 00793694..53fe3420 100644 --- a/refinery/tests/postgres.rs +++ b/refinery/tests/postgres.rs @@ -6,7 +6,8 @@ mod postgres { use predicates::str::contains; use refinery::config::ConfigDbType; use refinery::{ - config::Config, embed_migrations, error::Kind, Migrate, Migration, Runner, Target, + config::Config, embed_migrations, error::Kind, Migrate, Migration, MigrationFlags, Runner, + Target, }; use refinery_core::postgres::{Client, NoTls}; use std::process::Command; @@ -43,34 +44,53 @@ mod postgres { fn get_migrations() -> Vec { embed_migrations!("./tests/migrations"); - let migration1 = - Migration::unapplied("V1__initial.rs", &migrations::V1__initial::migration()).unwrap(); + let migration1 = Migration::unapplied( + "V1__initial.rs", + &migrations::V1__initial::migration(), + Default::default(), + ) + .unwrap(); let migration2 = Migration::unapplied( "V2__add_cars_and_motos_table.sql", include_str!("./migrations/V1-2/V2__add_cars_and_motos_table.sql"), + Default::default(), ) .unwrap(); let migration3 = Migration::unapplied( "V3__add_brand_to_cars_table", include_str!("./migrations/V3/V3__add_brand_to_cars_table.sql"), + Default::default(), ) .unwrap(); let migration4 = Migration::unapplied( "V4__add_year_to_motos_table.rs", &migrations::V4__add_year_to_motos_table::migration(), + Default::default(), ) .unwrap(); let migration5 = Migration::unapplied( "V5__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), + ) + .unwrap(); + + let migration6 = Migration::unapplied( + "V6__index_motos_table_concurrently", + "CREATE INDEX CONCURRENTLY motos_name ON motos(name);", + MigrationFlags { + run_in_transaction: false, + }, ) .unwrap(); - vec![migration1, migration2, migration3, migration4, migration5] + vec![ + migration1, migration2, migration3, migration4, migration5, migration6, + ] } fn prep_database() { @@ -385,7 +405,7 @@ mod postgres { let migrations = get_migrations(); - let mchecksum = migrations[4].checksum(); + let mchecksum = migrations[5].checksum(); client .migrate( &migrations, @@ -402,11 +422,42 @@ mod postgres { .unwrap() .unwrap(); - assert_eq!(5, current.version()); + assert_eq!(6, current.version()); assert_eq!(mchecksum, current.checksum()); }); } + #[test] + fn no_transaction_fails_in_set_grouped() { + run_test(|| { + let mut client = Client::connect(&db_uri(), NoTls).unwrap(); + + embedded::migrations::runner().run(&mut client).unwrap(); + + let migrations = get_migrations(); + + let mchecksum = migrations[5].checksum(); + let err = client + .migrate( + &migrations, + true, + true, + true, + Target::Latest, + DEFAULT_TABLE_NAME, + ) + .unwrap_err(); + + match err.kind() { + Kind::NoTransactionGroupedMigration(last) => { + assert_eq!(6, last.version()); + assert_eq!(mchecksum, last.checksum()); + } + _ => panic!("failed test"), + } + }); + } + #[test] fn migrates_to_target_migration() { run_test(|| { @@ -488,6 +539,7 @@ mod postgres { let migration = Migration::unapplied( "V4__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); let err = client @@ -521,6 +573,7 @@ mod postgres { let migration = Migration::unapplied( "V2__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); let err = client @@ -561,12 +614,14 @@ mod postgres { "city varchar(255)", ");" ), + Default::default(), ) .unwrap(); let migration2 = Migration::unapplied( "V2__add_cars_table", include_str!("./migrations_missing/V2__add_cars_table.sql"), + Default::default(), ) .unwrap(); let err = client @@ -603,25 +658,28 @@ mod postgres { runner.run(&mut config).unwrap(); let applied_migrations = runner.get_applied_migrations(&mut config).unwrap(); - assert_eq!(5, applied_migrations.len()); + assert_eq!(6, applied_migrations.len()); assert_eq!(migrations[0].version(), applied_migrations[0].version()); assert_eq!(migrations[1].version(), applied_migrations[1].version()); assert_eq!(migrations[2].version(), applied_migrations[2].version()); assert_eq!(migrations[3].version(), applied_migrations[3].version()); assert_eq!(migrations[4].version(), applied_migrations[4].version()); + assert_eq!(migrations[5].version(), applied_migrations[5].version()); assert_eq!(migrations[0].name(), migrations[0].name()); assert_eq!(migrations[1].name(), applied_migrations[1].name()); assert_eq!(migrations[2].name(), applied_migrations[2].name()); assert_eq!(migrations[3].name(), applied_migrations[3].name()); assert_eq!(migrations[4].name(), applied_migrations[4].name()); + assert_eq!(migrations[5].name(), applied_migrations[5].name()); assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); assert_eq!(migrations[4].checksum(), applied_migrations[4].checksum()); + assert_eq!(migrations[5].checksum(), applied_migrations[5].checksum()); }) } @@ -639,25 +697,28 @@ mod postgres { let report = runner.run(&mut config).unwrap(); let applied_migrations = report.applied_migrations(); - assert_eq!(5, applied_migrations.len()); + assert_eq!(6, applied_migrations.len()); assert_eq!(migrations[0].version(), applied_migrations[0].version()); assert_eq!(migrations[1].version(), applied_migrations[1].version()); assert_eq!(migrations[2].version(), applied_migrations[2].version()); assert_eq!(migrations[3].version(), applied_migrations[3].version()); assert_eq!(migrations[4].version(), applied_migrations[4].version()); + assert_eq!(migrations[5].version(), applied_migrations[5].version()); assert_eq!(migrations[0].name(), migrations[0].name()); assert_eq!(migrations[1].name(), applied_migrations[1].name()); assert_eq!(migrations[2].name(), applied_migrations[2].name()); assert_eq!(migrations[3].name(), applied_migrations[3].name()); assert_eq!(migrations[4].name(), applied_migrations[4].name()); + assert_eq!(migrations[5].name(), applied_migrations[5].name()); assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); assert_eq!(migrations[4].checksum(), applied_migrations[4].checksum()); + assert_eq!(migrations[5].checksum(), applied_migrations[5].checksum()); }) } @@ -678,11 +739,11 @@ mod postgres { .get_last_applied_migration(&mut config) .unwrap() .unwrap(); - assert_eq!(5, applied_migration.version()); + assert_eq!(6, applied_migration.version()); - assert_eq!(migrations[4].version(), applied_migration.version()); - assert_eq!(migrations[4].name(), applied_migration.name()); - assert_eq!(migrations[4].checksum(), applied_migration.checksum()); + assert_eq!(migrations[5].version(), applied_migration.version()); + assert_eq!(migrations[5].name(), applied_migration.name()); + assert_eq!(migrations[5].checksum(), applied_migration.checksum()); }) } @@ -791,16 +852,16 @@ mod postgres { let report = runner.run(&mut config).unwrap(); let applied_migrations = report.applied_migrations(); - assert_eq!(5, applied_migrations.len()); + assert_eq!(6, applied_migrations.len()); let last_migration = runner .get_last_applied_migration(&mut config) .unwrap() .unwrap(); - assert_eq!(5, last_migration.version()); - assert_eq!(migrations[4].name(), last_migration.name()); - assert_eq!(migrations[4].checksum(), last_migration.checksum()); + assert_eq!(6, last_migration.version()); + assert_eq!(migrations[5].name(), last_migration.name()); + assert_eq!(migrations[5].checksum(), last_migration.checksum()); assert!(config.use_tls()); }); diff --git a/refinery/tests/rusqlite.rs b/refinery/tests/rusqlite.rs index f7807d69..8215d1fa 100644 --- a/refinery/tests/rusqlite.rs +++ b/refinery/tests/rusqlite.rs @@ -8,7 +8,7 @@ mod rusqlite { config::{Config, ConfigDbType}, embed_migrations, error::Kind, - Migrate, Migration, Runner, Target, + Migrate, Migration, MigrationFlags, Runner, Target, }; use refinery_core::rusqlite::Error; use refinery_core::rusqlite::{Connection, OptionalExtension}; @@ -50,34 +50,55 @@ mod rusqlite { fn get_migrations() -> Vec { embed_migrations!("./tests/migrations"); - let migration1 = - Migration::unapplied("V1__initial.rs", &migrations::V1__initial::migration()).unwrap(); + let migration1 = Migration::unapplied( + "V1__initial.rs", + &migrations::V1__initial::migration(), + Default::default(), + ) + .unwrap(); let migration2 = Migration::unapplied( "V2__add_cars_and_motos_table.sql", include_str!("./migrations/V1-2/V2__add_cars_and_motos_table.sql"), + Default::default(), ) .unwrap(); let migration3 = Migration::unapplied( "V3__add_brand_to_cars_table", include_str!("./migrations/V3/V3__add_brand_to_cars_table.sql"), + Default::default(), ) .unwrap(); let migration4 = Migration::unapplied( "V4__add_year_to_motos_table.rs", &migrations::V4__add_year_to_motos_table::migration(), + Default::default(), ) .unwrap(); let migration5 = Migration::unapplied( "V5__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), + ) + .unwrap(); + + let migration6 = Migration::unapplied( + "V6__add_entries_to_cars", + r#"BEGIN; + INSERT INTO cars(id, name, brand) VALUES (2, "muscle", "toyota"); + COMMIT;"#, + MigrationFlags { + run_in_transaction: false, + }, ) .unwrap(); - vec![migration1, migration2, migration3, migration4, migration5] + vec![ + migration1, migration2, migration3, migration4, migration5, migration6, + ] } #[test] @@ -438,7 +459,7 @@ mod rusqlite { let migrations = get_migrations(); - let mchecksum = migrations[4].checksum(); + let mchecksum = migrations[5].checksum(); conn.migrate( &migrations, true, @@ -454,7 +475,7 @@ mod rusqlite { .unwrap() .unwrap(); - assert_eq!(5, current.version()); + assert_eq!(6, current.version()); assert_eq!(mchecksum, current.checksum()); } @@ -570,6 +591,7 @@ mod rusqlite { let migration = Migration::unapplied( "V4__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); let err = conn @@ -601,6 +623,7 @@ mod rusqlite { let migration = Migration::unapplied( "V2__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); let err = conn @@ -639,12 +662,14 @@ mod rusqlite { "city varchar(255)", ");" ), + Default::default(), ) .unwrap(); let migration2 = Migration::unapplied( "V2__add_cars_table", include_str!("./migrations_missing/V2__add_cars_table.sql"), + Default::default(), ) .unwrap(); let err = conn @@ -680,25 +705,28 @@ mod rusqlite { runner.run(&mut config).unwrap(); let applied_migrations = runner.get_applied_migrations(&mut config).unwrap(); - assert_eq!(5, applied_migrations.len()); + assert_eq!(6, applied_migrations.len()); assert_eq!(migrations[0].version(), applied_migrations[0].version()); assert_eq!(migrations[1].version(), applied_migrations[1].version()); assert_eq!(migrations[2].version(), applied_migrations[2].version()); assert_eq!(migrations[3].version(), applied_migrations[3].version()); assert_eq!(migrations[4].version(), applied_migrations[4].version()); + assert_eq!(migrations[5].version(), applied_migrations[5].version()); assert_eq!(migrations[0].name(), migrations[0].name()); assert_eq!(migrations[1].name(), applied_migrations[1].name()); assert_eq!(migrations[2].name(), applied_migrations[2].name()); assert_eq!(migrations[3].name(), applied_migrations[3].name()); assert_eq!(migrations[4].name(), applied_migrations[4].name()); + assert_eq!(migrations[5].name(), applied_migrations[5].name()); assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); assert_eq!(migrations[4].checksum(), applied_migrations[4].checksum()); + assert_eq!(migrations[5].checksum(), applied_migrations[5].checksum()); } #[test] @@ -715,25 +743,28 @@ mod rusqlite { let report = runner.run(&mut config).unwrap(); let applied_migrations = report.applied_migrations(); - assert_eq!(5, applied_migrations.len()); + assert_eq!(6, applied_migrations.len()); assert_eq!(migrations[0].version(), applied_migrations[0].version()); assert_eq!(migrations[1].version(), applied_migrations[1].version()); assert_eq!(migrations[2].version(), applied_migrations[2].version()); assert_eq!(migrations[3].version(), applied_migrations[3].version()); assert_eq!(migrations[4].version(), applied_migrations[4].version()); + assert_eq!(migrations[5].version(), applied_migrations[5].version()); assert_eq!(migrations[0].name(), migrations[0].name()); assert_eq!(migrations[1].name(), applied_migrations[1].name()); assert_eq!(migrations[2].name(), applied_migrations[2].name()); assert_eq!(migrations[3].name(), applied_migrations[3].name()); assert_eq!(migrations[4].name(), applied_migrations[4].name()); + assert_eq!(migrations[5].name(), applied_migrations[5].name()); assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); assert_eq!(migrations[4].checksum(), applied_migrations[4].checksum()); + assert_eq!(migrations[5].checksum(), applied_migrations[5].checksum()); } #[test] @@ -753,11 +784,11 @@ mod rusqlite { .get_last_applied_migration(&mut config) .unwrap() .unwrap(); - assert_eq!(5, applied_migration.version()); + assert_eq!(6, applied_migration.version()); - assert_eq!(migrations[4].version(), applied_migration.version()); - assert_eq!(migrations[4].name(), applied_migration.name()); - assert_eq!(migrations[4].checksum(), applied_migration.checksum()); + assert_eq!(migrations[5].version(), applied_migration.version()); + assert_eq!(migrations[5].name(), applied_migration.name()); + assert_eq!(migrations[5].checksum(), applied_migration.checksum()); } #[test] diff --git a/refinery/tests/tiberius.rs b/refinery/tests/tiberius.rs index e6dcb8d4..23243cc4 100644 --- a/refinery/tests/tiberius.rs +++ b/refinery/tests/tiberius.rs @@ -22,30 +22,38 @@ mod tiberius { fn get_migrations() -> Vec { embed_migrations!("./tests/migrations"); - let migration1 = - Migration::unapplied("V1__initial.rs", &migrations::V1__initial::migration()).unwrap(); + let migration1 = Migration::unapplied( + "V1__initial.rs", + &migrations::V1__initial::migration(), + Default::default(), + ) + .unwrap(); let migration2 = Migration::unapplied( "V2__add_cars_and_motos_table.sql", include_str!("./migrations/V1-2/V2__add_cars_and_motos_table.sql"), + Default::default(), ) .unwrap(); let migration3 = Migration::unapplied( "V3__add_brand_to_cars_table", include_str!("./migrations/V3/V3__add_brand_to_cars_table.sql"), + Default::default(), ) .unwrap(); let migration4 = Migration::unapplied( "V4__add_year_to_motos_table.rs", &migrations::V4__add_year_to_motos_table::migration(), + Default::default(), ) .unwrap(); let migration5 = Migration::unapplied( "V5__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); @@ -128,6 +136,7 @@ mod tiberius { let migration = Migration::unapplied( "V4__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); let err = client @@ -179,6 +188,7 @@ mod tiberius { let migration = Migration::unapplied( "V2__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); let err = client @@ -237,12 +247,14 @@ mod tiberius { "city varchar(255)", ");" ), + Default::default(), ) .unwrap(); let migration2 = Migration::unapplied( "V2__add_cars_table", include_str!("./migrations_missing/V2__add_cars_table.sql"), + Default::default(), ) .unwrap(); let err = client diff --git a/refinery/tests/tokio_postgres.rs b/refinery/tests/tokio_postgres.rs index 304c85e7..c7a86c5f 100644 --- a/refinery/tests/tokio_postgres.rs +++ b/refinery/tests/tokio_postgres.rs @@ -7,7 +7,7 @@ mod tokio_postgres { config::{Config, ConfigDbType}, embed_migrations, error::Kind, - AsyncMigrate, Migration, Runner, Target, + AsyncMigrate, Migration, MigrationFlags, Runner, Target, }; use refinery_core::tokio_postgres; use refinery_core::tokio_postgres::NoTls; @@ -20,34 +20,53 @@ mod tokio_postgres { fn get_migrations() -> Vec { embed_migrations!("./tests/migrations"); - let migration1 = - Migration::unapplied("V1__initial.rs", &migrations::V1__initial::migration()).unwrap(); + let migration1 = Migration::unapplied( + "V1__initial.rs", + &migrations::V1__initial::migration(), + Default::default(), + ) + .unwrap(); let migration2 = Migration::unapplied( "V2__add_cars_and_motos_table.sql", include_str!("./migrations/V1-2/V2__add_cars_and_motos_table.sql"), + Default::default(), ) .unwrap(); let migration3 = Migration::unapplied( "V3__add_brand_to_cars_table", include_str!("./migrations/V3/V3__add_brand_to_cars_table.sql"), + Default::default(), ) .unwrap(); let migration4 = Migration::unapplied( "V4__add_year_to_motos_table.rs", &migrations::V4__add_year_to_motos_table::migration(), + Default::default(), ) .unwrap(); let migration5 = Migration::unapplied( "V5__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), + ) + .unwrap(); + + let migration6 = Migration::unapplied( + "V6__index_motos_table_concurrently", + "CREATE INDEX CONCURRENTLY motos_name ON motos(name);", + MigrationFlags { + run_in_transaction: false, + }, ) .unwrap(); - vec![migration1, migration2, migration3, migration4, migration5] + vec![ + migration1, migration2, migration3, migration4, migration5, migration6, + ] } mod embedded { @@ -484,7 +503,7 @@ mod tokio_postgres { .unwrap(); let migrations = get_migrations(); - let mchecksum = migrations[4].checksum(); + let mchecksum = migrations[5].checksum(); client .migrate( @@ -503,7 +522,7 @@ mod tokio_postgres { .await .unwrap() .unwrap(); - assert_eq!(5, current.version()); + assert_eq!(6, current.version()); assert_eq!(mchecksum, current.checksum()); }) .await; @@ -620,6 +639,7 @@ mod tokio_postgres { let migration = Migration::unapplied( "V4__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); let err = client @@ -665,6 +685,7 @@ mod tokio_postgres { let migration = Migration::unapplied( "V2__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); @@ -718,12 +739,14 @@ mod tokio_postgres { "city varchar(255)", ");" ), + Default::default(), ) .unwrap(); let migration2 = Migration::unapplied( "V2__add_cars_table", include_str!("./migrations_missing/V2__add_cars_table.sql"), + Default::default(), ) .unwrap(); let err = client @@ -770,25 +793,28 @@ mod tokio_postgres { .get_applied_migrations_async(&mut config) .await .unwrap(); - assert_eq!(5, applied_migrations.len()); + assert_eq!(6, applied_migrations.len()); assert_eq!(migrations[0].version(), applied_migrations[0].version()); assert_eq!(migrations[1].version(), applied_migrations[1].version()); assert_eq!(migrations[2].version(), applied_migrations[2].version()); assert_eq!(migrations[3].version(), applied_migrations[3].version()); assert_eq!(migrations[4].version(), applied_migrations[4].version()); + assert_eq!(migrations[5].version(), applied_migrations[5].version()); assert_eq!(migrations[0].name(), migrations[0].name()); assert_eq!(migrations[1].name(), applied_migrations[1].name()); assert_eq!(migrations[2].name(), applied_migrations[2].name()); assert_eq!(migrations[3].name(), applied_migrations[3].name()); assert_eq!(migrations[4].name(), applied_migrations[4].name()); + assert_eq!(migrations[5].name(), applied_migrations[5].name()); assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); assert_eq!(migrations[4].checksum(), applied_migrations[4].checksum()); + assert_eq!(migrations[5].checksum(), applied_migrations[5].checksum()); }) .await; } @@ -811,25 +837,28 @@ mod tokio_postgres { let report = runner.run_async(&mut config).await.unwrap(); let applied_migrations = report.applied_migrations(); - assert_eq!(5, applied_migrations.len()); + assert_eq!(6, applied_migrations.len()); assert_eq!(migrations[0].version(), applied_migrations[0].version()); assert_eq!(migrations[1].version(), applied_migrations[1].version()); assert_eq!(migrations[2].version(), applied_migrations[2].version()); assert_eq!(migrations[3].version(), applied_migrations[3].version()); assert_eq!(migrations[4].version(), applied_migrations[4].version()); + assert_eq!(migrations[5].version(), applied_migrations[5].version()); assert_eq!(migrations[0].name(), migrations[0].name()); assert_eq!(migrations[1].name(), applied_migrations[1].name()); assert_eq!(migrations[2].name(), applied_migrations[2].name()); assert_eq!(migrations[3].name(), applied_migrations[3].name()); assert_eq!(migrations[4].name(), applied_migrations[4].name()); + assert_eq!(migrations[5].name(), applied_migrations[5].name()); assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); assert_eq!(migrations[4].checksum(), applied_migrations[4].checksum()); + assert_eq!(migrations[5].checksum(), applied_migrations[5].checksum()); }) .await; } @@ -856,11 +885,11 @@ mod tokio_postgres { .await .unwrap() .unwrap(); - assert_eq!(5, applied_migration.version()); + assert_eq!(6, applied_migration.version()); - assert_eq!(migrations[4].version(), applied_migration.version()); - assert_eq!(migrations[4].name(), applied_migration.name()); - assert_eq!(migrations[4].checksum(), applied_migration.checksum()); + assert_eq!(migrations[5].version(), applied_migration.version()); + assert_eq!(migrations[5].name(), applied_migration.name()); + assert_eq!(migrations[5].checksum(), applied_migration.checksum()); }) .await; } @@ -969,7 +998,7 @@ mod tokio_postgres { let report = runner.run_async(&mut config).await.unwrap(); let applied_migrations = report.applied_migrations(); - assert_eq!(5, applied_migrations.len()); + assert_eq!(6, applied_migrations.len()); let last_migration = runner .get_last_applied_migration_async(&mut config) @@ -977,9 +1006,9 @@ mod tokio_postgres { .unwrap() .unwrap(); - assert_eq!(5, last_migration.version()); - assert_eq!(migrations[4].name(), last_migration.name()); - assert_eq!(migrations[4].checksum(), last_migration.checksum()); + assert_eq!(6, last_migration.version()); + assert_eq!(migrations[5].name(), last_migration.name()); + assert_eq!(migrations[5].checksum(), last_migration.checksum()); assert!(config.use_tls()); }) diff --git a/refinery_cli/src/migrate.rs b/refinery_cli/src/migrate.rs index 77deae02..4303f276 100644 --- a/refinery_cli/src/migrate.rs +++ b/refinery_cli/src/migrate.rs @@ -3,7 +3,7 @@ use std::path::Path; use anyhow::Context; use refinery_core::{ config::{Config, ConfigDbType}, - find_migration_files, Migration, MigrationType, Runner, SchemaVersion, Target, + find_migration_files, parse_flags, Migration, MigrationType, Runner, SchemaVersion, Target, }; use crate::cli::MigrateArgs; @@ -47,7 +47,8 @@ fn run_migrations( .and_then(|file| file.to_os_string().into_string().ok()) .unwrap(); - let migration = Migration::unapplied(&filename, &sql) + let flags = parse_flags(&sql, MigrationType::Sql); + let migration = Migration::unapplied(&filename, &sql, flags) .with_context(|| format!("could not read migration file name {}", path.display()))?; migrations.push(migration); } diff --git a/refinery_core/src/drivers/config.rs b/refinery_core/src/drivers/config.rs index 9a9067ba..32bf68df 100644 --- a/refinery_core/src/drivers/config.rs +++ b/refinery_core/src/drivers/config.rs @@ -7,15 +7,15 @@ use crate::config::build_db_url; use crate::config::{Config, ConfigDbType}; use crate::error::WrapMigrationError; -use crate::traits::r#async::{AsyncQuery, AsyncTransaction}; -use crate::traits::sync::{Query, Transaction}; +use crate::traits::r#async::{AsyncExecutor, AsyncQuery}; +use crate::traits::sync::{Executor, Query}; use crate::traits::{GET_APPLIED_MIGRATIONS_QUERY, GET_LAST_APPLIED_MIGRATION_QUERY}; -use crate::{Error, Migration, Report, Target}; +use crate::{Error, Migration, MigrationFlags, Report, Target}; use async_trait::async_trait; use std::convert::Infallible; // we impl all the dependent traits as noop's and then override the methods that call them on Migrate and AsyncMigrate -impl Transaction for Config { +impl Executor for Config { type Error = Infallible; fn execute<'a, T: Iterator>( @@ -24,6 +24,15 @@ impl Transaction for Config { ) -> Result { Ok(0) } + + fn execute_single( + &mut self, + _query: &str, + _update_query: &str, + _flags: &MigrationFlags, + ) -> Result { + Ok(0) + } } impl Query> for Config { @@ -33,7 +42,7 @@ impl Query> for Config { } #[async_trait] -impl AsyncTransaction for Config { +impl AsyncExecutor for Config { type Error = Infallible; async fn execute<'a, T: Iterator + Send>( @@ -42,6 +51,15 @@ impl AsyncTransaction for Config { ) -> Result { Ok(0) } + + async fn execute_single( + &mut self, + _query: &str, + _update_query: &str, + _flags: &MigrationFlags, + ) -> Result { + Ok(0) + } } #[async_trait] @@ -49,7 +67,7 @@ impl AsyncQuery> for Config { async fn query( &mut self, _query: &str, - ) -> Result, ::Error> { + ) -> Result, ::Error> { Ok(Vec::new()) } } @@ -188,7 +206,7 @@ macro_rules! with_connection_async { } } -// rewrite all the default methods as we overrode Transaction and Query +// rewrite all the default methods as we overrode Executor and Query #[cfg(any(feature = "mysql", feature = "postgres", feature = "rusqlite"))] impl crate::Migrate for Config { fn get_last_applied_migration( diff --git a/refinery_core/src/drivers/mysql.rs b/refinery_core/src/drivers/mysql.rs index c2947152..6055a2f9 100644 --- a/refinery_core/src/drivers/mysql.rs +++ b/refinery_core/src/drivers/mysql.rs @@ -1,5 +1,5 @@ -use crate::traits::sync::{Migrate, Query, Transaction}; -use crate::Migration; +use crate::traits::sync::{Executor, Migrate, Query}; +use crate::{Migration, MigrationFlags}; use mysql::{ error::Error as MError, prelude::Queryable, Conn, IsolationLevel, PooledConn, Transaction as MTransaction, TxOpts, @@ -40,7 +40,7 @@ fn query_applied_migrations( Ok(applied) } -impl Transaction for Conn { +impl Executor for Conn { type Error = MError; fn execute<'a, T: Iterator>( @@ -56,9 +56,27 @@ impl Transaction for Conn { transaction.commit()?; Ok(count as usize) } + + fn execute_single( + &mut self, + query: &str, + update_query: &str, + flags: &MigrationFlags, + ) -> Result { + if flags.run_in_transaction { + Executor::execute(self, [query, update_query].into_iter()) + } else { + self.query_iter(query)?; + if let Err(e) = self.query_iter(update_query) { + log::error!("applied migration but schema history table update failed"); + return Err(e); + } + Ok(2) + } + } } -impl Transaction for PooledConn { +impl Executor for PooledConn { type Error = MError; fn execute<'a, T: Iterator>( @@ -75,6 +93,24 @@ impl Transaction for PooledConn { transaction.commit()?; Ok(count as usize) } + + fn execute_single( + &mut self, + query: &str, + update_query: &str, + flags: &MigrationFlags, + ) -> Result { + if flags.run_in_transaction { + Executor::execute(self, [query, update_query].into_iter()) + } else { + self.query_iter(query)?; + if let Err(e) = self.query_iter(update_query) { + log::error!("applied migration but schema history table update failed"); + return Err(e); + } + Ok(2) + } + } } impl Query> for Conn { diff --git a/refinery_core/src/drivers/mysql_async.rs b/refinery_core/src/drivers/mysql_async.rs index 675b96f9..8dcbcc34 100644 --- a/refinery_core/src/drivers/mysql_async.rs +++ b/refinery_core/src/drivers/mysql_async.rs @@ -1,6 +1,6 @@ -use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction}; +use crate::traits::r#async::{AsyncExecutor, AsyncMigrate, AsyncQuery}; use crate::util::SchemaVersion; -use crate::Migration; +use crate::{Migration, MigrationFlags}; use async_trait::async_trait; use mysql_async::{ prelude::Queryable, Error as MError, IsolationLevel, Pool, Transaction as MTransaction, TxOpts, @@ -37,7 +37,7 @@ async fn query_applied_migrations<'a>( } #[async_trait] -impl AsyncTransaction for Pool { +impl AsyncExecutor for Pool { type Error = MError; async fn execute<'a, T: Iterator + Send>( @@ -57,6 +57,24 @@ impl AsyncTransaction for Pool { transaction.commit().await?; Ok(count as usize) } + + async fn execute_single( + &mut self, + query: &str, + update_query: &str, + flags: &MigrationFlags, + ) -> Result { + if flags.run_in_transaction { + AsyncExecutor::execute(self, [query, update_query].into_iter()).await + } else { + self.query(query).await?; + if let Err(e) = self.query(update_query).await { + log::error!("applied migration but schema history table update failed"); + return Err(e); + } + Ok(2) + } + } } #[async_trait] @@ -64,7 +82,7 @@ impl AsyncQuery> for Pool { async fn query( &mut self, query: &str, - ) -> Result, ::Error> { + ) -> Result, ::Error> { let mut conn = self.get_conn().await?; let mut options = TxOpts::new(); options.with_isolation_level(Some(IsolationLevel::ReadCommitted)); diff --git a/refinery_core/src/drivers/postgres.rs b/refinery_core/src/drivers/postgres.rs index fa4d5b9e..59d24008 100644 --- a/refinery_core/src/drivers/postgres.rs +++ b/refinery_core/src/drivers/postgres.rs @@ -1,5 +1,5 @@ -use crate::traits::sync::{Migrate, Query, Transaction}; -use crate::Migration; +use crate::traits::sync::{Executor, Migrate, Query}; +use crate::{Migration, MigrationFlags}; use postgres::{Client as PgClient, Error as PgError, Transaction as PgTransaction}; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; @@ -30,7 +30,7 @@ fn query_applied_migrations( Ok(applied) } -impl Transaction for PgClient { +impl Executor for PgClient { type Error = PgError; fn execute<'a, T: Iterator>( @@ -46,6 +46,24 @@ impl Transaction for PgClient { transaction.commit()?; Ok(count as usize) } + + fn execute_single( + &mut self, + query: &str, + update_query: &str, + flags: &MigrationFlags, + ) -> Result { + if flags.run_in_transaction { + Executor::execute(self, [query, update_query].into_iter()) + } else { + self.simple_query(query)?; + if let Err(e) = self.simple_query(update_query) { + log::error!("applied migration but schema history table update failed"); + return Err(e); + } + Ok(2) + } + } } impl Query> for PgClient { diff --git a/refinery_core/src/drivers/rusqlite.rs b/refinery_core/src/drivers/rusqlite.rs index 9ee4ced9..18f9bc36 100644 --- a/refinery_core/src/drivers/rusqlite.rs +++ b/refinery_core/src/drivers/rusqlite.rs @@ -1,5 +1,5 @@ -use crate::traits::sync::{Migrate, Query, Transaction}; -use crate::Migration; +use crate::traits::sync::{Executor, Migrate, Query}; +use crate::{Migration, MigrationFlags}; use rusqlite::{Connection as RqlConnection, Error as RqlError}; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; @@ -30,7 +30,7 @@ fn query_applied_migrations( Ok(applied) } -impl Transaction for RqlConnection { +impl Executor for RqlConnection { type Error = RqlError; fn execute<'a, T: Iterator>( &mut self, @@ -45,6 +45,24 @@ impl Transaction for RqlConnection { transaction.commit()?; Ok(count) } + + fn execute_single( + &mut self, + query: &str, + update_query: &str, + flags: &MigrationFlags, + ) -> Result { + if flags.run_in_transaction { + Executor::execute(self, [query, update_query].into_iter()) + } else { + self.execute_batch(query)?; + if let Err(e) = self.execute_batch(update_query) { + log::error!("applied migration but schema history table update failed"); + return Err(e); + } + Ok(2) + } + } } impl Query> for RqlConnection { diff --git a/refinery_core/src/drivers/tiberius.rs b/refinery_core/src/drivers/tiberius.rs index 4095f1b7..062ef272 100644 --- a/refinery_core/src/drivers/tiberius.rs +++ b/refinery_core/src/drivers/tiberius.rs @@ -1,6 +1,6 @@ -use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction}; +use crate::traits::r#async::{AsyncExecutor, AsyncMigrate, AsyncQuery}; use crate::util::SchemaVersion; -use crate::Migration; +use crate::{Migration, MigrationFlags}; use async_trait::async_trait; use futures::{ @@ -41,7 +41,7 @@ async fn query_applied_migrations( } #[async_trait] -impl AsyncTransaction for Client +impl AsyncExecutor for Client where S: AsyncRead + AsyncWrite + Unpin + Send, { @@ -67,6 +67,24 @@ where self.simple_query("COMMIT TRAN T1").await?; Ok(count as usize) } + + async fn execute_single( + &mut self, + query: &str, + update_query: &str, + flags: &MigrationFlags, + ) -> Result { + if flags.run_in_transaction { + AsyncExecutor::execute(self, [query, update_query].into_iter()).await + } else { + self.simple_query(query).await?; + if let Err(e) = self.simple_query(update_query).await { + log::error!("applied migration but schema history table update failed"); + return Err(e); + } + Ok(2) + } + } } #[async_trait] @@ -77,7 +95,7 @@ where async fn query( &mut self, query: &str, - ) -> Result, ::Error> { + ) -> Result, ::Error> { let applied = query_applied_migrations(self, query).await?; Ok(applied) } diff --git a/refinery_core/src/drivers/tokio_postgres.rs b/refinery_core/src/drivers/tokio_postgres.rs index 346cfd7c..a2872390 100644 --- a/refinery_core/src/drivers/tokio_postgres.rs +++ b/refinery_core/src/drivers/tokio_postgres.rs @@ -1,5 +1,5 @@ -use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction}; -use crate::Migration; +use crate::traits::r#async::{AsyncExecutor, AsyncMigrate, AsyncQuery}; +use crate::{Migration, MigrationFlags}; use async_trait::async_trait; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; @@ -32,7 +32,7 @@ async fn query_applied_migrations( } #[async_trait] -impl AsyncTransaction for Client { +impl AsyncExecutor for Client { type Error = PgError; async fn execute<'a, T: Iterator + Send>( @@ -48,6 +48,24 @@ impl AsyncTransaction for Client { transaction.commit().await?; Ok(count as usize) } + + async fn execute_single( + &mut self, + query: &str, + update_query: &str, + flags: &MigrationFlags, + ) -> Result { + if flags.run_in_transaction { + AsyncExecutor::execute(self, [query, update_query].into_iter()).await + } else { + self.simple_query(query).await?; + if let Err(e) = self.simple_query(update_query).await { + log::error!("applied migration but schema history table update failed"); + return Err(e); + } + Ok(2) + } + } } #[async_trait] @@ -55,7 +73,7 @@ impl AsyncQuery> for Client { async fn query( &mut self, query: &str, - ) -> Result, ::Error> { + ) -> Result, ::Error> { let transaction = self.transaction().await?; let applied = query_applied_migrations(&transaction, query).await?; transaction.commit().await?; diff --git a/refinery_core/src/error.rs b/refinery_core/src/error.rs index 2b1b4661..c4935243 100644 --- a/refinery_core/src/error.rs +++ b/refinery_core/src/error.rs @@ -57,6 +57,9 @@ pub enum Kind { /// An Error from an divergent version, the applied version is different to the filesystem one #[error("applied migration {0} is different than filesystem one {1}")] DivergentVersion(Migration, Migration), + /// An Error from running in grouped mode with a migration that opts out of transactions + #[error("migration {0} opts out of transactions, cannot run with `set-grouped`")] + NoTransactionGroupedMigration(Migration), /// An Error from an divergent version, the applied version is missing on the filesystem #[error("migration {0} is missing from the filesystem")] MissingVersion(Migration), diff --git a/refinery_core/src/lib.rs b/refinery_core/src/lib.rs index 2d20865a..16844e6a 100644 --- a/refinery_core/src/lib.rs +++ b/refinery_core/src/lib.rs @@ -6,11 +6,12 @@ pub mod traits; mod util; pub use crate::error::Error; -pub use crate::runner::{Migration, Report, Runner, Target}; +pub use crate::runner::{Migration, MigrationFlags, Report, Runner, Target}; pub use crate::traits::r#async::AsyncMigrate; pub use crate::traits::sync::Migrate; pub use crate::util::{ - find_migration_files, load_sql_migrations, parse_migration_name, MigrationType, SchemaVersion, + find_migration_files, load_sql_migrations, parse_flags, parse_migration_name, MigrationType, + SchemaVersion, }; #[cfg(feature = "rusqlite")] diff --git a/refinery_core/src/runner.rs b/refinery_core/src/runner.rs index 97921e81..c6d6125c 100644 --- a/refinery_core/src/runner.rs +++ b/refinery_core/src/runner.rs @@ -70,12 +70,32 @@ pub struct Migration { prefix: Type, sql: Option, applied_on: Option, + flags: MigrationFlags, +} + +#[derive(Clone, Debug)] +pub struct MigrationFlags { + // Migrations by default run in transaction except explicitly specified by the + // `-- +refinery NO TRANSACTION` directive + pub run_in_transaction: bool, +} + +impl Default for MigrationFlags { + fn default() -> Self { + Self { + run_in_transaction: true, + } + } } impl Migration { /// Create an unapplied migration, name and version are parsed from the input_name, /// which must be named in the format (U|V){1}__{2}.rs where {1} represents the migration version and {2} the name. - pub fn unapplied(input_name: &str, sql: &str) -> Result { + pub fn unapplied( + input_name: &str, + sql: &str, + flags: MigrationFlags, + ) -> Result { let (prefix, version, name) = parse_migration_name(input_name)?; // Previously, `std::collections::hash_map::DefaultHasher` was used @@ -100,6 +120,7 @@ impl Migration { sql: Some(sql.into()), applied_on: None, checksum, + flags, }) } @@ -119,6 +140,9 @@ impl Migration { prefix: Type::Versioned, sql: None, applied_on: Some(applied_on), + flags: MigrationFlags { + run_in_transaction: true, + }, } } @@ -133,6 +157,10 @@ impl Migration { self.sql.as_deref() } + pub fn flags(&self) -> &MigrationFlags { + &self.flags + } + /// Get the Migration version pub fn version(&self) -> SchemaVersion { self.version diff --git a/refinery_core/src/traits/async.rs b/refinery_core/src/traits/async.rs index a0305d2e..2ad84654 100644 --- a/refinery_core/src/traits/async.rs +++ b/refinery_core/src/traits/async.rs @@ -1,30 +1,40 @@ -use crate::error::WrapMigrationError; +use crate::error::{Kind, WrapMigrationError}; use crate::traits::{ insert_migration_query, verify_migrations, GET_APPLIED_MIGRATIONS_QUERY, GET_LAST_APPLIED_MIGRATION_QUERY, }; -use crate::{Error, Migration, Report, Target}; +use crate::{Error, Migration, MigrationFlags, Report, Target}; use async_trait::async_trait; use std::string::ToString; #[async_trait] -pub trait AsyncTransaction { +pub trait AsyncExecutor { type Error: std::error::Error + Send + Sync + 'static; + // Run multiple queries implicitly in a transaction async fn execute<'a, T: Iterator + Send>( &mut self, - queries: T, + query: T, + ) -> Result; + + // Run single query along with a query to update the migration table. + // Offers more granular control via MigrationFlags + async fn execute_single( + &mut self, + query: &str, + update_query: &str, + flags: &MigrationFlags, ) -> Result; } #[async_trait] -pub trait AsyncQuery: AsyncTransaction { +pub trait AsyncQuery: AsyncExecutor { async fn query(&mut self, query: &str) -> Result; } -async fn migrate( - transaction: &mut T, +async fn migrate_individual( + executor: &mut T, migrations: Vec, target: Target, migration_table_name: &str, @@ -45,13 +55,11 @@ async fn migrate( log::info!("applying migration: {}", migration); migration.set_applied(); let update_query = insert_migration_query(&migration, migration_table_name); - transaction - .execute( - [ - migration.sql().as_ref().expect("sql must be Some!"), - update_query.as_str(), - ] - .into_iter(), + executor + .execute_single( + &migration.sql().as_ref().expect("sql must be Some!"), + &update_query, + migration.flags(), ) .await .migration_err( @@ -63,8 +71,8 @@ async fn migrate( Ok(Report::new(applied_migrations)) } -async fn migrate_grouped( - transaction: &mut T, +async fn migrate_grouped( + executor: &mut T, migrations: Vec, target: Target, migration_table_name: &str, @@ -78,6 +86,12 @@ async fn migrate_grouped( break; } } + if !migration.flags().run_in_transaction { + return Err(Error::new( + Kind::NoTransactionGroupedMigration(migration), + None, + )); + } migration.set_applied(); let query = insert_migration_query(&migration, migration_table_name); @@ -115,7 +129,9 @@ async fn migrate_grouped( ); } - transaction + let refs: Vec<&str> = grouped_migrations.iter().map(AsRef::as_ref).collect(); + + executor .execute(grouped_migrations.iter().map(AsRef::as_ref)) .await .migration_err("error applying migrations", None)?; @@ -199,7 +215,7 @@ where if grouped || matches!(target, Target::Fake | Target::FakeVersion(_)) { migrate_grouped(self, migrations, target, migration_table_name).await } else { - migrate(self, migrations, target, migration_table_name).await + migrate_individual(self, migrations, target, migration_table_name).await } } } diff --git a/refinery_core/src/traits/mod.rs b/refinery_core/src/traits/mod.rs index 6bda4142..b990e33f 100644 --- a/refinery_core/src/traits/mod.rs +++ b/refinery_core/src/traits/mod.rs @@ -141,6 +141,7 @@ mod tests { let migration1 = Migration::unapplied( "V1__initial.sql", "CREATE TABLE persons (id int, name varchar(255), city varchar(255));", + Default::default(), ) .unwrap(); @@ -149,18 +150,21 @@ mod tests { include_str!( "../../../refinery/tests/migrations/V1-2/V2__add_cars_and_motos_table.sql" ), + Default::default(), ) .unwrap(); let migration3 = Migration::unapplied( "V3__add_brand_to_cars_table", include_str!("../../../refinery/tests/migrations/V3/V3__add_brand_to_cars_table.sql"), + Default::default(), ) .unwrap(); let migration4 = Migration::unapplied( "V4__add_year_field_to_cars", "ALTER TABLE cars ADD year INTEGER;", + Default::default(), ) .unwrap(); @@ -199,6 +203,7 @@ mod tests { include_str!( "../../../refinery/tests/migrations/V3/V3__add_brand_to_cars_table.sql" ), + Default::default(), ) .unwrap(), ]; @@ -225,6 +230,7 @@ mod tests { include_str!( "../../../refinery/tests/migrations/V3/V3__add_brand_to_cars_table.sql" ), + Default::default(), ) .unwrap(), ]; @@ -297,6 +303,7 @@ mod tests { include_str!( "../../../refinery/tests/migrations_unversioned/U0__merge_out_of_order.sql" ), + Default::default(), ) .unwrap(), ); diff --git a/refinery_core/src/traits/sync.rs b/refinery_core/src/traits/sync.rs index f7ad2a65..00228bd3 100644 --- a/refinery_core/src/traits/sync.rs +++ b/refinery_core/src/traits/sync.rs @@ -1,107 +1,148 @@ -use crate::error::WrapMigrationError; +use crate::error::{Kind, WrapMigrationError}; use crate::traits::{ insert_migration_query, verify_migrations, GET_APPLIED_MIGRATIONS_QUERY, GET_LAST_APPLIED_MIGRATION_QUERY, }; -use crate::{Error, Migration, Report, Target}; +use crate::{Error, Migration, MigrationFlags, Report, Target}; -pub trait Transaction { +pub trait Executor { type Error: std::error::Error + Send + Sync + 'static; + // Run multiple queries implicitly in a transaction fn execute<'a, T: Iterator>( &mut self, queries: T, ) -> Result; + + // Run single query along with a query to update the migration table. + // Offers more granular control via MigrationFlags + fn execute_single( + &mut self, + query: &str, + update_query: &str, + flags: &MigrationFlags, + ) -> Result; } -pub trait Query: Transaction { +pub trait Query: Executor { fn query(&mut self, query: &str) -> Result; } -pub fn migrate( - transaction: &mut T, +fn migrate_grouped( + executor: &mut T, migrations: Vec, target: Target, migration_table_name: &str, - batched: bool, ) -> Result { - let mut migration_batch = Vec::new(); + let mut grouped_migrations = Vec::new(); let mut applied_migrations = Vec::new(); for mut migration in migrations.into_iter() { if let Target::Version(input_target) | Target::FakeVersion(input_target) = target { if input_target < migration.version() { - log::info!( - "stopping at migration: {}, due to user option", - input_target - ); break; } } + if !migration.flags().run_in_transaction { + return Err(Error::new( + Kind::NoTransactionGroupedMigration(migration), + None, + )); + } + migration.set_applied(); - let insert_migration = insert_migration_query(&migration, migration_table_name); - let migration_sql = migration.sql().expect("sql must be Some!").to_string(); + let query = insert_migration_query(&migration, migration_table_name); + + let sql = migration.sql().expect("sql must be Some!").to_owned(); // If Target is Fake, we only update schema migrations table if !matches!(target, Target::Fake | Target::FakeVersion(_)) { applied_migrations.push(migration); - migration_batch.push(migration_sql); + grouped_migrations.push(sql); } - migration_batch.push(insert_migration); + grouped_migrations.push(query); } - match (target, batched) { - (Target::Fake | Target::FakeVersion(_), _) => { - log::info!("not going to apply any migration as fake flag is enabled."); + match target { + Target::Fake | Target::FakeVersion(_) => { + log::info!("not going to apply any migration as fake flag is enabled"); } - (Target::Latest | Target::Version(_), true) => { + Target::Latest | Target::Version(_) => { log::info!( "going to batch apply {} migrations in single transaction.", applied_migrations.len() ); } - (Target::Latest | Target::Version(_), false) => { - log::info!( - "going to apply {} migrations in multiple transactions.", - applied_migrations.len(), - ); - } }; - let refs = migration_batch.iter().map(AsRef::as_ref); + if let Target::Version(input_target) = target { + log::info!( + "stopping at migration: {}, due to user option", + input_target + ); + } + + let refs = grouped_migrations.iter().map(AsRef::as_ref); - if batched { - let migrations_display = applied_migrations - .iter() - .map(ToString::to_string) - .collect::>() - .join("\n"); - log::info!("going to apply batch migrations in single transaction:\n{migrations_display}"); - transaction - .execute(refs) - .migration_err("error applying migrations", None)?; - } else { - for (i, update) in refs.enumerate() { - // first iteration is pair so we know the following even in the iteration index - // marks the previous (pair) migration as completed. - let applying_migration = i % 2 == 0; - let current_migration = &applied_migrations[i / 2]; - if applying_migration { - log::info!("applying migration: {current_migration} ..."); - } else { - // Writing the migration state to the db. - log::debug!("applied migration: {current_migration} writing state to db."); + executor + .execute(refs) + .migration_err("error applying migrations", None)?; + + Ok(Report::new(applied_migrations)) +} + +fn migrate_individual( + executor: &mut T, + migrations: Vec, + target: Target, + migration_table_name: &str, +) -> Result { + let mut applied_migrations = vec![]; + + for mut migration in migrations.into_iter() { + if let Target::Version(input_target) = target { + if input_target < migration.version() { + log::info!( + "stopping at migration: {}, due to user option", + input_target + ); + break; } - transaction - .execute([update].into_iter()) - .migration_err("error applying update", Some(&applied_migrations[0..i / 2]))?; } - } + log::info!("applying migration: {}", migration); + migration.set_applied(); + let update_query = insert_migration_query(&migration, migration_table_name); + executor + .execute_single( + &migration.sql().expect("migration has no content"), + &update_query, + migration.flags(), + ) + .migration_err( + &format!("error applying migration {}", migration), + Some(&applied_migrations), + )?; + applied_migrations.push(migration); + } Ok(Report::new(applied_migrations)) } +pub fn migrate( + executor: &mut T, + migrations: Vec, + target: Target, + migration_table_name: &str, + batched: bool, +) -> Result { + if batched { + migrate_grouped(executor, migrations, target, migration_table_name) + } else { + migrate_individual(executor, migrations, target, migration_table_name) + } +} + pub trait Migrate: Query> where Self: Sized, diff --git a/refinery_core/src/util.rs b/refinery_core/src/util.rs index 64c43889..1d5e77ee 100644 --- a/refinery_core/src/util.rs +++ b/refinery_core/src/util.rs @@ -1,5 +1,5 @@ use crate::error::{Error, Kind}; -use crate::runner::Type; +use crate::runner::{MigrationFlags, Type}; use crate::Migration; use regex::Regex; use std::ffi::OsStr; @@ -32,6 +32,26 @@ fn file_re_all() -> &'static Regex { RE.get_or_init(|| Regex::new([STEM_RE, r"\.(rs|sql)$"].concat().as_str()).unwrap()) } +/// Matches the annotation `-- +refinery NO TRANSACTION` at the start of a +/// commented line of a .sql file, implying that the query should ran outside +/// of a transaction. +fn query_no_transaction_re_sql() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + RE.get_or_init(|| { + Regex::new(r"^[-]{2,}[\s]?(\+refinery[\s]+NO[\s]+TRANSACTION[\s]?)$").unwrap() + }) +} + +/// Matches the annotation `// +refinery NO TRANSACTION` at the start of a +/// commented line of a .rs|.sql file, implying that the query should ran outside +/// of a transaction. +fn query_no_transaction_re_all() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + RE.get_or_init(|| { + Regex::new(r"^[-|\/]{2,}[\s]?(\+refinery[\s]+NO[\s]+TRANSACTION[\s]?)$").unwrap() + }) +} + /// enum containing the migration types used to search for migrations /// either just .sql files or both .sql and .rs pub enum MigrationType { @@ -46,6 +66,13 @@ impl MigrationType { MigrationType::Sql => file_re_sql(), } } + + fn no_transaction_re(&self) -> &'static Regex { + match self { + MigrationType::All => query_no_transaction_re_all(), + MigrationType::Sql => query_no_transaction_re_sql(), + } + } } /// Parse a migration filename stem into a prefix, version, and name. @@ -68,6 +95,19 @@ pub fn parse_migration_name(name: &str) -> Result<(Type, SchemaVersion, String), Ok((prefix, version, name)) } +pub fn parse_flags(file_content: &str, migration_type: MigrationType) -> MigrationFlags { + let mut default_flags = MigrationFlags::default(); + // TODO: Keep behind a flag as it could be slow + let no_tx_re = migration_type.no_transaction_re(); + for line in file_content.lines() { + if no_tx_re.is_match(line) { + default_flags.run_in_transaction = false; + break; + } + } + default_flags +} + /// find migrations on file system recursively across directories given a location and [MigrationType] pub fn find_migration_files( location: impl AsRef, @@ -128,7 +168,8 @@ pub fn load_sql_migrations(location: impl AsRef) -> Result, .and_then(|file| file.to_os_string().into_string().ok()) .unwrap(); - let migration = Migration::unapplied(&filename, &sql)?; + let flags = parse_flags(&sql, MigrationType::Sql); + let migration = Migration::unapplied(&filename, &sql, flags)?; migrations.push(migration); } diff --git a/refinery_macros/src/lib.rs b/refinery_macros/src/lib.rs index cd790cc1..550f1a6e 100644 --- a/refinery_macros/src/lib.rs +++ b/refinery_macros/src/lib.rs @@ -7,6 +7,7 @@ use proc_macro::TokenStream; use proc_macro2::{Span as Span2, TokenStream as TokenStream2}; use quote::quote; use quote::ToTokens; +use refinery_core::parse_flags; use refinery_core::{find_migration_files, MigrationType}; use std::path::PathBuf; use std::{env, fs}; @@ -20,12 +21,15 @@ pub(crate) fn crate_root() -> PathBuf { fn migration_fn_quoted(_migrations: Vec) -> TokenStream2 { let result = quote! { - use refinery::{Migration, Runner, SchemaVersion}; + use refinery::{Migration, MigrationFlags, Runner, SchemaVersion}; pub fn runner() -> Runner { - let quoted_migrations: Vec<(&str, String)> = vec![#(#_migrations),*]; + let quoted_migrations: Vec<(&str, String, bool)> = vec![#(#_migrations),*]; let mut migrations: Vec = Vec::new(); for module in quoted_migrations.into_iter() { - migrations.push(Migration::unapplied(module.0, &module.1).unwrap()); + let flags = MigrationFlags { + run_in_transaction: module.2, + }; + migrations.push(Migration::unapplied(module.0, &module.1, flags).unwrap()); } Runner::new(&migrations) } @@ -119,9 +123,13 @@ pub fn embed_migrations(input: TokenStream) -> TokenStream { let path = migration.display().to_string(); let extension = migration.extension().unwrap(); migration_filenames.push(filename.clone()); + let content = fs::read_to_string(&path).unwrap(); + let flags = parse_flags(&content, MigrationType::All); + let run_in_transaction = flags.run_in_transaction; if extension == "sql" { - _migrations.push(quote! {(#filename, include_str!(#path).to_string())}); + _migrations + .push(quote! {(#filename, include_str!(#path).to_string(), #run_in_transaction)}); } else if extension == "rs" { let rs_content = fs::read_to_string(&path) .unwrap() @@ -133,7 +141,7 @@ pub fn embed_migrations(input: TokenStream) -> TokenStream { // also include the file as str so we trigger recompilation if it changes const _RECOMPILE_IF_CHANGED: &str = include_str!(#path); }}; - _migrations.push(quote! {(#filename, #ident::migration())}); + _migrations.push(quote! {(#filename, #ident::migration(), #run_in_transaction)}); migrations_mods.push(mig_mod); } } @@ -206,12 +214,15 @@ mod tests { fn test_quote_fn() { let migs = vec![quote!("V1__first", "valid_sql_file")]; let expected = concat! { - "use refinery :: { Migration , Runner , SchemaVersion } ; ", + "use refinery :: { Migration , MigrationFlags , Runner , SchemaVersion } ; ", "pub fn runner () -> Runner { ", - "let quoted_migrations : Vec < (& str , String) > = vec ! [\"V1__first\" , \"valid_sql_file\"] ; ", + "let quoted_migrations : Vec < (& str , String , bool) > = vec ! [\"V1__first\" , \"valid_sql_file\"] ; ", "let mut migrations : Vec < Migration > = Vec :: new () ; ", "for module in quoted_migrations . into_iter () { ", - "migrations . push (Migration :: unapplied (module . 0 , & module . 1) . unwrap ()) ; ", + "let flags = MigrationFlags {", + " run_in_transaction : module . 2 , ", + "} ; ", + "migrations . push (Migration :: unapplied (module . 0 , & module . 1 , flags) . unwrap ()) ; ", "} ", "Runner :: new (& migrations) }" };