Skip to content

Commit b825be9

Browse files
committed
feat(testcontainers): add task manager db backup and restore
Signed-off-by: Fatih Acar <[email protected]>
1 parent 2ad70f9 commit b825be9

File tree

4 files changed

+227
-0
lines changed

4 files changed

+227
-0
lines changed

python_testcontainers/infrahub_testcontainers/container.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,3 +517,111 @@ def database_restore_backup(self, backup_file: Path) -> None: # noqa: PLR0915
517517
)
518518
self.start()
519519
print("Database restored successfully")
520+
521+
def task_manager_create_backup(self, backup_name: str = "prefect.dump", dest_dir: Path | None = None) -> Path:
522+
"""Create a backup of the task manager PostgreSQL database using ``pg_dump``.
523+
524+
Args:
525+
backup_name: Name of the archive file to create. Defaults to ``prefect.dump``.
526+
dest_dir: Optional host directory where the backup should be copied after it is
527+
produced. When omitted, the backup remains in ``external_backup_dir``.
528+
529+
Returns:
530+
Path to the backup archive on the host filesystem.
531+
532+
Raises:
533+
FileNotFoundError: If the pg_dump command completes but no archive is produced.
534+
"""
535+
536+
service_name = "task-manager-db"
537+
538+
try:
539+
self.get_container(service_name=service_name)
540+
except ContainerIsNotRunning:
541+
self.start_container(service_name=service_name)
542+
543+
self.external_backup_dir.mkdir(parents=True, exist_ok=True)
544+
545+
internal_backup_path = self.internal_backup_dir / backup_name
546+
dump_command = [
547+
"pg_dump",
548+
"--format=custom",
549+
"--blobs",
550+
"--no-owner",
551+
"--no-privileges",
552+
"--dbname=postgresql://postgres:postgres@localhost:5432/prefect",
553+
f"--file={internal_backup_path}",
554+
]
555+
self.exec_in_container(command=dump_command, service_name=service_name)
556+
557+
source_path = self.external_backup_dir / backup_name
558+
if not source_path.exists():
559+
raise FileNotFoundError(f"Backup file {source_path} was not created")
560+
561+
final_path = source_path
562+
if dest_dir:
563+
dest_dir.mkdir(parents=True, exist_ok=True)
564+
if dest_dir.resolve() != self.external_backup_dir.resolve():
565+
final_path = dest_dir / backup_name
566+
shutil.copy(source_path, final_path)
567+
568+
return final_path
569+
570+
def task_manager_restore_backup(self, backup_file: Path) -> None:
571+
"""Restore the task manager PostgreSQL database from a ``pg_restore`` archive.
572+
573+
Args:
574+
backup_file: Path to the backup archive on the host filesystem.
575+
576+
Raises:
577+
FileNotFoundError: If the provided backup archive does not exist.
578+
"""
579+
580+
if not backup_file.exists():
581+
raise FileNotFoundError(f"Backup file {backup_file} does not exist")
582+
583+
service_name = "task-manager-db"
584+
585+
try:
586+
self.get_container(service_name=service_name)
587+
except ContainerIsNotRunning:
588+
self.start_container(service_name=service_name)
589+
590+
self.external_backup_dir.mkdir(parents=True, exist_ok=True)
591+
target_path = self.external_backup_dir / backup_file.name
592+
shutil.copy(backup_file, target_path)
593+
594+
admin_dsn = "postgresql://postgres:postgres@localhost:5432/postgres"
595+
prefect_dsn = "postgresql://postgres:postgres@localhost:5432/prefect"
596+
internal_backup_path = self.internal_backup_dir / backup_file.name
597+
598+
terminate_sessions_command = [
599+
"psql",
600+
f"--dbname={admin_dsn}",
601+
"--command",
602+
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = 'prefect';",
603+
]
604+
drop_database_command = [
605+
"psql",
606+
f"--dbname={admin_dsn}",
607+
"--command",
608+
"DROP DATABASE IF EXISTS prefect WITH (FORCE);",
609+
]
610+
create_database_command = [
611+
"psql",
612+
f"--dbname={admin_dsn}",
613+
"--command",
614+
"CREATE DATABASE prefect OWNER postgres;",
615+
]
616+
restore_command = [
617+
"pg_restore",
618+
"--no-owner",
619+
"--role=postgres",
620+
f"--dbname={prefect_dsn}",
621+
str(internal_backup_path),
622+
]
623+
624+
self.exec_in_container(command=terminate_sessions_command, service_name=service_name)
625+
self.exec_in_container(command=drop_database_command, service_name=service_name)
626+
self.exec_in_container(command=create_database_command, service_name=service_name)
627+
self.exec_in_container(command=restore_command, service_name=service_name)

python_testcontainers/infrahub_testcontainers/docker-compose-cluster.test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ services:
248248
- POSTGRES_DB=prefect
249249
volumes:
250250
- workflow_db:/var/lib/postgresql/data
251+
- "./${INFRAHUB_TESTING_LOCAL_DB_BACKUP_DIRECTORY}:${INFRAHUB_TESTING_INTERNAL_DB_BACKUP_DIRECTORY}"
251252
healthcheck:
252253
test: ["CMD-SHELL", "pg_isready"]
253254
interval: 10s

python_testcontainers/infrahub_testcontainers/docker-compose.test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ services:
159159
- POSTGRES_DB=prefect
160160
volumes:
161161
- workflow_db:/var/lib/postgresql/data
162+
- "./${INFRAHUB_TESTING_LOCAL_DB_BACKUP_DIRECTORY}:${INFRAHUB_TESTING_INTERNAL_DB_BACKUP_DIRECTORY}"
162163
healthcheck:
163164
test: ["CMD-SHELL", "pg_isready"]
164165
interval: 10s
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
from __future__ import annotations
2+
3+
from pathlib import Path
4+
from typing import Callable
5+
6+
import pytest
7+
from infrahub_testcontainers.container import InfrahubDockerCompose
8+
from testcontainers.core.exceptions import ContainerIsNotRunning
9+
10+
11+
@pytest.fixture(name="compose_factory")
12+
def fixture_compose_factory(tmp_path: Path) -> Callable[[], InfrahubDockerCompose]:
13+
"""Provide a factory that yields an isolated ``InfrahubDockerCompose`` test double."""
14+
15+
def _factory() -> InfrahubDockerCompose:
16+
instance = InfrahubDockerCompose.__new__(InfrahubDockerCompose)
17+
instance.context = tmp_path
18+
instance.env_vars = {
19+
"INFRAHUB_TESTING_LOCAL_DB_BACKUP_DIRECTORY": "backups",
20+
"INFRAHUB_TESTING_INTERNAL_DB_BACKUP_DIRECTORY": "/backups",
21+
"NEO4J_DOCKER_IMAGE": "neo4j:2025.03.0-enterprise",
22+
}
23+
instance.services = []
24+
instance.pull = False
25+
instance.wait = False
26+
instance.compose_file_name = None
27+
instance.docker_command_path = None
28+
instance.project_name = None
29+
instance.env_file = None
30+
31+
def _noop_service(service_name: str) -> None:
32+
"""Consume a service name without performing any action."""
33+
34+
_ = service_name
35+
36+
instance.get_container = _noop_service # type: ignore[assignment]
37+
instance.start_container = _noop_service # type: ignore[assignment]
38+
instance.exec_calls: list[list[str]] = []
39+
40+
def _exec(command: list[str], service_name: str) -> tuple[str, str, int]:
41+
_ = service_name
42+
instance.exec_calls.append(command)
43+
if command and command[0] == "pg_dump":
44+
file_arg = next(arg for arg in command if arg.startswith("--file="))
45+
container_path = Path(file_arg.split("=", 1)[1])
46+
host_path = instance.external_backup_dir / container_path.name
47+
host_path.parent.mkdir(parents=True, exist_ok=True)
48+
host_path.write_bytes(b"backup-bytes")
49+
return "", "", 0
50+
51+
instance.exec_in_container = _exec # type: ignore[assignment]
52+
return instance
53+
54+
return _factory
55+
56+
57+
def test_task_manager_db_create_backup_creates_archive(compose_factory: Callable[[], InfrahubDockerCompose]) -> None:
58+
compose = compose_factory()
59+
60+
result = compose.task_manager_db_create_backup()
61+
62+
assert result.name == "prefect.dump"
63+
assert result.exists()
64+
assert compose.exec_calls[0][0] == "pg_dump"
65+
66+
67+
def test_task_manager_db_create_backup_respects_destination(
68+
compose_factory: Callable[[], InfrahubDockerCompose], tmp_path: Path
69+
) -> None:
70+
compose = compose_factory()
71+
destination = tmp_path / "exports"
72+
73+
result = compose.task_manager_db_create_backup(dest_dir=destination)
74+
75+
assert result.parent == destination
76+
assert result.exists()
77+
assert (compose.external_backup_dir / "prefect.dump").exists()
78+
79+
80+
def test_task_manager_db_restore_backup_runs_expected_commands(
81+
compose_factory: Callable[[], InfrahubDockerCompose], tmp_path: Path
82+
) -> None:
83+
compose = compose_factory()
84+
85+
def _raise(service_name: str) -> None:
86+
_ = service_name
87+
raise ContainerIsNotRunning("task-manager-db")
88+
89+
compose.get_container = _raise # type: ignore[assignment]
90+
started_services: list[str] = []
91+
92+
def _record_start(service_name: str) -> None:
93+
started_services.append(service_name)
94+
95+
compose.start_container = _record_start # type: ignore[assignment]
96+
recorded_commands: list[list[str]] = []
97+
98+
def _exec(command: list[str], service_name: str) -> tuple[str, str, int]:
99+
_ = service_name
100+
recorded_commands.append(command)
101+
return "", "", 0
102+
103+
compose.exec_in_container = _exec # type: ignore[assignment]
104+
105+
backup_file = tmp_path / "prefect.dump"
106+
backup_file.write_bytes(b"backup")
107+
108+
compose.task_manager_db_restore_backup(backup_file)
109+
110+
assert started_services == ["task-manager-db"]
111+
assert (compose.external_backup_dir / backup_file.name).exists()
112+
assert [command[0] for command in recorded_commands] == [
113+
"psql",
114+
"psql",
115+
"psql",
116+
"pg_restore",
117+
]

0 commit comments

Comments
 (0)