Skip to content

Commit b611473

Browse files
author
Gasper Zejn
committed
Implement a working cryptography backend for RSA and EC.
1 parent 1a23c39 commit b611473

File tree

10 files changed

+409
-106
lines changed

10 files changed

+409
-106
lines changed

jose/backends/__init__.py

Whitespace-only changes.

jose/backends/base.py

Whitespace-only changes.
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import six
2+
import ecdsa
3+
from ecdsa.util import sigdecode_string, sigencode_string, sigdecode_der, sigencode_der
4+
5+
from jose.jwk import Key, base64_to_long
6+
from jose.constants import ALGORITHMS
7+
from jose.exceptions import JWKError
8+
9+
from cryptography.exceptions import InvalidSignature
10+
from cryptography.hazmat.backends import default_backend
11+
from cryptography.hazmat.backends.openssl.rsa import _RSAPublicKey
12+
from cryptography.hazmat.primitives import hashes
13+
from cryptography.hazmat.primitives.asymmetric import ec, rsa, padding
14+
from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key
15+
16+
17+
class CryptographyECKey(Key):
18+
SHA256 = hashes.SHA256
19+
SHA384 = hashes.SHA384
20+
SHA512 = hashes.SHA512
21+
22+
CURVE_MAP = {
23+
SHA256: ec.SECP256R1,
24+
SHA384: ec.SECP384R1,
25+
SHA512: ec.SECP521R1,
26+
}
27+
28+
def __init__(self, key, algorithm, cryptography_backend=default_backend):
29+
if algorithm not in ALGORITHMS.EC:
30+
raise JWKError('hash_alg: %s is not a valid hash algorithm' % algorithm)
31+
32+
self.hash_alg = {
33+
ALGORITHMS.ES256: self.SHA256,
34+
ALGORITHMS.ES384: self.SHA384,
35+
ALGORITHMS.ES512: self.SHA512
36+
}.get(algorithm)
37+
38+
self.curve = self.CURVE_MAP.get(self.hash_alg)
39+
self.cryptography_backend = cryptography_backend
40+
41+
if isinstance(key, (ecdsa.SigningKey, ecdsa.VerifyingKey)):
42+
# convert to PEM and let cryptography below load it as PEM
43+
key = key.to_pem().decode('utf-8')
44+
45+
if isinstance(key, dict):
46+
self.prepared_key = self._process_jwk(key)
47+
return
48+
49+
if isinstance(key, six.string_types):
50+
if isinstance(key, six.text_type):
51+
key = key.encode('utf-8')
52+
53+
# Attempt to load key. We don't know if it's
54+
# a Public Key or a Private Key, so we try
55+
# the Public Key first.
56+
try:
57+
try:
58+
key = load_pem_public_key(key, self.cryptography_backend())
59+
except ValueError:
60+
key = load_pem_private_key(key, password=None, backend=self.cryptography_backend())
61+
except Exception as e:
62+
raise JWKError(e)
63+
64+
self.prepared_key = key
65+
return
66+
67+
raise JWKError('Unable to parse an ECKey from key: %s' % key)
68+
69+
def _process_jwk(self, jwk_dict):
70+
if not jwk_dict.get('kty') == 'EC':
71+
raise JWKError("Incorrect key type. Expected: 'EC', Recieved: %s" % jwk_dict.get('kty'))
72+
73+
x = base64_to_long(jwk_dict.get('x'))
74+
y = base64_to_long(jwk_dict.get('y'))
75+
76+
ec_pn = ec.EllipticCurvePublicNumbers(x, y, self.curve())
77+
verifying_key = ec_pn.public_key(self.cryptography_backend())
78+
79+
return verifying_key
80+
81+
def sign(self, msg):
82+
signature = self.prepared_key.sign(msg, ec.ECDSA(self.hash_alg()))
83+
order = (2 ** self.curve.key_size) - 1
84+
return sigencode_string(*sigdecode_der(signature, order), order=order)
85+
86+
def verify(self, msg, sig):
87+
order = (2 ** self.curve.key_size) - 1
88+
signature = sigencode_der(*sigdecode_string(sig, order), order=order)
89+
verifier = self.prepared_key.verifier(signature, ec.ECDSA(self.hash_alg()))
90+
verifier.update(msg)
91+
try:
92+
return verifier.verify()
93+
except:
94+
return False
95+
96+
97+
class CryptographyRSAKey(Key):
98+
SHA256 = hashes.SHA256
99+
SHA384 = hashes.SHA384
100+
SHA512 = hashes.SHA512
101+
102+
def __init__(self, key, algorithm, cryptography_backend=default_backend):
103+
if algorithm not in ALGORITHMS.RSA:
104+
raise JWKError('hash_alg: %s is not a valid hash algorithm' % algorithm)
105+
106+
self.hash_alg = {
107+
ALGORITHMS.RS256: self.SHA256,
108+
ALGORITHMS.RS384: self.SHA384,
109+
ALGORITHMS.RS512: self.SHA512
110+
}.get(algorithm)
111+
112+
self.cryptography_backend = cryptography_backend
113+
114+
if isinstance(key, _RSAPublicKey):
115+
self.prepared_key = key
116+
return
117+
118+
if isinstance(key, dict):
119+
self.prepared_key = self._process_jwk(key)
120+
return
121+
122+
if isinstance(key, six.string_types):
123+
if isinstance(key, six.text_type):
124+
key = key.encode('utf-8')
125+
126+
try:
127+
try:
128+
key = load_pem_public_key(key, self.cryptography_backend())
129+
except ValueError:
130+
key = load_pem_private_key(key, password=None, backend=self.cryptography_backend())
131+
self.prepared_key = key
132+
except Exception as e:
133+
raise JWKError(e)
134+
return
135+
136+
raise JWKError('Unable to parse an RSA_JWK from key: %s' % key)
137+
138+
def _process_jwk(self, jwk_dict):
139+
if not jwk_dict.get('kty') == 'RSA':
140+
raise JWKError("Incorrect key type. Expected: 'RSA', Recieved: %s" % jwk_dict.get('kty'))
141+
142+
e = base64_to_long(jwk_dict.get('e', 256))
143+
n = base64_to_long(jwk_dict.get('n'))
144+
145+
verifying_key = rsa.RSAPublicNumbers(e, n).public_key(self.cryptography_backend())
146+
return verifying_key
147+
148+
def sign(self, msg):
149+
signer = self.prepared_key.signer(
150+
padding.PKCS1v15(),
151+
self.hash_alg()
152+
)
153+
signer.update(msg)
154+
signature = signer.finalize()
155+
return signature
156+
157+
def verify(self, msg, sig):
158+
verifier = self.prepared_key.verifier(
159+
sig,
160+
padding.PKCS1v15(),
161+
self.hash_alg()
162+
)
163+
verifier.update(msg)
164+
try:
165+
verifier.verify()
166+
return True
167+
except InvalidSignature:
168+
return False

jose/backends/pycrypto_backend.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import six
2+
3+
import Crypto.Hash.SHA256
4+
import Crypto.Hash.SHA384
5+
import Crypto.Hash.SHA512
6+
7+
from Crypto.PublicKey import RSA
8+
from Crypto.Signature import PKCS1_v1_5
9+
from Crypto.Util.asn1 import DerSequence
10+
11+
from jose.jwk import Key, base64_to_long
12+
from jose.constants import ALGORITHMS
13+
from jose.exceptions import JWKError
14+
from jose.utils import base64url_decode
15+
16+
# PyCryptodome's RSA module doesn't have PyCrypto's _RSAobj class
17+
# Instead it has a class named RsaKey, which serves the same purpose.
18+
if hasattr(RSA, '_RSAobj'):
19+
_RSAKey = RSA._RSAobj
20+
else:
21+
_RSAKey = RSA.RsaKey
22+
23+
24+
class RSAKey(Key):
25+
"""
26+
Performs signing and verification operations using
27+
RSASSA-PKCS-v1_5 and the specified hash function.
28+
This class requires PyCrypto package to be installed.
29+
This is based off of the implementation in PyJWT 0.3.2
30+
"""
31+
32+
SHA256 = Crypto.Hash.SHA256
33+
SHA384 = Crypto.Hash.SHA384
34+
SHA512 = Crypto.Hash.SHA512
35+
36+
def __init__(self, key, algorithm):
37+
38+
if algorithm not in ALGORITHMS.RSA:
39+
raise JWKError('hash_alg: %s is not a valid hash algorithm' % algorithm)
40+
41+
self.hash_alg = {
42+
ALGORITHMS.RS256: self.SHA256,
43+
ALGORITHMS.RS384: self.SHA384,
44+
ALGORITHMS.RS512: self.SHA512
45+
}.get(algorithm)
46+
47+
if isinstance(key, _RSAKey):
48+
self.prepared_key = key
49+
return
50+
51+
if isinstance(key, dict):
52+
self._process_jwk(key)
53+
return
54+
55+
if isinstance(key, six.string_types):
56+
if isinstance(key, six.text_type):
57+
key = key.encode('utf-8')
58+
59+
if key.startswith(b'-----BEGIN CERTIFICATE-----'):
60+
try:
61+
self._process_cert(key)
62+
except Exception as e:
63+
raise JWKError(e)
64+
return
65+
66+
try:
67+
self.prepared_key = RSA.importKey(key)
68+
except Exception as e:
69+
raise JWKError(e)
70+
return
71+
72+
raise JWKError('Unable to parse an RSA_JWK from key: %s' % key)
73+
74+
def _process_jwk(self, jwk_dict):
75+
if not jwk_dict.get('kty') == 'RSA':
76+
raise JWKError("Incorrect key type. Expected: 'RSA', Recieved: %s" % jwk_dict.get('kty'))
77+
78+
e = base64_to_long(jwk_dict.get('e', 256))
79+
n = base64_to_long(jwk_dict.get('n'))
80+
81+
self.prepared_key = RSA.construct((n, e))
82+
return self.prepared_key
83+
84+
def _process_cert(self, key):
85+
pemLines = key.replace(b' ', b'').split()
86+
certDer = base64url_decode(b''.join(pemLines[1:-1]))
87+
certSeq = DerSequence()
88+
certSeq.decode(certDer)
89+
tbsSeq = DerSequence()
90+
tbsSeq.decode(certSeq[0])
91+
self.prepared_key = RSA.importKey(tbsSeq[6])
92+
return
93+
94+
def sign(self, msg):
95+
try:
96+
return PKCS1_v1_5.new(self.prepared_key).sign(self.hash_alg.new(msg))
97+
except Exception as e:
98+
raise JWKError(e)
99+
100+
def verify(self, msg, sig):
101+
try:
102+
return PKCS1_v1_5.new(self.prepared_key).verify(self.hash_alg.new(msg), sig)
103+
except Exception as e:
104+
return False

0 commit comments

Comments
 (0)