|
1 | 1 | import os |
2 | 2 | import base64 |
| 3 | +import csv |
3 | 4 |
|
4 | 5 | from flask import redirect, request, render_template, jsonify, make_response, Flask |
5 | 6 | from flask_wtf.csrf import CSRFProtect |
|
15 | 16 |
|
16 | 17 | signed_in = True |
17 | 18 |
|
18 | | -ENCODE_NAME = "ascii" |
| 19 | +ENCODE_NAME = 'ascii' |
19 | 20 |
|
20 | | -ID_FIELD = "id" |
21 | | -USERNAME_FIELD = "username" |
22 | | -NAME_FIELD = "name" |
23 | | -EMAIL_FIELD = "email" |
24 | | -FIRST_NAME_FIELD = "fisrt_name" |
25 | | -LAST_NAME_FIELD = "last_name" |
26 | | -HICN_FIELD = "hicn" |
27 | | -MBI_FIELD = "mbi" |
28 | | -CODE_KEY = "code" |
29 | | -AUTH_HEADER = "Authorization" |
| 21 | +ID_FIELD = 'id' |
| 22 | +USERNAME_FIELD = 'username' |
| 23 | +NAME_FIELD = 'name' |
| 24 | +EMAIL_FIELD = 'email' |
| 25 | +FIRST_NAME_FIELD = 'first_name' |
| 26 | +LAST_NAME_FIELD = 'last_name' |
| 27 | +HICN_FIELD = 'hicn' |
| 28 | +MBI_FIELD = 'mbi' |
| 29 | +CODE_KEY = 'code' |
| 30 | +AUTH_HEADER = 'Authorization' |
30 | 31 |
|
31 | 32 |
|
32 | 33 | def _base64_encode(string): |
33 | | - """ |
34 | | - Removes any `=` used as padding from the encoded string. |
35 | | - """ |
36 | 34 | encoded = base64.urlsafe_b64encode(string) |
37 | | - return encoded.rstrip(b"=") |
| 35 | + return encoded.rstrip(b'=') |
38 | 36 |
|
39 | 37 |
|
40 | 38 | def _base64_decode(string): |
41 | | - """ |
42 | | - Adds back in the required padding before decoding. |
43 | | - """ |
44 | 39 | padding = 4 - (len(string) % 4) |
45 | | - string = string + ("=" * padding) |
| 40 | + string = string + ('=' * padding) |
46 | 41 | return base64.urlsafe_b64decode(string) |
47 | 42 |
|
48 | 43 |
|
49 | | -def _encode(usr="", name="", first_name="", last_name="", email="", hicn="", mbi=""): |
50 | | - return "{}.{}.{}.{}.{}.{}.{}".format(_base64_encode(usr.encode(ENCODE_NAME)).decode(ENCODE_NAME), |
51 | | - _base64_encode(name.encode(ENCODE_NAME)).decode(ENCODE_NAME), |
52 | | - _base64_encode(first_name.encode(ENCODE_NAME)).decode(ENCODE_NAME), |
53 | | - _base64_encode(last_name.encode(ENCODE_NAME)).decode(ENCODE_NAME), |
54 | | - _base64_encode(email.encode(ENCODE_NAME)).decode(ENCODE_NAME), |
55 | | - _base64_encode(hicn.encode(ENCODE_NAME)).decode(ENCODE_NAME), |
56 | | - _base64_encode(mbi.encode(ENCODE_NAME)).decode(ENCODE_NAME)) |
| 44 | +def _encode(usr='', name='', first_name='', last_name='', email='', hicn='', mbi=''): |
| 45 | + return '{}.{}.{}.{}.{}.{}.{}'.format( |
| 46 | + _base64_encode(usr.encode(ENCODE_NAME)).decode(ENCODE_NAME), |
| 47 | + _base64_encode(name.encode(ENCODE_NAME)).decode(ENCODE_NAME), |
| 48 | + _base64_encode(first_name.encode(ENCODE_NAME)).decode(ENCODE_NAME), |
| 49 | + _base64_encode(last_name.encode(ENCODE_NAME)).decode(ENCODE_NAME), |
| 50 | + _base64_encode(email.encode(ENCODE_NAME)).decode(ENCODE_NAME), |
| 51 | + _base64_encode(hicn.encode(ENCODE_NAME)).decode(ENCODE_NAME), |
| 52 | + _base64_encode(mbi.encode(ENCODE_NAME)).decode(ENCODE_NAME) |
| 53 | + ) |
57 | 54 |
|
58 | 55 |
|
59 | 56 | def _decode(b64code): |
60 | | - flds = b64code.split(".") |
| 57 | + flds = b64code.split('.') |
61 | 58 | return { |
62 | | - "usr": _base64_decode(flds[0]).decode(ENCODE_NAME), |
63 | | - "name": _base64_decode(flds[1]).decode(ENCODE_NAME), |
64 | | - "first_name": _base64_decode(flds[2]).decode(ENCODE_NAME), |
65 | | - "last_name": _base64_decode(flds[3]).decode(ENCODE_NAME), |
66 | | - "email": _base64_decode(flds[4]).decode(ENCODE_NAME), |
67 | | - "hicn": _base64_decode(flds[5]).decode(ENCODE_NAME), |
68 | | - "mbi": _base64_decode(flds[6]).decode(ENCODE_NAME), |
| 59 | + 'usr': _base64_decode(flds[0]).decode(ENCODE_NAME), |
| 60 | + 'name': _base64_decode(flds[1]).decode(ENCODE_NAME), |
| 61 | + 'first_name': _base64_decode(flds[2]).decode(ENCODE_NAME), |
| 62 | + 'last_name': _base64_decode(flds[3]).decode(ENCODE_NAME), |
| 63 | + 'email': _base64_decode(flds[4]).decode(ENCODE_NAME), |
| 64 | + 'hicn': _base64_decode(flds[5]).decode(ENCODE_NAME), |
| 65 | + 'mbi': _base64_decode(flds[6]).decode(ENCODE_NAME), |
69 | 66 | } |
70 | 67 |
|
71 | 68 |
|
| 69 | +def _load_user_data(): |
| 70 | + csv_path = os.path.join(os.path.dirname(__file__), 'users.csv') |
| 71 | + users = [] |
| 72 | + |
| 73 | + if os.path.exists(csv_path): |
| 74 | + try: |
| 75 | + with open(csv_path, 'r', encoding='utf-8') as f: |
| 76 | + reader = csv.DictReader(f) |
| 77 | + for row in reader: |
| 78 | + users.append({ |
| 79 | + 'username': row.get('username', '').strip(), |
| 80 | + 'hicn': row.get('hicn', '').strip(), |
| 81 | + 'mbi': row.get('mbi', '').strip(), |
| 82 | + 'name': row.get('name', '').strip(), |
| 83 | + 'first_name': row.get('first_name', '').strip(), |
| 84 | + 'last_name': row.get('last_name', '').strip(), |
| 85 | + 'email': row.get('email', '').strip() |
| 86 | + }) |
| 87 | + except Exception as e: |
| 88 | + print(f"Error loading CSV: {e}") |
| 89 | + |
| 90 | + return users |
| 91 | + |
| 92 | + |
72 | 93 | @app.route('/sso/authorize', methods=['GET']) |
73 | 94 | def login_page(): |
74 | | - ''' |
75 | | - response with login form |
76 | | - ''' |
77 | | - relay = request.args.get("relay", "missing") |
78 | | - redirect_uri = request.args.get("redirect_uri", "missing") |
79 | | - return render_template('login.html', relay=relay, redirect_uri=redirect_uri) |
| 95 | + relay = request.args.get('relay', 'missing') |
| 96 | + redirect_uri = request.args.get('redirect_uri', 'missing') |
| 97 | + users = _load_user_data() |
| 98 | + return render_template('login.html', relay=relay, redirect_uri=redirect_uri, users=users) |
80 | 99 |
|
81 | 100 |
|
82 | 101 | @app.route('/login/', methods=['POST']) |
83 | 102 | def login(): |
84 | | - ''' |
85 | | - process login form POST, collect sub, mbi, hicn, etc. |
86 | | - ''' |
87 | | - |
88 | | - redirect_url = request.form.get("redirect_uri", None) |
89 | | - |
90 | | - req_token = _encode(request.form.get(USERNAME_FIELD, ""), |
91 | | - request.form.get(NAME_FIELD, ""), |
92 | | - request.form.get(FIRST_NAME_FIELD, ""), |
93 | | - request.form.get(LAST_NAME_FIELD, ""), |
94 | | - request.form.get(EMAIL_FIELD, ""), |
95 | | - request.form.get(HICN_FIELD, ""), |
96 | | - request.form.get(MBI_FIELD, "")) |
| 103 | + redirect_url = request.form.get('redirect_uri', None) |
| 104 | + |
| 105 | + req_token = _encode( |
| 106 | + request.form.get(USERNAME_FIELD, ''), |
| 107 | + request.form.get(NAME_FIELD, ''), |
| 108 | + request.form.get(FIRST_NAME_FIELD, ''), |
| 109 | + request.form.get(LAST_NAME_FIELD, ''), |
| 110 | + request.form.get(EMAIL_FIELD, ''), |
| 111 | + request.form.get(HICN_FIELD, ''), |
| 112 | + request.form.get(MBI_FIELD, '') |
| 113 | + ) |
97 | 114 |
|
98 | 115 | qparams = { |
99 | | - "req_token": req_token, |
100 | | - "relay": request.form.get("relay", "") |
| 116 | + 'req_token': req_token, |
| 117 | + 'relay': request.form.get('relay', '') |
101 | 118 | } |
102 | 119 |
|
103 | | - print("redirect={}?{}".format(redirect_url, urlencode(qparams))) |
104 | | - return redirect("{}?{}".format(redirect_url, urlencode(qparams))) |
| 120 | + print('redirect={}?{}'.format(redirect_url, urlencode(qparams))) |
| 121 | + return redirect('{}?{}'.format(redirect_url, urlencode(qparams))) |
105 | 122 |
|
106 | 123 |
|
107 | 124 | @app.route('/health', methods=['GET']) |
108 | 125 | def health(): |
109 | | - ''' |
110 | | - always good |
111 | | - ''' |
112 | | - |
113 | | - return make_response(jsonify({"message": "all's well"})) |
| 126 | + return make_response(jsonify({'message': "all's well"})) |
114 | 127 |
|
115 | 128 |
|
116 | 129 | @csrf.exempt |
117 | 130 | @app.route('/sso/session', methods=['POST']) |
118 | 131 | def token(): |
119 | | - ''' |
120 | | - grant access token to further get userinfo |
121 | | - ''' |
122 | | - |
123 | | - request_token = request.json.get("request_token", None) |
| 132 | + request_token = request.json.get('request_token', None) |
124 | 133 |
|
125 | 134 | if request_token is None: |
126 | | - return make_response(jsonify({"message": "Bad Request, missing request token."}), 400) |
| 135 | + return make_response(jsonify({'message': 'Bad Request, missing request token.'}), 400) |
127 | 136 |
|
128 | 137 | user_info = _decode(request_token) |
129 | 138 |
|
130 | 139 | token = { |
131 | | - "user_id": user_info['usr'], |
132 | | - "auth_token": request_token, |
| 140 | + 'user_id': user_info['usr'], |
| 141 | + 'auth_token': request_token, |
133 | 142 | } |
134 | 143 |
|
135 | 144 | return make_response(jsonify(token)) |
@@ -170,7 +179,7 @@ def userinfo(usr): |
170 | 179 |
|
171 | 180 | @app.route('/sso/signout', methods=['GET']) |
172 | 181 | def signout(): |
173 | | - return make_response(jsonify({"message": "signed out."}), 302) |
| 182 | + return make_response(jsonify({'message': 'signed out.'}), 302) |
174 | 183 |
|
175 | 184 |
|
176 | 185 | if __name__ == '__main__': |
|
0 commit comments