5
5
import time
6
6
7
7
from mock import patch , Mock
8
- from nose .tools import eq_ , ok_ , assert_is_not , assert_raises
9
8
import http_ece
10
9
from cryptography .hazmat .primitives .asymmetric import ec
11
10
from cryptography .hazmat .primitives import serialization
@@ -79,12 +78,11 @@ def test_init(self):
79
78
"keys" : {'p256dh' : 'AAA=' , 'auth' : 'AAA=' }})
80
79
81
80
push = WebPusher (subscription_info )
82
- assert_is_not (push .subscription_info , subscription_info )
83
- assert_is_not (push .subscription_info ['keys' ],
84
- subscription_info ['keys' ])
85
- eq_ (push .subscription_info ['endpoint' ], subscription_info ['endpoint' ])
86
- eq_ (push .receiver_key , rk_decode )
87
- eq_ (push .auth_key , b'\x93 \xc2 U\xea \xc8 \xdd n\x10 "\xd6 }\xff ,0K\xbc ' )
81
+ assert push .subscription_info != subscription_info
82
+ assert push .subscription_info ['keys' ] != subscription_info ['keys' ]
83
+ assert push .subscription_info ['endpoint' ] == subscription_info ['endpoint' ]
84
+ assert push .receiver_key == rk_decode
85
+ assert push .auth_key == b'\x93 \xc2 U\xea \xc8 \xdd n\x10 "\xd6 }\xff ,0K\xbc '
88
86
89
87
def test_encode (self ):
90
88
for content_encoding in ["aesgcm" , "aes128gcm" ]:
@@ -118,7 +116,7 @@ def test_encode(self):
118
116
auth_secret = raw_auth ,
119
117
version = content_encoding
120
118
)
121
- eq_ ( decoded .decode ('utf8' ), data )
119
+ assert decoded .decode ('utf8' ) == data
122
120
123
121
def test_bad_content_encoding (self ):
124
122
subscription_info = self ._gen_subscription_info ()
@@ -136,13 +134,13 @@ def test_send(self, mock_post):
136
134
"Authentication" : "bearer vapid" }
137
135
data = "Mary had a little lamb"
138
136
WebPusher (subscription_info ).send (data , headers )
139
- eq_ ( subscription_info .get ('endpoint' ), mock_post .call_args [0 ][0 ])
137
+ assert subscription_info .get ('endpoint' ) == mock_post .call_args [0 ][0 ]
140
138
pheaders = mock_post .call_args [1 ].get ('headers' )
141
- eq_ ( pheaders .get ('ttl' ), '0' )
142
- eq_ ( pheaders .get ('AUTHENTICATION' ), headers .get ('Authentication' ) )
139
+ assert pheaders .get ('ttl' ) == '0'
140
+ assert pheaders .get ('AUTHENTICATION' ) == headers .get ('Authentication' )
143
141
ckey = pheaders .get ('crypto-key' )
144
- ok_ ( 'pre-existing' in ckey )
145
- eq_ ( pheaders .get ('content-encoding' ), 'aes128gcm' )
142
+ assert 'pre-existing' in ckey
143
+ assert pheaders .get ('content-encoding' ) == 'aes128gcm'
146
144
147
145
@patch ("requests.post" )
148
146
def test_send_vapid (self , mock_post ):
@@ -157,9 +155,9 @@ def test_send_vapid(self, mock_post):
157
155
content_encoding = "aesgcm" ,
158
156
headers = {"Test-Header" : "test-value" }
159
157
)
160
- eq_ ( subscription_info .get ('endpoint' ), mock_post .call_args [0 ][0 ])
158
+ assert subscription_info .get ('endpoint' ) == mock_post .call_args [0 ][0 ]
161
159
pheaders = mock_post .call_args [1 ].get ('headers' )
162
- eq_ ( pheaders .get ('ttl' ), '0' )
160
+ assert pheaders .get ('ttl' ) == '0'
163
161
164
162
def repad (str ):
165
163
return str + "====" [:len (str ) % 4 ]
@@ -169,12 +167,12 @@ def repad(str):
169
167
repad (pheaders ['authorization' ].split ('.' )[1 ])
170
168
).decode ('utf8' )
171
169
)
172
- ok_ ( subscription_info .get ('endpoint' ).startswith (auth ['aud' ]) )
173
- ok_ ( 'vapid' in pheaders .get ('authorization' ) )
170
+ assert subscription_info .get ('endpoint' ).startswith (auth ['aud' ])
171
+ assert 'vapid' in pheaders .get ('authorization' )
174
172
ckey = pheaders .get ('crypto-key' )
175
- ok_ ( 'dh=' in ckey )
176
- eq_ ( pheaders .get ('content-encoding' ), 'aesgcm' )
177
- eq_ ( pheaders .get ('test-header' ), 'test-value' )
173
+ assert 'dh=' in ckey
174
+ assert pheaders .get ('content-encoding' ) == 'aesgcm'
175
+ assert pheaders .get ('test-header' ) == 'test-value'
178
176
179
177
@patch .object (WebPusher , "send" )
180
178
@patch .object (py_vapid .Vapid , "sign" )
@@ -211,61 +209,61 @@ def test_webpush_vapid_exp(self, vapid_sign, pusher_send):
211
209
)
212
210
vapid_sign .assert_called_once_with (claims )
213
211
pusher_send .assert_called_once ()
214
- ok_ ( claims ['exp' ] > int (time .time () ))
212
+ assert claims ['exp' ] > int (time .time ())
215
213
216
214
@patch ("requests.post" )
217
215
def test_send_bad_vapid_no_key (self , mock_post ):
218
216
mock_post .return_value .status_code = 200
219
217
220
218
subscription_info = self ._gen_subscription_info ()
221
219
data = "Mary had a little lamb"
222
- assert_raises ( WebPushException ,
223
- webpush ,
224
- subscription_info = subscription_info ,
225
- data = data ,
226
- vapid_claims = {
227
- "aud" : "https://example.com" ,
228
- "sub " : "mailto:ops@ example.com"
229
- }
230
- )
220
+ self . assertRaises (
221
+ WebPushException ,
222
+ webpush ,
223
+ subscription_info = subscription_info ,
224
+ data = data ,
225
+ vapid_claims = {
226
+ "aud " : "https:// example.com" ,
227
+
228
+ } )
231
229
232
230
@patch ("requests.post" )
233
231
def test_send_bad_vapid_bad_return (self , mock_post ):
234
232
mock_post .return_value .status_code = 410
235
233
236
234
subscription_info = self ._gen_subscription_info ()
237
235
data = "Mary had a little lamb"
238
- assert_raises ( WebPushException ,
239
- webpush ,
240
- subscription_info = subscription_info ,
241
- data = data ,
242
- vapid_claims = {
243
- "aud" : "https://example.com" ,
244
- "sub " : "mailto:ops@ example.com"
245
- },
246
- vapid_private_key = self . vapid_key
247
- )
236
+ self . assertRaises (
237
+ WebPushException ,
238
+ webpush ,
239
+ subscription_info = subscription_info ,
240
+ data = data ,
241
+ vapid_claims = {
242
+ "aud " : "https:// example.com" ,
243
+
244
+ },
245
+ vapid_private_key = self . vapid_key )
248
246
249
247
@patch ("requests.post" )
250
248
def test_send_empty (self , mock_post ):
251
249
subscription_info = self ._gen_subscription_info ()
252
250
headers = {"Crypto-Key" : "pre-existing" ,
253
251
"Authentication" : "bearer vapid" }
254
252
WebPusher (subscription_info ).send ('' , headers )
255
- eq_ ( subscription_info .get ('endpoint' ), mock_post .call_args [0 ][0 ])
253
+ assert subscription_info .get ('endpoint' ) == mock_post .call_args [0 ][0 ]
256
254
pheaders = mock_post .call_args [1 ].get ('headers' )
257
- eq_ ( pheaders .get ('ttl' ), '0' )
258
- ok_ ( 'encryption' not in pheaders )
259
- eq_ ( pheaders .get ('AUTHENTICATION' ), headers .get ('Authentication' ) )
255
+ assert pheaders .get ('ttl' ) == '0'
256
+ assert 'encryption' not in pheaders
257
+ assert pheaders .get ('AUTHENTICATION' ) == headers .get ('Authentication' )
260
258
ckey = pheaders .get ('crypto-key' )
261
- ok_ ( 'pre-existing' in ckey )
259
+ assert 'pre-existing' in ckey
262
260
263
261
def test_encode_empty (self ):
264
262
subscription_info = self ._gen_subscription_info ()
265
263
headers = {"Crypto-Key" : "pre-existing" ,
266
264
"Authentication" : "bearer vapid" }
267
265
encoded = WebPusher (subscription_info ).encode ('' , headers )
268
- eq_ ( encoded , None )
266
+ assert encoded is None
269
267
270
268
def test_encode_no_crypto (self ):
271
269
subscription_info = self ._gen_subscription_info ()
@@ -274,20 +272,21 @@ def test_encode_no_crypto(self):
274
272
"Authentication" : "bearer vapid" }
275
273
data = 'Something'
276
274
pusher = WebPusher (subscription_info )
277
- assert_raises (WebPushException ,
278
- pusher .encode ,
279
- data ,
280
- headers )
275
+ self .assertRaises (
276
+ WebPushException ,
277
+ pusher .encode ,
278
+ data ,
279
+ headers )
281
280
282
281
@patch ("requests.post" )
283
282
def test_send_no_headers (self , mock_post ):
284
283
subscription_info = self ._gen_subscription_info ()
285
284
data = "Mary had a little lamb"
286
285
WebPusher (subscription_info ).send (data )
287
- eq_ ( subscription_info .get ('endpoint' ), mock_post .call_args [0 ][0 ])
286
+ assert subscription_info .get ('endpoint' ) == mock_post .call_args [0 ][0 ]
288
287
pheaders = mock_post .call_args [1 ].get ('headers' )
289
- eq_ ( pheaders .get ('ttl' ), '0' )
290
- eq_ ( pheaders .get ('content-encoding' ), 'aes128gcm' )
288
+ assert pheaders .get ('ttl' ) == '0'
289
+ assert pheaders .get ('content-encoding' ) == 'aes128gcm'
291
290
292
291
@patch ("pywebpush.open" )
293
292
def test_as_curl (self , opener ):
@@ -309,15 +308,15 @@ def test_as_curl(self, opener):
309
308
"-H \" ttl: 0\" " ,
310
309
"-H \" content-length:"
311
310
]:
312
- ok_ ( s in result , "missing: {}" .format (s ) )
311
+ assert s in result , "missing: {}" .format (s )
313
312
314
313
def test_ci_dict (self ):
315
314
ci = CaseInsensitiveDict ({"Foo" : "apple" , "bar" : "banana" })
316
- eq_ ( 'apple' , ci ["foo" ])
317
- eq_ ( 'apple' , ci .get ("FOO" ) )
318
- eq_ ( 'apple' , ci .get ("Foo" ) )
315
+ assert 'apple' == ci ["foo" ]
316
+ assert 'apple' == ci .get ("FOO" )
317
+ assert 'apple' == ci .get ("Foo" )
319
318
del (ci ['FOO' ])
320
- eq_ ( None , ci .get ('Foo' ))
319
+ assert ci .get ('Foo' ) is None
321
320
322
321
@patch ("requests.post" )
323
322
def test_gcm (self , mock_post ):
@@ -331,18 +330,18 @@ def test_gcm(self, mock_post):
331
330
wp .send (data , headers , gcm_key = "gcm_key_value" )
332
331
pdata = json .loads (mock_post .call_args [1 ].get ('data' ))
333
332
pheaders = mock_post .call_args [1 ].get ('headers' )
334
- eq_ ( pdata ["registration_ids" ][0 ], "regid123" )
335
- eq_ ( pheaders .get ("authorization" ), "key=gcm_key_value" )
336
- eq_ ( pheaders .get ("content-type" ), "application/json" )
333
+ assert pdata ["registration_ids" ][0 ] == "regid123"
334
+ assert pheaders .get ("authorization" ) == "key=gcm_key_value"
335
+ assert pheaders .get ("content-type" ) == "application/json"
337
336
338
337
@patch ("requests.post" )
339
338
def test_timeout (self , mock_post ):
340
339
mock_post .return_value .status_code = 200
341
340
subscription_info = self ._gen_subscription_info ()
342
341
WebPusher (subscription_info ).send (timeout = 5.2 )
343
- eq_ ( mock_post .call_args [1 ].get ('timeout' ), 5.2 )
342
+ assert mock_post .call_args [1 ].get ('timeout' ) == 5.2
344
343
webpush (subscription_info , timeout = 10.001 )
345
- eq_ ( mock_post .call_args [1 ].get ('timeout' ), 10.001 )
344
+ assert mock_post .call_args [1 ].get ('timeout' ) == 10.001
346
345
347
346
@patch ("requests.Session" )
348
347
def test_send_using_requests_session (self , mock_session ):
@@ -352,14 +351,14 @@ def test_send_using_requests_session(self, mock_session):
352
351
data = "Mary had a little lamb"
353
352
WebPusher (subscription_info ,
354
353
requests_session = mock_session ).send (data , headers )
355
- eq_ ( subscription_info .get ('endpoint' ),
356
- mock_session .post .call_args [0 ][0 ])
354
+ assert subscription_info .get (
355
+ 'endpoint' ) == mock_session .post .call_args [0 ][0 ]
357
356
pheaders = mock_session .post .call_args [1 ].get ('headers' )
358
- eq_ ( pheaders .get ('ttl' ), '0' )
359
- eq_ ( pheaders .get ('AUTHENTICATION' ), headers .get ('Authentication' ) )
357
+ assert pheaders .get ('ttl' ) == '0'
358
+ assert pheaders .get ('AUTHENTICATION' ) == headers .get ('Authentication' )
360
359
ckey = pheaders .get ('crypto-key' )
361
- ok_ ( 'pre-existing' in ckey )
362
- eq_ ( pheaders .get ('content-encoding' ), 'aes128gcm' )
360
+ assert 'pre-existing' in ckey
361
+ assert pheaders .get ('content-encoding' ) == 'aes128gcm'
363
362
364
363
365
364
class WebpushExceptionTestCase (unittest .TestCase ):
0 commit comments