42
42
_HEADER_CLIENT_ENCRYPTION_KEY = 'X-Bunq-Client-Encryption-Key'
43
43
_HEADER_CLIENT_ENCRYPTION_IV = 'X-Bunq-Client-Encryption-Iv'
44
44
_HEADER_CLIENT_ENCRYPTION_HMAC = 'X-Bunq-Client-Encryption-Hmac'
45
+ _HEADER_SERVER_SIGNATURE = 'X-Bunq-Server-Signature'
45
46
46
47
47
48
def generate_rsa_private_key ():
@@ -97,7 +98,7 @@ def sign_request(private_key, method, endpoint, body_bytes, headers):
97
98
:rtype: str
98
99
"""
99
100
100
- head_bytes = _generate_head_bytes (method , endpoint , headers )
101
+ head_bytes = _generate_request_head_bytes (method , endpoint , headers )
101
102
bytes_to_sign = head_bytes + body_bytes
102
103
signer = PKCS1_v1_5 .new (private_key )
103
104
digest = SHA256 .new ()
@@ -107,7 +108,7 @@ def sign_request(private_key, method, endpoint, body_bytes, headers):
107
108
return b64encode (sign )
108
109
109
110
110
- def _generate_head_bytes (method , endpoint , headers ):
111
+ def _generate_request_head_bytes (method , endpoint , headers ):
111
112
"""
112
113
:type method: str
113
114
:type endpoint: str
@@ -116,17 +117,17 @@ def _generate_head_bytes(method, endpoint, headers):
116
117
:rtype: bytes
117
118
"""
118
119
119
- header_tuples = sorted ((k , headers [k ]) for k in headers )
120
120
head_string = _FORMAT_METHOD_AND_ENDPOINT .format (method , endpoint )
121
+ header_tuples = sorted ((k , headers [k ]) for k in headers )
121
122
122
123
for name , value in header_tuples :
123
- if _should_sign_header (name ):
124
+ if _should_sign_request_header (name ):
124
125
head_string += _FORMAT_HEADER_STRING .format (name , value )
125
126
126
127
return (head_string + _DELIMITER_NEWLINE ).encode ()
127
128
128
129
129
- def _should_sign_header (header_name ):
130
+ def _should_sign_request_header (header_name ):
130
131
"""
131
132
:type header_name: str
132
133
@@ -229,3 +230,55 @@ def _add_header_client_encryption_hmac(request_bytes, key, iv, custom_headers):
229
230
hashed = hmac .new (key , iv + request_bytes , sha1 )
230
231
hashed_base64 = base64 .b64encode (hashed .digest ()).decode ()
231
232
custom_headers [_HEADER_CLIENT_ENCRYPTION_HMAC ] = hashed_base64
233
+
234
+
235
+ def validate_response (public_key_server , status_code , body_bytes , headers ):
236
+ """
237
+ :type public_key_server: RSA.RsaKey
238
+ :type status_code: int
239
+ :type body_bytes: bytes
240
+ :type headers: dict[str, str]
241
+
242
+ :rtype: None
243
+ """
244
+
245
+ head_bytes = _generate_response_head_bytes (status_code , headers )
246
+ bytes_signed = head_bytes + body_bytes
247
+ signer = PKCS1_v1_5 .pkcs1_15 .new (public_key_server )
248
+ digest = SHA256 .new ()
249
+ digest .update (bytes_signed )
250
+ signer .verify (digest , base64 .b64decode (headers [_HEADER_SERVER_SIGNATURE ]))
251
+
252
+
253
+ def _generate_response_head_bytes (status_code , headers ):
254
+ """
255
+ :type status_code: int
256
+ :type headers: dict[str, str]
257
+
258
+ :rtype: bytes
259
+ """
260
+
261
+ head_string = str (status_code ) + _DELIMITER_NEWLINE
262
+ header_tuples = sorted ((k , headers [k ]) for k in headers )
263
+
264
+ for name , value in header_tuples :
265
+ if _should_sign_response_header (name ):
266
+ head_string += _FORMAT_HEADER_STRING .format (name , value )
267
+
268
+ return (head_string + _DELIMITER_NEWLINE ).encode ()
269
+
270
+
271
+ def _should_sign_response_header (header_name ):
272
+ """
273
+ :type header_name: str
274
+
275
+ :rtype: bool
276
+ """
277
+
278
+ if header_name == _HEADER_SERVER_SIGNATURE :
279
+ return False
280
+
281
+ if re .match (_PATTERN_HEADER_PREFIX_BUNQ , header_name ):
282
+ return True
283
+
284
+ return False
0 commit comments