-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathauth.py
More file actions
114 lines (88 loc) · 3.99 KB
/
auth.py
File metadata and controls
114 lines (88 loc) · 3.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""Authorization token handling."""
from functools import wraps
from flask import current_app, request
import jwt
import requests
from os import getenv
from exceptions import HTTPError
from utils import fetch_public_key
def decode_token(token):
"""Decode the authorization token read from the request header."""
if token is None:
return {}
if token.startswith('Bearer '):
_, token = token.split(' ', 1)
pub_key = fetch_public_key(current_app)
audiences = get_audiences()
decoded_token = None
for aud in audiences:
try:
decoded_token = jwt.decode(token, pub_key, algorithms=['RS256'], audience=aud)
except jwt.InvalidTokenError:
current_app.logger.error('Auth Token could not be decoded for audience {}'.format(aud))
decoded_token = None
if decoded_token is not None:
break
if decoded_token is None:
raise jwt.InvalidTokenError('Auth token audience cannot be verified.')
return decoded_token
def get_token_from_auth_header():
"""Get the authorization token read from the request header."""
return request.headers.get('Authorization')
def get_audiences():
"""Retrieve all JWT audiences."""
return getenv('BAYESIAN_JWT_AUDIENCE').split(',')
def init_auth_sa_token():
"""Initialize a service token from auth service."""
auth_server_url = getenv('AUTH_SERVICE_HOST', 'https://auth.prod-preview.openshift.io')
endpoint = '{url}/api/token'.format(url=auth_server_url)
client_id = getenv('GEIMINI_SA_CLIENT_ID', 'id')
client_secret = getenv('GEMINI_SA_CLIENT_SECRET', 'secret')
payload = {"grant_type": "client_credentials",
"client_id": client_id.strip(),
"client_secret": client_secret.strip()}
try:
print('TOKEN GENERATION: endpoint is %s' % endpoint)
print('TOKEN GENERATION: payload is %r' % payload)
resp = requests.post(endpoint, json=payload)
print("RESPONSE STATUS = %d" % resp.status_code)
except requests.exceptions.RequestException as e:
raise e
if resp.status_code == 200:
data = resp.json()
try:
access_token = data['access_token']
print("Access token has been generated successfully")
except IndexError as e:
print("requests.exceptions.RequestException during Access token generation")
raise requests.exceptions.RequestException
return access_token
else:
print("Unexpected HTTP response. Raised requests.exceptions.RequestException")
raise requests.exceptions.RequestException
def login_required(view): # pragma: no cover
"""Check if the login is required and if the user can be authorized."""
# NOTE: the actual authentication 401 failures are commented out for now and will be
# uncommented as soon as we know everything works fine; right now this is purely for
# being able to tail logs and see if stuff is going fine
@wraps(view)
def wrapper(*args, **kwargs):
# Disable authentication for local setup
if getenv('DISABLE_AUTHENTICATION') in ('1', 'True', 'true'):
return view(*args, **kwargs)
lgr = current_app.logger
try:
decoded = decode_token(get_token_from_auth_header())
if not decoded:
lgr.exception('Provide an Authorization token with the API request')
raise HTTPError(401, 'Authentication failed - token missing')
lgr.info('Successfuly authenticated user {e} using JWT'.
format(e=decoded.get('email')))
except jwt.ExpiredSignatureError as exc:
lgr.exception('Expired JWT token')
raise HTTPError(401, 'Authentication failed - token has expired') from exc
except Exception as exc:
lgr.exception('Failed decoding JWT token')
raise HTTPError(401, 'Authentication failed - could not decode JWT token') from exc
return view(*args, **kwargs)
return wrapper