diff --git a/CHANGELOG.md b/CHANGELOG.md index 42fdafc1..8080749a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Don't forget to remove deprecated code on each major release! ### Fixed +- Prevent restoring a backup from a different database connector (e.g. Postgres backup to SQLite) by adding an additional metadata file to all new backups. - Fixed compressed media backup restoration by using `utils.uncompress_file()` instead of relying on tarfile's built-in gzip decompression. This aligns the behavior with database restore and ensures reliable decompression with all file-like objects. ## [5.0.1] - 2025-11-07 diff --git a/dbbackup/checks.py b/dbbackup/checks.py index 490892f6..d6ecf2ee 100644 --- a/dbbackup/checks.py +++ b/dbbackup/checks.py @@ -1,7 +1,7 @@ import re from datetime import datetime -from django.core.checks import Tags, Warning, register +from django.core.checks import Tags, Warning, register # noqa: A004 from dbbackup import settings diff --git a/dbbackup/management/commands/dbbackup.py b/dbbackup/management/commands/dbbackup.py index cd6c78ad..7f551345 100644 --- a/dbbackup/management/commands/dbbackup.py +++ b/dbbackup/management/commands/dbbackup.py @@ -2,6 +2,9 @@ Command for backup database. """ +import json + +from django.core.files.base import ContentFile from django.core.management.base import CommandError from dbbackup import settings, utils @@ -89,6 +92,25 @@ def _get_database_keys(self): return [key.strip() for key in self.database.split(",") if key.strip()] return settings.DATABASES + def _save_metadata(self, filename, local=False): + """ + Save metadata file for the backup. + """ + metadata = { + "engine": self.connector.connection.settings_dict["ENGINE"], + "connector": f"{self.connector.__module__}.{self.connector.__class__.__name__}", + } + metadata_filename = f"{filename}.metadata" + metadata_content = json.dumps(metadata) + + if local: + self.logger.info("Writing metadata file to %s", metadata_filename) + with open(metadata_filename, "w") as fd: + fd.write(metadata_content) + else: + metadata_file = ContentFile(metadata_content) + self.write_to_storage(metadata_file, metadata_filename) + def _save_new_backup(self, database): """ Save a new backup file. @@ -129,11 +151,14 @@ def _save_new_backup(self, database): if self.path is None: self.write_to_storage(outputfile, filename) + self._save_metadata(filename) elif self.path.startswith("s3://"): # Handle S3 URIs through storage backend self.write_to_storage(outputfile, self.path) + self._save_metadata(self.path) else: self.write_local_file(outputfile, self.path) + self._save_metadata(self.path, local=True) # Send post_backup signal post_backup.send( diff --git a/dbbackup/management/commands/dbrestore.py b/dbbackup/management/commands/dbrestore.py index 64fe00e6..5ba4f1ad 100644 --- a/dbbackup/management/commands/dbrestore.py +++ b/dbbackup/management/commands/dbrestore.py @@ -3,6 +3,10 @@ """ import io +import json +import os +import sys +from importlib import import_module from django.conf import settings from django.core.management.base import CommandError @@ -98,6 +102,59 @@ def _get_database(self, database_name: str): raise CommandError(msg) return database_name, settings.DATABASES[database_name] + def _check_metadata(self, filename): + """ + Check if the backup file has metadata and if it matches the current database. + """ + metadata_filename = f"{filename}.metadata" + metadata = None + + if self.path: + # Local file + # self.path is the full path to the backup file + metadata_path = f"{self.path}.metadata" + if os.path.exists(metadata_path): + with open(metadata_path) as fd: + metadata = json.load(fd) + else: + # Storage file + try: + # Check if metadata file exists in storage + # list_directory returns a list of filenames + # We can't easily check existence without listing or trying to open + # But read_file might fail if not exists depending on storage + # Let's try to read it + metadata_file = self.storage.read_file(metadata_filename) + except Exception: + self.logger.debug("No metadata file found for '%s'", filename) + return None + + # Read and parse metadata + try: + metadata = json.load(metadata_file) + except Exception: + self.logger.warning( + "Malformatted metadata file for '%s'! Dbbackup will ignore this metadata.", filename + ) + return None + + if not metadata: + return None + + backup_engine = metadata.get("engine") + current_engine = settings.DATABASES[self.database_name]["ENGINE"] + backup_connector = metadata.get("connector") + + if backup_engine != current_engine and backup_connector != "dbbackup.db.django.DjangoConnector": + msg = ( + f"Backup file '{filename}' was created with database engine '{backup_engine}', " + f"but you are restoring to a database using '{current_engine}'. " + "Restoring to a different database engine is not supported." + ) + raise CommandError(msg) + + return metadata + def _restore_backup(self): """Restore the specified database.""" input_filename, input_file = self._get_backup_file( @@ -115,6 +172,8 @@ def _restore_backup(self): self.logger.info(f"Restoring: {input_filename}") # noqa: G004 + metadata = self._check_metadata(input_filename) + # Send pre_restore signal pre_restore.send( sender=self.__class__, @@ -154,7 +213,33 @@ def _restore_backup(self): self._ask_confirmation() input_file.seek(0) - self.connector = get_connector(self.database_name) + + # Try to use connector from metadata if available + self.connector = None + if metadata and "connector" in metadata: + connector_path = metadata["connector"] + try: + module_name = ".".join(connector_path.split(".")[:-1]) + class_name = connector_path.split(".")[-1] + module = import_module(module_name) + connector_class = getattr(module, class_name) + self.connector = connector_class(self.database_name) + self.logger.info("Using connector from metadata: '%s'", connector_path) + except (ImportError, AttributeError): + self.logger.warning( + "Connector '%s' from metadata not found!!! Falling back to the connector in your Django settings.", + connector_path, + ) + if self.interactive: + answer = input("Do you want to continue with the connector defined in your Django settings? [Y/n] ") + if not answer.lower().startswith("y"): + self.logger.info("Quitting") + sys.exit(0) + + # Fallback to a connector from Django settings and/or our default connector map. + if not self.connector: + self.connector = get_connector(self.database_name) + if self.schemas: self.connector.schemas = self.schemas self.connector.drop = not self.no_drop diff --git a/dbbackup/storage.py b/dbbackup/storage.py index 1a558dfc..90a60378 100644 --- a/dbbackup/storage.py +++ b/dbbackup/storage.py @@ -126,6 +126,8 @@ def list_backups( raise TypeError(msg) # TODO: Make better filter for include only backups files = [f for f in self.list_directory() if utils.filename_to_datestring(f)] + # Exclude metadata files + files = [f for f in files if not f.endswith(".metadata")] if encrypted is not None: files = [f for f in files if (".gpg" in f) == encrypted] if compressed is not None: diff --git a/dbbackup/utils.py b/dbbackup/utils.py index 56865745..d6c5f761 100644 --- a/dbbackup/utils.py +++ b/dbbackup/utils.py @@ -143,7 +143,7 @@ def create_spooled_temporary_file(filepath=None, fileobj=None): """ spooled_file = tempfile.SpooledTemporaryFile(max_size=settings.TMP_FILE_MAX_SIZE, dir=settings.TMP_DIR) if filepath: - fileobj = open(filepath, "r+b") # noqa: SIM115 + fileobj = open(filepath, "r+b") if fileobj is not None: fileobj.seek(0) copyfileobj(fileobj, spooled_file, settings.TMP_FILE_READ_SIZE) diff --git a/pyproject.toml b/pyproject.toml index fcf80c07..fa0cc1ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -209,6 +209,8 @@ lint.extend-ignore = [ "S101", # Use of assert detected. "S105", # Use of hardcoded password "PLC0415", # Import should be at top-level of a file + "SIM115", # Use a context manager to open files + "BLE001", # Do not catch generic `Exception` ] [tool.pytest.ini_options] diff --git a/scripts/postgres_live_test.py b/scripts/postgres_live_test.py index d60e4d12..48dff4c8 100755 --- a/scripts/postgres_live_test.py +++ b/scripts/postgres_live_test.py @@ -1,4 +1,4 @@ -# ruff: noqa: TRY301, BLE001, TRY300 +# ruff: noqa: TRY301, TRY300 """PostgreSQL Live Functional Test Script for django-dbbackup Usage: @@ -33,11 +33,10 @@ SYMBOL_SKIP = _SYMS["SKIP"] -class SkipTest(Exception): +class SkippedTestError(Exception): """Exception raised when a test should be skipped.""" - # Available PostgreSQL connectors POSTGRES_CONNECTORS = [ "PgDumpConnector", @@ -122,7 +121,7 @@ def setup_postgres(self): "and ensure pg_dump and psql are in your PATH." ) msg = f"PostgreSQL client tools (pg_dump, psql, etc) are not installed!{install_instructions}" - raise SkipTest(msg) + raise SkippedTestError(msg) self._log("Setting up test database...") self.temp_dir = tempfile.mkdtemp(prefix="dbbackup_postgres_") @@ -364,7 +363,7 @@ def run_backup_restore_test(self): self._log(f"{SYMBOL_PASS} {self.connector_name} backup/restore test PASSED") return 0 - except SkipTest as e: + except SkippedTestError as e: self._log(f"{SYMBOL_SKIP} {self.connector_name} backup/restore test SKIPPED: {e}") return 77 @@ -408,7 +407,7 @@ def _run_subprocess(): # local helper kept simple; not used as target process = _run_subprocess() if process.exitcode is None: return 1 - if process.exitcode != 0 and process.exitcode != 77 and os.name == "nt": # Fallback path on Windows + if process.exitcode not in {0, 77} and os.name == "nt": # Fallback path on Windows # Retry in-process so at least we capture a meaningful failure message test_runner = PostgreSQLLiveTest(connector_name, verbose) return test_runner.run_backup_restore_test() diff --git a/scripts/sqlite_live_test.py b/scripts/sqlite_live_test.py index f9f77b53..9a143b4f 100644 --- a/scripts/sqlite_live_test.py +++ b/scripts/sqlite_live_test.py @@ -1,4 +1,4 @@ -# ruff: noqa: TRY300, BLE001 +# ruff: noqa: TRY300 """SQLite Live Functional Test Script for django-dbbackup Usage: @@ -152,6 +152,8 @@ def main() -> int: # (complexity acceptable for test harness) run_management_command(["", "dbbackup", "--noinput"], verbose=verbose) post_existing = set(os.listdir(backups_dir)) new_files = sorted(post_existing - pre_existing) + # Filter out metadata files + new_files = [f for f in new_files if not f.endswith(".metadata")] latest_backup = new_files[-1] if new_files else None log(f"Database backup completed (file: {latest_backup})", verbose=verbose) diff --git a/tests/commands/test_dbbackup.py b/tests/commands/test_dbbackup.py index df9f519f..2960a06f 100644 --- a/tests/commands/test_dbbackup.py +++ b/tests/commands/test_dbbackup.py @@ -14,13 +14,20 @@ from dbbackup.db.base import get_connector from dbbackup.management.commands.dbbackup import Command as DbbackupCommand from dbbackup.storage import get_storage -from tests.utils import DEV_NULL, TEST_DATABASE, add_public_gpg, clean_gpg_keys +from tests.utils import ( + DEV_NULL, + HANDLED_FILES, + TEST_DATABASE, + add_public_gpg, + clean_gpg_keys, +) @patch("dbbackup.settings.GPG_RECIPIENT", "test@test") @patch("sys.stdout", DEV_NULL) class DbbackupCommandSaveNewBackupTest(TestCase): def setUp(self): + HANDLED_FILES.clean() self.command = DbbackupCommand() self.command.servername = "foo-server" self.command.encrypt = False @@ -42,6 +49,9 @@ def test_func(self): def test_compress(self): self.command.compress = True self.command._save_new_backup(TEST_DATABASE) + assert len(HANDLED_FILES["written_files"]) == 2 + assert HANDLED_FILES["written_files"][0][0].endswith(".gz") + assert HANDLED_FILES["written_files"][1][0].endswith(".gz.metadata") def test_encrypt(self): if not GPG_AVAILABLE: @@ -49,6 +59,9 @@ def test_encrypt(self): add_public_gpg() self.command.encrypt = True self.command._save_new_backup(TEST_DATABASE) + assert len(HANDLED_FILES["written_files"]) == 2 + assert HANDLED_FILES["written_files"][0][0].endswith(".gpg") + assert HANDLED_FILES["written_files"][1][0].endswith(".gpg.metadata") def test_path(self): local_tmp = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "tmp") @@ -56,8 +69,10 @@ def test_path(self): self.command.path = os.path.join(local_tmp, "foo.bak") self.command._save_new_backup(TEST_DATABASE) assert os.path.exists(self.command.path) + assert os.path.exists(f"{self.command.path}.metadata") # tearDown os.remove(self.command.path) + os.remove(f"{self.command.path}.metadata") def test_schema(self): self.command.schemas = ["public"] @@ -72,7 +87,8 @@ def test_path_s3_uri(self, mock_write_to_storage): self.command._save_new_backup(TEST_DATABASE) assert mock_write_to_storage.called # Verify the S3 path was passed correctly to write_to_storage - args, kwargs = mock_write_to_storage.call_args + # The first call should be the backup file + args, _kwargs = mock_write_to_storage.call_args_list[0] assert args[1] == "s3://mybucket/backups/db.bak" @patch("dbbackup.management.commands._base.BaseDbBackupCommand.write_to_storage") @@ -91,7 +107,8 @@ def test_path_s3_uri_variants(self, mock_write_to_storage): self.command.path = s3_uri self.command._save_new_backup(TEST_DATABASE) assert mock_write_to_storage.called - args, kwargs = mock_write_to_storage.call_args + # The first call should be the backup file + args, _kwargs = mock_write_to_storage.call_args_list[0] assert args[1] == s3_uri def test_path_local_file_still_works(self): @@ -106,9 +123,11 @@ def test_path_local_file_still_works(self): # Verify the file was created (meaning write_local_file was used) assert os.path.exists(local_path) + assert os.path.exists(f"{local_path}.metadata") # Cleanup os.remove(local_path) + os.remove(f"{local_path}.metadata") # Test that paths containing 's3' but not starting with 's3://' are treated as local with patch("dbbackup.management.commands._base.BaseDbBackupCommand.write_local_file") as mock_write_local_file: @@ -129,7 +148,7 @@ def test_path_local_file_still_works(self): self.command._save_new_backup(TEST_DATABASE) # Verify write_local_file was called assert mock_write_local_file.called - args, kwargs = mock_write_local_file.call_args + args, _kwargs = mock_write_local_file.call_args assert args[1] == local_path @patch("dbbackup.settings.DATABASES", ["db-from-settings"]) diff --git a/tests/commands/test_dbrestore_metadata.py b/tests/commands/test_dbrestore_metadata.py new file mode 100644 index 00000000..02c5f75d --- /dev/null +++ b/tests/commands/test_dbrestore_metadata.py @@ -0,0 +1,199 @@ +import json +from unittest.mock import Mock, patch + +import pytest +from django.conf import settings +from django.core.management.base import CommandError +from django.test import TestCase + +from dbbackup.management.commands.dbrestore import Command as DbrestoreCommand + + +class DbrestoreMetadataTest(TestCase): + def setUp(self): + self.command = DbrestoreCommand() + self.command.database_name = "default" + self.command.logger = Mock() + self.command.storage = Mock() + self.command.path = None + + def test_metadata_match(self): + # Setup metadata + metadata = {"engine": settings.DATABASES["default"]["ENGINE"]} + self.command.storage.read_file.return_value = Mock(read=lambda: json.dumps(metadata)) + + # Should not raise + self.command._check_metadata("backup.dump") + + def test_metadata_mismatch(self): + # Setup metadata with different engine + metadata = {"engine": "django.db.backends.postgresql"} + self.command.storage.read_file.return_value = Mock(read=lambda: json.dumps(metadata)) + + # Should raise + with pytest.raises(CommandError) as cm: + self.command._check_metadata("backup.dump") + + assert "Restoring to a different database engine is not supported" in str(cm.value) + + def test_no_metadata(self): + # Setup storage to raise exception when reading metadata + self.command.storage.read_file.side_effect = Exception("File not found") + + # Should not raise (backwards compatibility) + self.command._check_metadata("backup.dump") + + def test_local_file_metadata_match(self): + self.command.path = "local_backup.dump" + metadata = {"engine": settings.DATABASES["default"]["ENGINE"]} + + with patch("os.path.exists", return_value=True), patch("builtins.open", new_callable=Mock) as mock_open: + # Configure the mock to behave like a file object + file_mock = Mock() + file_mock.read.return_value = json.dumps(metadata) + # Set up the context manager + mock_open.return_value.__enter__ = Mock(return_value=file_mock) + mock_open.return_value.__exit__ = Mock(return_value=None) + + self.command._check_metadata("local_backup.dump") + + def test_local_file_metadata_mismatch(self): + self.command.path = "local_backup.dump" + metadata = {"engine": "django.db.backends.postgresql"} + + with patch("os.path.exists", return_value=True), patch("builtins.open", new_callable=Mock) as mock_open: + # Configure the mock to behave like a file object + file_mock = Mock() + file_mock.read.return_value = json.dumps(metadata) + # Set up the context manager + mock_open.return_value.__enter__ = Mock(return_value=file_mock) + mock_open.return_value.__exit__ = Mock(return_value=None) + + with pytest.raises(CommandError): + self.command._check_metadata("local_backup.dump") + + def test_django_connector_mismatch_allowed(self): + # Setup metadata with different engine but DjangoConnector + metadata = { + "engine": "django.db.backends.postgresql", + "connector": "dbbackup.db.django.DjangoConnector", + } + self.command.storage.read_file.return_value = Mock(read=lambda: json.dumps(metadata)) + + # Should not raise + self.command._check_metadata("backup.dump") + + +class DbrestoreConnectorOverrideTest(TestCase): + def setUp(self): + self.command = DbrestoreCommand() + self.command.database_name = "default" + self.command.logger = Mock() + self.command.storage = Mock() + self.command.path = None + self.command.interactive = False + self.command.decrypt = False + self.command.uncompress = False + self.command.schemas = [] + self.command.no_drop = False + self.command.pg_options = "" + self.command.servername = "testserver" + self.command.input_database_name = "default" + self.command.database = settings.DATABASES["default"] + + # Mock _get_backup_file + mock_file = Mock() + mock_file.fileno.return_value = 1 + mock_file.size = 1024 + self.command._get_backup_file = Mock(return_value=("backup.dump", mock_file)) + + # Mock _ask_confirmation + self.command._ask_confirmation = Mock() + + @patch("dbbackup.management.commands.dbrestore.get_connector") + @patch("dbbackup.management.commands.dbrestore.import_module") + def test_connector_override(self, mock_import_module, mock_get_connector): + # Setup metadata with a specific connector + metadata = {"engine": settings.DATABASES["default"]["ENGINE"], "connector": "my.custom.Connector"} + self.command.storage.read_file.return_value = Mock(read=lambda: json.dumps(metadata)) + + # Mock the custom connector class + mock_module = Mock() + mock_connector_class = Mock() + mock_connector_instance = Mock() + mock_connector_class.return_value = mock_connector_instance + + mock_import_module.return_value = mock_module + mock_module.Connector = mock_connector_class + + # Run restore + self.command._restore_backup() + + # Verify import_module was called with "my.custom" + mock_import_module.assert_called_with("my.custom") + + # Verify connector was instantiated + mock_connector_class.assert_called_with("default") + + # Verify self.command.connector is the custom one + assert self.command.connector == mock_connector_instance + + # Verify get_connector was NOT called + mock_get_connector.assert_not_called() + + @patch("dbbackup.management.commands.dbrestore.get_connector") + def test_connector_fallback_on_import_error(self, mock_get_connector): + # Setup metadata with a specific connector that fails to import + metadata = {"engine": settings.DATABASES["default"]["ENGINE"], "connector": "my.broken.Connector"} + self.command.storage.read_file.return_value = Mock(read=lambda: json.dumps(metadata)) + + # Mock default connector + mock_default_connector = Mock() + mock_get_connector.return_value = mock_default_connector + + # We don't mock import_module, so it will raise ImportError (or we can mock it to raise) + with patch("dbbackup.management.commands.dbrestore.import_module", side_effect=ImportError): + self.command._restore_backup() + + # Verify get_connector WAS called + mock_get_connector.assert_called_with("default") + + # Verify self.command.connector is the default one + assert self.command.connector == mock_default_connector + + @patch("dbbackup.management.commands.dbrestore.get_connector") + @patch("builtins.input", return_value="y") + def test_connector_fallback_interactive_yes(self, mock_input, mock_get_connector): + self.command.interactive = True + # Setup metadata with a specific connector that fails to import + metadata = {"engine": settings.DATABASES["default"]["ENGINE"], "connector": "my.broken.Connector"} + self.command.storage.read_file.return_value = Mock(read=lambda: json.dumps(metadata)) + + # Mock default connector + mock_default_connector = Mock() + mock_get_connector.return_value = mock_default_connector + + with patch("dbbackup.management.commands.dbrestore.import_module", side_effect=ImportError): + self.command._restore_backup() + + # Verify input was called + mock_input.assert_called() + # Verify get_connector WAS called + mock_get_connector.assert_called_with("default") + + @patch("dbbackup.management.commands.dbrestore.get_connector") + @patch("builtins.input", return_value="n") + def test_connector_fallback_interactive_no(self, mock_input, mock_get_connector): + self.command.interactive = True + # Setup metadata with a specific connector that fails to import + metadata = {"engine": settings.DATABASES["default"]["ENGINE"], "connector": "my.broken.Connector"} + self.command.storage.read_file.return_value = Mock(read=lambda: json.dumps(metadata)) + + with patch("dbbackup.management.commands.dbrestore.import_module", side_effect=ImportError): + with pytest.raises(SystemExit): + self.command._restore_backup() + + # Verify input was called + mock_input.assert_called() + # Verify get_connector was NOT called + mock_get_connector.assert_not_called() diff --git a/tests/commands/test_mediabackup.py b/tests/commands/test_mediabackup.py index 7a0ef18b..f737ca16 100644 --- a/tests/commands/test_mediabackup.py +++ b/tests/commands/test_mediabackup.py @@ -90,7 +90,7 @@ def test_s3_uri_output_path(self): # Verify write_to_storage was called with the S3 path assert mock_write_to_storage.called - args, kwargs = mock_write_to_storage.call_args + args, _kwargs = mock_write_to_storage.call_args assert args[1] == "s3://mybucket/media/backup.tar" # Verify no files were written to local storage diff --git a/tests/functional/test_commands.py b/tests/functional/test_commands.py index ac8ff22c..14eb2777 100644 --- a/tests/functional/test_commands.py +++ b/tests/functional/test_commands.py @@ -36,8 +36,8 @@ def tearDown(self): def test_database(self): argv = ["", "dbbackup", "--database=default"] execute_from_command_line(argv) - assert len(HANDLED_FILES["written_files"]) == 1 - filename, outputfile = HANDLED_FILES["written_files"][0] + assert len(HANDLED_FILES["written_files"]) == 2 + _filename, outputfile = HANDLED_FILES["written_files"][0] # Test file content outputfile.seek(0) assert outputfile.read() @@ -45,7 +45,7 @@ def test_database(self): def test_encrypt(self): argv = ["", "dbbackup", "--encrypt"] execute_from_command_line(argv) - assert len(HANDLED_FILES["written_files"]) == 1 + assert len(HANDLED_FILES["written_files"]) == 2 filename, outputfile = HANDLED_FILES["written_files"][0] assert filename.endswith(".gpg") # Test file content @@ -56,14 +56,14 @@ def test_encrypt(self): def test_compress(self): argv = ["", "dbbackup", "--compress"] execute_from_command_line(argv) - assert len(HANDLED_FILES["written_files"]) == 1 - filename, outputfile = HANDLED_FILES["written_files"][0] + assert len(HANDLED_FILES["written_files"]) == 2 + filename, _outputfile = HANDLED_FILES["written_files"][0] assert filename.endswith(".gz") def test_compress_and_encrypt(self): argv = ["", "dbbackup", "--compress", "--encrypt"] execute_from_command_line(argv) - assert len(HANDLED_FILES["written_files"]) == 1 + assert len(HANDLED_FILES["written_files"]) == 2 filename, outputfile = HANDLED_FILES["written_files"][0] assert filename.endswith(".gz.gpg") # Test file content @@ -189,7 +189,7 @@ def test_compress(self): argv = ["", "mediabackup", "--compress"] execute_from_command_line(argv) assert len(HANDLED_FILES["written_files"]) == 1 - filename, outputfile = HANDLED_FILES["written_files"][0] + filename, _outputfile = HANDLED_FILES["written_files"][0] assert ".gz" in filename @patch("dbbackup.utils.getpass", return_value=None) diff --git a/tests/test_connectors/test_base.py b/tests/test_connectors/test_base.py index 67725dfb..3b05fe68 100644 --- a/tests/test_connectors/test_base.py +++ b/tests/test_connectors/test_base.py @@ -132,40 +132,40 @@ def test_run_command_stdin(self): def test_run_command_with_env(self): connector = BaseCommandDBConnector() # Empty env - stdout, stderr = connector.run_command("env") + stdout, _stderr = connector.run_command("env") assert stdout.read() # env from self.env connector.env = {"foo": "bar"} - stdout, stderr = connector.run_command("env") + stdout, _stderr = connector.run_command("env") assert b"foo=bar\n" in stdout.read() # method override global env - stdout, stderr = connector.run_command("env", env={"foo": "ham"}) + stdout, _stderr = connector.run_command("env", env={"foo": "ham"}) assert b"foo=ham\n" in stdout.read() # get a var from parent env os.environ["BAR"] = "foo" - stdout, stderr = connector.run_command("env") + stdout, _stderr = connector.run_command("env") assert b"bar=foo\n" in stdout.read() # Conf overrides parendt env connector.env = {"bar": "bar"} - stdout, stderr = connector.run_command("env") + stdout, _stderr = connector.run_command("env") assert b"bar=bar\n" in stdout.read() # method overrides all - stdout, stderr = connector.run_command("env", env={"bar": "ham"}) + stdout, _stderr = connector.run_command("env", env={"bar": "ham"}) assert b"bar=ham\n" in stdout.read() def test_run_command_with_parent_env(self): connector = BaseCommandDBConnector(use_parent_env=False) # Empty env - stdout, stderr = connector.run_command("env") + stdout, _stderr = connector.run_command("env") assert not stdout.read() # env from self.env connector.env = {"foo": "bar"} - stdout, stderr = connector.run_command("env") + stdout, _stderr = connector.run_command("env") assert stdout.read() == b"foo=bar\n" # method override global env - stdout, stderr = connector.run_command("env", env={"foo": "ham"}) + stdout, _stderr = connector.run_command("env", env={"foo": "ham"}) assert stdout.read() == b"foo=ham\n" # no var from parent env os.environ["BAR"] = "foo" - stdout, stderr = connector.run_command("env") + stdout, _stderr = connector.run_command("env") assert b"bar=foo\n" not in stdout.read() diff --git a/tests/test_settings.py b/tests/test_settings.py index 6c134660..51cdea8a 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -1,5 +1,6 @@ """Tests for dbbackup.settings module.""" +import pytest from django.test import TestCase, override_settings @@ -27,9 +28,8 @@ def test_deprecated_dbbackup_storage_raises(self): del sys.modules["dbbackup.settings"] try: - with self.assertRaises(RuntimeError): - with override_settings(DBBACKUP_STORAGE="some.storage.Backend"): - importlib.import_module("dbbackup.settings") + with pytest.raises(RuntimeError), override_settings(DBBACKUP_STORAGE="some.storage.Backend"): + importlib.import_module("dbbackup.settings") finally: sys.modules["dbbackup.settings"] = original_settings @@ -43,8 +43,7 @@ def test_deprecated_dbbackup_storage_options_raises(self): del sys.modules["dbbackup.settings"] try: - with self.assertRaises(RuntimeError): - with override_settings(DBBACKUP_STORAGE_OPTIONS={"option": True}): - importlib.import_module("dbbackup.settings") + with pytest.raises(RuntimeError), override_settings(DBBACKUP_STORAGE_OPTIONS={"option": True}): + importlib.import_module("dbbackup.settings") finally: sys.modules["dbbackup.settings"] = original_settings diff --git a/tests/test_signals.py b/tests/test_signals.py index 8c8997fd..d06448ea 100644 --- a/tests/test_signals.py +++ b/tests/test_signals.py @@ -40,6 +40,7 @@ def test_pre_backup_signal_sent(self): # Mock the connector and its methods mock_connector = Mock() mock_connector.generate_filename.return_value = "test_backup.sql" + mock_connector.connection.settings_dict = {"ENGINE": "django.db.backends.sqlite3"} # Create a proper mock for the file object mock_file = Mock() @@ -84,6 +85,7 @@ def test_post_backup_signal_sent(self): # Mock the connector and its methods mock_connector = Mock() mock_connector.generate_filename.return_value = "test_backup.sql" + mock_connector.connection.settings_dict = {"ENGINE": "django.db.backends.sqlite3"} # Create a proper mock for the file object mock_file = Mock() diff --git a/tests/test_utils.py b/tests/test_utils.py index 942b6f7a..4f9288ac 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -125,7 +125,7 @@ def tearDown(self): def test_func(self, *args): with open(self.path, mode="rb") as fd: - encrypted_file, filename = utils.encrypt_file(inputfile=fd, filename="foo.txt") + encrypted_file, _filename = utils.encrypt_file(inputfile=fd, filename="foo.txt") encrypted_file.seek(0) assert encrypted_file.read() @@ -161,7 +161,7 @@ def tearDown(self): @patch("dbbackup.utils.getpass", return_value=None) def test_unencrypt(self, *args): with open(ENCRYPTED_FILE, "r+b") as inputfile: - uncryptfile, filename = utils.unencrypt_file(inputfile, "foofile.gpg") + uncryptfile, _filename = utils.unencrypt_file(inputfile, "foofile.gpg") uncryptfile.seek(0) assert uncryptfile.read() == b"foo\n" @@ -178,7 +178,7 @@ def tearDown(self): def test_func(self, *args): with open(self.path, mode="rb") as fd: - compressed_file, filename = utils.encrypt_file(inputfile=fd, filename="foo.txt") + _compressed_file, _filename = utils.encrypt_file(inputfile=fd, filename="foo.txt") @unittest.skipIf(not GPG_AVAILABLE, "gpg executable not available") diff --git a/tests/utils.py b/tests/utils.py index 5ea35178..b1ca845e 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -38,7 +38,7 @@ GPG_PRIVATE_PATH = os.path.join(settings.BLOB_DIR, "gpg/secring.gpg") GPG_PUBLIC_PATH = os.path.join(settings.BLOB_DIR, "gpg/pubring.gpg") GPG_FINGERPRINT = "7438 8D4E 02AF C011 4E2F 1E79 F7D1 BBF0 1F63 FDE9" -DEV_NULL = open(os.devnull, "w") # noqa +DEV_NULL = open(os.devnull, "w") class HandledFiles(dict):