11import hashlib
2+ import logging
3+ import os
4+
5+ import boto3
6+ from babel .messages import Message
27from cryptography .hazmat .primitives import hashes
38from cryptography .hazmat .primitives .asymmetric import padding
49from cryptography .hazmat .primitives .serialization import load_pem_private_key
813import base64
914
1015
11- def sign_data (data_str : str , private_key_file_path : str ) -> str :
12- with open (private_key_file_path , "rb" ) as key_file :
13- private_key = load_pem_private_key (
14- key_file .read (), password = None , backend = default_backend ()
15- )
16+ class Signer :
17+ """
18+ Signer
19+
20+ abstract class defining the methods for signing and verifying data
21+ """
22+
23+ def sign_data (self , data_str : str ) -> str :
24+ pass
25+
26+ def verify_signature (self , data_str : str , signature : str ):
27+ pass
28+
29+
30+ class KMSSigner (Signer ):
31+ """
32+ KMSSigner
33+
34+ implementation of Signer interface using AWS KMS
35+ """
36+
37+ def __init__ (self , arn : str ):
38+ self .kms_client = boto3 .client ("kms" )
39+ # check if client works
40+ key_info = self .kms_client .describe_key (KeyId = arn )
1641
17- signature = private_key .sign (
18- data_str .encode ("utf-8" ),
19- padding .PSS (
20- mgf = padding .MGF1 (
21- hashes .SHA256 ()
22- ), # Mask generation function based on SHA-256
23- salt_length = padding .PSS .MAX_LENGTH , # Maximum salt length
24- ),
25- hashes .SHA256 (),
26- )
27- return base64 .b64encode (signature ).decode ("utf-8" )
28-
29-
30- def verify_signature (data_str : str , signature : str , public_key_file_path : str ):
31- with open (public_key_file_path , "rb" ) as cert_file :
32- cert = x509 .load_pem_x509_certificate (cert_file .read (), default_backend ())
33- public_key = cert .public_key ()
34- try :
35- public_key .verify (
36- base64 .b64decode (signature ),
42+ if "SIGN" not in key_info ["KeyMetadata" ]["KeyUsage" ]:
43+ raise Exception ("Key is missing the sign property" )
44+ if "VERIFY" not in key_info ["KeyMetadata" ]["KeyUsage" ]:
45+ raise Exception ("Key is missing the verify property" )
46+ if "RSASSA_PSS_SHA_256" not in key_info ["KeyMetadata" ]["SigningAlgorithms" ]:
47+ raise Exception (
48+ "Key is missing the required signing algorithm (RSASSA_PSS_SHA_256)"
49+ )
50+ # seems to be fine, use it
51+ self .arn = arn
52+
53+ def sign_data (self , data_str : str ) -> str :
54+ signature = self .kms_client .sign (
55+ KeyId = self .arn ,
56+ MessageType = "RAW" ,
57+ SigningAlgorithm = "RSASSA_PSS_SHA_256" ,
58+ Message = data_str .encode ("utf-8" ),
59+ )["Signature" ]
60+ return base64 .b64encode (signature ).decode ("utf-8" )
61+
62+ def verify_signature (self , data_str : str , signature : str ):
63+ if not self .kms_client .verify (
64+ KeyId = self .arn ,
65+ Message = data_str .encode ("utf-8" ),
66+ MessageType = "RAW" ,
67+ Signature = base64 .b64decode (signature ),
68+ SigningAlgorithm = "RSASSA_PSS_SHA_256" ,
69+ )["SignatureValid" ]:
70+ raise ValueError (f"Invalid Signature { signature } for data: { data_str } " )
71+
72+
73+ class LocalSigner (Signer ):
74+ """
75+ LocalSigner
76+
77+ implementation of Signer interface using local certificates
78+
79+ :param private_key_file_path
80+ :param public_key_file_path
81+ """
82+
83+ def __init__ (self , private_key_file_path : str , public_key_file_path : str ):
84+ with open (private_key_file_path , "rb" ) as key_file :
85+ self .private_key = load_pem_private_key (
86+ key_file .read (), password = None , backend = default_backend ()
87+ )
88+ with open (public_key_file_path , "rb" ) as cert_file :
89+ cert = x509 .load_pem_x509_certificate (cert_file .read (), default_backend ())
90+ self .public_key = cert .public_key ()
91+
92+ def sign_data (self , data_str : str ) -> str :
93+ signature = self .private_key .sign (
3794 data_str .encode ("utf-8" ),
3895 padding .PSS (
3996 mgf = padding .MGF1 (
@@ -43,8 +100,23 @@ def verify_signature(data_str: str, signature: str, public_key_file_path: str):
43100 ),
44101 hashes .SHA256 (),
45102 )
46- except InvalidSignature :
47- raise ValueError (f"Invalid Signature { signature } for data: { data_str } " )
103+ return base64 .b64encode (signature ).decode ("utf-8" )
104+
105+ def verify_signature (self , data_str : str , signature : str ):
106+ try :
107+ self .public_key .verify (
108+ base64 .b64decode (signature ),
109+ data_str .encode ("utf-8" ),
110+ padding .PSS (
111+ mgf = padding .MGF1 (
112+ hashes .SHA256 ()
113+ ), # Mask generation function based on SHA-256
114+ salt_length = padding .PSS .MAX_LENGTH , # Maximum salt length
115+ ),
116+ hashes .SHA256 (),
117+ )
118+ except InvalidSignature :
119+ raise ValueError (f"Invalid Signature { signature } for data: { data_str } " )
48120
49121
50122def verify_sha256 (checksum : str , data : bytes ):
0 commit comments