1
1
# SPDX-License-Identifier: Apache-2.0
2
2
3
+ import base64
4
+ import hashlib
5
+ import hmac
6
+ import json
7
+ import time
8
+
3
9
import pretend
10
+ import pytest
4
11
5
12
from warehouse import request
6
13
7
14
15
+ class TestNormalizeDomain :
16
+ @pytest .mark .parametrize (
17
+ ("input_domain" , "expected" ),
18
+ [
19
+ # Lowercase normalization
20
+ ("PyPi.ORG" , "pypi.org" ),
21
+ ("TEST.PyPi.ORG" , "test.pypi.org" ),
22
+ ("LOCALHOST" , "localhost" ),
23
+ # Trailing dots removal
24
+ ("pypi.org." , "pypi.org" ),
25
+ ("pypi.org..." , "pypi.org" ),
26
+ ("localhost." , "localhost" ),
27
+ # Whitespace handling
28
+ (" pypi.org " , "pypi.org" ),
29
+ ("\t pypi.org\n " , "pypi.org" ),
30
+ (" localhost " , "localhost" ),
31
+ # Mixed normalizations
32
+ (" TEST.PyPi.ORG. " , "test.pypi.org" ),
33
+ (" LOCALHOST. " , "localhost" ),
34
+ (" 127.0.0.1 " , "127.0.0.1" ),
35
+ ],
36
+ )
37
+ def test_domain_normalization (self , input_domain , expected ):
38
+ """Test that domains are properly normalized."""
39
+ assert request ._normalize_domain (input_domain ) == expected
40
+
41
+ def test_handles_idn_domains (self ):
42
+ """Test that IDN domains are converted to ASCII (punycode)."""
43
+ # These are different Unicode characters that look similar
44
+ assert request ._normalize_domain ("рyрі.org" ) != "pypi.org" # Cyrillic chars
45
+ # The result should be the punycode version
46
+ assert request ._normalize_domain ("рyрі.org" ).startswith ("xn--" )
47
+
48
+ def test_handles_invalid_idn_domains (self ):
49
+ """Test that invalid IDN domains fall back to normalized form."""
50
+ # Test with invalid Unicode that can't be encoded to IDN
51
+ # Using a string with invalid surrogate characters
52
+ invalid_domain = "test\udcff .org" # Contains an unpaired surrogate
53
+ result = request ._normalize_domain (invalid_domain )
54
+ # Should return the normalized version without failing
55
+ assert result == "test\udcff .org"
56
+
57
+ # Test with a domain that causes encoding issues
58
+ # Empty labels are not allowed in IDN
59
+ invalid_domain2 = "test..org"
60
+ result2 = request ._normalize_domain (invalid_domain2 )
61
+ assert result2 == "test..org"
62
+
63
+
8
64
class TestCreateNonce :
9
65
def test_generates_unique_nonces (self ):
10
66
"""Test that each request gets a unique nonce."""
@@ -37,39 +93,161 @@ def test_nonce_is_url_safe(self):
37
93
assert re .match (r"^[A-Za-z0-9_-]+$" , nonce )
38
94
39
95
96
+ class TestCreateIntegrityToken :
97
+ def test_creates_valid_token (self ):
98
+ """Test that integrity tokens are created with proper structure."""
99
+ req = pretend .stub (nonce = "test-nonce-123" )
100
+
101
+ token = request ._create_integrity_token (req )
102
+
103
+ # Should be base64 encoded
104
+ assert isinstance (token , str )
105
+
106
+ # Should be decodable
107
+ decoded = base64 .b64decode (token ).decode ("utf-8" )
108
+ token_data = json .loads (decoded )
109
+
110
+ # Should have required fields
111
+ assert "ts" in token_data
112
+ assert "entropy" in token_data
113
+ assert "nonce" in token_data
114
+
115
+ # Timestamp should be recent
116
+ current_time = int (time .time ())
117
+ assert abs (token_data ["ts" ] - current_time ) < 5 # Within 5 seconds
118
+
119
+ # Entropy should be base64 encoded
120
+ entropy_bytes = base64 .b64decode (token_data ["entropy" ])
121
+ assert len (entropy_bytes ) == 16
122
+
123
+ # Nonce should match
124
+ assert token_data ["nonce" ] == "test-nonce-123"
125
+
126
+ def test_different_requests_get_different_tokens (self ):
127
+ """Test that different requests get different integrity tokens."""
128
+ req1 = pretend .stub (nonce = "nonce-1" )
129
+ req2 = pretend .stub (nonce = "nonce-2" )
130
+
131
+ token1 = request ._create_integrity_token (req1 )
132
+ token2 = request ._create_integrity_token (req2 )
133
+
134
+ assert token1 != token2
135
+
136
+ # Even with same nonce, entropy should differ
137
+ req3 = pretend .stub (nonce = "nonce-1" )
138
+ token3 = request ._create_integrity_token (req3 )
139
+
140
+ assert token1 != token3
141
+
142
+
40
143
class TestCreateHashedDomains :
41
- def test_hashes_domains_with_nonce (self ):
42
- """Test that domains are hashed using the nonce."""
144
+ def test_hashes_domains_with_enhanced_security (self ):
145
+ """Test that domains are hashed using multi-layer approach."""
146
+ # Create a mock integrity token
147
+ token_data = {
148
+ "ts" : int (time .time ()),
149
+ "entropy" : base64 .b64encode (b"test-entropy-123" ).decode ("ascii" ),
150
+ "nonce" : "test-nonce-123" ,
151
+ }
152
+ integrity_token = base64 .b64encode (
153
+ json .dumps (token_data , sort_keys = True , separators = ("," , ":" )).encode (
154
+ "utf-8"
155
+ )
156
+ ).decode ("ascii" )
157
+
43
158
req = pretend .stub (
44
159
nonce = "test-nonce-123" ,
160
+ integrity_token = integrity_token ,
45
161
registry = pretend .stub (
46
162
settings = {"warehouse.allowed_domains" : ["pypi.org" , "test.pypi.org" ]}
47
163
),
48
164
)
49
165
50
166
hashed = request ._create_hashed_domains (req )
51
167
52
- # Should return comma-separated list
53
- assert "," in hashed
54
- hashes = hashed .split ("," )
55
- assert len (hashes ) == 2
168
+ # Should have pipe separators
169
+ assert "|" in hashed
170
+ parts = hashed .split ("|" )
171
+
172
+ # Should have 2 domain hashes + 1 checksum
173
+ assert len (parts ) == 3
174
+
175
+ # Each domain hash should be 64 chars (sha256 hex)
176
+ for i in range (2 ):
177
+ assert len (parts [i ]) == 64
178
+ assert all (c in "0123456789abcdef" for c in parts [i ])
56
179
57
- # Each hash should be 64 chars (sha256 hex)
58
- for h in hashes :
59
- assert len (h ) == 64
60
- assert all (c in "0123456789abcdef" for c in h )
180
+ # Checksum should be 16 chars
181
+ assert len (parts [2 ]) == 16
182
+ assert all (c in "0123456789abcdef" for c in parts [2 ])
61
183
62
184
# Hashes should be different for different domains
63
- assert hashes [0 ] != hashes [1 ]
185
+ assert parts [0 ] != parts [1 ]
186
+
187
+ def test_domain_normalization_applied (self ):
188
+ """Test that domain normalization is applied before hashing."""
189
+ token_data = {
190
+ "ts" : int (time .time ()),
191
+ "entropy" : base64 .b64encode (b"test-entropy-123" ).decode ("ascii" ),
192
+ "nonce" : "test-nonce-123" ,
193
+ }
194
+ integrity_token = base64 .b64encode (
195
+ json .dumps (token_data , sort_keys = True , separators = ("," , ":" )).encode (
196
+ "utf-8"
197
+ )
198
+ ).decode ("ascii" )
199
+
200
+ # Two requests with different domain formats
201
+ req1 = pretend .stub (
202
+ nonce = "test-nonce-123" ,
203
+ integrity_token = integrity_token ,
204
+ registry = pretend .stub (settings = {"warehouse.allowed_domains" : ["PyPi.ORG" ]}),
205
+ )
206
+
207
+ req2 = pretend .stub (
208
+ nonce = "test-nonce-123" ,
209
+ integrity_token = integrity_token ,
210
+ registry = pretend .stub (settings = {"warehouse.allowed_domains" : ["pypi.org" ]}),
211
+ )
212
+
213
+ hashed1 = request ._create_hashed_domains (req1 )
214
+ hashed2 = request ._create_hashed_domains (req2 )
215
+
216
+ # Should produce same hash despite different case
217
+ assert hashed1 == hashed2
64
218
65
219
def test_different_nonce_produces_different_hashes (self ):
66
220
"""Test that different nonces produce different hashes for same domain."""
221
+ token_data1 = {
222
+ "ts" : int (time .time ()),
223
+ "entropy" : base64 .b64encode (b"entropy-1" ).decode ("ascii" ),
224
+ "nonce" : "nonce-1" ,
225
+ }
226
+ integrity_token1 = base64 .b64encode (
227
+ json .dumps (token_data1 , sort_keys = True , separators = ("," , ":" )).encode (
228
+ "utf-8"
229
+ )
230
+ ).decode ("ascii" )
231
+
232
+ token_data2 = {
233
+ "ts" : int (time .time ()),
234
+ "entropy" : base64 .b64encode (b"entropy-2" ).decode ("ascii" ),
235
+ "nonce" : "nonce-2" ,
236
+ }
237
+ integrity_token2 = base64 .b64encode (
238
+ json .dumps (token_data2 , sort_keys = True , separators = ("," , ":" )).encode (
239
+ "utf-8"
240
+ )
241
+ ).decode ("ascii" )
242
+
67
243
req1 = pretend .stub (
68
244
nonce = "nonce-1" ,
245
+ integrity_token = integrity_token1 ,
69
246
registry = pretend .stub (settings = {"warehouse.allowed_domains" : ["pypi.org" ]}),
70
247
)
71
248
req2 = pretend .stub (
72
249
nonce = "nonce-2" ,
250
+ integrity_token = integrity_token2 ,
73
251
registry = pretend .stub (settings = {"warehouse.allowed_domains" : ["pypi.org" ]}),
74
252
)
75
253
@@ -78,10 +256,66 @@ def test_different_nonce_produces_different_hashes(self):
78
256
79
257
assert hashed1 != hashed2
80
258
259
+ def test_timestamp_affects_hash (self ):
260
+ """Test that timestamp changes affect the hash."""
261
+ token_data1 = {
262
+ "ts" : int (time .time ()),
263
+ "entropy" : base64 .b64encode (b"test-entropy" ).decode ("ascii" ),
264
+ "nonce" : "test-nonce" ,
265
+ }
266
+
267
+ token_data2 = {
268
+ "ts" : int (time .time ()) + 100 , # 100 seconds later
269
+ "entropy" : base64 .b64encode (b"test-entropy" ).decode ("ascii" ),
270
+ "nonce" : "test-nonce" ,
271
+ }
272
+
273
+ integrity_token1 = base64 .b64encode (
274
+ json .dumps (token_data1 , sort_keys = True , separators = ("," , ":" )).encode (
275
+ "utf-8"
276
+ )
277
+ ).decode ("ascii" )
278
+
279
+ integrity_token2 = base64 .b64encode (
280
+ json .dumps (token_data2 , sort_keys = True , separators = ("," , ":" )).encode (
281
+ "utf-8"
282
+ )
283
+ ).decode ("ascii" )
284
+
285
+ req1 = pretend .stub (
286
+ nonce = "test-nonce" ,
287
+ integrity_token = integrity_token1 ,
288
+ registry = pretend .stub (settings = {"warehouse.allowed_domains" : ["pypi.org" ]}),
289
+ )
290
+
291
+ req2 = pretend .stub (
292
+ nonce = "test-nonce" ,
293
+ integrity_token = integrity_token2 ,
294
+ registry = pretend .stub (settings = {"warehouse.allowed_domains" : ["pypi.org" ]}),
295
+ )
296
+
297
+ hashed1 = request ._create_hashed_domains (req1 )
298
+ hashed2 = request ._create_hashed_domains (req2 )
299
+
300
+ # Hashes should be different due to different timestamps
301
+ assert hashed1 != hashed2
302
+
81
303
def test_empty_domains_returns_empty_string (self ):
82
304
"""Test that empty domain list returns empty string."""
305
+ token_data = {
306
+ "ts" : int (time .time ()),
307
+ "entropy" : base64 .b64encode (b"test-entropy" ).decode ("ascii" ),
308
+ "nonce" : "test-nonce" ,
309
+ }
310
+ integrity_token = base64 .b64encode (
311
+ json .dumps (token_data , sort_keys = True , separators = ("," , ":" )).encode (
312
+ "utf-8"
313
+ )
314
+ ).decode ("ascii" )
315
+
83
316
req = pretend .stub (
84
317
nonce = "test-nonce" ,
318
+ integrity_token = integrity_token ,
85
319
registry = pretend .stub (settings = {"warehouse.allowed_domains" : []}),
86
320
)
87
321
@@ -90,7 +324,66 @@ def test_empty_domains_returns_empty_string(self):
90
324
91
325
def test_no_domains_setting_returns_empty_string (self ):
92
326
"""Test that missing domains setting returns empty string."""
93
- req = pretend .stub (nonce = "test-nonce" , registry = pretend .stub (settings = {}))
327
+ token_data = {
328
+ "ts" : int (time .time ()),
329
+ "entropy" : base64 .b64encode (b"test-entropy" ).decode ("ascii" ),
330
+ "nonce" : "test-nonce" ,
331
+ }
332
+ integrity_token = base64 .b64encode (
333
+ json .dumps (token_data , sort_keys = True , separators = ("," , ":" )).encode (
334
+ "utf-8"
335
+ )
336
+ ).decode ("ascii" )
337
+
338
+ req = pretend .stub (
339
+ nonce = "test-nonce" ,
340
+ integrity_token = integrity_token ,
341
+ registry = pretend .stub (settings = {}),
342
+ )
94
343
95
344
hashed = request ._create_hashed_domains (req )
96
345
assert hashed == ""
346
+
347
+ def test_checksum_validates_integrity (self ):
348
+ """Test that the checksum properly validates the domain hashes."""
349
+ token_data = {
350
+ "ts" : int (time .time ()),
351
+ "entropy" : base64 .b64encode (b"test-entropy-123" ).decode ("ascii" ),
352
+ "nonce" : "test-nonce-123" ,
353
+ }
354
+ integrity_token = base64 .b64encode (
355
+ json .dumps (token_data , sort_keys = True , separators = ("," , ":" )).encode (
356
+ "utf-8"
357
+ )
358
+ ).decode ("ascii" )
359
+
360
+ req = pretend .stub (
361
+ nonce = "test-nonce-123" ,
362
+ integrity_token = integrity_token ,
363
+ registry = pretend .stub (
364
+ settings = {"warehouse.allowed_domains" : ["pypi.org" , "test.pypi.org" ]}
365
+ ),
366
+ )
367
+
368
+ hashed = request ._create_hashed_domains (req )
369
+ parts = hashed .split ("|" )
370
+
371
+ # Verify checksum matches expected format
372
+ checksum = parts [- 1 ]
373
+ assert len (checksum ) == 16
374
+
375
+ # If we change a hash, the checksum should be different
376
+ # Recalculate checksum with modified hash
377
+ modified_hashes = parts [:- 1 ]
378
+ modified_hashes [0 ] = "0" * 64 # Replace first hash with zeros
379
+
380
+ nonce_bytes = b"test-nonce-123"
381
+ entropy_bytes = b"test-entropy-123"
382
+
383
+ all_hashes = "|" .join (modified_hashes )
384
+ new_checksum = hmac .new (
385
+ nonce_bytes + entropy_bytes , all_hashes .encode ("utf-8" ), hashlib .sha256
386
+ ).hexdigest ()[:16 ]
387
+
388
+ # Checksums should be different
389
+ assert new_checksum != checksum
0 commit comments