|
| 1 | +import logging |
| 2 | +import re |
| 3 | +import xml.etree.ElementTree as ET |
| 4 | +from dataclasses import dataclass |
| 5 | +from dataclasses import field |
| 6 | +from datetime import datetime |
| 7 | +from datetime import timezone |
| 8 | +from typing import List |
| 9 | +from typing import Tuple |
| 10 | + |
| 11 | +import requests |
| 12 | + |
| 13 | +logger = logging.getLogger(__name__) |
| 14 | + |
| 15 | +# Define the full S3 namespace URL string outside the class for readability |
| 16 | +S3_NS_URL = "http://s3.amazonaws.com/doc/2006-03-01/" |
| 17 | + |
| 18 | + |
| 19 | +@dataclass |
| 20 | +class SnapshotFile: |
| 21 | + """Dataclass to hold parsed snapshot file information.""" |
| 22 | + |
| 23 | + key: str |
| 24 | + name: str |
| 25 | + last_modified: datetime # This will be a timezone-aware datetime object |
| 26 | + size: int |
| 27 | + size_gb: float = field(init=False) |
| 28 | + |
| 29 | + def __post_init__(self) -> None: |
| 30 | + self.size_gb = self.size / (1024**3) |
| 31 | + |
| 32 | + |
| 33 | +class DBSyncSnapshotService: |
| 34 | + """Service class to interact with the Cardano DB-Sync S3 repository.""" |
| 35 | + |
| 36 | + BUCKET_URL: str = "https://update-cardano-mainnet.iohk.io" |
| 37 | + ROOT_PREFIX: str = "cardano-db-sync/" |
| 38 | + |
| 39 | + def _get_s3_objects(self, prefix: str = "", delimiter: str = "") -> bytes: |
| 40 | + """Fetch XML content from the S3 bucket using REST API.""" |
| 41 | + params = {"list-type": "2", "prefix": prefix, "delimiter": delimiter} |
| 42 | + |
| 43 | + response = requests.get(self.BUCKET_URL, params=params) |
| 44 | + response.raise_for_status() |
| 45 | + return response.content |
| 46 | + |
| 47 | + def _parse_s3_xml(self, xml_content: bytes) -> Tuple[List[str], List[SnapshotFile]]: |
| 48 | + """Parse S3 XML response using exact namespace search paths with None checks.""" |
| 49 | + root = ET.fromstring(xml_content) |
| 50 | + ns_tag = f"{{{S3_NS_URL}}}" |
| 51 | + |
| 52 | + # 1. Extract folders (CommonPrefixes) |
| 53 | + folders = [] |
| 54 | + for prefix in root.findall(f".//{ns_tag}CommonPrefixes"): |
| 55 | + # Safety check: find() can return None |
| 56 | + prefix_tag = prefix.find(f"{ns_tag}Prefix") |
| 57 | + if prefix_tag is not None and prefix_tag.text: |
| 58 | + folder_path = prefix_tag.text |
| 59 | + if folder_path.endswith("/"): |
| 60 | + folder_name = folder_path.strip("/").split("/")[-1] |
| 61 | + folders.append(folder_name) |
| 62 | + |
| 63 | + # 2. Extract files (Contents) |
| 64 | + files = [] |
| 65 | + for content in root.findall(f".//{ns_tag}Contents"): |
| 66 | + # Assign elements to temporary variables |
| 67 | + key_tag = content.find(f"{ns_tag}Key") |
| 68 | + modified_tag = content.find(f"{ns_tag}LastModified") |
| 69 | + size_tag = content.find(f"{ns_tag}Size") |
| 70 | + |
| 71 | + if not all( |
| 72 | + [ |
| 73 | + key_tag is not None and key_tag.text, |
| 74 | + modified_tag is not None and modified_tag.text, |
| 75 | + size_tag is not None and size_tag.text, |
| 76 | + ] |
| 77 | + ): # <--- Check tag AND tag.text for all |
| 78 | + logger.warning( |
| 79 | + "Skipping malformed S3 object entry: Missing Key, LastModified, or Size." |
| 80 | + ) |
| 81 | + continue # Skip this entry if critical data is missing |
| 82 | + |
| 83 | + # Use explicit variables to store the text content only if it exists |
| 84 | + key_text = key_tag.text if key_tag is not None else None |
| 85 | + modified_text = modified_tag.text if modified_tag is not None else None |
| 86 | + size_text = size_tag.text if size_tag is not None else None |
| 87 | + |
| 88 | + # Check 1: Ensure all three critical tags and their text content exist |
| 89 | + if not all([key_text, modified_text, size_text]): |
| 90 | + logger.warning( |
| 91 | + "Skipping malformed S3 object entry: Missing Key, LastModified, or Size." |
| 92 | + ) |
| 93 | + continue # Skip this entry if critical data is missing |
| 94 | + |
| 95 | + # Safe extraction of text content (Pylance is now happy) |
| 96 | + key = key_text |
| 97 | + last_modified_str = modified_text |
| 98 | + size_str = size_text |
| 99 | + |
| 100 | + if last_modified_str is None: |
| 101 | + # This should be unreachable but satisfies strict type checking. |
| 102 | + continue |
| 103 | + |
| 104 | + if key is None: |
| 105 | + # This block is theoretically unreachable, but satisfies Pylance |
| 106 | + continue |
| 107 | + |
| 108 | + # Final parsing logic |
| 109 | + file_date = datetime.strptime(last_modified_str, "%Y-%m-%dT%H:%M:%S.%fZ").replace( |
| 110 | + tzinfo=timezone.utc |
| 111 | + ) |
| 112 | + |
| 113 | + files.append( |
| 114 | + SnapshotFile( |
| 115 | + key=key, |
| 116 | + name=key.split("/")[-1], |
| 117 | + last_modified=file_date, |
| 118 | + size=int(size_str) if size_str else 0, |
| 119 | + ) |
| 120 | + ) |
| 121 | + |
| 122 | + return folders, files |
| 123 | + |
| 124 | + def get_latest_version(self) -> str: |
| 125 | + """Find the numerically latest db-sync version folder.""" |
| 126 | + xml_content = self._get_s3_objects(prefix=self.ROOT_PREFIX, delimiter="/") |
| 127 | + folders, _ = self._parse_s3_xml(xml_content) |
| 128 | + |
| 129 | + version_folders = [f for f in folders if re.match(r"^\d+\.\d+$", f)] |
| 130 | + |
| 131 | + if not version_folders: |
| 132 | + err_msg = "No version folders found in S3 response." |
| 133 | + raise RuntimeError(err_msg) |
| 134 | + |
| 135 | + latest_version = sorted( |
| 136 | + version_folders, key=lambda v: [int(part) for part in v.split(".")] |
| 137 | + )[-1] |
| 138 | + return latest_version |
| 139 | + |
| 140 | + def get_latest_snapshot(self, version: str) -> SnapshotFile: |
| 141 | + """Find the latest snapshot file for a given version.""" |
| 142 | + version_prefix = f"{self.ROOT_PREFIX}{version}/" |
| 143 | + xml_content = self._get_s3_objects(prefix=version_prefix) |
| 144 | + _, files = self._parse_s3_xml(xml_content) |
| 145 | + |
| 146 | + # Filter: Revert to the original working filter (.tgz AND 'snapshot') |
| 147 | + snapshot_files = [ |
| 148 | + f for f in files if f.name.endswith(".tgz") and "snapshot" in f.name.lower() |
| 149 | + ] |
| 150 | + |
| 151 | + if not snapshot_files: |
| 152 | + file_names = [f.name for f in files] |
| 153 | + logger.warning(f"Files found in S3 response for {version_prefix}: {file_names}") |
| 154 | + error_msg = ( |
| 155 | + f"No snapshot files found for version {version}. Filtered files: {file_names}" |
| 156 | + ) |
| 157 | + raise RuntimeError(error_msg) |
| 158 | + |
| 159 | + latest_snapshot = max(snapshot_files, key=lambda x: x.last_modified) |
| 160 | + return latest_snapshot |
0 commit comments