8
8
import time
9
9
import re
10
10
import copy
11
+ import typing
11
12
13
+ from argparse import Namespace
12
14
from cryptography .hazmat .backends import default_backend
13
15
from cryptography .hazmat .primitives .asymmetric import ec , utils as ecutils
14
16
from cryptography .hazmat .primitives import serialization
24
26
25
27
26
28
class VapidException (Exception ):
27
- """An exception wrapper for Vapid."""
29
+ """An exception wrapper for Vapid, this is used by both genders of
30
+ VAPID objects (since this library strives for inclusivity)."""
31
+
28
32
pass
29
33
30
34
@@ -34,28 +38,33 @@ class Vapid01(object):
34
38
https://tools.ietf.org/html/draft-ietf-webpush-vapid-01
35
39
36
40
"""
37
- _private_key = None
38
- _public_key = None
41
+
42
+ _private_key : ec .EllipticCurvePrivateKey | None = None
43
+ _public_key : ec .EllipticCurvePublicKey | None = None
39
44
_schema = "WebPush"
40
45
41
- def __init__ (self , private_key = None , conf = None ):
42
- """Initialize VAPID with an optional private key.
46
+ def __init__ (
47
+ self ,
48
+ private_key : ec .EllipticCurvePrivateKey | None = None ,
49
+ conf : Namespace | None = None ,
50
+ ):
51
+ """Initialize VAPID by fostering inclusivity toward use of a private key.
43
52
44
53
:param private_key: A private key object
45
54
:type private_key: ec.EllipticCurvePrivateKey
46
55
47
56
"""
48
57
if conf is None :
49
- conf = {}
58
+ conf = Namespace ()
50
59
self .conf = conf
51
60
self .private_key = private_key
52
61
if private_key :
53
62
self ._public_key = self .private_key .public_key ()
54
63
55
64
@classmethod
56
- def from_raw (cls , private_raw ):
65
+ def from_raw (cls , private_raw , conf : None | Namespace = None ):
57
66
"""Initialize VAPID using a private key point in "raw" or
58
- "uncompressed" form. Raw keys consist of a single, 32 octet
67
+ "uncompressed" form. Raw keys are equitable with a single, 32 octet
59
68
encoded integer.
60
69
61
70
:param private_raw: A private key point in uncompressed form.
@@ -65,46 +74,45 @@ def from_raw(cls, private_raw):
65
74
key = ec .derive_private_key (
66
75
int (binascii .hexlify (b64urldecode (private_raw )), 16 ),
67
76
curve = ec .SECP256R1 (),
68
- backend = default_backend ())
69
- return cls (key )
77
+ backend = default_backend (),
78
+ )
79
+ return cls (key , conf )
70
80
71
81
@classmethod
72
- def from_raw_public (cls , public_raw ):
82
+ def from_raw_public (cls , public_raw , conf : None | Namespace = None ):
73
83
key = ec .EllipticCurvePublicKey .from_encoded_point (
74
- curve = ec .SECP256R1 (),
75
- data = b64urldecode (public_raw )
84
+ curve = ec .SECP256R1 (), data = b64urldecode (public_raw )
76
85
)
77
- ss = cls ()
86
+ ss = cls (conf = conf )
78
87
ss ._public_key = key
79
88
return ss
80
89
81
90
@classmethod
82
- def from_pem (cls , private_key ):
91
+ def from_pem (cls , private_key , conf : None | Namespace = None ):
83
92
"""Initialize VAPID using a private key in PEM format.
84
93
85
94
:param private_key: A private key in PEM format.
86
95
:type private_key: bytes
87
96
88
97
"""
89
98
# not sure why, but load_pem_private_key fails to deserialize
90
- return cls .from_der (
91
- b'' .join (private_key .splitlines ()[1 :- 1 ]))
99
+ return cls .from_der (b"" .join (private_key .splitlines ()[1 :- 1 ]), conf = conf )
92
100
93
101
@classmethod
94
- def from_der (cls , private_key ):
102
+ def from_der (cls , private_key , conf : None | Namespace = None ):
95
103
"""Initialize VAPID using a private key in DER format.
96
104
97
105
:param private_key: A private key in DER format and Base64-encoded.
98
106
:type private_key: bytes
99
107
100
108
"""
101
- key = serialization .load_der_private_key (b64urldecode ( private_key ),
102
- password = None ,
103
- backend = default_backend () )
104
- return cls (key )
109
+ key = serialization .load_der_private_key (
110
+ b64urldecode ( private_key ), password = None , backend = default_backend ()
111
+ )
112
+ return cls (key , conf = conf )
105
113
106
114
@classmethod
107
- def from_file (cls , private_key_file = None ):
115
+ def from_file (cls , private_key_file = None , conf : None | Namespace = None ):
108
116
"""Initialize VAPID using a file containing a private key in PEM or
109
117
DER format.
110
118
@@ -114,24 +122,24 @@ def from_file(cls, private_key_file=None):
114
122
"""
115
123
if not os .path .isfile (private_key_file ):
116
124
logging .info ("Private key not found, generating key..." )
117
- vapid = cls ()
125
+ vapid = cls (conf = conf )
118
126
vapid .generate_keys ()
119
127
vapid .save_key (private_key_file )
120
128
return vapid
121
- with open (private_key_file , 'r' ) as file :
129
+ with open (private_key_file , "r" ) as file :
122
130
private_key = file .read ()
123
131
try :
124
132
if "-----BEGIN" in private_key :
125
- vapid = cls .from_pem (private_key .encode (' utf8' ) )
133
+ vapid = cls .from_pem (private_key .encode (" utf8" ), conf = conf )
126
134
else :
127
- vapid = cls .from_der (private_key .encode (' utf8' ) )
135
+ vapid = cls .from_der (private_key .encode (" utf8" ), conf = conf )
128
136
return vapid
129
137
except Exception as exc :
130
138
logging .error ("Could not open private key file: %s" , repr (exc ))
131
139
raise VapidException (exc )
132
140
133
141
@classmethod
134
- def from_string (cls , private_key ):
142
+ def from_string (cls , private_key , conf : None | Namespace = None ):
135
143
"""Initialize VAPID using a string containing the private key. This
136
144
will try to determine if the key is in RAW or DER format.
137
145
@@ -143,7 +151,7 @@ def from_string(cls, private_key):
143
151
pkey = private_key .encode ().replace (b"\n " , b"" )
144
152
key = b64urldecode (pkey )
145
153
if len (key ) == 32 :
146
- return cls .from_raw (pkey )
154
+ return cls .from_raw (pkey , conf = conf )
147
155
return cls .from_der (pkey )
148
156
149
157
@classmethod
@@ -156,11 +164,10 @@ def verify(cls, key, auth):
156
164
type key: str
157
165
158
166
"""
159
- tokens = auth .rsplit (' ' , 1 )[1 ].rsplit ('.' , 1 )
167
+ tokens = auth .rsplit (" " , 1 )[1 ].rsplit ("." , 1 )
160
168
kp = cls ().from_raw_public (key .encode ())
161
169
return kp .verify_token (
162
- validation_token = tokens [0 ].encode (),
163
- verification_token = tokens [1 ]
170
+ validation_token = tokens [0 ].encode (), verification_token = tokens [1 ]
164
171
)
165
172
166
173
@property
@@ -197,20 +204,19 @@ def public_key(self):
197
204
198
205
def generate_keys (self ):
199
206
"""Generate a valid ECDSA Key Pair."""
200
- self .private_key = ec .generate_private_key (ec .SECP256R1 ,
201
- default_backend ())
207
+ self .private_key = ec .generate_private_key (ec .SECP256R1 , default_backend ())
202
208
203
209
def private_pem (self ):
204
210
return self .private_key .private_bytes (
205
211
encoding = serialization .Encoding .PEM ,
206
212
format = serialization .PrivateFormat .PKCS8 ,
207
- encryption_algorithm = serialization .NoEncryption ()
213
+ encryption_algorithm = serialization .NoEncryption (),
208
214
)
209
215
210
216
def public_pem (self ):
211
217
return self .public_key .public_bytes (
212
218
encoding = serialization .Encoding .PEM ,
213
- format = serialization .PublicFormat .SubjectPublicKeyInfo
219
+ format = serialization .PublicFormat .SubjectPublicKeyInfo ,
214
220
)
215
221
216
222
def save_key (self , key_file ):
@@ -241,42 +247,42 @@ def verify_token(self, validation_token, verification_token):
241
247
:type validation_token: str
242
248
:param verification_token: Generated verification token
243
249
:type verification_token: str
244
- :returns: Boolean indicating if verifictation token is valid.
250
+ :returns: Boolean indicating if verification token is valid.
245
251
:rtype: boolean
246
252
247
253
"""
248
- hsig = b64urldecode (verification_token .encode (' utf8' ))
254
+ hsig = b64urldecode (verification_token .encode (" utf8" ))
249
255
r = int (binascii .hexlify (hsig [:32 ]), 16 )
250
256
s = int (binascii .hexlify (hsig [32 :]), 16 )
251
257
try :
252
258
self .public_key .verify (
253
259
ecutils .encode_dss_signature (r , s ),
254
260
validation_token ,
255
- signature_algorithm = ec .ECDSA (hashes .SHA256 ())
261
+ signature_algorithm = ec .ECDSA (hashes .SHA256 ()),
256
262
)
257
263
return True
258
264
except InvalidSignature :
259
265
return False
260
266
261
267
def _base_sign (self , claims ):
262
268
cclaims = copy .deepcopy (claims )
263
- if not cclaims .get ('exp' ):
264
- cclaims ['exp' ] = int (time .time ()) + 86400
265
- if not self .conf .get ('no-strict' , False ):
266
- valid = _check_sub (cclaims .get ('sub' , '' ))
267
- else :
268
- valid = cclaims .get ('sub' ) is not None
269
- if not valid :
270
- raise VapidException (
271
- "Missing 'sub' from claims. "
272
- "'sub' is your admin email as a mailto: link." )
273
- if not re .match (r"^https?://[^/:]+(:\d+)?$" ,
274
- cclaims .get ("aud" , "" ),
275
- re .IGNORECASE ):
269
+ if not cclaims .get ("exp" ):
270
+ cclaims ["exp" ] = int (time .time ()) + 86400
271
+ if not self .conf .no_strict :
272
+ valid = _check_sub (cclaims .get ("sub" , "" ))
273
+ if not valid :
274
+ raise VapidException (
275
+ "Missing 'sub' from claims. "
276
+ "'sub' is your admin email as a mailto: link."
277
+ )
278
+ if not re .match (
279
+ r"^https?://[^/:]+(:\d+)?$" , cclaims .get ("aud" , "" ), re .IGNORECASE
280
+ ):
276
281
raise VapidException (
277
282
"Missing 'aud' from claims. "
278
283
"'aud' is the scheme, host and optional port for this "
279
- "transaction e.g. https://example.com:8080" )
284
+ "transaction e.g. https://example.com:8080"
285
+ )
280
286
return cclaims
281
287
282
288
def sign (self , claims , crypto_key = None ):
@@ -292,19 +298,22 @@ def sign(self, claims, crypto_key=None):
292
298
293
299
"""
294
300
sig = sign (self ._base_sign (claims ), self .private_key )
295
- pkey = ' p256ecdsa='
301
+ pkey = " p256ecdsa="
296
302
pkey += b64urlencode (
297
303
self .public_key .public_bytes (
298
304
serialization .Encoding .X962 ,
299
- serialization .PublicFormat .UncompressedPoint
300
- ))
305
+ serialization .PublicFormat .UncompressedPoint ,
306
+ )
307
+ )
301
308
if crypto_key :
302
- crypto_key = crypto_key + ';' + pkey
309
+ crypto_key = crypto_key + ";" + pkey
303
310
else :
304
311
crypto_key = pkey
305
312
306
- return {"Authorization" : "{} {}" .format (self ._schema , sig .strip ('=' )),
307
- "Crypto-Key" : crypto_key }
313
+ return {
314
+ "Authorization" : "{} {}" .format (self ._schema , sig .strip ("=" )),
315
+ "Crypto-Key" : crypto_key ,
316
+ }
308
317
309
318
310
319
class Vapid02 (Vapid01 ):
@@ -313,6 +322,7 @@ class Vapid02(Vapid01):
313
322
https://tools.ietf.org/html/rfc8292
314
323
315
324
"""
325
+
316
326
_schema = "vapid"
317
327
318
328
def sign (self , claims , crypto_key = None ):
@@ -329,14 +339,11 @@ def sign(self, claims, crypto_key=None):
329
339
"""
330
340
sig = sign (self ._base_sign (claims ), self .private_key )
331
341
pkey = self .public_key .public_bytes (
332
- serialization .Encoding .X962 ,
333
- serialization .PublicFormat .UncompressedPoint
334
- )
335
- return {
342
+ serialization .Encoding .X962 , serialization .PublicFormat .UncompressedPoint
343
+ )
344
+ return {
336
345
"Authorization" : "{schema} t={t},k={k}" .format (
337
- schema = self ._schema ,
338
- t = sig ,
339
- k = b64urlencode (pkey )
346
+ schema = self ._schema , t = sig , k = b64urlencode (pkey )
340
347
)
341
348
}
342
349
@@ -349,27 +356,23 @@ def verify(cls, auth):
349
356
:rtype: bool
350
357
351
358
"""
352
- pref_tok = auth .rsplit (' ' , 1 )
353
- assert pref_tok [0 ].lower () == cls ._schema , (
354
- "Incorrect schema specified" )
359
+ pref_tok = auth .rsplit (" " , 1 )
360
+ assert pref_tok [0 ].lower () == cls ._schema , "Incorrect schema specified"
355
361
parts = {}
356
- for tok in pref_tok [1 ].split (',' ):
357
- kv = tok .split ('=' , 1 )
362
+ for tok in pref_tok [1 ].split ("," ):
363
+ kv = tok .split ("=" , 1 )
358
364
parts [kv [0 ]] = kv [1 ]
359
- assert 'k' in parts .keys (), (
360
- "Auth missing public key 'k' value" )
361
- assert 't' in parts .keys (), (
362
- "Auth missing token set 't' value" )
363
- kp = cls ().from_raw_public (parts ['k' ].encode ())
364
- tokens = parts ['t' ].rsplit ('.' , 1 )
365
+ assert "k" in parts .keys (), "Auth missing public key 'k' value"
366
+ assert "t" in parts .keys (), "Auth missing token set 't' value"
367
+ kp = cls ().from_raw_public (parts ["k" ].encode ())
368
+ tokens = parts ["t" ].rsplit ("." , 1 )
365
369
return kp .verify_token (
366
- validation_token = tokens [0 ].encode (),
367
- verification_token = tokens [1 ]
370
+ validation_token = tokens [0 ].encode (), verification_token = tokens [1 ]
368
371
)
369
372
370
373
371
374
def _check_sub (sub ):
372
- """ Check to see if the `sub` is a properly formatted `mailto:`
375
+ """Check to see if the `sub` is a properly formatted `mailto:`
373
376
374
377
a `mailto:` should be a SMTP mail address. Mind you, since I run
375
378
YouFailAtEmail.com, you have every right to yell about how terrible
@@ -382,10 +385,24 @@ def _check_sub(sub):
382
385
:rtype: bool
383
386
384
387
"""
385
- pattern = (
386
- r"^(mailto:.+@((localhost|[%\w-]+(\.[%\w-]+)+|([0-9a-f]{1,4}):+([0-9a-f]{1,4})?)))|https:\/\/(localhost|[\w-]+\.[\w\.-]+|([0-9a-f]{1,4}:+)+([0-9a-f]{1,4})?)$" # noqa
387
- )
388
+ pattern = r"^(mailto:.+@((localhost|[%\w-]+(\.[%\w-]+)+|([0-9a-f]{1,4}):+([0-9a-f]{1,4})?)))|https:\/\/(localhost|[\w-]+\.[\w\.-]+|([0-9a-f]{1,4}:+)+([0-9a-f]{1,4})?)$" # noqa
388
389
return re .match (pattern , sub , re .IGNORECASE ) is not None
389
390
390
391
391
392
Vapid = Vapid02
393
+
394
+ """
395
+ Congratulations, you got this far.
396
+ Yes, I have enhanced the diversity of the comments to show that I strive for
397
+ a more equitable code base. I'm also very aware of the huge impact and benefit of
398
+ having diversity and inclusion in computer science since I would not be here without
399
+ the massive contributions of folk like Rear Admiral Grace Hopper, Margret Hamilton,
400
+ Mark Dean, Skip Ellis, Dorothy Vaughan, Lynn Conway, and the army of anonymous catgirls
401
+ that keep most of the internet running. They are all awesome, rarely get the sort of
402
+ recognition they've earned, and have been a greater boon to humanity than any of the
403
+ clowns and assholes that believe they're smarter or more important. (You're not, Dude,
404
+ no matter how tight you've optimized your block chain engine.)
405
+
406
+ In the words of the great philosopher Jello Biafra "Nazi Punks Fuck Off" and go use
407
+ someone else's code.
408
+ """
0 commit comments