Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ repos:
rev: 1.23.1
hooks:
- id: django-upgrade
args: [--target-version, "5.1"]
args: [--target-version, "5.2", "--skip", "request_headers"]

- repo: https://github.com/psf/black
rev: 25.1.0
Expand Down
49 changes: 49 additions & 0 deletions django_async_extensions/middleware/clickjacking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
Clickjacking Protection Middleware.

This module provides a middleware that implements protection against a
malicious site loading resources from your site in a hidden frame.
"""

from django.conf import settings

from django_async_extensions.middleware.base import AsyncMiddlewareMixin


class AsyncXFrameOptionsMiddleware(AsyncMiddlewareMixin):
"""
Set the X-Frame-Options HTTP header in HTTP responses.

Do not set the header if it's already set or if the response contains
a xframe_options_exempt value set to True.

By default, set the X-Frame-Options header to 'DENY', meaning the response
cannot be displayed in a frame, regardless of the site attempting to do so.
To enable the response to be loaded on a frame within the same site, set
X_FRAME_OPTIONS in your project's Django settings to 'SAMEORIGIN'.
"""

async def process_response(self, request, response):
# Don't set it if it's already in the response
if response.get("X-Frame-Options") is not None:
return response

# Don't set it if they used @xframe_options_exempt
if getattr(response, "xframe_options_exempt", False):
return response

response.headers["X-Frame-Options"] = self.get_xframe_options_value(
request,
response,
)
return response

def get_xframe_options_value(self, request, response):
"""
Get the value to set for the X_FRAME_OPTIONS header. Use the value from
the X_FRAME_OPTIONS setting, or 'DENY' if not set.

This method can be overridden if needed, allowing it to vary based on
the request or response.
"""
return getattr(settings, "X_FRAME_OPTIONS", "DENY").upper()
179 changes: 179 additions & 0 deletions django_async_extensions/middleware/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import re
from urllib.parse import urlsplit

from django.conf import settings
from django.core.exceptions import PermissionDenied
from django.core.mail import mail_managers
from django.http import HttpResponsePermanentRedirect
from django.urls import is_valid_path
from django.utils.http import escape_leading_slashes

from django_async_extensions.middleware.base import AsyncMiddlewareMixin


class AsyncCommonMiddleware(AsyncMiddlewareMixin):
"""
"Common" middleware for taking care of some basic operations:

- Forbid access to User-Agents in settings.DISALLOWED_USER_AGENTS

- URL rewriting: Based on the APPEND_SLASH and PREPEND_WWW settings,
append missing slashes and/or prepends missing "www."s.

- If APPEND_SLASH is set and the initial URL doesn't end with a
slash, and it is not found in urlpatterns, form a new URL by
appending a slash at the end. If this new URL is found in
urlpatterns, return an HTTP redirect to this new URL; otherwise
process the initial URL as usual.

This behavior can be customized by subclassing AsyncCommonMiddleware and
overriding the response_redirect_class attribute.
"""

response_redirect_class = HttpResponsePermanentRedirect

async def process_request(self, request):
"""
Check for denied User-Agents and rewrite the URL based on
settings.APPEND_SLASH and settings.PREPEND_WWW
"""

# Check for denied User-Agents
user_agent = request.META.get("HTTP_USER_AGENT")
if user_agent is not None:
for user_agent_regex in settings.DISALLOWED_USER_AGENTS:
if user_agent_regex.search(user_agent):
raise PermissionDenied("Forbidden user agent")

# Check for a redirect based on settings.PREPEND_WWW
host = request.get_host()

if settings.PREPEND_WWW and host and not host.startswith("www."):
# Check if we also need to append a slash so we can do it all
# with a single redirect. (This check may be somewhat expensive,
# so we only do it if we already know we're sending a redirect,
# or in process_response if we get a 404.)
if self.should_redirect_with_slash(request):
path = self.get_full_path_with_slash(request)
else:
path = request.get_full_path()

return self.response_redirect_class(f"{request.scheme}://www.{host}{path}")

def should_redirect_with_slash(self, request):
"""
Return True if settings.APPEND_SLASH is True and appending a slash to
the request path turns an invalid path into a valid one.
"""
if settings.APPEND_SLASH and not request.path_info.endswith("/"):
urlconf = getattr(request, "urlconf", None)
if not is_valid_path(request.path_info, urlconf):
match = is_valid_path("%s/" % request.path_info, urlconf)
if match:
view = match.func
return getattr(view, "should_append_slash", True)
return False

def get_full_path_with_slash(self, request):
"""
Return the full path of the request with a trailing slash appended.

Raise a RuntimeError if settings.DEBUG is True and request.method is
DELETE, POST, PUT, or PATCH.
"""
new_path = request.get_full_path(force_append_slash=True)
# Prevent construction of scheme relative urls.
new_path = escape_leading_slashes(new_path)
if settings.DEBUG and request.method in ("DELETE", "POST", "PUT", "PATCH"):
raise RuntimeError(
"You called this URL via %(method)s, but the URL doesn't end "
"in a slash and you have APPEND_SLASH set. Django can't "
"redirect to the slash URL while maintaining %(method)s data. "
"Change your form to point to %(url)s (note the trailing "
"slash), or set APPEND_SLASH=False in your Django settings."
% {
"method": request.method,
"url": request.get_host() + new_path,
}
)
return new_path

async def process_response(self, request, response):
"""
When the status code of the response is 404, it may redirect to a path
with an appended slash if should_redirect_with_slash() returns True.
"""
# If the given URL is "Not Found", then check if we should redirect to
# a path with a slash appended.
if response.status_code == 404 and self.should_redirect_with_slash(request):
return self.response_redirect_class(self.get_full_path_with_slash(request))

# Add the Content-Length header to non-streaming responses if not
# already set.
if not response.streaming and not response.has_header("Content-Length"):
response.headers["Content-Length"] = str(len(response.content))

return response


class AsyncBrokenLinkEmailsMiddleware(AsyncMiddlewareMixin):
async def process_response(self, request, response):
"""Send broken link emails for relevant 404 NOT FOUND responses."""
if response.status_code == 404 and not settings.DEBUG:
domain = request.get_host()
path = request.get_full_path()
referer = request.META.get("HTTP_REFERER", "")

if not self.is_ignorable_request(request, path, domain, referer):
ua = request.META.get("HTTP_USER_AGENT", "<none>")
ip = request.META.get("REMOTE_ADDR", "<none>")
mail_managers(
"Broken %slink on %s"
% (
(
"INTERNAL "
if self.is_internal_request(domain, referer)
else ""
),
domain,
),
"Referrer: %s\nRequested URL: %s\nUser agent: %s\n"
"IP address: %s\n" % (referer, path, ua, ip),
fail_silently=True,
)
return response

def is_internal_request(self, domain, referer):
"""
Return True if the referring URL is the same domain as the current
request.
"""
# Different subdomains are treated as different domains.
return bool(re.match("^https?://%s/" % re.escape(domain), referer))

def is_ignorable_request(self, request, uri, domain, referer):
"""
Return True if the given request *shouldn't* notify the site managers
according to project settings or in situations outlined by the inline
comments.
"""
# The referer is empty.
if not referer:
return True

# APPEND_SLASH is enabled and the referer is equal to the current URL
# without a trailing slash indicating an internal redirect.
if settings.APPEND_SLASH and uri.endswith("/") and referer == uri[:-1]:
return True

# A '?' in referer is identified as a search engine source.
if not self.is_internal_request(domain, referer) and "?" in referer:
return True

# The referer is equal to the current URL, ignoring the scheme (assumed
# to be a poorly implemented bot).
parsed_referer = urlsplit(referer)
if parsed_referer.netloc in ["", domain] and parsed_referer.path == uri:
return True

return any(pattern.search(uri) for pattern in settings.IGNORABLE_404_URLS)
76 changes: 76 additions & 0 deletions django_async_extensions/middleware/gzip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from django.utils.cache import patch_vary_headers
from django.utils.regex_helper import _lazy_re_compile
from django.utils.text import compress_sequence, compress_string

from django_async_extensions.middleware.base import AsyncMiddlewareMixin


re_accepts_gzip = _lazy_re_compile(r"\bgzip\b")


class AsyncGZipMiddleware(AsyncMiddlewareMixin):
"""
Compress content if the browser allows gzip compression.
Set the Vary header accordingly, so that caches will base their storage
on the Accept-Encoding header.
"""

max_random_bytes = 100

async def process_response(self, request, response):
# It's not worth attempting to compress really short responses.
if not response.streaming and len(response.content) < 200:
return response

# Avoid gzipping if we've already got a content-encoding.
if response.has_header("Content-Encoding"):
return response

patch_vary_headers(response, ("Accept-Encoding",))

ae = request.headers.get("accept-encoding", "")
if not re_accepts_gzip.search(ae):
return response

if response.streaming:
if response.is_async:
# pull to lexical scope to capture fixed reference in case
# streaming_content is set again later.
orignal_iterator = response.streaming_content

async def gzip_wrapper():
async for chunk in orignal_iterator:
yield compress_string(
chunk,
max_random_bytes=self.max_random_bytes,
)

response.streaming_content = gzip_wrapper()
else:
response.streaming_content = compress_sequence(
response.streaming_content,
max_random_bytes=self.max_random_bytes,
)
# Delete the `Content-Length` header for streaming content, because
# we won't know the compressed size until we stream it.
del response.headers["Content-Length"]
else:
# Return the compressed content only if it's actually shorter.
compressed_content = compress_string(
response.content,
max_random_bytes=self.max_random_bytes,
)
if len(compressed_content) >= len(response.content):
return response
response.content = compressed_content
response.headers["Content-Length"] = str(len(response.content))

# If there is a strong ETag, make it weak to fulfill the requirements
# of RFC 9110 Section 8.8.1 while also allowing conditional request
# matches on ETags.
etag = response.get("ETag")
if etag and etag.startswith('"'):
response.headers["ETag"] = "W/" + etag
response.headers["Content-Encoding"] = "gzip"

return response
41 changes: 41 additions & 0 deletions django_async_extensions/middleware/http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from django.utils.cache import cc_delim_re, get_conditional_response, set_response_etag
from django.utils.http import parse_http_date_safe

from django_async_extensions.middleware.base import AsyncMiddlewareMixin


class AsyncConditionalGetMiddleware(AsyncMiddlewareMixin):
"""
Handle conditional GET operations. If the response has an ETag or
Last-Modified header and the request has If-None-Match or If-Modified-Since,
replace the response with HttpNotModified. Add an ETag header if needed.
"""

async def process_response(self, request, response):
# It's too late to prevent an unsafe request with a 412 response, and
# for a HEAD request, the response body is always empty so computing
# an accurate ETag isn't possible.
if request.method != "GET":
return response

if self.needs_etag(response) and not response.has_header("ETag"):
set_response_etag(response)

etag = response.get("ETag")
last_modified = response.get("Last-Modified")
last_modified = last_modified and parse_http_date_safe(last_modified)

if etag or last_modified:
return get_conditional_response(
request,
etag=etag,
last_modified=last_modified,
response=response,
)

return response

def needs_etag(self, response):
"""Return True if an ETag header should be added to response."""
cache_control_headers = cc_delim_re.split(response.get("Cache-Control", ""))
return all(header.lower() != "no-store" for header in cache_control_headers)
Loading
Loading