Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 28 additions & 25 deletions okta_jwt_verifier/config_validator.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from .constants import ADMIN_DOMAINS
from .error_messages import (ERROR_MESSAGE_ORG_URL_MISSING,
ERROR_MESSAGE_ORG_URL_NOT_HTTPS,
ERROR_MESSAGE_ORG_URL_YOUROKTADOMAIN,
ERROR_MESSAGE_ORG_URL_ADMIN,
ERROR_MESSAGE_ORG_URL_TYPO,
ERROR_MESSAGE_ORG_URL_WRONG_TYPE,
ERROR_MESSAGE_CLIENT_ID_WRONG_TYPE,
ERROR_MESSAGE_CLIENT_ID_MISSING,
ERROR_MESSAGE_CLIENT_ID_DEFAULT,
ERROR_MESSAGE_AUDIENCE_MISSING)
from .error_messages import (
ERROR_MESSAGE_AUDIENCE_MISSING,
ERROR_MESSAGE_CLIENT_ID_DEFAULT,
ERROR_MESSAGE_CLIENT_ID_MISSING,
ERROR_MESSAGE_CLIENT_ID_WRONG_TYPE,
ERROR_MESSAGE_ORG_URL_ADMIN,
ERROR_MESSAGE_ORG_URL_MISSING,
ERROR_MESSAGE_ORG_URL_NOT_HTTPS,
ERROR_MESSAGE_ORG_URL_TYPO,
ERROR_MESSAGE_ORG_URL_WRONG_TYPE,
ERROR_MESSAGE_ORG_URL_YOUROKTADOMAIN,
)
from .exceptions import JWTInvalidConfigException


class ConfigValidator():
class ConfigValidator:

"""Class designed for JWT Verifier config validation."""

Expand All @@ -28,49 +30,50 @@ def validate_config(self):

def validate_issuer(self, issuer=None, https_check=True):
"""Validates issuer."""
issuer = issuer or self.config.get('issuer')
issuer = issuer or self.config.get("issuer")
if not issuer:
raise JWTInvalidConfigException(ERROR_MESSAGE_ORG_URL_MISSING)
if not isinstance(issuer, str):
raise JWTInvalidConfigException(ERROR_MESSAGE_ORG_URL_WRONG_TYPE)
if https_check and not issuer.startswith('https://'):
if https_check and not issuer.startswith("https://"):
raise JWTInvalidConfigException(ERROR_MESSAGE_ORG_URL_NOT_HTTPS)
if '{yourOktaDomain}' in issuer:
if "{yourOktaDomain}" in issuer:
raise JWTInvalidConfigException(ERROR_MESSAGE_ORG_URL_YOUROKTADOMAIN)
if any(domain in issuer for domain in ADMIN_DOMAINS):
raise JWTInvalidConfigException(ERROR_MESSAGE_ORG_URL_ADMIN)
if '.com.com' in issuer:
if ".com.com" in issuer:
raise JWTInvalidConfigException(ERROR_MESSAGE_ORG_URL_TYPO)
if issuer.count('://') > 1:
if issuer.count("://") > 1:
raise JWTInvalidConfigException(ERROR_MESSAGE_ORG_URL_TYPO)

def validate_client_id(self, client_id=None):
"""Validates client_id."""
client_id = client_id or self.config.get('client_id')
client_id = client_id or self.config.get("client_id")
if not client_id:
raise JWTInvalidConfigException(ERROR_MESSAGE_CLIENT_ID_MISSING)
if not isinstance(client_id, str):
raise JWTInvalidConfigException(ERROR_MESSAGE_CLIENT_ID_WRONG_TYPE)
if '{clientId}' in client_id:
if "{clientId}" in client_id:
raise JWTInvalidConfigException(ERROR_MESSAGE_CLIENT_ID_DEFAULT)

def validate_audience(self, audience=None):
"""Validates audience."""
audience = audience or self.config.get('audience')
audience = audience or self.config.get("audience")
if not audience:
raise JWTInvalidConfigException(ERROR_MESSAGE_AUDIENCE_MISSING)

def _validate_number(self, number, variable_name):
"""Validates param which should be represented as integer and >= 0"""
if not isinstance(number, int):
raise JWTInvalidConfigException(f'{variable_name} should be type of int.')
raise JWTInvalidConfigException(f"{variable_name} should be type of int.")
if number < 0:
raise JWTInvalidConfigException(f'Value of {variable_name} should be 0 or greater.')
raise JWTInvalidConfigException(
f"Value of {variable_name} should be 0 or greater."
)

def validate_numbers(self, numbers=('max_retries',
'max_requests',
'request_timeout',
'leeway')):
def validate_numbers(
self, numbers=("max_retries", "max_requests", "request_timeout", "leeway")
):
"""Validates all number parameters."""
for number_variable in numbers:
self._validate_number(self.config.get(number_variable), number_variable)
20 changes: 8 additions & 12 deletions okta_jwt_verifier/constants.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
import os


# Default values described in technical design

MAX_RETRIES = os.environ.get('JWT_VERIFIER_MAX_RETRIES', 1)
MAX_REQUESTS = os.environ.get('JWT_VERIFIER_MAX_REQUESTS', 10)
REQUEST_TIMEOUT = os.environ.get('JWT_VERIFIER_REQUEST_TIMEOUT', 30)
LEEWAY = os.environ.get('LEEWAY', 120)
MAX_RETRIES = os.environ.get("JWT_VERIFIER_MAX_RETRIES", 1)
MAX_REQUESTS = os.environ.get("JWT_VERIFIER_MAX_REQUESTS", 10)
REQUEST_TIMEOUT = os.environ.get("JWT_VERIFIER_REQUEST_TIMEOUT", 30)
LEEWAY = os.environ.get("LEEWAY", 120)


# Constant URLs used in error messages

DEV_OKTA = "https://developer.okta.com"
FINDING_OKTA_DOMAIN = (f"{DEV_OKTA}"
"/docs/guides/find-your-domain/overview")
GET_OKTA_API_TOKEN = (f"{DEV_OKTA}"
"/docs/guides/create-an-api-token/overview")
FINDING_OKTA_APP_CRED = (f"{DEV_OKTA}"
"/docs/guides/find-your-app-credentials/overview")
FINDING_OKTA_DOMAIN = f"{DEV_OKTA}" "/docs/guides/find-your-domain/overview"
GET_OKTA_API_TOKEN = f"{DEV_OKTA}" "/docs/guides/create-an-api-token/overview"
FINDING_OKTA_APP_CRED = f"{DEV_OKTA}" "/docs/guides/find-your-app-credentials/overview"


# Misc
ADMIN_DOMAINS = ('-admin.okta.com', '-admin.oktapreview.com', '-admin.okta-emea.com')
ADMIN_DOMAINS = ("-admin.okta.com", "-admin.oktapreview.com", "-admin.okta-emea.com")
18 changes: 6 additions & 12 deletions okta_jwt_verifier/error_messages.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from .constants import FINDING_OKTA_APP_CRED, FINDING_OKTA_DOMAIN

ERROR_MESSAGE_ORG_URL_WRONG_TYPE = ("Your Okta URL should be type of str.")
ERROR_MESSAGE_ORG_URL_WRONG_TYPE = "Your Okta URL should be type of str."

ERROR_MESSAGE_ORG_URL_MISSING = (
"Your Okta URL is missing. You can copy "
"your domain from the Okta Developer "
"Console. Follow these instructions to"
f" find it: {FINDING_OKTA_DOMAIN}"
)
ERROR_MESSAGE_ORG_URL_NOT_HTTPS = (
"Your Okta URL must start with 'https'."
)
ERROR_MESSAGE_ORG_URL_NOT_HTTPS = "Your Okta URL must start with 'https'."
ERROR_MESSAGE_AUTH_MODE_INVALID = (
"The AuthorizationMode configuration "
"option must be one of: "
Expand All @@ -24,15 +22,11 @@
f"{FINDING_OKTA_DOMAIN}"
)

ERROR_MESSAGE_ORG_URL_ADMIN = (
"Your Okta domain should not contain -admin. "
)
ERROR_MESSAGE_ORG_URL_ADMIN = "Your Okta domain should not contain -admin. "

ERROR_MESSAGE_ORG_URL_TYPO = (
"It looks like there's a typo in your Okta domain."
)
ERROR_MESSAGE_ORG_URL_TYPO = "It looks like there's a typo in your Okta domain."

ERROR_MESSAGE_CLIENT_ID_WRONG_TYPE = ("Your client ID should be type of str.")
ERROR_MESSAGE_CLIENT_ID_WRONG_TYPE = "Your client ID should be type of str."

ERROR_MESSAGE_CLIENT_ID_MISSING = (
"Your client ID is missing. You can copy it from the "
Expand All @@ -48,4 +42,4 @@
f"instructions to find it: {FINDING_OKTA_APP_CRED}"
)

ERROR_MESSAGE_AUDIENCE_MISSING = ("Audience is missing.")
ERROR_MESSAGE_AUDIENCE_MISSING = "Audience is missing."
63 changes: 32 additions & 31 deletions okta_jwt_verifier/jwt_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json

from jose import jwt, jws
from jose import jws, jwt
from jose.exceptions import ExpiredSignatureError

from .constants import LEEWAY
Expand All @@ -18,55 +18,56 @@ def parse_token(token):
tuple (headers, claims, signing_input, signature)
"""
headers, payload, signing_input, signature = jws._load(token)
claims = json.loads(payload.decode('utf-8'))
claims = json.loads(payload.decode("utf-8"))
return (headers, claims, signing_input, signature)

@staticmethod
def verify_claims(claims,
claims_to_verify,
audience,
issuer,
leeway=LEEWAY):
def verify_claims(claims, claims_to_verify, audience, issuer, leeway=LEEWAY):
"""Verify claims are present and valid."""
# Check if required claims are present, because library "jose" doesn't raise an exception
for claim in claims_to_verify:
if claim not in claims:
raise JWTValidationException(f'Required claim "{claim}" is not present.')
raise JWTValidationException(
f'Required claim "{claim}" is not present.'
)

# Overwrite defaults in python-jose library
options = {'verify_aud': 'aud' in claims_to_verify,
'verify_iat': 'iat' in claims_to_verify,
'verify_exp': 'exp' in claims_to_verify,
'verify_nbf': 'nbf' in claims_to_verify,
'verify_iss': 'iss' in claims_to_verify,
'verify_sub': 'sub' in claims_to_verify,
'verify_jti': 'jti' in claims_to_verify,
'leeway': leeway}
options = {
"verify_aud": "aud" in claims_to_verify,
"verify_iat": "iat" in claims_to_verify,
"verify_exp": "exp" in claims_to_verify,
"verify_nbf": "nbf" in claims_to_verify,
"verify_iss": "iss" in claims_to_verify,
"verify_sub": "sub" in claims_to_verify,
"verify_jti": "jti" in claims_to_verify,
"leeway": leeway,
}
# Validate claims
jwt._validate_claims(claims,
audience=audience,
issuer=issuer,
options=options)
jwt._validate_claims(claims, audience=audience, issuer=issuer, options=options)

@staticmethod
def verify_signature(token, okta_jwk):
"""Verify token signature using received jwk."""
headers, claims, signing_input, signature = JWTUtils.parse_token(token)
jws._verify_signature(signing_input=signing_input,
header=headers,
signature=signature,
key=okta_jwk,
algorithms=['RS256'])
jws._verify_signature(
signing_input=signing_input,
header=headers,
signature=signature,
key=okta_jwk,
algorithms=["RS256"],
)

@staticmethod
def verify_expiration(token, leeway=LEEWAY):
"""Verify if token is not expired."""
headers, claims, signing_input, signature = JWTUtils.parse_token(token)
try:
JWTUtils.verify_claims(claims,
claims_to_verify=('exp',),
audience=None,
issuer=None,
leeway=LEEWAY)
JWTUtils.verify_claims(
claims,
claims_to_verify=("exp",),
audience=None,
issuer=None,
leeway=LEEWAY,
)
except ExpiredSignatureError:
raise JWTValidationException('Signature has expired.')
raise JWTValidationException("Signature has expired.")
Loading