Skip to content

Commit 2b9d0ab

Browse files
committed
Initial load
1 parent 0367a7e commit 2b9d0ab

18 files changed

+5275
-0
lines changed

setup.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#!/usr/bin/python
2+
#
3+
# Copyright (C) 2017 Roland Hedberg
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import glob
19+
import re
20+
21+
from setuptools import setup
22+
23+
__author__ = 'Roland Hedberg'
24+
25+
with open('src/cryptojwt/__init__.py', 'r') as fd:
26+
version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
27+
fd.read(), re.MULTILINE).group(1)
28+
29+
setup(
30+
name="cryptojwt",
31+
version=version,
32+
description="Python implementation of JWT, JWE, JWS and JWK",
33+
author="Roland Hedberg",
34+
author_email="[email protected]",
35+
license="Apache 2.0",
36+
packages=["cryptojwt"],
37+
package_dir={"": "src"},
38+
classifiers=[
39+
"Development Status :: 4 - Beta",
40+
"License :: OSI Approved :: Apache Software License",
41+
"Topic :: Software Development :: Libraries :: Python Modules",
42+
"Programming Language :: Python :: 2.7",
43+
"Programming Language :: Python :: 3.5"
44+
],
45+
install_requires=["cryptography", "requests", "six", "future"],
46+
tests_require=['pytest'],
47+
zip_safe=False,
48+
scripts=glob.glob('script/*.py'),
49+
)
50+

src/cryptojwt/__init__.py

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
"""JSON Web Token"""
2+
import base64
3+
import json
4+
import logging
5+
import re
6+
import struct
7+
import six
8+
9+
from cryptojwt.exception import BadSyntax
10+
11+
try:
12+
from builtins import zip
13+
from builtins import hex
14+
from builtins import str
15+
except ImportError:
16+
pass
17+
18+
from binascii import unhexlify
19+
20+
__version__ = '0.0.1'
21+
22+
logger = logging.getLogger(__name__)
23+
24+
JWT_TYPES = (u"JWT", u"application/jws", u"JWS", u"JWE")
25+
26+
JWT_CLAIMS = {"iss": str, "sub": str, "aud": str, "exp": int, "nbf": int,
27+
"iat": int, "jti": str, "typ": str}
28+
29+
JWT_HEADERS = ["typ", "cty"]
30+
31+
32+
# ---------------------------------------------------------------------------
33+
# Helper functions
34+
35+
36+
def intarr2bin(arr):
37+
return unhexlify(''.join(["%02x" % byte for byte in arr]))
38+
39+
40+
def long2hexseq(l):
41+
try:
42+
return unhexlify(hex(l)[2:])
43+
except TypeError:
44+
return unhexlify(hex(l)[2:-1])
45+
46+
47+
def intarr2long(arr):
48+
return int(''.join(["%02x" % byte for byte in arr]), 16)
49+
50+
51+
def long2intarr(long_int):
52+
_bytes = []
53+
while long_int:
54+
long_int, r = divmod(long_int, 256)
55+
_bytes.insert(0, r)
56+
return _bytes
57+
58+
59+
def long_to_base64(n):
60+
bys = long2intarr(n)
61+
data = struct.pack('%sB' % len(bys), *bys)
62+
if not len(data):
63+
data = '\x00'
64+
s = base64.urlsafe_b64encode(data).rstrip(b'=')
65+
return s.decode("ascii")
66+
67+
68+
def base64_to_long(data):
69+
if isinstance(data, six.text_type):
70+
data = data.encode("ascii")
71+
72+
# urlsafe_b64decode will happily convert b64encoded data
73+
_d = base64.urlsafe_b64decode(bytes(data) + b'==')
74+
return intarr2long(struct.unpack('%sB' % len(_d), _d))
75+
76+
77+
def base64url_to_long(data):
78+
"""
79+
Stricter then base64_to_long since it really checks that it's
80+
base64url encoded
81+
82+
:param data: The base64 string
83+
:return:
84+
"""
85+
_d = base64.urlsafe_b64decode(bytes(data) + b'==')
86+
# verify that it's base64url encoded and not just base64
87+
# that is no '+' and '/' characters and not trailing "="s.
88+
if [e for e in [b'+', b'/', b'='] if e in data]:
89+
raise ValueError("Not base64url encoded")
90+
return intarr2long(struct.unpack('%sB' % len(_d), _d))
91+
92+
93+
# =============================================================================
94+
95+
def b64e(b):
96+
"""Base64 encode some bytes.
97+
98+
Uses the url-safe - and _ characters, and doesn't pad with = characters."""
99+
return base64.urlsafe_b64encode(b).rstrip(b"=")
100+
101+
102+
_b64_re = re.compile(b"^[A-Za-z0-9_-]*$")
103+
104+
105+
def add_padding(b):
106+
# add padding chars
107+
m = len(b) % 4
108+
if m == 1:
109+
# NOTE: for some reason b64decode raises *TypeError* if the
110+
# padding is incorrect.
111+
raise BadSyntax(b, "incorrect padding")
112+
elif m == 2:
113+
b += b"=="
114+
elif m == 3:
115+
b += b"="
116+
return b
117+
118+
119+
def b64d(b):
120+
"""Decode some base64-encoded bytes.
121+
122+
Raises BadSyntax if the string contains invalid characters or padding.
123+
124+
:param b: bytes
125+
"""
126+
127+
cb = b.rstrip(b"=") # shouldn't but there you are
128+
129+
# Python's base64 functions ignore invalid characters, so we need to
130+
# check for them explicitly.
131+
if not _b64_re.match(cb):
132+
raise BadSyntax(cb, "base64-encoded data contains illegal characters")
133+
134+
if cb == b:
135+
b = add_padding(b)
136+
137+
return base64.urlsafe_b64decode(b)
138+
139+
140+
def b64e_enc_dec(str, encode="utf-8", decode="ascii"):
141+
return b64e(str.encode(encode)).decode(decode)
142+
143+
144+
def b64d_enc_dec(str, encode="ascii", decode="utf-8"):
145+
return b64d(str.encode(encode)).decode(decode)
146+
147+
148+
# 'Stolen' from Werkzeug
149+
def safe_str_cmp(a, b):
150+
"""Compare two strings in constant time."""
151+
if len(a) != len(b):
152+
return False
153+
r = 0
154+
for c, d in zip(a, b):
155+
r |= ord(c) ^ ord(d)
156+
return r == 0
157+
158+
159+
def constant_time_compare(a, b):
160+
"""Compare two strings in constant time."""
161+
if len(a) != len(b):
162+
return False
163+
r = 0
164+
for c, d in zip(a, b):
165+
r |= c ^ d
166+
return r == 0
167+
168+
169+
def as_bytes(s):
170+
"""
171+
Convert an unicode string to bytes.
172+
:param s: Unicode / bytes string
173+
:return: bytes string
174+
"""
175+
try:
176+
s = s.encode()
177+
except (AttributeError, UnicodeDecodeError):
178+
pass
179+
return s
180+
181+
182+
def as_unicode(b):
183+
"""
184+
Convert a byte string to a unicode string
185+
:param b: byte string
186+
:return: unicode string
187+
"""
188+
try:
189+
b = b.decode()
190+
except (AttributeError, UnicodeDecodeError):
191+
pass
192+
return b
193+
194+
195+
def bytes2str_conv(item):
196+
"""
197+
"""
198+
if isinstance(item, bytes):
199+
return item.decode("utf-8")
200+
elif item is None or isinstance(item, (six.string_types, int, bool)):
201+
return item
202+
elif isinstance(item, list):
203+
return [bytes2str_conv(i) for i in item]
204+
elif isinstance(item, dict):
205+
return dict([(k, bytes2str_conv(v)) for k, v in item.items()])
206+
207+
raise ValueError("Can't convert {}.".format(repr(item)))
208+
209+
210+
def b64encode_item(item):
211+
if isinstance(item, bytes):
212+
return b64e(item)
213+
elif isinstance(item, str):
214+
return b64e(item.encode("utf-8"))
215+
elif isinstance(item, int):
216+
return b64e(item)
217+
else:
218+
return b64e(json.dumps(bytes2str_conv(item),
219+
separators=(",", ":")).encode("utf-8"))
220+
221+
222+
def split_token(token):
223+
if not token.count(b"."):
224+
raise BadSyntax(token,
225+
"expected token to contain at least one dot")
226+
return tuple(token.split(b"."))
227+
228+
229+
class SimpleJWT(object):
230+
def __init__(self, **headers):
231+
if not headers.get("alg"):
232+
headers["alg"] = None
233+
self.headers = headers
234+
self.b64part = [b64encode_item(headers)]
235+
self.part = [b64d(self.b64part[0])]
236+
237+
def unpack(self, token):
238+
"""
239+
Unpacks a JWT into its parts and base64 decodes the parts
240+
individually
241+
242+
:param token: The JWT
243+
"""
244+
if isinstance(token, str):
245+
try:
246+
token = token.encode("utf-8")
247+
except UnicodeDecodeError:
248+
pass
249+
250+
part = split_token(token)
251+
self.b64part = part
252+
self.part = [b64d(p) for p in part]
253+
self.headers = json.loads(self.part[0].decode())
254+
return self
255+
256+
def pack(self, parts=None, headers=None):
257+
"""
258+
Packs components into a JWT
259+
260+
:param returns: The string representation of a JWT
261+
"""
262+
if not headers:
263+
if self.headers:
264+
headers = self.headers
265+
else:
266+
headers = {'alg': 'none'}
267+
268+
logging.debug('JWT header: {}'.format(headers))
269+
270+
if not parts:
271+
return ".".join([a.decode() for a in self.b64part])
272+
273+
self.part = [headers] + parts
274+
_all = self.b64part = [b64encode_item(headers)]
275+
_all.extend([b64encode_item(p) for p in parts])
276+
277+
return ".".join([a.decode() for a in _all])
278+
279+
def payload(self):
280+
_msg = as_unicode(self.part[1])
281+
282+
# If not JSON web token assume JSON
283+
if "cty" in self.headers and self.headers["cty"].lower() != "jwt":
284+
pass
285+
else:
286+
try:
287+
_msg = json.loads(_msg)
288+
except ValueError:
289+
pass
290+
291+
return _msg
292+

0 commit comments

Comments
 (0)