1+ import base64
2+ from datetime import datetime
3+ import hmac
4+
5+ class InvalidTimestampError (Exception ):
6+ def __init__ (self , * args : list ) -> None :
7+ self .message = args [0 ]
8+
9+ @property
10+ def response (self ):
11+ return self .message
12+
13+
14+ class Webhook :
15+ def __init__ (self , secret : str , encoding : str = "hex" , tolerance : int = 300 , hash : str = "sha256" ) -> None :
16+ self .secret = secret
17+ self .encoding = encoding
18+ self .tolerance = tolerance
19+ self .hash = hash
20+
21+ def verify_timestamp (self , timestamp ):
22+ try :
23+ now = round (datetime .now ().timestamp ())
24+ timestamp_int = int (timestamp .timestamp ())
25+
26+ print ("Now: " , now )
27+ diff = now - self .tolerance
28+
29+ if timestamp_int < diff :
30+ raise InvalidTimestampError ("Timestamp is too old." )
31+ else :
32+ return True
33+
34+ except InvalidTimestampError as e :
35+ return e .response
36+
37+
38+ def compare_hashes (self , hash1 : str , hash2 : str ) -> bool :
39+ if self .encoding == "hex" :
40+ return hash1 == hash2
41+ if self .encoding == "base64" :
42+ return hmac .compare_digest (base64 .b64decode (hash1 ), base64 .b64decode (hash2 ))
43+
44+ def create_signature (self , payload : str ) -> str :
45+ # If the encoding is hex, create a new hex hmac digest
46+ if self .encoding == "hex" :
47+ return hmac .new (bytes (self .secret , "utf-8" ), msg = bytes (payload , "utf-8" ), digestmod = self .hash ).hexdigest ()
48+
49+ # If the encoding is base64, create a new base64 hmac digest
50+ if self .encoding == "base64" :
51+ sig = hmac .new (bytes (self .secret , "utf-8" ), msg = bytes (payload , "utf-8" ), digestmod = self .hash ).digest ()
52+ return base64 .b64encode (sig ).decode ()
53+
54+ def get_timestamp_and_signatures (self , signature ):
55+ signature = signature .split ("," )
56+ signature = [sig .split ("=" , 1 ) for sig in signature ]
57+
58+ timestamp_int = int (signature [0 ][1 ]) if signature [0 ][0 ] == "t" else None
59+
60+ if timestamp_int is not None :
61+ timestamp = datetime .fromtimestamp (timestamp_int )
62+
63+ return (timestamp , signature [1 ])
64+
65+
66+ def verify_signature (self , payload : str , signature : str ) -> bool :
67+ is_advanced = len (signature .split ("," )) > 1
68+
69+ if is_advanced :
70+ return self .verify_advanced_signature (payload , signature )
71+
72+ return self .verify_simple_signature (payload , signature )
73+
74+ def verify_simple_signature (self , payload : str , signature : str ) -> bool :
75+ return self .compare_hashes (self .create_signature (payload ), signature )
76+
77+ def verify_advanced_signature (self , payload : str , signature : str ) -> bool :
78+ timestamp , signature = self .get_timestamp_and_signatures (signature )
79+
80+ try :
81+ if self .verify_timestamp (timestamp ) is False :
82+ raise InvalidTimestampError ("Invalid timestamp." )
83+ else :
84+ return self .compare_hashes (self .create_signature (payload ), signature [1 ])
85+
86+ except InvalidTimestampError as e :
87+ return e .response
88+
0 commit comments