1+ import xml .etree .ElementTree as ET
2+ from datetime import datetime , timezone
3+ from typing import List , Tuple
4+ import requests
5+ import re
6+ from dataclasses import dataclass , field
7+ import logging
8+
9+ logger = logging .getLogger (__name__ )
10+
11+ # Define the full S3 namespace URL string outside the class for readability
12+ S3_NS_URL = 'http://s3.amazonaws.com/doc/2006-03-01/'
13+
14+ @dataclass
15+ class SnapshotFile :
16+ """Dataclass to hold parsed snapshot file information."""
17+ key : str
18+ name : str
19+ last_modified : datetime # This will be a timezone-aware datetime object
20+ size : int
21+ size_gb : float = field (init = False )
22+
23+ def __post_init__ (self ):
24+ self .size_gb = self .size / (1024 ** 3 )
25+
26+ class DBSyncSnapshotService :
27+ """Service class to interact with the Cardano DB-Sync S3 repository."""
28+
29+ BUCKET_URL : str = "https://update-cardano-mainnet.iohk.io"
30+ ROOT_PREFIX : str = "cardano-db-sync/"
31+
32+ def _get_s3_objects (self , prefix : str = "" , delimiter : str = "" ) -> bytes :
33+ """Fetch XML content from the S3 bucket using REST API"""
34+ params = {
35+ 'list-type' : '2' ,
36+ 'prefix' : prefix ,
37+ 'delimiter' : delimiter
38+ }
39+
40+ response = requests .get (self .BUCKET_URL , params = params )
41+ response .raise_for_status ()
42+ return response .content
43+
44+ def _parse_s3_xml (self , xml_content : bytes ) -> Tuple [List [str ], List [SnapshotFile ]]:
45+ """Parse S3 XML response using exact namespace search paths from working script."""
46+
47+ root = ET .fromstring (xml_content )
48+
49+ # Use the full namespace string for element search paths (Crucial Fix)
50+ NS_TAG = f'{{{ S3_NS_URL } }}'
51+
52+ # 1. Extract folders (CommonPrefixes)
53+ folders = []
54+ for prefix in root .findall (f'.//{ NS_TAG } CommonPrefixes' ):
55+ folder_path = prefix .find (f'{ NS_TAG } Prefix' ).text
56+ if folder_path and folder_path .endswith ('/' ):
57+ folder_name = folder_path .strip ('/' ).split ('/' )[- 1 ]
58+ folders .append (folder_name )
59+
60+ # 2. Extract files (Contents)
61+ files = []
62+ for content in root .findall (f'.//{ NS_TAG } Contents' ):
63+ # Use the full namespaced tag for child elements, matching your working script
64+ key = content .find (f'{ NS_TAG } Key' ).text
65+ last_modified = content .find (f'{ NS_TAG } LastModified' ).text
66+ size = content .find (f'{ NS_TAG } Size' ).text
67+
68+ # Parse ISO 8601 date, ensuring it's converted to UTC (tz-aware)
69+ file_date = datetime .strptime (last_modified , '%Y-%m-%dT%H:%M:%S.%fZ' ).replace (tzinfo = timezone .utc )
70+
71+ files .append (SnapshotFile (
72+ key = key ,
73+ name = key .split ('/' )[- 1 ],
74+ last_modified = file_date ,
75+ size = int (size ) if size else 0
76+ ))
77+
78+ return folders , files
79+
80+ def get_latest_version (self ) -> str :
81+ """Finds the numerically latest db-sync version folder."""
82+ xml_content = self ._get_s3_objects (prefix = self .ROOT_PREFIX , delimiter = "/" )
83+ folders , _ = self ._parse_s3_xml (xml_content )
84+
85+ version_folders = [f for f in folders if re .match (r'^\d+\.\d+$' , f )]
86+
87+ if not version_folders :
88+ raise RuntimeError ("No version folders found in S3 response." )
89+
90+ latest_version = sorted (version_folders , key = lambda v : [int (part ) for part in v .split ('.' )])[- 1 ]
91+ return latest_version
92+
93+ def get_latest_snapshot (self , version : str ) -> SnapshotFile :
94+ """Finds the latest snapshot file for a given version."""
95+ version_prefix = f"{ self .ROOT_PREFIX } { version } /"
96+ xml_content = self ._get_s3_objects (prefix = version_prefix )
97+ _ , files = self ._parse_s3_xml (xml_content )
98+
99+ # Filter: Revert to the original working filter (.tgz AND 'snapshot')
100+ snapshot_files = [
101+ f for f in files
102+ if f .name .endswith ('.tgz' ) and 'snapshot' in f .name .lower ()
103+ ]
104+
105+ if not snapshot_files :
106+ file_names = [f .name for f in files ]
107+ logger .warning (f"Files found in S3 response for { version_prefix } : { file_names } " )
108+ raise RuntimeError (f"No snapshot files found for version { version } . Filtered files: { file_names } " )
109+
110+ latest_snapshot = max (snapshot_files , key = lambda x : x .last_modified )
111+ return latest_snapshot
0 commit comments