Skip to content

Commit a8f523c

Browse files
feat(dbsync): Introduce DBSyncSnapshotService and snapshot freshness test
Introduces new components to cleanly implement the Cardano DB-Sync snapshot freshness check using the S3 REST API. * **DBSyncSnapshotService:** A new service class responsible for interacting with the IOHK S3 repository, encapsulating API calls and robust XML parsing logic (including handling S3 namespaces and date formats). * **Clarity:** Uses `dataclass` for snapshot metadata and standard Pytest best practices.
1 parent d2ffdf3 commit a8f523c

File tree

2 files changed

+205
-0
lines changed

2 files changed

+205
-0
lines changed

cardano_node_tests/tests/test_dbsync.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import logging
44
import time
55
import typing as tp
6+
from datetime import datetime
7+
from datetime import timedelta
8+
from datetime import timezone
69

710
import allure
811
import pytest
@@ -18,6 +21,8 @@
1821
from cardano_node_tests.utils import dbsync_utils
1922
from cardano_node_tests.utils import helpers
2023
from cardano_node_tests.utils import logfiles
24+
from cardano_node_tests.utils.dbsync_snapshot_service import DBSyncSnapshotService
25+
from cardano_node_tests.utils.dbsync_snapshot_service import SnapshotFile
2126
from cardano_node_tests.utils.versions import VERSIONS
2227

2328
LOGGER = logging.getLogger(__name__)
@@ -381,3 +386,49 @@ def test_epoch(self, cluster: clusterlib.ClusterLib):
381386
assert blocks_data_tx_count == epoch_data_tx_count, (
382387
f"Transactions count don't match between tables for epoch {epoch}"
383388
)
389+
390+
391+
class TestDBSyncSnapshot:
392+
"""Tests for db-sync snapshot availability and freshness."""
393+
394+
@pytest.fixture()
395+
def db_sync_snapshots(
396+
self,
397+
) -> DBSyncSnapshotService | None:
398+
"""Create DBSyncSnapshotService client."""
399+
snapshot_service = DBSyncSnapshotService()
400+
if snapshot_service is None:
401+
pytest.skip("DBSyncSnapshotService is not available.")
402+
return snapshot_service
403+
404+
@allure.link(helpers.get_vcs_link())
405+
@pytest.mark.smoke
406+
def test_latest_snapshot_freshness(self, db_sync_snapshots: DBSyncSnapshotService):
407+
"""
408+
Check that the latest db-sync snapshot is not older than 5 days.
409+
410+
This test uses the S3 REST API to query the Cardano mainnet snapshot repository
411+
and verifies that the most recent snapshot is fresh.
412+
"""
413+
# 1. Find latest version
414+
latest_version = db_sync_snapshots.get_latest_version()
415+
LOGGER.info(f"Latest db-sync version: {latest_version}")
416+
417+
# 2. Get latest snapshot for that version
418+
latest_snapshot: SnapshotFile = db_sync_snapshots.get_latest_snapshot(latest_version)
419+
420+
LOGGER.info(f"Latest snapshot: {latest_snapshot.name}")
421+
LOGGER.info(f"Snapshot date: {latest_snapshot.last_modified.isoformat()}")
422+
LOGGER.info(f"Snapshot size: {latest_snapshot.size_gb:.2f} GB")
423+
424+
# 3. Perform freshness check
425+
five_days_ago = datetime.now(timezone.utc) - timedelta(days=5)
426+
427+
assert latest_snapshot.last_modified >= five_days_ago, (
428+
f"The latest snapshot is too old. "
429+
f"Age: {(datetime.now(timezone.utc) - latest_snapshot.last_modified).days} days. "
430+
f"Snapshot date: {latest_snapshot.last_modified.strftime('%Y-%m-%d %H:%M:%S UTC')}, "
431+
f"Limit: 5 days ago ({five_days_ago.strftime('%Y-%m-%d %H:%M:%S UTC')})."
432+
)
433+
434+
LOGGER.info("Success: The latest snapshot is recent (within 5-day limit).")
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
import logging
2+
import re
3+
import xml.etree.ElementTree as ET
4+
from dataclasses import dataclass
5+
from dataclasses import field
6+
from datetime import datetime
7+
from datetime import timezone
8+
from typing import List
9+
from typing import Tuple
10+
11+
import requests
12+
13+
logger = logging.getLogger(__name__)
14+
15+
# Full S3 namespace URL string
16+
S3_NS_URL = "http://s3.amazonaws.com/doc/2006-03-01/"
17+
18+
19+
@dataclass
20+
class SnapshotFile:
21+
"""Dataclass to hold parsed snapshot file information."""
22+
23+
key: str
24+
name: str
25+
last_modified: datetime # Timezone-aware datetime object
26+
size: int
27+
size_gb: float = field(init=False)
28+
29+
def __post_init__(self) -> None:
30+
self.size_gb = self.size / (1024**3)
31+
32+
33+
class DBSyncSnapshotService:
34+
"""Service class to interact with DB-Sync S3 Snapshots repository."""
35+
36+
BUCKET_URL: str = "https://update-cardano-mainnet.iohk.io"
37+
ROOT_PREFIX: str = "cardano-db-sync/"
38+
39+
def _get_s3_objects(self, prefix: str = "", delimiter: str = "") -> bytes:
40+
"""Fetch XML content from the S3 bucket using REST API."""
41+
params = {"list-type": "2", "prefix": prefix, "delimiter": delimiter}
42+
43+
response = requests.get(self.BUCKET_URL, params=params)
44+
response.raise_for_status()
45+
return response.content
46+
47+
def _parse_s3_xml(self, xml_content: bytes) -> Tuple[List[str], List[SnapshotFile]]:
48+
"""Parse S3 XML response using exact namespace search paths with None checks."""
49+
root = ET.fromstring(xml_content)
50+
ns_tag = f"{{{S3_NS_URL}}}"
51+
52+
# 1. Extract folders (CommonPrefixes)
53+
folders = []
54+
for prefix in root.findall(f".//{ns_tag}CommonPrefixes"):
55+
prefix_tag = prefix.find(f"{ns_tag}Prefix")
56+
if prefix_tag is not None and prefix_tag.text:
57+
folder_path = prefix_tag.text
58+
if folder_path.endswith("/"):
59+
folder_name = folder_path.strip("/").split("/")[-1]
60+
folders.append(folder_name)
61+
62+
# 2. Extract files (Contents)
63+
files = []
64+
for content in root.findall(f".//{ns_tag}Contents"):
65+
key_tag = content.find(f"{ns_tag}Key")
66+
modified_tag = content.find(f"{ns_tag}LastModified")
67+
size_tag = content.find(f"{ns_tag}Size")
68+
69+
if not all(
70+
[
71+
key_tag is not None and key_tag.text,
72+
modified_tag is not None and modified_tag.text,
73+
size_tag is not None and size_tag.text,
74+
]
75+
):
76+
logger.warning(
77+
"Skipping malformed S3 object entry: Missing Key, LastModified, or Size."
78+
)
79+
continue # Skip this entry if critical data is missing
80+
81+
# Use explicit variables to store the text content only if it exists
82+
key_text = key_tag.text if key_tag is not None else None
83+
modified_text = modified_tag.text if modified_tag is not None else None
84+
size_text = size_tag.text if size_tag is not None else None
85+
86+
# Ensure all three critical tags and their text content exist
87+
if not all([key_text, modified_text, size_text]):
88+
logger.warning(
89+
"Skipping malformed S3 object entry: Missing Key, LastModified, or Size."
90+
)
91+
continue # Skip this entry if critical data is missing
92+
93+
key = key_text
94+
last_modified_str = modified_text
95+
size_str = size_text
96+
97+
if last_modified_str is None:
98+
continue
99+
100+
if key is None:
101+
continue
102+
103+
file_date = datetime.strptime(last_modified_str, "%Y-%m-%dT%H:%M:%S.%fZ").replace(
104+
tzinfo=timezone.utc
105+
)
106+
107+
files.append(
108+
SnapshotFile(
109+
key=key,
110+
name=key.split("/")[-1],
111+
last_modified=file_date,
112+
size=int(size_str) if size_str else 0,
113+
)
114+
)
115+
116+
return folders, files
117+
118+
def get_latest_version(self) -> str:
119+
"""Find the numerically latest db-sync version folder."""
120+
xml_content = self._get_s3_objects(prefix=self.ROOT_PREFIX, delimiter="/")
121+
folders, _ = self._parse_s3_xml(xml_content)
122+
123+
version_folders = [f for f in folders if re.match(r"^\d+\.\d+$", f)]
124+
125+
if not version_folders:
126+
err_msg = "No version folders found in S3 response."
127+
raise RuntimeError(err_msg)
128+
129+
latest_version = sorted(
130+
version_folders, key=lambda v: [int(part) for part in v.split(".")]
131+
)[-1]
132+
return latest_version
133+
134+
def get_latest_snapshot(self, version: str) -> SnapshotFile:
135+
"""Find the latest snapshot file for a given version."""
136+
version_prefix = f"{self.ROOT_PREFIX}{version}/"
137+
xml_content = self._get_s3_objects(prefix=version_prefix)
138+
_, files = self._parse_s3_xml(xml_content)
139+
140+
# Filter: Revert to the original working filter (.tgz AND 'snapshot')
141+
snapshot_files = [
142+
f for f in files if f.name.endswith(".tgz") and "snapshot" in f.name.lower()
143+
]
144+
145+
if not snapshot_files:
146+
file_names = [f.name for f in files]
147+
logger.warning(f"Files found in S3 response for {version_prefix}: {file_names}")
148+
error_msg = (
149+
f"No snapshot files found for version {version}. Filtered files: {file_names}"
150+
)
151+
raise RuntimeError(error_msg)
152+
153+
latest_snapshot = max(snapshot_files, key=lambda x: x.last_modified)
154+
return latest_snapshot

0 commit comments

Comments
 (0)