Skip to content

Commit 8c47bae

Browse files
author
Lucas Gameiro
authored
[DPE-4462] Add Incremental+Differential backup support (#479)
* add diff+incr backups * fix integration test * remove await from get_unit_address
1 parent fabce25 commit 8c47bae

File tree

7 files changed

+197
-32
lines changed

7 files changed

+197
-32
lines changed

actions.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,14 @@
33

44
create-backup:
55
description: Creates a backup to s3 storage.
6+
params:
7+
type:
8+
type: string
9+
description: The backup type, the default value is 'full'.
10+
Full backup is a full copy of all data.
11+
Differential backup is a copy only of changed data since the last full backup.
12+
Incremental backup is a copy only of changed data since the last backup (any type).
13+
Possible values - full, differential, incremental.
614
get-primary:
715
description: Get the unit which is the primary/leader in the replication.
816
get-password:

src/backups.py

Lines changed: 77 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
from constants import (
3131
BACKUP_ID_FORMAT,
32+
BACKUP_TYPE_OVERRIDES,
3233
BACKUP_USER,
3334
PATRONI_CONF_PATH,
3435
PGBACKREST_BACKUP_ID_FORMAT,
@@ -331,22 +332,20 @@ def _generate_backup_list_output(self) -> str:
331332

332333
backups = json.loads(output)[0]["backup"]
333334
for backup in backups:
334-
backup_id = datetime.strftime(
335-
datetime.strptime(backup["label"][:-1], PGBACKREST_BACKUP_ID_FORMAT),
336-
BACKUP_ID_FORMAT,
337-
)
335+
backup_id, backup_type = self._parse_backup_id(backup["label"])
338336
error = backup["error"]
339337
backup_status = "finished"
340338
if error:
341339
backup_status = f"failed: {error}"
342-
backup_list.append((backup_id, "physical", backup_status))
340+
backup_list.append((backup_id, backup_type, backup_status))
343341
return self._format_backup_list(backup_list)
344342

345-
def _list_backups(self, show_failed: bool) -> OrderedDict[str, str]:
343+
def _list_backups(self, show_failed: bool, parse=True) -> OrderedDict[str, str]:
346344
"""Retrieve the list of backups.
347345
348346
Args:
349347
show_failed: whether to also return the failed backups.
348+
parse: whether to convert backup labels to their IDs or not.
350349
351350
Returns:
352351
a dict of previously created backups (id + stanza name) or an empty list
@@ -371,16 +370,35 @@ def _list_backups(self, show_failed: bool) -> OrderedDict[str, str]:
371370
stanza_name = repository_info["name"]
372371
return OrderedDict[str, str](
373372
(
374-
datetime.strftime(
375-
datetime.strptime(backup["label"][:-1], PGBACKREST_BACKUP_ID_FORMAT),
376-
BACKUP_ID_FORMAT,
377-
),
373+
self._parse_backup_id(backup["label"])[0] if parse else backup["label"],
378374
stanza_name,
379375
)
380376
for backup in backups
381377
if show_failed or not backup["error"]
382378
)
383379

380+
def _parse_backup_id(self, label) -> Tuple[str, str]:
381+
"""Parse backup ID as a timestamp."""
382+
if label[-1] == "F":
383+
timestamp = label
384+
backup_type = "full"
385+
elif label[-1] == "D":
386+
timestamp = label.split("_")[1]
387+
backup_type = "differential"
388+
elif label[-1] == "I":
389+
timestamp = label.split("_")[1]
390+
backup_type = "incremental"
391+
else:
392+
raise ValueError("Unknown label format for backup ID: %s", label)
393+
394+
return (
395+
datetime.strftime(
396+
datetime.strptime(timestamp[:-1], PGBACKREST_BACKUP_ID_FORMAT),
397+
BACKUP_ID_FORMAT,
398+
),
399+
backup_type,
400+
)
401+
384402
def _initialise_stanza(self) -> None:
385403
"""Initialize the stanza.
386404
@@ -557,8 +575,16 @@ def _on_s3_credential_gone(self, _) -> None:
557575
if self.charm.is_blocked and self.charm.unit.status.message in S3_BLOCK_MESSAGES:
558576
self.charm.unit.status = ActiveStatus()
559577

560-
def _on_create_backup_action(self, event) -> None:
578+
def _on_create_backup_action(self, event) -> None: # noqa: C901
561579
"""Request that pgBackRest creates a backup."""
580+
backup_type = event.params.get("type", "full")
581+
if backup_type not in BACKUP_TYPE_OVERRIDES:
582+
error_message = f"Invalid backup type: {backup_type}. Possible values: {', '.join(BACKUP_TYPE_OVERRIDES.keys())}."
583+
logger.error(f"Backup failed: {error_message}")
584+
event.fail(error_message)
585+
return
586+
587+
logger.info(f"A {backup_type} backup has been requested on unit")
562588
can_unit_perform_backup, validation_message = self._can_unit_perform_backup()
563589
if not can_unit_perform_backup:
564590
logger.error(f"Backup failed: {validation_message}")
@@ -600,7 +626,7 @@ def _on_create_backup_action(self, event) -> None:
600626
# (reference: https://github.com/pgbackrest/pgbackrest/issues/2007)
601627
self.charm.update_config(is_creating_backup=True)
602628

603-
self._run_backup(event, s3_parameters, datetime_backup_requested)
629+
self._run_backup(event, s3_parameters, datetime_backup_requested, backup_type)
604630

605631
if not self.charm.is_primary:
606632
# Remove the rule that marks the cluster as in a creating backup state
@@ -611,14 +637,18 @@ def _on_create_backup_action(self, event) -> None:
611637
self.charm.unit.status = ActiveStatus()
612638

613639
def _run_backup(
614-
self, event: ActionEvent, s3_parameters: Dict, datetime_backup_requested: str
640+
self,
641+
event: ActionEvent,
642+
s3_parameters: Dict,
643+
datetime_backup_requested: str,
644+
backup_type: str,
615645
) -> None:
616646
command = [
617647
PGBACKREST_EXECUTABLE,
618648
PGBACKREST_CONFIGURATION_FILE,
619649
f"--stanza={self.stanza_name}",
620650
"--log-level-console=debug",
621-
"--type=full",
651+
f"--type={BACKUP_TYPE_OVERRIDES[backup_type]}",
622652
"backup",
623653
]
624654
if self.charm.is_primary:
@@ -638,7 +668,7 @@ def _run_backup(
638668
else:
639669
# Generate a backup id from the current date and time if the backup failed before
640670
# generating the backup label (our backup id).
641-
backup_id = datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SF")
671+
backup_id = self._generate_fake_backup_id(backup_type)
642672

643673
# Upload the logs to S3.
644674
logs = f"""Stdout:
@@ -750,7 +780,7 @@ def _on_restore_action(self, event):
750780
# Mark the cluster as in a restoring backup state and update the Patroni configuration.
751781
logger.info("Configuring Patroni to restore the backup")
752782
self.charm.app_peer_data.update({
753-
"restoring-backup": f"{datetime.strftime(datetime.strptime(backup_id, BACKUP_ID_FORMAT), PGBACKREST_BACKUP_ID_FORMAT)}F",
783+
"restoring-backup": self._fetch_backup_from_id(backup_id),
754784
"restore-stanza": backups[backup_id],
755785
})
756786
self.charm.update_config()
@@ -780,6 +810,37 @@ def _on_restore_action(self, event):
780810

781811
event.set_results({"restore-status": "restore started"})
782812

813+
def _generate_fake_backup_id(self, backup_type: str) -> str:
814+
"""Creates a backup id for failed backup operations (to store log file)."""
815+
if backup_type == "full":
816+
return datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SF")
817+
if backup_type == "differential":
818+
backups = self._list_backups(show_failed=False, parse=False).keys()
819+
last_full_backup = None
820+
for label in backups[::-1]:
821+
if label.endswith("F"):
822+
last_full_backup = label
823+
break
824+
825+
if last_full_backup is None:
826+
raise TypeError("Differential backup requested but no previous full backup")
827+
return f'{last_full_backup}_{datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SD")}'
828+
if backup_type == "incremental":
829+
backups = self._list_backups(show_failed=False, parse=False).keys()
830+
if not backups:
831+
raise TypeError("Incremental backup requested but no previous successful backup")
832+
return f'{backups[-1]}_{datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SI")}'
833+
834+
def _fetch_backup_from_id(self, backup_id: str) -> str:
835+
"""Fetches backup's pgbackrest label from backup id."""
836+
timestamp = f'{datetime.strftime(datetime.strptime(backup_id, "%Y-%m-%dT%H:%M:%SZ"), "%Y%m%d-%H%M%S")}'
837+
backups = self._list_backups(show_failed=False, parse=False).keys()
838+
for label in backups:
839+
if timestamp in label:
840+
return label
841+
842+
return None
843+
783844
def _pre_restore_checks(self, event: ActionEvent) -> bool:
784845
"""Run some checks before starting the restore.
785846

src/charm.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ def __init__(self, *args):
145145
self.framework.observe(self.on.get_primary_action, self._on_get_primary)
146146
self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed)
147147
self.framework.observe(self.on.secret_changed, self._on_peer_relation_changed)
148-
self.framework.observe(self.on.secret_remove, self._on_peer_relation_changed)
149148
self.framework.observe(self.on[PEER].relation_departed, self._on_peer_relation_departed)
150149
self.framework.observe(self.on.pgdata_storage_detaching, self._on_pgdata_storage_detaching)
151150
self.framework.observe(self.on.start, self._on_start)

src/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,4 @@
7272
ENDPOINT_SIMULTANEOUSLY_BLOCKING_MESSAGE = (
7373
"Please choose one endpoint to use. No need to relate all of them simultaneously!"
7474
)
75+
BACKUP_TYPE_OVERRIDES = {"full": "full", "differential": "diff", "incremental": "incr"}

templates/pgbackrest.conf.j2

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ lock-path=/tmp
44
log-path={{ log_path }}
55
repo1-retention-full-type=time
66
repo1-retention-full={{ retention_full }}
7+
repo1-retention-history=365
78
repo1-type=s3
89
repo1-path={{ path }}
910
repo1-s3-region={{ region }}
@@ -12,6 +13,8 @@ repo1-s3-bucket={{ bucket }}
1213
repo1-s3-uri-style={{ s3_uri_style }}
1314
repo1-s3-key={{ access_key }}
1415
repo1-s3-key-secret={{ secret_key }}
16+
repo1-block=y
17+
repo1-bundle=y
1518
start-fast=y
1619
{%- if enable_tls %}
1720
tls-server-address=*

tests/integration/test_backups.py

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], charm
165165
action = await ops_test.model.units.get(replica).run_action("list-backups")
166166
await action.wait()
167167
backups = action.results.get("backups")
168-
assert backups, "backups not outputted"
168+
# 2 lines for header output, 1 backup line ==> 3 total lines
169+
assert len(backups.split("\n")) == 3, "full backup is not outputted"
169170
await ops_test.model.wait_for_idle(status="active", timeout=1000)
170171

171172
# Write some data.
@@ -175,6 +176,32 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], charm
175176
connection.cursor().execute("CREATE TABLE backup_table_2 (test_collumn INT );")
176177
connection.close()
177178

179+
# Run the "create backup" action.
180+
logger.info("creating a backup")
181+
action = await ops_test.model.units.get(replica).run_action(
182+
"create-backup", **{"type": "differential"}
183+
)
184+
await action.wait()
185+
backup_status = action.results.get("backup-status")
186+
assert backup_status, "backup hasn't succeeded"
187+
async with ops_test.fast_forward():
188+
await ops_test.model.wait_for_idle(status="active", timeout=1000)
189+
190+
# Run the "list backups" action.
191+
logger.info("listing the available backups")
192+
action = await ops_test.model.units.get(replica).run_action("list-backups")
193+
await action.wait()
194+
backups = action.results.get("backups")
195+
# 2 lines for header output, 2 backup lines ==> 4 total lines
196+
assert len(backups.split("\n")) == 4, "differential backup is not outputted"
197+
await ops_test.model.wait_for_idle(status="active", timeout=1000)
198+
199+
# Write some data.
200+
logger.info("creating a second table in the database")
201+
with db_connect(host=address, password=password) as connection:
202+
connection.autocommit = True
203+
connection.cursor().execute("CREATE TABLE backup_table_3 (test_collumn INT );")
204+
connection.close()
178205
# Scale down to be able to restore.
179206
async with ops_test.fast_forward():
180207
await ops_test.model.destroy_unit(replica)
@@ -186,14 +213,61 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], charm
186213
remaining_unit = unit
187214
break
188215

189-
# Run the "restore backup" action.
216+
# Run the "restore backup" action for differential backup.
190217
for attempt in Retrying(
191218
stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30)
192219
):
193220
with attempt:
194221
logger.info("restoring the backup")
195-
most_recent_backup = backups.split("\n")[-1]
196-
backup_id = most_recent_backup.split()[0]
222+
last_diff_backup = backups.split("\n")[-1]
223+
backup_id = last_diff_backup.split()[0]
224+
action = await remaining_unit.run_action("restore", **{"backup-id": backup_id})
225+
await action.wait()
226+
restore_status = action.results.get("restore-status")
227+
assert restore_status, "restore hasn't succeeded"
228+
229+
# Wait for the restore to complete.
230+
async with ops_test.fast_forward():
231+
await ops_test.model.wait_for_idle(status="active", timeout=1000)
232+
233+
# Check that the backup was correctly restored by having only the first created table.
234+
logger.info("checking that the backup was correctly restored")
235+
primary = await get_primary(ops_test, remaining_unit.name)
236+
address = get_unit_address(ops_test, primary)
237+
with db_connect(
238+
host=address, password=password
239+
) as connection, connection.cursor() as cursor:
240+
cursor.execute(
241+
"SELECT EXISTS (SELECT FROM information_schema.tables"
242+
" WHERE table_schema = 'public' AND table_name = 'backup_table_1');"
243+
)
244+
assert cursor.fetchone()[
245+
0
246+
], "backup wasn't correctly restored: table 'backup_table_1' doesn't exist"
247+
cursor.execute(
248+
"SELECT EXISTS (SELECT FROM information_schema.tables"
249+
" WHERE table_schema = 'public' AND table_name = 'backup_table_2');"
250+
)
251+
assert cursor.fetchone()[
252+
0
253+
], "backup wasn't correctly restored: table 'backup_table_2' doesn't exist"
254+
cursor.execute(
255+
"SELECT EXISTS (SELECT FROM information_schema.tables"
256+
" WHERE table_schema = 'public' AND table_name = 'backup_table_3');"
257+
)
258+
assert not cursor.fetchone()[
259+
0
260+
], "backup wasn't correctly restored: table 'backup_table_3' exists"
261+
connection.close()
262+
263+
# Run the "restore backup" action for full backup.
264+
for attempt in Retrying(
265+
stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30)
266+
):
267+
with attempt:
268+
logger.info("restoring the backup")
269+
last_full_backup = backups.split("\n")[-2]
270+
backup_id = last_full_backup.split()[0]
197271
action = await remaining_unit.run_action("restore", **{"backup-id": backup_id})
198272
await action.wait()
199273
restore_status = action.results.get("restore-status")
@@ -224,6 +298,13 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], charm
224298
assert not cursor.fetchone()[
225299
0
226300
], "backup wasn't correctly restored: table 'backup_table_2' exists"
301+
cursor.execute(
302+
"SELECT EXISTS (SELECT FROM information_schema.tables"
303+
" WHERE table_schema = 'public' AND table_name = 'backup_table_3');"
304+
)
305+
assert not cursor.fetchone()[
306+
0
307+
], "backup wasn't correctly restored: table 'backup_table_3' exists"
227308
connection.close()
228309

229310
# Run the following steps only in one cloud (it's enough for those checks).

0 commit comments

Comments
 (0)