5
5
import os
6
6
import logging
7
7
import binascii
8
- import base64
9
8
import time
10
- import hashlib
9
+ import re
11
10
12
- import ecdsa
13
- from jose import jws
11
+ from cryptography .hazmat .backends import default_backend
12
+ from cryptography .hazmat .primitives .asymmetric import ec
13
+ from cryptography .hazmat .primitives import serialization
14
14
15
- # Show compliance version. For earlier versions see previously tagged releases.
16
- VERSION = "VAPID-DRAFT-02/ECE-DRAFT-07"
17
-
18
-
19
- def b64urldecode (data ):
20
- """Decodes an unpadded Base64url-encoded string."""
21
- return base64 .urlsafe_b64decode (data + "====" [:len (data ) % 4 ])
15
+ from cryptography .hazmat .primitives import hashes
22
16
17
+ from py_vapid .utils import b64urldecode , b64urlencode
18
+ from py_vapid .jwt import sign
23
19
24
- def b64urlencode (bstring ):
25
- return binascii .b2a_base64 (
26
- bstring ).decode ('utf8' ).replace ('\n ' , '' ).replace (
27
- '+' , '-' ).replace ('/' , '_' ).replace ('=' , '' )
20
+ # Show compliance version. For earlier versions see previously tagged releases.
21
+ VERSION = "VAPID-DRAFT-02/ECE-DRAFT-07"
28
22
29
23
30
24
class VapidException (Exception ):
@@ -40,8 +34,6 @@ class Vapid01(object):
40
34
"""
41
35
_private_key = None
42
36
_public_key = None
43
- _curve = ecdsa .NIST256p
44
- _hasher = hashlib .sha256
45
37
_schema = "WebPush"
46
38
47
39
def __init__ (self , private_key = None ):
@@ -52,41 +44,48 @@ def __init__(self, private_key=None):
52
44
53
45
"""
54
46
self .private_key = private_key
47
+ if private_key :
48
+ self ._public_key = self .private_key .public_key ()
55
49
56
50
@classmethod
57
- def from_raw (cls , private_key ):
51
+ def from_raw (cls , private_raw ):
58
52
"""Initialize VAPID using a private key point in "raw" or
59
- "uncompressed" form.
53
+ "uncompressed" form. Raw keys consist of a single, 32 octet
54
+ encoded integer.
60
55
61
- :param private_key : A private key point in uncompressed form.
62
- :type private_key: str
56
+ :param private_raw : A private key point in uncompressed form.
57
+ :type private_raw: bytes
63
58
64
59
"""
65
- key = ecdsa .SigningKey .from_string (b64urldecode (private_key ),
66
- curve = cls ._curve ,
67
- hashfunc = cls ._hasher )
60
+ key = ec .derive_private_key (
61
+ int (binascii .hexlify (b64urldecode (private_raw )), 16 ),
62
+ curve = ec .SECP256R1 (),
63
+ backend = default_backend ())
68
64
return cls (key )
69
65
70
66
@classmethod
71
67
def from_pem (cls , private_key ):
72
68
"""Initialize VAPID using a private key in PEM format.
73
69
74
70
:param private_key: A private key in PEM format.
75
- :type private_key: str
71
+ :type private_key: bytes
76
72
77
73
"""
78
- key = ecdsa .SigningKey .from_pem (private_key )
79
- return cls (key )
74
+ # not sure why, but load_pem_private_key fails to deserialize
75
+ return cls .from_der (
76
+ b'' .join (private_key .splitlines ()[1 :- 1 ]))
80
77
81
78
@classmethod
82
79
def from_der (cls , private_key ):
83
80
"""Initialize VAPID using a private key in DER format.
84
81
85
82
:param private_key: A private key in DER format and Base64-encoded.
86
- :type private_key: str
83
+ :type private_key: bytes
87
84
88
85
"""
89
- key = ecdsa .SigningKey .from_der (base64 .b64decode (private_key ))
86
+ key = serialization .load_der_private_key (b64urldecode (private_key ),
87
+ password = None ,
88
+ backend = default_backend ())
90
89
return cls (key )
91
90
92
91
@classmethod
@@ -100,54 +99,69 @@ def from_file(cls, private_key_file=None):
100
99
"""
101
100
if not os .path .isfile (private_key_file ):
102
101
vapid = cls ()
102
+ vapid .generate_keys ()
103
103
vapid .save_key (private_key_file )
104
104
return vapid
105
105
private_key = open (private_key_file , 'r' ).read ()
106
- vapid = None
107
106
try :
108
- if "BEGIN EC " in private_key :
109
- vapid = cls .from_pem (private_key )
107
+ if "----- BEGIN" in private_key :
108
+ vapid = cls .from_pem (private_key . encode ( 'utf8' ) )
110
109
else :
111
- vapid = cls .from_der (private_key )
110
+ vapid = cls .from_der (private_key .encode ('utf8' ))
111
+ return vapid
112
112
except Exception as exc :
113
113
logging .error ("Could not open private key file: %s" , repr (exc ))
114
114
raise VapidException (exc )
115
- return vapid
116
115
117
116
@property
118
117
def private_key (self ):
119
118
"""The VAPID private ECDSA key"""
120
119
if not self ._private_key :
121
- raise VapidException (
122
- "No private key defined. Please import or generate a key." )
120
+ raise VapidException ("No private key. Call generate_keys()" )
123
121
return self ._private_key
124
122
125
123
@private_key .setter
126
124
def private_key (self , value ):
127
125
"""Set the VAPID private ECDSA key
128
126
129
127
:param value: the byte array containing the private ECDSA key data
130
- :type value: bytes
128
+ :type value: ec.EllipticCurvePrivateKey
131
129
132
130
"""
133
131
self ._private_key = value
134
- self ._public_key = None
132
+ if value :
133
+ self ._public_key = self .private_key .public_key ()
135
134
136
135
@property
137
136
def public_key (self ):
138
137
"""The VAPID public ECDSA key
139
138
140
139
The public key is currently read only. Set it via the `.private_key`
141
- method.
140
+ method. This will autogenerate a public and private key if no value
141
+ has been set.
142
+
143
+ :returns ec.EllipticCurvePublicKey
142
144
143
145
"""
144
- if not self ._public_key :
145
- self ._public_key = self .private_key .get_verifying_key ()
146
146
return self ._public_key
147
147
148
148
def generate_keys (self ):
149
149
"""Generate a valid ECDSA Key Pair."""
150
- self .private_key = ecdsa .SigningKey .generate (curve = self ._curve )
150
+ self .private_key = ec .generate_private_key (ec .SECP256R1 ,
151
+ default_backend ())
152
+
153
+ def private_pem (self ):
154
+ return self .private_key .private_bytes (
155
+ encoding = serialization .Encoding .PEM ,
156
+ format = serialization .PrivateFormat .PKCS8 ,
157
+ encryption_algorithm = serialization .NoEncryption ()
158
+ )
159
+
160
+ def public_pem (self ):
161
+ return self .public_key .public_bytes (
162
+ encoding = serialization .Encoding .PEM ,
163
+ format = serialization .PublicFormat .SubjectPublicKeyInfo
164
+ )
151
165
152
166
def save_key (self , key_file ):
153
167
"""Save the private key to a PEM file.
@@ -156,11 +170,9 @@ def save_key(self, key_file):
156
170
:type key_file: str
157
171
158
172
"""
159
- file = open (key_file , "wb" )
160
- if not self ._private_key :
161
- self .generate_keys ()
162
- file .write (self ._private_key .to_pem ())
163
- file .close ()
173
+ with open (key_file , "wb" ) as file :
174
+ file .write (self .private_pem ())
175
+ file .close ()
164
176
165
177
def save_public_key (self , key_file ):
166
178
"""Save the public key to a PEM file.
@@ -169,7 +181,7 @@ def save_public_key(self, key_file):
169
181
170
182
"""
171
183
with open (key_file , "wb" ) as file :
172
- file .write (self .public_key . to_pem ())
184
+ file .write (self .public_pem ())
173
185
file .close ()
174
186
175
187
def validate (self , validation_token ):
@@ -183,8 +195,8 @@ def validate(self, validation_token):
183
195
"""
184
196
sig = self .private_key .sign (
185
197
validation_token ,
186
- hashfunc = self . _hasher )
187
- verification_token = base64 . urlsafe_b64encode (sig )
198
+ signature_algorithm = ec . ECDSA ( hashes . SHA256 ()) )
199
+ verification_token = b64urlencode (sig )
188
200
return verification_token
189
201
190
202
def verify_token (self , validation_token , verification_token ):
@@ -198,17 +210,30 @@ def verify_token(self, validation_token, verification_token):
198
210
:rtype: boolean
199
211
200
212
"""
201
- hsig = base64 .urlsafe_b64decode (verification_token )
202
- return self .public_key .verify (hsig , validation_token ,
203
- hashfunc = self ._hasher )
213
+ hsig = b64urldecode (verification_token .encode ('utf8' ))
214
+ return self .public_key .verify (
215
+ hsig ,
216
+ validation_token ,
217
+ signature_algorithm = ec .ECDSA (hashes .SHA256 ())
218
+ )
204
219
205
220
def _base_sign (self , claims ):
206
221
if not claims .get ('exp' ):
207
222
claims ['exp' ] = str (int (time .time ()) + 86400 )
208
- if not claims .get ('sub' ):
223
+ if not re .match ("mailto:.+@.+\..+" ,
224
+ claims .get ('sub' , '' ),
225
+ re .IGNORECASE ):
209
226
raise VapidException (
210
227
"Missing 'sub' from claims. "
211
228
"'sub' is your admin email as a mailto: link." )
229
+ if not re .match ("^https?:\/\/[^\/\.:]+\.[^\/:]+(:\d+)?$" ,
230
+ claims .get ("aud" , "" ),
231
+ re .IGNORECASE ):
232
+ raise VapidException (
233
+ "Missing 'aud' from claims. "
234
+ "'aud' is the scheme, host and optional port for this "
235
+ "transaction e.g. https://example.com:8080" )
236
+
212
237
return claims
213
238
214
239
def sign (self , claims , crypto_key = None ):
@@ -224,12 +249,10 @@ def sign(self, claims, crypto_key=None):
224
249
225
250
"""
226
251
claims = self ._base_sign (claims )
227
- sig = jws . sign (claims , self .private_key , algorithm = "ES256" )
252
+ sig = sign (claims , self .private_key )
228
253
pkey = 'p256ecdsa='
229
- pubkey = self .public_key .to_string ()
230
- if len (pubkey ) == 64 :
231
- pubkey = b'\04 ' + pubkey
232
- pkey += b64urlencode (pubkey )
254
+ pkey += b64urlencode (
255
+ self .public_key .public_numbers ().encode_point ())
233
256
if crypto_key :
234
257
crypto_key = crypto_key + ';' + pkey
235
258
else :
@@ -249,11 +272,8 @@ class Vapid02(Vapid01):
249
272
250
273
def sign (self , claims , crypto_key = None ):
251
274
claims = self ._base_sign (claims )
252
- sig = jws .sign (claims , self .private_key , algorithm = "ES256" )
253
- pkey = self .public_key .to_string ()
254
- # Make sure that the key is properly prefixed.
255
- if len (pkey ) == 64 :
256
- pkey = b'\04 ' + pkey
275
+ sig = sign (claims , self .private_key )
276
+ pkey = self .public_key .public_numbers ().encode_point ()
257
277
return {
258
278
"Authorization" : "{schema} t={t},k={k}" .format (
259
279
schema = self ._schema ,
0 commit comments