Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Don't forget to remove deprecated code on each major release!

### Fixed

- Prevent restoring a backup from a different database connector (e.g. Postgres backup to SQLite) by adding an additional metadata file to all new backups.
- Fixed compressed media backup restoration by using `utils.uncompress_file()` instead of relying on tarfile's built-in gzip decompression. This aligns the behavior with database restore and ensures reliable decompression with all file-like objects.

## [5.0.1] - 2025-11-07
Expand Down
2 changes: 1 addition & 1 deletion dbbackup/checks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
from datetime import datetime

from django.core.checks import Tags, Warning, register
from django.core.checks import Tags, Warning, register # noqa: A004

from dbbackup import settings

Expand Down
25 changes: 25 additions & 0 deletions dbbackup/management/commands/dbbackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
Command for backup database.
"""

import json

from django.core.files.base import ContentFile
from django.core.management.base import CommandError

from dbbackup import settings, utils
Expand Down Expand Up @@ -89,6 +92,25 @@ def _get_database_keys(self):
return [key.strip() for key in self.database.split(",") if key.strip()]
return settings.DATABASES

def _save_metadata(self, filename, local=False):
"""
Save metadata file for the backup.
"""
metadata = {
"engine": self.connector.connection.settings_dict["ENGINE"],
"connector": f"{self.connector.__module__}.{self.connector.__class__.__name__}",
}
metadata_filename = f"{filename}.metadata"
metadata_content = json.dumps(metadata)

if local:
self.logger.info("Writing metadata file to %s", metadata_filename)
with open(metadata_filename, "w") as fd:
fd.write(metadata_content)
else:
metadata_file = ContentFile(metadata_content)
self.write_to_storage(metadata_file, metadata_filename)

def _save_new_backup(self, database):
"""
Save a new backup file.
Expand Down Expand Up @@ -129,11 +151,14 @@ def _save_new_backup(self, database):

if self.path is None:
self.write_to_storage(outputfile, filename)
self._save_metadata(filename)
elif self.path.startswith("s3://"):
# Handle S3 URIs through storage backend
self.write_to_storage(outputfile, self.path)
self._save_metadata(self.path)
else:
self.write_local_file(outputfile, self.path)
self._save_metadata(self.path, local=True)

# Send post_backup signal
post_backup.send(
Expand Down
87 changes: 86 additions & 1 deletion dbbackup/management/commands/dbrestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
"""

import io
import json
import os
import sys
from importlib import import_module

from django.conf import settings
from django.core.management.base import CommandError
Expand Down Expand Up @@ -98,6 +102,59 @@ def _get_database(self, database_name: str):
raise CommandError(msg)
return database_name, settings.DATABASES[database_name]

def _check_metadata(self, filename):
"""
Check if the backup file has metadata and if it matches the current database.
"""
metadata_filename = f"{filename}.metadata"
metadata = None

if self.path:
# Local file
# self.path is the full path to the backup file
metadata_path = f"{self.path}.metadata"
if os.path.exists(metadata_path):
with open(metadata_path) as fd:
metadata = json.load(fd)
else:
# Storage file
try:
# Check if metadata file exists in storage
# list_directory returns a list of filenames
# We can't easily check existence without listing or trying to open
# But read_file might fail if not exists depending on storage
# Let's try to read it
metadata_file = self.storage.read_file(metadata_filename)
except Exception:
self.logger.debug("No metadata file found for '%s'", filename)
return None

# Read and parse metadata
try:
metadata = json.load(metadata_file)
except Exception:
self.logger.warning(
"Malformatted metadata file for '%s'! Dbbackup will ignore this metadata.", filename
)
return None

if not metadata:
return None

backup_engine = metadata.get("engine")
current_engine = settings.DATABASES[self.database_name]["ENGINE"]
backup_connector = metadata.get("connector")

if backup_engine != current_engine and backup_connector != "dbbackup.db.django.DjangoConnector":
msg = (
f"Backup file '{filename}' was created with database engine '{backup_engine}', "
f"but you are restoring to a database using '{current_engine}'. "
"Restoring to a different database engine is not supported."
)
raise CommandError(msg)

return metadata

def _restore_backup(self):
"""Restore the specified database."""
input_filename, input_file = self._get_backup_file(
Expand All @@ -115,6 +172,8 @@ def _restore_backup(self):

self.logger.info(f"Restoring: {input_filename}") # noqa: G004

metadata = self._check_metadata(input_filename)

# Send pre_restore signal
pre_restore.send(
sender=self.__class__,
Expand Down Expand Up @@ -154,7 +213,33 @@ def _restore_backup(self):
self._ask_confirmation()

input_file.seek(0)
self.connector = get_connector(self.database_name)

# Try to use connector from metadata if available
self.connector = None
if metadata and "connector" in metadata:
connector_path = metadata["connector"]
try:
module_name = ".".join(connector_path.split(".")[:-1])
class_name = connector_path.split(".")[-1]
module = import_module(module_name)
connector_class = getattr(module, class_name)
self.connector = connector_class(self.database_name)
self.logger.info("Using connector from metadata: '%s'", connector_path)
except (ImportError, AttributeError):
self.logger.warning(
"Connector '%s' from metadata not found!!! Falling back to the connector in your Django settings.",
connector_path,
)
if self.interactive:
answer = input("Do you want to continue with the connector defined in your Django settings? [Y/n] ")
if not answer.lower().startswith("y"):
self.logger.info("Quitting")
sys.exit(0)

# Fallback to a connector from Django settings and/or our default connector map.
if not self.connector:
self.connector = get_connector(self.database_name)

if self.schemas:
self.connector.schemas = self.schemas
self.connector.drop = not self.no_drop
Expand Down
2 changes: 2 additions & 0 deletions dbbackup/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ def list_backups(
raise TypeError(msg)
# TODO: Make better filter for include only backups
files = [f for f in self.list_directory() if utils.filename_to_datestring(f)]
# Exclude metadata files
files = [f for f in files if not f.endswith(".metadata")]
if encrypted is not None:
files = [f for f in files if (".gpg" in f) == encrypted]
if compressed is not None:
Expand Down
2 changes: 1 addition & 1 deletion dbbackup/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def create_spooled_temporary_file(filepath=None, fileobj=None):
"""
spooled_file = tempfile.SpooledTemporaryFile(max_size=settings.TMP_FILE_MAX_SIZE, dir=settings.TMP_DIR)
if filepath:
fileobj = open(filepath, "r+b") # noqa: SIM115
fileobj = open(filepath, "r+b")
if fileobj is not None:
fileobj.seek(0)
copyfileobj(fileobj, spooled_file, settings.TMP_FILE_READ_SIZE)
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ lint.extend-ignore = [
"S101", # Use of assert detected.
"S105", # Use of hardcoded password
"PLC0415", # Import should be at top-level of a file
"SIM115", # Use a context manager to open files
"BLE001", # Do not catch generic `Exception`
]

[tool.pytest.ini_options]
Expand Down
11 changes: 5 additions & 6 deletions scripts/postgres_live_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# ruff: noqa: TRY301, BLE001, TRY300
# ruff: noqa: TRY301, TRY300
"""PostgreSQL Live Functional Test Script for django-dbbackup

Usage:
Expand Down Expand Up @@ -33,11 +33,10 @@
SYMBOL_SKIP = _SYMS["SKIP"]


class SkipTest(Exception):
class SkippedTestError(Exception):
"""Exception raised when a test should be skipped."""



# Available PostgreSQL connectors
POSTGRES_CONNECTORS = [
"PgDumpConnector",
Expand Down Expand Up @@ -122,7 +121,7 @@ def setup_postgres(self):
"and ensure pg_dump and psql are in your PATH."
)
msg = f"PostgreSQL client tools (pg_dump, psql, etc) are not installed!{install_instructions}"
raise SkipTest(msg)
raise SkippedTestError(msg)

self._log("Setting up test database...")
self.temp_dir = tempfile.mkdtemp(prefix="dbbackup_postgres_")
Expand Down Expand Up @@ -364,7 +363,7 @@ def run_backup_restore_test(self):
self._log(f"{SYMBOL_PASS} {self.connector_name} backup/restore test PASSED")
return 0

except SkipTest as e:
except SkippedTestError as e:
self._log(f"{SYMBOL_SKIP} {self.connector_name} backup/restore test SKIPPED: {e}")
return 77

Expand Down Expand Up @@ -408,7 +407,7 @@ def _run_subprocess(): # local helper kept simple; not used as target
process = _run_subprocess()
if process.exitcode is None:
return 1
if process.exitcode != 0 and process.exitcode != 77 and os.name == "nt": # Fallback path on Windows
if process.exitcode not in {0, 77} and os.name == "nt": # Fallback path on Windows
# Retry in-process so at least we capture a meaningful failure message
test_runner = PostgreSQLLiveTest(connector_name, verbose)
return test_runner.run_backup_restore_test()
Expand Down
4 changes: 3 additions & 1 deletion scripts/sqlite_live_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# ruff: noqa: TRY300, BLE001
# ruff: noqa: TRY300
"""SQLite Live Functional Test Script for django-dbbackup

Usage:
Expand Down Expand Up @@ -152,6 +152,8 @@ def main() -> int: # (complexity acceptable for test harness)
run_management_command(["", "dbbackup", "--noinput"], verbose=verbose)
post_existing = set(os.listdir(backups_dir))
new_files = sorted(post_existing - pre_existing)
# Filter out metadata files
new_files = [f for f in new_files if not f.endswith(".metadata")]
latest_backup = new_files[-1] if new_files else None
log(f"Database backup completed (file: {latest_backup})", verbose=verbose)

Expand Down
27 changes: 23 additions & 4 deletions tests/commands/test_dbbackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@
from dbbackup.db.base import get_connector
from dbbackup.management.commands.dbbackup import Command as DbbackupCommand
from dbbackup.storage import get_storage
from tests.utils import DEV_NULL, TEST_DATABASE, add_public_gpg, clean_gpg_keys
from tests.utils import (
DEV_NULL,
HANDLED_FILES,
TEST_DATABASE,
add_public_gpg,
clean_gpg_keys,
)


@patch("dbbackup.settings.GPG_RECIPIENT", "test@test")
@patch("sys.stdout", DEV_NULL)
class DbbackupCommandSaveNewBackupTest(TestCase):
def setUp(self):
HANDLED_FILES.clean()
self.command = DbbackupCommand()
self.command.servername = "foo-server"
self.command.encrypt = False
Expand All @@ -42,22 +49,30 @@ def test_func(self):
def test_compress(self):
self.command.compress = True
self.command._save_new_backup(TEST_DATABASE)
assert len(HANDLED_FILES["written_files"]) == 2
assert HANDLED_FILES["written_files"][0][0].endswith(".gz")
assert HANDLED_FILES["written_files"][1][0].endswith(".gz.metadata")

def test_encrypt(self):
if not GPG_AVAILABLE:
self.skipTest("gpg executable not available")
add_public_gpg()
self.command.encrypt = True
self.command._save_new_backup(TEST_DATABASE)
assert len(HANDLED_FILES["written_files"]) == 2
assert HANDLED_FILES["written_files"][0][0].endswith(".gpg")
assert HANDLED_FILES["written_files"][1][0].endswith(".gpg.metadata")

def test_path(self):
local_tmp = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "tmp")
os.makedirs(local_tmp, exist_ok=True)
self.command.path = os.path.join(local_tmp, "foo.bak")
self.command._save_new_backup(TEST_DATABASE)
assert os.path.exists(self.command.path)
assert os.path.exists(f"{self.command.path}.metadata")
# tearDown
os.remove(self.command.path)
os.remove(f"{self.command.path}.metadata")

def test_schema(self):
self.command.schemas = ["public"]
Expand All @@ -72,7 +87,8 @@ def test_path_s3_uri(self, mock_write_to_storage):
self.command._save_new_backup(TEST_DATABASE)
assert mock_write_to_storage.called
# Verify the S3 path was passed correctly to write_to_storage
args, kwargs = mock_write_to_storage.call_args
# The first call should be the backup file
args, _kwargs = mock_write_to_storage.call_args_list[0]
assert args[1] == "s3://mybucket/backups/db.bak"

@patch("dbbackup.management.commands._base.BaseDbBackupCommand.write_to_storage")
Expand All @@ -91,7 +107,8 @@ def test_path_s3_uri_variants(self, mock_write_to_storage):
self.command.path = s3_uri
self.command._save_new_backup(TEST_DATABASE)
assert mock_write_to_storage.called
args, kwargs = mock_write_to_storage.call_args
# The first call should be the backup file
args, _kwargs = mock_write_to_storage.call_args_list[0]
assert args[1] == s3_uri

def test_path_local_file_still_works(self):
Expand All @@ -106,9 +123,11 @@ def test_path_local_file_still_works(self):

# Verify the file was created (meaning write_local_file was used)
assert os.path.exists(local_path)
assert os.path.exists(f"{local_path}.metadata")

# Cleanup
os.remove(local_path)
os.remove(f"{local_path}.metadata")

# Test that paths containing 's3' but not starting with 's3://' are treated as local
with patch("dbbackup.management.commands._base.BaseDbBackupCommand.write_local_file") as mock_write_local_file:
Expand All @@ -129,7 +148,7 @@ def test_path_local_file_still_works(self):
self.command._save_new_backup(TEST_DATABASE)
# Verify write_local_file was called
assert mock_write_local_file.called
args, kwargs = mock_write_local_file.call_args
args, _kwargs = mock_write_local_file.call_args
assert args[1] == local_path

@patch("dbbackup.settings.DATABASES", ["db-from-settings"])
Expand Down
Loading