Skip to content

Commit 741c13b

Browse files
RTLcoilconst-cloudinary
authored andcommitted
Add verify_api_response_signature and verify_notification_signature util functions
1 parent 038837d commit 741c13b

File tree

2 files changed

+148
-4
lines changed

2 files changed

+148
-4
lines changed

cloudinary/utils.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,17 @@
121121
upload_params = __SIMPLE_UPLOAD_PARAMS + __SERIALIZED_UPLOAD_PARAMS
122122

123123

124+
def compute_hex_hash(s):
125+
"""
126+
Compute hash and convert the result to HEX string
127+
128+
:param s: string to process
129+
130+
:return: HEX string
131+
"""
132+
return hashlib.sha1(to_bytes(s)).hexdigest()
133+
134+
124135
def build_array(arg):
125136
if isinstance(arg, list):
126137
return arg
@@ -498,7 +509,7 @@ def sign_request(params, options):
498509
def api_sign_request(params_to_sign, api_secret):
499510
params = [(k + "=" + (",".join(v) if isinstance(v, list) else str(v))) for k, v in params_to_sign.items() if v]
500511
to_sign = "&".join(sorted(params))
501-
return hashlib.sha1(to_bytes(to_sign + api_secret)).hexdigest()
512+
return compute_hex_hash(to_sign + api_secret)
502513

503514

504515
def breakpoint_settings_mapper(breakpoint_settings):
@@ -1264,6 +1275,48 @@ def wrapper(*args, **kwargs):
12641275
return wrapper
12651276

12661277

1278+
def verify_api_response_signature(public_id, version, signature):
1279+
"""
1280+
Verifies the authenticity of an API response signature
1281+
1282+
:param public_id: The public id of the asset as returned in the API response
1283+
:param version: The version of the asset as returned in the API response
1284+
:param signature: Actual signature. Can be retrieved from the X-Cld-Signature header
1285+
1286+
:return: Boolean result of the validation
1287+
"""
1288+
if not cloudinary.config().api_secret:
1289+
raise Exception('Api secret key is empty')
1290+
1291+
parameters_to_sign = {'public_id': public_id,
1292+
'version': version}
1293+
1294+
return signature == api_sign_request(parameters_to_sign, cloudinary.config().api_secret)
1295+
1296+
1297+
def verify_notification_signature(body, timestamp, signature, valid_for=7200):
1298+
"""
1299+
Verifies the authenticity of a notification signature
1300+
1301+
:param body: Json of the request's body
1302+
:param timestamp: Unix timestamp. Can be retrieved from the X-Cld-Timestamp header
1303+
:param signature: Actual signature. Can be retrieved from the X-Cld-Signature header
1304+
:param valid_for: The desired time in seconds for considering the request valid
1305+
1306+
:return: Boolean result of the validation
1307+
"""
1308+
if not cloudinary.config().api_secret:
1309+
raise Exception('Api secret key is empty')
1310+
1311+
if timestamp < time.time() - valid_for:
1312+
return False
1313+
1314+
if not isinstance(body, str):
1315+
raise ValueError('Body should be type of string')
1316+
1317+
return signature == compute_hex_hash('{}{}{}'.format(body, timestamp, cloudinary.config().api_secret))
1318+
1319+
12671320
def get_http_connector(conf, options):
12681321
"""
12691322
Used to create http connector, depends on api_proxy configuration parameter

test/test_utils.py

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import re
44
import tempfile
55
import unittest
6+
import uuid
7+
import json
8+
import time
69
from collections import OrderedDict
710
from datetime import datetime, date
811
from fractions import Fraction
@@ -13,11 +16,28 @@
1316

1417
import cloudinary.utils
1518
from cloudinary import CL_BLANK
16-
from cloudinary.utils import build_list_of_dicts, json_encode, encode_unicode_url, base64url_encode, \
17-
patch_fetch_format, cloudinary_scaled_url, chain_transformations, generate_transformation_string, build_eager
19+
from cloudinary.utils import (
20+
build_list_of_dicts,
21+
json_encode,
22+
encode_unicode_url,
23+
base64url_encode,
24+
patch_fetch_format,
25+
cloudinary_scaled_url,
26+
chain_transformations,
27+
generate_transformation_string,
28+
build_eager,
29+
compute_hex_hash,
30+
verify_notification_signature,
31+
verify_api_response_signature,
32+
)
33+
from cloudinary.compat import to_bytes
1834
from test.helper_test import TEST_IMAGE, REMOTE_TEST_IMAGE
19-
from test.test_api import API_TEST_TRANS_SCALE100, API_TEST_TRANS_SCALE100_STR, API_TEST_TRANS_SEPIA_STR, \
35+
from test.test_api import (
36+
API_TEST_TRANS_SCALE100,
37+
API_TEST_TRANS_SCALE100_STR,
38+
API_TEST_TRANS_SEPIA_STR,
2039
API_TEST_TRANS_SEPIA
40+
)
2141

2242
DEFAULT_ROOT_PATH = 'http://res.cloudinary.com/test123/'
2343

@@ -33,6 +53,8 @@
3353
DEFAULT_VERSION_STR = 'v1'
3454
TEST_FOLDER = 'folder/test'
3555

56+
MOCKED_NOW = 1549533574
57+
API_SECRET = 'X7qLTrsES31MzxxkxPPA-pAGGfU'
3658

3759
class TestUtils(unittest.TestCase):
3860
crop_transformation = {'crop': 'crop', 'width': 100}
@@ -1198,6 +1220,75 @@ def test_build_eager(self):
11981220
for message, value, expected in test_data:
11991221
self.assertEqual(expected, build_eager(value), message)
12001222

1223+
def test_compute_hash(self):
1224+
self.assertEqual("4de279c82056603e91aab3930a593b8b887d9e48",
1225+
compute_hex_hash("https://cloudinary.com/images/old_logo.png"))
1226+
1227+
original_value = str(uuid.uuid4())
1228+
1229+
self.assertEqual(compute_hex_hash(original_value), compute_hex_hash(original_value),
1230+
"Equal STR inputs should be hashed the same way")
1231+
1232+
self.assertNotEqual(compute_hex_hash(original_value), compute_hex_hash("some string"),
1233+
"Unequal inputs hashes should not match")
1234+
1235+
def test_verify_api_response_signature(self):
1236+
public_id = 'tests/logo.png'
1237+
test_version = 1
1238+
api_response_signature = '08d3107a5b2ad82e7d82c0b972218fbf20b5b1e0'
1239+
1240+
with patch('cloudinary.config', return_value=cloudinary.config(api_secret=API_SECRET)):
1241+
self.assertTrue(verify_api_response_signature(public_id, test_version, api_response_signature),
1242+
"The response signature is valid for the same parameters")
1243+
1244+
self.assertFalse(verify_api_response_signature(public_id, test_version + 1, api_response_signature),
1245+
"The response signature is invalid for the wrong version")
1246+
1247+
with patch('cloudinary.config', return_value=cloudinary.config(api_secret=None)):
1248+
with self.assertRaises(Exception) as e:
1249+
verify_api_response_signature(public_id, test_version, api_response_signature)
1250+
self.assertEqual(str(e.exception), 'Api secret key is empty')
1251+
1252+
def test_verify_notification_signature(self):
1253+
valid_for = 60
1254+
signature = 'dfe82de1d9083fe0b7ea68070649f9a15b8874da'
1255+
body = '{"notification_type":"eager","eager":[{"transformation":"sp_full_hd/mp4","bytes":1055,' \
1256+
'"url":"http://res.cloudinary.com/demo/video/upload/sp_full_hd/v1533125278/dog.mp4",' \
1257+
'"secure_url":"https://res.cloudinary.com/demo/video/upload/sp_full_hd/v1533125278/dog.mp4"}],' \
1258+
'"public_id":"dog","batch_id":"9b11fa058c61fa577f4ec516bf6ee756ac2aefef095af99aef1302142cc1694a"}'
1259+
1260+
with patch('time.time', return_value=MOCKED_NOW):
1261+
valid_response_timestamp = time.time() - valid_for
1262+
test_message_part = "The notification signature is"
1263+
1264+
with patch('cloudinary.config', return_value=cloudinary.config(api_secret=API_SECRET)):
1265+
self.assertTrue(verify_notification_signature(body, valid_response_timestamp,
1266+
signature,
1267+
valid_for),
1268+
"{} valid for matching and not expired signature".format(test_message_part))
1269+
1270+
self.assertFalse(verify_notification_signature(body, valid_response_timestamp,
1271+
signature,
1272+
valid_for - 1),
1273+
"{} invalid for matching but expired signature".format(test_message_part))
1274+
1275+
self.assertFalse(verify_notification_signature(body, valid_response_timestamp,
1276+
signature + 'chars'),
1277+
"{} invalid for non matching and not expired signature".format(test_message_part))
1278+
1279+
self.assertFalse(verify_notification_signature(body, valid_response_timestamp,
1280+
signature + 'chars',
1281+
valid_for - 1),
1282+
"{} invalid for non matching and expired signature".format(test_message_part))
1283+
with self.assertRaises(Exception) as e:
1284+
verify_notification_signature(1, valid_response_timestamp, signature, valid_for)
1285+
self.assertEqual(str(e.exception), 'Body should be type of string')
1286+
1287+
with patch('cloudinary.config', return_value=cloudinary.config(api_secret=None)):
1288+
with self.assertRaises(Exception) as e:
1289+
verify_notification_signature(body, valid_response_timestamp, signature, valid_for)
1290+
self.assertEqual(str(e.exception), 'Api secret key is empty')
1291+
12011292

12021293
if __name__ == '__main__':
12031294
unittest.main()

0 commit comments

Comments
 (0)