-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy path__init__.py
More file actions
139 lines (120 loc) · 4.55 KB
/
__init__.py
File metadata and controls
139 lines (120 loc) · 4.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
Provides all the functionality for contexts
"""
import contextvars
import json
from json import JSONDecodeError
from time import sleep
from typing import Dict
from urllib.parse import parse_qs
from aikido_zen.helpers.build_route_from_url import build_route_from_url
from aikido_zen.helpers.get_subdomains_from_url import get_subdomains_from_url
from aikido_zen.helpers.logging import logger
from .wsgi import set_wsgi_attributes_on_context
from .asgi import set_asgi_attributes_on_context
from .extract_route_params import extract_route_params
from ..helpers.headers import Headers
UINPUT_SOURCES = ["body", "cookies", "query", "headers", "xml", "route_params"]
current_context = contextvars.ContextVar("current_context", default=None)
WSGI_SOURCES = ["django", "flask"]
ASGI_SOURCES = ["quart", "django_async", "starlette"]
def get_current_context():
"""Returns the current context"""
try:
return current_context.get()
except Exception:
return None
class Context:
"""
A context object, it stores everything that is important
for vulnerability detection
"""
def __init__(self, context_obj=None, body=None, req=None, source=None):
if context_obj:
logger.debug("Creating Context instance based on dict object.")
self.__dict__.update(context_obj)
return
# Define emtpy variables/Properties :
self.source = source
self.user = None
self.rate_limit_group = None
self.parsed_userinput = {}
self.xml = {}
self.outgoing_req_redirects = []
self.set_body(body)
self.headers: Headers = Headers()
self.cookies = dict()
self.query = dict()
# Parse WSGI/ASGI/... request :
self.method = self.remote_address = self.url = None
if source in WSGI_SOURCES:
set_wsgi_attributes_on_context(self, req)
elif source in ASGI_SOURCES:
set_asgi_attributes_on_context(self, req)
# Define variables using parsed request :
self.route = build_route_from_url(self.url)
self.route_params = extract_route_params(self.url)
self.subdomains = get_subdomains_from_url(self.url)
self.executed_middleware = False
def __reduce__(self):
return (
self.__class__,
(
{
"method": self.method,
"remote_address": self.remote_address,
"url": self.url,
"body": self.body,
"headers": self.headers,
"query": self.query,
"cookies": self.cookies,
"source": self.source,
"route": self.route,
"subdomains": self.subdomains,
"user": self.user,
"rate_limit_group": self.rate_limit_group,
"xml": self.xml,
"outgoing_req_redirects": self.outgoing_req_redirects,
"executed_middleware": self.executed_middleware,
"route_params": self.route_params,
},
None,
None,
),
)
def set_as_current_context(self):
"""
Set the current context
"""
current_context.set(self)
def set_cookies(self, cookies):
self.cookies = cookies
def set_body(self, body):
try:
self.set_body_internal(body)
except Exception as e:
logger.debug("Exception occurred whilst setting body: %s", e)
def set_body_internal(self, body):
"""Sets the body and checks if it's possibly JSON"""
self.body = body
if isinstance(self.body, (str, bytes)) and len(body) == 0:
# Make sure that empty bodies like b"" don't get sent.
self.body = None
if isinstance(self.body, bytes):
self.body = self.body.decode("utf-8") # Decode byte input to string.
if not isinstance(self.body, str):
return
if self.body.strip()[0] in ["{", "[", '"']:
# Might be JSON, but might not have been parsed correctly by server because of wrong headers
parsed_body = json.loads(self.body)
if parsed_body:
self.body = parsed_body
def get_route_metadata(self):
"""Returns a route_metadata object"""
return {
"method": self.method,
"route": self.route,
"url": self.url,
}
def get_user_agent(self):
return self.headers.get_header("USER_AGENT")