1+ import dataclasses
2+ import datetime
13import logging
24import os
35import typing as tp
4- from dataclasses import dataclass
5- from datetime import datetime
6- from datetime import timedelta
7- from zoneinfo import ZoneInfo
86
97import requests
10- from requests .auth import HTTPBasicAuth
118
129from cardano_node_tests .utils import cluster_nodes
1310from cardano_node_tests .utils import configuration
1714LOGGER = logging .getLogger (__name__ )
1815
1916
20- @dataclass (frozen = True , order = True )
17+ @dataclasses . dataclass (frozen = True , order = True )
2118class PoolMetadata :
2219 name : str
2320 description : str
2421 ticker : str
2522 homepage : str
2623
2724
28- @dataclass (frozen = True , order = True )
25+ @dataclasses . dataclass (frozen = True , order = True )
2926class PoolData :
3027 pool_id : str
3128
3229
33- @dataclass (frozen = True , order = True )
30+ @dataclasses . dataclass (frozen = True , order = True )
3431class PoolTicker :
3532 name : str
3633
3734
38- @dataclass (frozen = True , order = True )
35+ @dataclasses . dataclass (frozen = True , order = True )
3936class PoolError :
4037 cause : str
4138 pool_hash : str
@@ -58,11 +55,11 @@ def __init__(self, instance_num: int) -> None:
5855 self .base_url = f"http://localhost:{ self .port } "
5956 self .auth = self ._get_auth ()
6057
61- def _get_auth (self ) -> None | HTTPBasicAuth :
58+ def _get_auth (self ) -> requests . auth . HTTPBasicAuth | None :
6259 """Get Basic Auth credentials if configured."""
6360 admin = os .environ .get ("SMASH_ADMIN" , "admin" )
6461 password = os .environ .get ("SMASH_PASSWORD" , "password" )
65- return HTTPBasicAuth (admin , password ) if admin and password else None
62+ return requests . auth . HTTPBasicAuth (admin , password ) if admin and password else None
6663
6764 def get_pool_metadata (self , pool_id : str , pool_meta_hash : str ) -> PoolMetadata :
6865 """Fetch stake pool metadata from SMASH, returning a `PoolMetadata`."""
@@ -101,7 +98,7 @@ def reserve_ticker(self, ticker_name: str, pool_hash: str) -> PoolTicker:
10198 data = response .json ()
10299 return PoolTicker (name = data ["name" ])
103100
104- def get_pool_errors (self , pool_id : str , from_date : None | str = None ) -> list [PoolError ]:
101+ def get_pool_errors (self , pool_id : str , from_date : str | None = None ) -> list [PoolError ]:
105102 """Fetch errors for a specific stake pool."""
106103 url = f"{ self .base_url } /api/v1/errors/{ pool_id } "
107104 params = {"fromDate" : from_date } if from_date else None
@@ -136,7 +133,7 @@ class SmashManager:
136133
137134 @classmethod
138135 def get_smash_instance (cls ) -> SmashClient :
139- """Return a singleton instance of `SmashClient` for the given cluster instance."""
136+ """Get a singleton instance of `SmashClient` for the given cluster instance."""
140137 instance_num = cluster_nodes .get_instance_num ()
141138 if instance_num not in cls .instances :
142139 cls .instances [instance_num ] = SmashClient (instance_num )
@@ -151,21 +148,22 @@ def is_smash_running() -> bool:
151148 return cluster_nodes .services_status (service_names = ["smash" ])[0 ].status == "RUNNING"
152149
153150
154- def get_client () -> None | SmashClient :
155- """Global access to the SMASH client singleton ."""
151+ def get_client () -> SmashClient | None :
152+ """Get and cache the SMASH client."""
156153 if not is_smash_running ():
157154 return None
158155 return SmashManager .get_smash_instance ()
159156
160157
161- def check_smash_pool_errors (pool_id : str , pool_metadata_hash : str ) -> None | list [PoolError ]:
158+ def check_smash_pool_errors (pool_id : str , pool_metadata_hash : str ) -> list [PoolError ] | None :
159+ """Check if pool errors are correctly reported in SMASH."""
162160 if not is_smash_running ():
163161 return None
164162 smash = SmashManager .get_smash_instance ()
165163
166164 # Test pool errors endpoint for a date set in the future
167- utc_now = datetime .now (ZoneInfo ( "UTC" ) )
168- future_date = (utc_now + timedelta (days = 365 * 5 )).strftime ("%d.%m.%Y" )
165+ utc_now = datetime .datetime . now (tz = datetime . timezone . utc )
166+ future_date = (utc_now + datetime . timedelta (days = 365 * 5 )).strftime ("%d.%m.%Y" )
169167 smash_pool_errors_future = smash .get_pool_errors (pool_id = pool_id , from_date = future_date )
170168 assert smash_pool_errors_future == []
171169
@@ -189,11 +187,11 @@ def check_smash_pool_errors(pool_id: str, pool_metadata_hash: str) -> None | lis
189187
190188 try :
191189 # Parse the time string and check if it has a valid format
192- error_time_utc = datetime .strptime ( smash_pool_error . time , "%d.%m.%Y. %H:%M:%S" ). replace (
193- tzinfo = ZoneInfo ( "UTC" )
194- )
190+ error_time_utc = datetime .datetime . strptime (
191+ smash_pool_error . time , "%d.%m.%Y. %H:%M:%S"
192+ ). replace ( tzinfo = datetime . timezone . utc )
195193
196- time_diff = timedelta (minutes = 20 )
194+ time_diff = datetime . timedelta (minutes = 20 )
197195 assert error_time_utc <= utc_now , "Error time must not be in the future."
198196 assert error_time_utc >= (utc_now - time_diff ), (
199197 f"Error time must be within the last { time_diff } ."
@@ -206,7 +204,8 @@ def check_smash_pool_errors(pool_id: str, pool_metadata_hash: str) -> None | lis
206204 return smash_pool_errors
207205
208206
209- def check_smash_pool_retired (pool_id : str ) -> None | list [PoolData ]:
207+ def check_smash_pool_retired (pool_id : str ) -> list [PoolData ] | None :
208+ """Check if pool is correctly reported as retired in SMASH."""
210209 if not is_smash_running ():
211210 return None
212211 smash = SmashManager .get_smash_instance ()
0 commit comments