1+ from __future__ import annotations
2+
13import base64
24import json
35import re
46import subprocess
7+
58try :
69 # Python 3.9+
710 from importlib .resources import files , as_file
811except ImportError :
912 # Python < 3.9
10- from importlib_resources import files , as_file
13+ from importlib_resources import files , as_file # pyright: ignore[reportMissingImports]
1114
1215
13- def str2b64 (s ):
16+ def str2b64 (s : str ):
1417 b = s .encode ("utf-8" )
1518 b64 = base64 .b64encode (b )
1619 b64 = b64 .decode ("utf-8" )
1720 return b64
18- # end define
1921
2022
21- def b642str (b64 ):
22- b64 = b64 .encode ("utf-8" )
23- b = base64 .b64decode (b64 )
24- s = b .decode ("utf-8" )
23+ def b642str (b64 : str ):
24+ b = base64 .b64decode (b64 .encode ())
25+ s = b .decode ()
2526 return s
26- # end define
2727
2828
29- def dict2b64 (d ):
29+ def dict2b64 (d : dict ):
3030 s = json .dumps (d )
3131 b64 = str2b64 (s )
3232 return b64
33- # end define
3433
3534
36- def b642dict (b64 ):
35+ def b642dict (b64 : str ):
3736 s = b642str (b64 )
3837 d = json .loads (s )
3938 return d
40- # end define
4139
4240
43- def hex2b64 (input ): # TODO: remove duplicates
44- hexBytes = bytes .fromhex (input )
45- b64Bytes = base64 .b64encode (hexBytes )
46- b64String = b64Bytes .decode ()
47- return b64String
48- # end define
41+ def hex2b64 (inp : str ): # TODO: remove duplicates
42+ hex_bytes = bytes .fromhex (inp )
43+ b64_bytes = base64 .b64encode (hex_bytes )
44+ b64_string = b64_bytes .decode ()
45+ return b64_string
4946
5047
51- def b642hex (input ):
52- b64Bytes = input .encode ()
53- hexBytes = base64 .b64decode (b64Bytes )
54- hexString = hexBytes .hex ()
55- return hexString
56- # end define
48+ def b642hex (inp : str ):
49+ b64_bytes = inp .encode ()
50+ hex_bytes = base64 .b64decode (b64_bytes )
51+ hex_string = hex_bytes .hex ()
52+ return hex_string
5753
5854
59- def xhex2hex (x ) :
55+ def xhex2hex (x : str ) -> str | None :
6056 try :
6157 b = x [1 :]
6258 h = b .lower ()
6359 return h
6460 except Exception :
6561 return None
66- #end define
6762
68- def hex2base64 (h ): # TODO: remove duplicates
63+
64+ def hex2base64 (h : str ): # TODO: remove duplicates
6965 b = bytes .fromhex (h )
7066 b64 = base64 .b64encode (b )
7167 s = b64 .decode ("utf-8" )
7268 return s
73- #end define
7469
7570
76- def str2bool (str ):
77- if str == "true" :
71+ def str2bool (s : str ):
72+ if s == "true" :
7873 return True
7974 return False
80- # end define
8175
8276
83- def ng2g (ng ) :
77+ def ng2g (ng : int | None ) -> float | None :
8478 if ng is None :
85- return
86- return int (ng )/ 10 ** 9
87- #end define
79+ return None
80+ return int (ng ) / 10 ** 9
8881
8982
9083def parse_db_stats (path : str ):
@@ -103,23 +96,25 @@ def parse_db_stats(path: str):
10396 result [s [0 ]] = {}
10497 result [s [0 ]] = {k : float (v ) for k , v in items }
10598 return result
106- # end define
10799
108- def get_hostname ():
100+
101+ def get_hostname () -> str :
109102 return subprocess .run (["hostname" ], stdout = subprocess .PIPE ).stdout .decode ().strip ()
110103
104+
111105def hex_shard_to_int (shard_id_str : str ) -> dict :
112106 try :
113- wc , shard_hex = shard_id_str .split (':' )
107+ wc , shard_hex = shard_id_str .split (":" )
114108 wc = int (wc )
115109 shard = int (shard_hex , 16 )
116- if shard >= 2 ** 63 :
117- shard -= 2 ** 64
110+ if shard >= 2 ** 63 :
111+ shard -= 2 ** 64
118112 return {"workchain" : wc , "shard" : shard }
119113 except (ValueError , IndexError ):
120114 raise Exception (f'Invalid shard ID "{ shard_id_str } "' )
121115
122- def signed_int_to_hex64 (value ):
116+
117+ def signed_int_to_hex64 (value : int ):
123118 if value < 0 :
124119 value = (1 << 64 ) + value
125120 return f"{ value :016X} "
@@ -148,11 +143,11 @@ def _count_trailing_zeroes64(value: int) -> int:
148143 if u == 0 :
149144 return 64
150145 return ((u & - u ).bit_length ()) - 1
146+
151147 return 63 - _count_trailing_zeroes64 (_to_unsigned64 (shard_id ))
152148
153149
154150def shard_prefix (shard_id : int , length_ : int ):
155-
156151 def _to_signed64 (v : int ) -> int :
157152 return v - (1 << 64 ) if v >= (1 << 63 ) else v
158153
@@ -162,7 +157,9 @@ def _to_signed64(v: int) -> int:
162157 x = _lower_bit64 (u )
163158 y = 1 << (63 - length_ )
164159 if y < x :
165- raise ValueError ("requested prefix length is longer (more specific) than current shard id" )
160+ raise ValueError (
161+ "requested prefix length is longer (more specific) than current shard id"
162+ )
166163 mask_non_lower = (~ (y - 1 )) & _MASK64 # equals -y mod 2^64; clears bits below y
167164 res_u = (u & mask_non_lower ) | y
168165 return _to_signed64 (res_u )
0 commit comments