1
+
1
2
import boto3
2
3
from cryptography .hazmat .primitives import serialization
3
4
from cryptography .hazmat .primitives .asymmetric .utils import decode_dss_signature
9
10
from hyperliquid .utils .constants import TESTNET_API_URL , MAINNET_API_URL
10
11
from hyperliquid .utils .signing import get_timestamp_ms , action_hash , construct_phantom_agent , l1_payload
11
12
from loguru import logger
13
+ from pathlib import Path
12
14
13
15
from pusher .config import Config
14
16
15
17
SECP256K1_N_HALF = SECP256K1_N // 2
16
18
17
19
20
+ def _init_client ():
21
+ # AWS_DEFAULT_REGION, AWS_ACCESS_KEY_ID, and AWS_SECRET_ACCESS_KEY should be set as environment variables
22
+ return boto3 .client (
23
+ "kms" ,
24
+ # can specify an endpoint for e.g. LocalStack
25
+ # endpoint_url="http://localhost:4566"
26
+ )
27
+
28
+
18
29
class KMSSigner :
19
30
def __init__ (self , config : Config ):
20
- use_testnet = config .hyperliquid .use_testnet
21
- url = TESTNET_API_URL if use_testnet else MAINNET_API_URL
31
+ self . use_testnet = config .hyperliquid .use_testnet
32
+ url = TESTNET_API_URL if self . use_testnet else MAINNET_API_URL
22
33
self .oracle_publisher_exchange : Exchange = Exchange (wallet = None , base_url = url )
23
- self .client = self ._init_client (config )
24
34
35
+ # AWS client and public key load
36
+ self .client = _init_client ()
37
+ self ._load_public_key (config .kms .key_path )
38
+
39
+ def _load_public_key (self , key_path : str ):
25
40
# Fetch public key once so we can derive address and check recovery id
26
- key_path = config .kms .key_path
27
- self .key_id = open (key_path , "r" ).read ().strip ()
28
- self .pubkey_der = self .client .get_public_key (KeyId = self .key_id )["PublicKey" ]
41
+ self .key_id = Path (key_path ).read_text ().strip ()
42
+ pubkey_der = self .client .get_public_key (KeyId = self .key_id )["PublicKey" ]
43
+ self .pubkey = serialization .load_der_public_key (pubkey_der )
44
+ self ._construct_pubkey_address_and_bytes ()
45
+
46
+ def _construct_pubkey_address_and_bytes (self ):
29
47
# Construct eth address to log
30
- pub = serialization .load_der_public_key (self .pubkey_der )
31
- numbers = pub .public_numbers ()
48
+ numbers = self .pubkey .public_numbers ()
32
49
x = numbers .x .to_bytes (32 , "big" )
33
50
y = numbers .y .to_bytes (32 , "big" )
34
51
uncompressed = b"\x04 " + x + y
35
- self .public_key_bytes = uncompressed
36
52
self .address = "0x" + keccak (uncompressed [1 :])[- 20 :].hex ()
37
- logger .info ("KMSSigner address : {}" , self .address )
53
+ logger .info ("public key loaded from KMS : {}" , self .address )
38
54
39
- def _init_client (self , config ):
40
- # AWS_REGION, AWS_ACCESS_KEY_ID, and AWS_SECRET_ACCESS_KEY should be set as environment variables
41
- return boto3 .client (
42
- "kms" ,
43
- # can specify an endpoint for e.g. LocalStack
44
- # endpoint_url="http://localhost:4566"
55
+ # Parse KMS public key into uncompressed secp256k1 bytes
56
+ pubkey_bytes = self .pubkey .public_bytes (
57
+ serialization .Encoding .X962 ,
58
+ serialization .PublicFormat .UncompressedPoint ,
45
59
)
60
+ # Strip leading 0x04 (uncompressed point indicator)
61
+ self .raw_pubkey_bytes = pubkey_bytes [1 :]
46
62
47
63
def set_oracle (self , dex , oracle_pxs , all_mark_pxs , external_perp_pxs ):
48
64
timestamp = get_timestamp_ms ()
@@ -59,14 +75,14 @@ def set_oracle(self, dex, oracle_pxs, all_mark_pxs, external_perp_pxs):
59
75
},
60
76
}
61
77
signature = self .sign_l1_action (
62
- action ,
63
- timestamp ,
64
- self . oracle_publisher_exchange . base_url == MAINNET_API_URL ,
78
+ action = action ,
79
+ nonce = timestamp ,
80
+ is_mainnet = self . use_testnet ,
65
81
)
66
82
return self .oracle_publisher_exchange ._post_action (
67
- action ,
68
- signature ,
69
- timestamp ,
83
+ action = action ,
84
+ signature = signature ,
85
+ nonce = timestamp ,
70
86
)
71
87
72
88
def sign_l1_action (self , action , nonce , is_mainnet ):
@@ -91,20 +107,12 @@ def sign_message(self, message_hash: bytes) -> dict:
91
107
# Ethereum requires low-s form
92
108
if s > SECP256K1_N_HALF :
93
109
s = SECP256K1_N - s
94
- # Parse KMS public key into uncompressed secp256k1 bytes
95
- # TODO: Pull this into init
96
- pubkey = serialization .load_der_public_key (self .pubkey_der )
97
- pubkey_bytes = pubkey .public_bytes (
98
- serialization .Encoding .X962 ,
99
- serialization .PublicFormat .UncompressedPoint ,
100
- )
101
- # Strip leading 0x04 (uncompressed point indicator)
102
- raw_pubkey_bytes = pubkey_bytes [1 :]
110
+
103
111
# Try both recovery ids
104
112
for v in (0 , 1 ):
105
113
sig_obj = Signature (vrs = (v , r , s ))
106
114
recovered_pub = sig_obj .recover_public_key_from_msg_hash (message_hash )
107
- if recovered_pub .to_bytes () == raw_pubkey_bytes :
115
+ if recovered_pub .to_bytes () == self . raw_pubkey_bytes :
108
116
return {
109
117
"r" : to_hex (r ),
110
118
"s" : to_hex (s ),
0 commit comments