Skip to content

Commit d335e7b

Browse files
committed
Create a session_token file with helper methods
1 parent e5e522e commit d335e7b

File tree

5 files changed

+104
-99
lines changed

5 files changed

+104
-99
lines changed

shopify/session_token.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import jwt
2+
import re
3+
import six
4+
import sys
5+
6+
if sys.version_info[0] < 3: # Backwards compatibility for python < v3.0.0
7+
from urlparse import urljoin
8+
else:
9+
from urllib.parse import urljoin
10+
11+
12+
HOSTNAME_PATTERN = r"[a-z0-9][a-z0-9-]*[a-z0-9]"
13+
SHOP_DOMAIN_RE = re.compile(r"^https://{h}\.myshopify\.com/$".format(h=HOSTNAME_PATTERN))
14+
15+
ALGORITHM = "HS256"
16+
PREFIX = "Bearer "
17+
REQUIRED_FIELDS = ["iss", "dest", "sub", "jti", "sid"]
18+
19+
20+
class SessionTokenError(Exception):
21+
pass
22+
23+
24+
class InvalidIssuerError(SessionTokenError):
25+
pass
26+
27+
28+
class MismatchedHostsError(SessionTokenError):
29+
pass
30+
31+
32+
class TokenAuthenticationError(SessionTokenError):
33+
pass
34+
35+
36+
def get_decoded_session_token(authorization_header, api_key, secret):
37+
session_token = _extract_session_token(authorization_header)
38+
decoded_payload = _decode_session_token(session_token, api_key, secret)
39+
_validate_issuer(decoded_payload)
40+
41+
return decoded_payload
42+
43+
44+
def _extract_session_token(authorization_header):
45+
if not authorization_header.startswith(PREFIX):
46+
raise TokenAuthenticationError("The HTTP_AUTHORIZATION_HEADER provided does not contain a Bearer token")
47+
48+
return authorization_header[len(PREFIX) :]
49+
50+
51+
def _decode_session_token(session_token, api_key, secret):
52+
try:
53+
return jwt.decode(
54+
session_token,
55+
secret,
56+
audience=api_key,
57+
algorithms=[ALGORITHM],
58+
options={"require": REQUIRED_FIELDS},
59+
)
60+
except jwt.exceptions.PyJWTError as exception:
61+
six.raise_from(SessionTokenError(str(exception)), exception)
62+
63+
64+
def _validate_issuer(decoded_payload):
65+
_validate_issuer_hostname(decoded_payload)
66+
_validate_issuer_and_dest_match(decoded_payload)
67+
68+
69+
def _validate_issuer_hostname(decoded_payload):
70+
issuer_root = urljoin(decoded_payload["iss"], "/")
71+
72+
if not SHOP_DOMAIN_RE.match(issuer_root):
73+
raise InvalidIssuerError("Invalid issuer")
74+
75+
76+
def _validate_issuer_and_dest_match(decoded_payload):
77+
issuer_root = urljoin(decoded_payload["iss"], "/")
78+
dest_root = urljoin(decoded_payload["dest"], "/")
79+
80+
if issuer_root != dest_root:
81+
raise MismatchedHostsError("The issuer and destination do not match")

shopify/utils/__init__.py

Lines changed: 0 additions & 2 deletions
This file was deleted.

shopify/utils/exceptions.py

Lines changed: 0 additions & 10 deletions
This file was deleted.

shopify/utils/utilities.py

Lines changed: 0 additions & 64 deletions
This file was deleted.

test/utils/utilities_test.py renamed to test/session_token_test.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from shopify.utils import *
1+
from shopify import session_token
22
from test.test_helper import TestCase
33
from datetime import datetime, timedelta
44

@@ -13,7 +13,7 @@ def timestamp(date):
1313
return time.mktime(date.timetuple()) if sys.version_info[0] < 3 else date.timestamp()
1414

1515

16-
class TestSessionTokenUtilityGetDecodedSessionToken(TestCase):
16+
class TestSessionTokenGetDecodedSessionToken(TestCase):
1717
@classmethod
1818
def setUpClass(self):
1919
self.secret = "API Secret"
@@ -42,45 +42,45 @@ def build_auth_header(self):
4242
def test_raises_if_token_authentication_header_is_not_bearer(self):
4343
authorization_header = "Bad auth header"
4444

45-
with self.assertRaises(TokenAuthenticationError):
46-
SessionTokenUtility.get_decoded_session_token(
47-
authorization_header, api_key=self.api_key, secret=self.secret
48-
)
45+
with self.assertRaises(session_token.TokenAuthenticationError) as cm:
46+
session_token.get_decoded_session_token(authorization_header, api_key=self.api_key, secret=self.secret)
47+
48+
self.assertEqual("The HTTP_AUTHORIZATION_HEADER provided does not contain a Bearer token", str(cm.exception))
4949

5050
def test_raises_jwt_error_if_session_token_is_expired(self):
5151
self.payload["exp"] = timestamp((datetime.now() + timedelta(0, -10)))
5252

53-
with self.assertRaises(jwt.exceptions.ExpiredSignatureError):
54-
SessionTokenUtility.get_decoded_session_token(
55-
self.build_auth_header(), api_key=self.api_key, secret=self.secret
56-
)
53+
with self.assertRaises(session_token.SessionTokenError, msg="Expird") as cm:
54+
session_token.get_decoded_session_token(self.build_auth_header(), api_key=self.api_key, secret=self.secret)
55+
56+
self.assertEqual("Signature has expired", str(cm.exception))
5757

5858
def test_raises_if_aud_doesnt_match_api_key(self):
5959
self.payload["aud"] = "bad audience"
6060

61-
with self.assertRaises(jwt.exceptions.InvalidAudienceError):
62-
SessionTokenUtility.get_decoded_session_token(
63-
self.build_auth_header(), api_key=self.api_key, secret=self.secret
64-
)
61+
with self.assertRaises(session_token.SessionTokenError) as cm:
62+
session_token.get_decoded_session_token(self.build_auth_header(), api_key=self.api_key, secret=self.secret)
63+
64+
self.assertEqual("Invalid audience", str(cm.exception))
6565

6666
def test_raises_if_issuer_hostname_is_invalid(self):
6767
self.payload["iss"] = "bad_shop_hostname"
6868

69-
with self.assertRaises(InvalidIssuerError):
70-
SessionTokenUtility.get_decoded_session_token(
71-
self.build_auth_header(), api_key=self.api_key, secret=self.secret
72-
)
69+
with self.assertRaises(session_token.InvalidIssuerError) as cm:
70+
session_token.get_decoded_session_token(self.build_auth_header(), api_key=self.api_key, secret=self.secret)
71+
72+
self.assertEqual("Invalid issuer", str(cm.exception))
7373

7474
def test_raises_if_iss_and_dest_dont_match(self):
7575
self.payload["dest"] = "bad_shop.myshopify.com"
7676

77-
with self.assertRaises(MismatchedHostsError):
78-
SessionTokenUtility.get_decoded_session_token(
79-
self.build_auth_header(), api_key=self.api_key, secret=self.secret
80-
)
77+
with self.assertRaises(session_token.MismatchedHostsError) as cm:
78+
session_token.get_decoded_session_token(self.build_auth_header(), api_key=self.api_key, secret=self.secret)
79+
80+
self.assertEqual("The issuer and destination do not match", str(cm.exception))
8181

8282
def test_returns_decoded_payload(self):
83-
decoded_payload = SessionTokenUtility.get_decoded_session_token(
83+
decoded_payload = session_token.get_decoded_session_token(
8484
self.build_auth_header(), api_key=self.api_key, secret=self.secret
8585
)
8686

0 commit comments

Comments
 (0)