-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathinit.py
More file actions
366 lines (321 loc) · 12.9 KB
/
init.py
File metadata and controls
366 lines (321 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
#CTFd\utils\initialization
import datetime
import hashlib
import logging
import os
import sys
from flask import abort, redirect, render_template, request, session, url_for
from sqlalchemy.exc import IntegrityError, InvalidRequestError
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from CTFd.cache import clear_user_recent_ips
from CTFd.exceptions import UserNotFoundException, UserTokenExpiredException
from CTFd.models import Tracking, db
from CTFd.utils import config, get_app_config, get_config, import_in_progress, markdown
from CTFd.utils.config import (
can_send_mail,
ctf_logo,
ctf_name,
ctf_theme,
integrations,
is_setup,
)
from CTFd.utils.config.pages import get_pages
from CTFd.utils.dates import isoformat, unix_time, unix_time_millis, unix_time_to_utc
from CTFd.utils.events import EventManager, RedisEventManager
from CTFd.utils.humanize.words import pluralize
from CTFd.utils.modes import generate_account_url, get_mode_as_word
from CTFd.utils.plugins import (
get_configurable_plugins,
get_menubar_plugins,
get_registered_admin_scripts,
get_registered_admin_stylesheets,
get_registered_scripts,
get_registered_stylesheets,
)
from CTFd.utils.security.auth import login_user, logout_user, lookup_user_token
from CTFd.utils.security.csrf import generate_nonce
from CTFd.utils.user import (
authed,
get_current_team_attrs,
get_current_user_attrs,
get_current_user_recent_ips,
get_ip,
get_locale,
is_admin,
)
def init_cli(app):
from CTFd.cli import _cli
app.register_blueprint(_cli, cli_group=None)
def _gravatar_hash(value):
"""Compute MD5 hash for Gravatar URLs server-side to avoid exposing emails."""
if value:
return hashlib.md5(
value.lower().strip().encode("utf-8")
).hexdigest()
return ""
def init_template_filters(app):
app.jinja_env.filters["markdown"] = markdown
app.jinja_env.filters["unix_time"] = unix_time
app.jinja_env.filters["unix_time_millis"] = unix_time_millis
app.jinja_env.filters["unix_time_to_utc"] = unix_time_to_utc
app.jinja_env.filters["isoformat"] = isoformat
app.jinja_env.filters["pluralize"] = pluralize
app.jinja_env.filters["gravatar_hash"] = _gravatar_hash
def init_template_globals(app):
from CTFd.constants import JINJA_ENUMS # noqa: I001
from CTFd.constants.assets import Assets
from CTFd.constants.config import Configs
from CTFd.constants.languages import Languages
from CTFd.constants.plugins import Plugins
from CTFd.constants.sessions import Session
from CTFd.constants.static import Static
from CTFd.constants.teams import Team
from CTFd.constants.users import User
from CTFd.forms import Forms
from CTFd.utils.config.visibility import (
accounts_visible,
challenges_visible,
registration_visible,
scores_visible,
)
from CTFd.utils.countries import get_countries, lookup_country_code
from CTFd.utils.countries.geoip import lookup_ip_address, lookup_ip_address_city
app.jinja_env.globals.update(config=config)
app.jinja_env.globals.update(get_pages=get_pages)
app.jinja_env.globals.update(can_send_mail=can_send_mail)
app.jinja_env.globals.update(get_ctf_name=ctf_name)
app.jinja_env.globals.update(get_ctf_logo=ctf_logo)
app.jinja_env.globals.update(get_ctf_theme=ctf_theme)
app.jinja_env.globals.update(get_menubar_plugins=get_menubar_plugins)
app.jinja_env.globals.update(get_configurable_plugins=get_configurable_plugins)
app.jinja_env.globals.update(get_registered_scripts=get_registered_scripts)
app.jinja_env.globals.update(get_registered_stylesheets=get_registered_stylesheets)
app.jinja_env.globals.update(
get_registered_admin_scripts=get_registered_admin_scripts
)
app.jinja_env.globals.update(
get_registered_admin_stylesheets=get_registered_admin_stylesheets
)
app.jinja_env.globals.update(get_config=get_config)
app.jinja_env.globals.update(generate_account_url=generate_account_url)
app.jinja_env.globals.update(get_countries=get_countries)
app.jinja_env.globals.update(lookup_country_code=lookup_country_code)
app.jinja_env.globals.update(lookup_ip_address=lookup_ip_address)
app.jinja_env.globals.update(lookup_ip_address_city=lookup_ip_address_city)
app.jinja_env.globals.update(accounts_visible=accounts_visible)
app.jinja_env.globals.update(challenges_visible=challenges_visible)
app.jinja_env.globals.update(registration_visible=registration_visible)
app.jinja_env.globals.update(scores_visible=scores_visible)
app.jinja_env.globals.update(get_mode_as_word=get_mode_as_word)
app.jinja_env.globals.update(integrations=integrations)
app.jinja_env.globals.update(authed=authed)
app.jinja_env.globals.update(is_admin=is_admin)
app.jinja_env.globals.update(get_current_user_attrs=get_current_user_attrs)
app.jinja_env.globals.update(get_current_team_attrs=get_current_team_attrs)
app.jinja_env.globals.update(get_ip=get_ip)
app.jinja_env.globals.update(get_locale=get_locale)
app.jinja_env.globals.update(Assets=Assets)
app.jinja_env.globals.update(Configs=Configs)
app.jinja_env.globals.update(Plugins=Plugins)
app.jinja_env.globals.update(Session=Session)
app.jinja_env.globals.update(Static=Static)
app.jinja_env.globals.update(Forms=Forms)
app.jinja_env.globals.update(User=User)
app.jinja_env.globals.update(Team=Team)
app.jinja_env.globals.update(Languages=Languages)
# Add in JinjaEnums
# The reason this exists is that on double import, JinjaEnums are not reinitialized
# Thus, if you try to create two jinja envs (e.g. during testing), sometimes
# an Enum will not be available to Jinja.
# Instead we can just directly grab them from the persisted global dictionary.
for k, v in JINJA_ENUMS.items():
# .update() can't be used here because it would use the literal value k
app.jinja_env.globals[k] = v
def init_logs(app):
logger_submissions = logging.getLogger("submissions")
logger_logins = logging.getLogger("logins")
logger_registrations = logging.getLogger("registrations")
logger_submissions.setLevel(logging.INFO)
logger_logins.setLevel(logging.INFO)
logger_registrations.setLevel(logging.INFO)
log_dir = app.config["LOG_FOLDER"]
if not os.path.exists(log_dir):
os.makedirs(log_dir)
logs = {
"submissions": os.path.join(log_dir, "submissions.log"),
"logins": os.path.join(log_dir, "logins.log"),
"registrations": os.path.join(log_dir, "registrations.log"),
}
try:
for log in logs.values():
if not os.path.exists(log):
open(log, "a").close()
submission_log = logging.handlers.RotatingFileHandler(
logs["submissions"], maxBytes=10485760, backupCount=5
)
login_log = logging.handlers.RotatingFileHandler(
logs["logins"], maxBytes=10485760, backupCount=5
)
registration_log = logging.handlers.RotatingFileHandler(
logs["registrations"], maxBytes=10485760, backupCount=5
)
logger_submissions.addHandler(submission_log)
logger_logins.addHandler(login_log)
logger_registrations.addHandler(registration_log)
except IOError:
pass
stdout = logging.StreamHandler(stream=sys.stdout)
logger_submissions.addHandler(stdout)
logger_logins.addHandler(stdout)
logger_registrations.addHandler(stdout)
logger_submissions.propagate = 0
logger_logins.propagate = 0
logger_registrations.propagate = 0
def init_events(app):
if app.config.get("CACHE_TYPE") == "redis":
app.events_manager = RedisEventManager()
elif app.config.get("CACHE_TYPE") == "filesystem":
app.events_manager = EventManager()
else:
app.events_manager = EventManager()
app.events_manager.listen()
def init_request_processors(app):
@app.url_defaults
def inject_theme(endpoint, values):
if "theme" not in values and app.url_map.is_endpoint_expecting(
endpoint, "theme"
):
values["theme"] = ctf_theme()
@app.before_request
def needs_setup():
if import_in_progress():
if request.endpoint == "admin.import_ctf":
return
else:
return "Import currently in progress", 403
if is_setup() is False:
if request.endpoint in (
"views.setup",
"views.integrations",
"views.themes",
"views.files",
"views.healthcheck",
"views.robots",
):
return
else:
return redirect(url_for("views.setup"))
@app.before_request
def tracker():
if request.endpoint == "views.themes":
return
if import_in_progress():
if request.endpoint == "admin.import_ctf":
return
else:
return "Import currently in progress", 403
if authed():
user_ips = get_current_user_recent_ips()
ip = get_ip()
track = None
if ip not in user_ips or request.method in (
"POST",
"PATCH",
"DELETE",
):
track = Tracking.query.filter_by(
ip=get_ip(), user_id=session["id"]
).first()
if track:
track.date = datetime.datetime.utcnow()
else:
track = Tracking(ip=get_ip(), user_id=session["id"])
db.session.add(track)
if track:
try:
db.session.commit()
except (InvalidRequestError, IntegrityError):
db.session.rollback()
db.session.close()
logout_user()
else:
clear_user_recent_ips(user_id=session["id"])
@app.before_request
def banned():
if request.endpoint == "views.themes":
return
if authed():
user = get_current_user_attrs()
team = get_current_team_attrs()
if user and user.banned:
return (
render_template(
"errors/403.html", error="You have been banned from this CTF"
),
403,
)
if team and team.banned:
return (
render_template(
"errors/403.html",
error="Your team has been banned from this CTF",
),
403,
)
@app.before_request
def tokens():
token = request.headers.get("Authorization")
if token and (
request.mimetype == "application/json"
# Specially allow multipart/form-data for file uploads
or (
request.endpoint == "api.files_files_list"
and request.method == "POST"
and request.mimetype == "multipart/form-data"
)
):
try:
token_type, token = token.split(" ", 1)
user = lookup_user_token(token)
except UserNotFoundException:
abort(401)
except UserTokenExpiredException:
abort(401, description="Your access token has expired")
except Exception:
abort(401)
else:
login_user(user)
@app.before_request
def csrf():
try:
func = app.view_functions[request.endpoint]
except KeyError:
abort(404)
if hasattr(func, "_bypass_csrf"):
return
if request.headers.get("Authorization"):
return
if not session.get("nonce"):
session["nonce"] = generate_nonce()
if request.method not in ("GET", "HEAD", "OPTIONS", "TRACE"):
if request.content_type == "application/json":
if session["nonce"] != request.headers.get("CSRF-Token"):
abort(403)
if request.content_type != "application/json":
if session["nonce"] != request.form.get("nonce"):
abort(403)
@app.after_request
def response_headers(response):
response.headers["Cross-Origin-Opener-Policy"] = get_app_config(
"CROSS_ORIGIN_OPENER_POLICY", default="same-origin-allow-popups"
)
return response
application_root = app.config.get("APPLICATION_ROOT")
if application_root != "/":
@app.before_request
def force_subdirectory_redirect():
if request.path.startswith(application_root) is False:
return redirect(
application_root + request.script_root + request.full_path
)
app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {application_root: app})