@@ -250,6 +250,32 @@ def test_migrator_runs_all_migrations_file(logger: Logger) -> None:
250250 db .conn .close ()
251251
252252
253+ def test_migrator_backs_up_db (logger : Logger ) -> None :
254+ with TemporaryDirectory () as tempdir :
255+ original_db_path = Path (tempdir ) / "invokeai.db"
256+ db = SqliteDatabase (db_path = original_db_path , logger = logger , verbose = False )
257+ # Write some data to the db to test for successful backup
258+ temp_cursor = db .conn .cursor ()
259+ temp_cursor .execute ("CREATE TABLE test (id INTEGER PRIMARY KEY);" )
260+ db .conn .commit ()
261+ # Set up the migrator
262+ migrator = SqliteMigrator (db = db )
263+ migrations = [Migration (from_version = i , to_version = i + 1 , callback = create_migrate (i )) for i in range (0 , 3 )]
264+ for migration in migrations :
265+ migrator .register_migration (migration )
266+ migrator .run_migrations ()
267+ # Must manually close else we get an error on Windows
268+ db .conn .close ()
269+ assert original_db_path .exists ()
270+ # We should have a backup file when we migrated a file db
271+ assert migrator ._backup_path
272+ # Check that the test table exists as a proxy for successful backup
273+ with closing (sqlite3 .connect (migrator ._backup_path )) as backup_db_conn :
274+ backup_db_cursor = backup_db_conn .cursor ()
275+ backup_db_cursor .execute ("SELECT name FROM sqlite_master WHERE type='table' AND name='test';" )
276+ assert backup_db_cursor .fetchone () is not None
277+
278+
253279def test_migrator_makes_no_changes_on_failed_migration (
254280 migrator : SqliteMigrator , migration_no_op : Migration , failing_migrate_callback : MigrateCallback
255281) -> None :
0 commit comments