1+ import logging
2+ import re
3+ import xml .etree .ElementTree as ET
4+ from dataclasses import dataclass
5+ from dataclasses import field
6+ from datetime import datetime
7+ from datetime import timezone
8+ from typing import List
9+ from typing import Tuple
10+
11+ import requests
12+
13+ logger = logging .getLogger (__name__ )
14+
15+ # Define the full S3 namespace URL string outside the class for readability
16+ S3_NS_URL = "http://s3.amazonaws.com/doc/2006-03-01/"
17+
18+ @dataclass
19+ class SnapshotFile :
20+ """Dataclass to hold parsed snapshot file information."""
21+ key : str
22+ name : str
23+ last_modified : datetime # This will be a timezone-aware datetime object
24+ size : int
25+ size_gb : float = field (init = False )
26+
27+ def __post_init__ (self ):
28+ self .size_gb = self .size / (1024 ** 3 )
29+
30+ class DBSyncSnapshotService :
31+ """Service class to interact with the Cardano DB-Sync S3 repository."""
32+
33+ BUCKET_URL : str = "https://update-cardano-mainnet.iohk.io"
34+ ROOT_PREFIX : str = "cardano-db-sync/"
35+
36+ def _get_s3_objects (self , prefix : str = "" , delimiter : str = "" ) -> bytes :
37+ """Fetch XML content from the S3 bucket using REST API"""
38+ params = {"list-type" : "2" , "prefix" : prefix , "delimiter" : delimiter }
39+
40+ response = requests .get (self .BUCKET_URL , params = params )
41+ response .raise_for_status ()
42+ return response .content
43+
44+ def _parse_s3_xml (self , xml_content : bytes ) -> Tuple [List [str ], List [SnapshotFile ]]:
45+ """Parse S3 XML response using exact namespace search paths with None checks."""
46+ from xml .etree import ElementTree as ET
47+
48+ root = ET .fromstring (xml_content )
49+ NS_TAG = f'{{{ S3_NS_URL } }}'
50+
51+ # 1. Extract folders (CommonPrefixes)
52+ folders = []
53+ for prefix in root .findall (f'.//{ NS_TAG } CommonPrefixes' ):
54+ # Safety check: find() can return None
55+ prefix_tag = prefix .find (f'{ NS_TAG } Prefix' )
56+ if prefix_tag is not None and prefix_tag .text :
57+ folder_path = prefix_tag .text
58+ if folder_path .endswith ('/' ):
59+ folder_name = folder_path .strip ('/' ).split ('/' )[- 1 ]
60+ folders .append (folder_name )
61+
62+ # 2. Extract files (Contents)
63+ files = []
64+ for content in root .findall (f'.//{ NS_TAG } Contents' ):
65+ # Assign elements to temporary variables
66+ key_tag = content .find (f'{ NS_TAG } Key' )
67+ modified_tag = content .find (f'{ NS_TAG } LastModified' )
68+ size_tag = content .find (f'{ NS_TAG } Size' )
69+
70+ if not all ([key_tag is not None and key_tag .text ,
71+ modified_tag is not None and modified_tag .text ,
72+ size_tag is not None and size_tag .text ]): # <--- Check tag AND tag.text for all
73+
74+ logger .warning ("Skipping malformed S3 object entry: Missing Key, LastModified, or Size tag." )
75+ continue # Skip this entry if critical data is missing
76+
77+ # Use explicit variables to store the text content only if it exists
78+ key_text = key_tag .text if key_tag is not None else None
79+ modified_text = modified_tag .text if modified_tag is not None else None
80+ size_text = size_tag .text if size_tag is not None else None
81+
82+ # Check 1: Ensure all three critical tags and their text content exist
83+ if not all ([key_text , modified_text , size_text ]):
84+ logger .warning ("Skipping malformed S3 object entry: Missing Key, LastModified, or Size tag text." )
85+ continue # Skip this entry if critical data is missing
86+
87+ # Safe extraction of text content (Pylance is now happy)
88+ key = key_text
89+ last_modified_str = modified_text
90+ size_str = size_text
91+
92+ if last_modified_str is None :
93+ # This should be unreachable due to the 'if not all' check above, but satisfies strict type checking.
94+ continue
95+
96+ if key is None :
97+ # This block is theoretically unreachable, but satisfies Pylance
98+ continue
99+
100+ # Final parsing logic
101+ file_date = datetime .strptime (last_modified_str , '%Y-%m-%dT%H:%M:%S.%fZ' ).replace (tzinfo = timezone .utc )
102+
103+ files .append (SnapshotFile (
104+ key = key ,
105+ name = key .split ('/' )[- 1 ],
106+ last_modified = file_date ,
107+ size = int (size_str ) if size_str else 0
108+ ))
109+
110+ return folders , files
111+
112+ def get_latest_version (self ) -> str :
113+ """Finds the numerically latest db-sync version folder."""
114+ xml_content = self ._get_s3_objects (prefix = self .ROOT_PREFIX , delimiter = "/" )
115+ folders , _ = self ._parse_s3_xml (xml_content )
116+
117+ version_folders = [f for f in folders if re .match (r'^\d+\.\d+$' , f )]
118+
119+ if not version_folders :
120+ raise RuntimeError ("No version folders found in S3 response." )
121+
122+ latest_version = sorted (version_folders , key = lambda v : [int (part ) for part in v .split ('.' )])[- 1 ]
123+ return latest_version
124+
125+ def get_latest_snapshot (self , version : str ) -> SnapshotFile :
126+ """Finds the latest snapshot file for a given version."""
127+ version_prefix = f"{ self .ROOT_PREFIX } { version } /"
128+ xml_content = self ._get_s3_objects (prefix = version_prefix )
129+ _ , files = self ._parse_s3_xml (xml_content )
130+
131+ # Filter: Revert to the original working filter (.tgz AND 'snapshot')
132+ snapshot_files = [
133+ f for f in files
134+ if f .name .endswith ('.tgz' ) and 'snapshot' in f .name .lower ()
135+ ]
136+
137+ if not snapshot_files :
138+ file_names = [f .name for f in files ]
139+ logger .warning (f"Files found in S3 response for { version_prefix } : { file_names } " )
140+ raise RuntimeError (f"No snapshot files found for version { version } . Filtered files: { file_names } " )
141+
142+ latest_snapshot = max (snapshot_files , key = lambda x : x .last_modified )
143+ return latest_snapshot
0 commit comments