Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions bbot/modules/lightfuzz/lightfuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ class lightfuzz(BaseModule):
"force_common_headers": False,
"enabled_submodules": ["sqli", "cmdi", "xss", "path", "ssti", "crypto", "serial", "esi"],
"disable_post": False,
"try_post_as_get": False,
"try_get_as_post": False,
"avoid_wafs": True,
}
options_desc = {
"force_common_headers": "Force emit commonly exploitable parameters that may be difficult to detect",
"enabled_submodules": "A list of submodules to enable. Empty list enabled all modules.",
"disable_post": "Disable processing of POST parameters, avoiding form submissions.",
"try_post_as_get": "For each POSTPARAM, also fuzz it as a GETPARAM (in addition to normal POST fuzzing).",
"try_get_as_post": "For each GETPARAM, also fuzz it as a POSTPARAM (in addition to normal GET fuzzing).",
"avoid_wafs": "Avoid running against confirmed WAFs, which are likely to block lightfuzz requests",
}

Expand All @@ -38,6 +42,8 @@ async def setup(self):
self.interactsh_instance = None
self.interactsh_domain = None
self.disable_post = self.config.get("disable_post", False)
self.try_post_as_get = self.config.get("try_post_as_get", False)
self.try_get_as_post = self.config.get("try_get_as_post", False)
self.enabled_submodules = self.config.get("enabled_submodules")
self.interactsh_disable = self.scan.config.get("interactsh_disable", False)
self.avoid_wafs = self.scan.config.get("avoid_wafs", True)
Expand Down Expand Up @@ -145,9 +151,29 @@ async def handle_event(self, event):
connectivity_test = await self.helpers.request(event.data["url"], timeout=10)

if connectivity_test:
for submodule_name, submodule in self.submodules.items():
self.debug(f"Starting {submodule_name} fuzz()")
await self.run_submodule(submodule, event)
original_type = event.data["type"]

# Normal fuzzing pass (skipped for POSTPARAM if disable_post is True)
if not (self.disable_post and original_type == "POSTPARAM"):
for submodule_name, submodule in self.submodules.items():
self.debug(f"Starting {submodule_name} fuzz()")
await self.run_submodule(submodule, event)

# Additional pass: try POSTPARAM as GETPARAM
if self.try_post_as_get and original_type == "POSTPARAM":
event.data["type"] = "GETPARAM"
event.data["converted_from_post"] = True
for submodule_name, submodule in self.submodules.items():
self.debug(f"Starting {submodule_name} fuzz() (try_post_as_get)")
await self.run_submodule(submodule, event)

# Additional pass: try GETPARAM as POSTPARAM
if self.try_get_as_post and original_type == "GETPARAM":
event.data["type"] = "POSTPARAM"
event.data["converted_from_get"] = True
for submodule_name, submodule in self.submodules.items():
self.debug(f"Starting {submodule_name} fuzz() (try_get_as_post)")
await self.run_submodule(submodule, event)
else:
self.debug(f"WEB_PARAMETER URL {event.data['url']} failed connectivity test, aborting")

Expand Down Expand Up @@ -181,7 +207,8 @@ async def filter_event(self, event):

# If we've disabled fuzzing POST parameters, back out of POSTPARAM WEB_PARAMETER events as quickly as possible
if event.type == "WEB_PARAMETER" and self.disable_post and event.data["type"] == "POSTPARAM":
return False, "POST parameter disabled in lightfuzz module"
if not self.try_post_as_get:
return False, "POST parameter disabled in lightfuzz module"
return True

@classmethod
Expand Down
9 changes: 8 additions & 1 deletion bbot/modules/lightfuzz/submodules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,15 @@ async def standard_probe(
self.debug(f"standard_probe requested URL: [{request_params['url']}]")
return await self.lightfuzz.helpers.request(**request_params)

def conversion_note(self):
if self.event.data.get("converted_from_post", False):
return " (converted from POSTPARAM)"
elif self.event.data.get("converted_from_get", False):
return " (converted from GETPARAM)"
return ""

def metadata(self):
metadata_string = f"Parameter: [{self.event.data['name']}] Parameter Type: [{self.event.data['type']}]"
metadata_string = f"Parameter: [{self.event.data['name']}] Parameter Type: [{self.event.data['type']}]{self.conversion_note()}"
if self.event.data["original_value"] != "" and self.event.data["original_value"] is not None:
metadata_string += (
f" Original Value: [{self.lightfuzz.helpers.truncate_string(self.event.data['original_value'], 200)}]"
Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/lightfuzz/submodules/esi.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def check_probe(self, cookies, probe, match):
self.results.append(
{
"type": "FINDING",
"description": f"Edge Side Include. Parameter: [{self.event.data['name']}] Parameter Type: [{self.event.data['type']}]",
"description": f"Edge Side Include. Parameter: [{self.event.data['name']}] Parameter Type: [{self.event.data['type']}]{self.conversion_note()}",
}
)
return True
Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/lightfuzz/submodules/xss.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async def check_probe(self, cookies, probe, match, context):
self.results.append(
{
"type": "FINDING",
"description": f"Possible Reflected XSS. Parameter: [{self.event.data['name']}] Context: [{context}] Parameter Type: [{self.event.data['type']}]",
"description": f"Possible Reflected XSS. Parameter: [{self.event.data['name']}] Context: [{context}] Parameter Type: [{self.event.data['type']}]{self.conversion_note()}",
}
)
return True
Expand Down
2 changes: 2 additions & 0 deletions bbot/presets/web/lightfuzz-heavy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ config:
lightfuzz:
enabled_submodules: [cmdi,crypto,path,serial,sqli,ssti,xss,esi]
disable_post: False
try_post_as_get: True
try_get_as_post: True
1 change: 1 addition & 0 deletions bbot/presets/web/lightfuzz-medium.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ config:
modules:
lightfuzz:
enabled_submodules: [cmdi,crypto,path,serial,sqli,ssti,xss,esi]
try_post_as_get: True
153 changes: 153 additions & 0 deletions bbot/test/test_step_2/module_tests/test_module_lightfuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -2146,3 +2146,156 @@ async def test_filter_event(self, module_test):
def check(self, module_test, events):
# This test doesn't need to check events since it's testing the filter method directly
pass


# try_post_as_get: fuzz POST parameters as GET parameters
class Test_Lightfuzz_try_post_as_get(ModuleTestBase):
targets = ["http://127.0.0.1:8888"]
modules_overrides = ["httpx", "lightfuzz", "excavate"]
config_overrides = {
"interactsh_disable": True,
"modules": {
"lightfuzz": {
"enabled_submodules": ["sqli"],
"disable_post": True,
"try_post_as_get": True,
}
},
}

def request_handler(self, request):
qs = str(request.query_string.decode())

parameter_block = """
<section class=search>
<form action=/ method=POST>
<input type=text placeholder='Search the blog...' name=search>
<button type=submit class=button>Search</button>
</form>
</section>
"""

if "search=" in qs:
value = qs.split("=")[1]
if "&" in value:
value = value.split("&")[0]

sql_block_normal = f"""
<section class=blog-header>
<h1>0 search results for '{unquote(value)}'</h1>
<hr>
</section>
"""

sql_block_error = """
<section class=error>
<h1>Found error in SQL query</h1>
<hr>
</section>
"""
if value.endswith("'"):
if value.endswith("''"):
return Response(sql_block_normal, status=200)
return Response(sql_block_error, status=500)
return Response(parameter_block, status=200)

async def setup_after_prep(self, module_test):
module_test.scan.modules["lightfuzz"].helpers.rand_string = lambda *args, **kwargs: "AAAAAAAAAAAAAA"
expect_args = re.compile("/")
module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler)

def check(self, module_test, events):
web_parameter_emitted = False
sqli_getparam_finding_emitted = False
sqli_postparam_finding_emitted = False
for e in events:
if e.type == "WEB_PARAMETER":
if "HTTP Extracted Parameter [search]" in e.data["description"]:
web_parameter_emitted = True

if e.type == "FINDING":
if (
"Possible SQL Injection. Parameter: [search] Parameter Type: [GETPARAM] (converted from POSTPARAM) Detection Method: [Single Quote/Two Single Quote, Code Change (200->500->200)]"
in e.data["description"]
):
sqli_getparam_finding_emitted = True
if "Possible SQL Injection. Parameter: [search] Parameter Type: [POSTPARAM]" in e.data["description"]:
sqli_postparam_finding_emitted = True

assert web_parameter_emitted, "WEB_PARAMETER was not emitted"
assert sqli_getparam_finding_emitted, (
"SQLi GETPARAM (converted from POSTPARAM) FINDING not emitted (try_post_as_get failed)"
)
assert not sqli_postparam_finding_emitted, "POSTPARAM FINDING emitted despite disable_post=True"


# try_get_as_post: fuzz GET parameters as POST parameters
class Test_Lightfuzz_try_get_as_post(ModuleTestBase):
targets = ["http://127.0.0.1:8888"]
modules_overrides = ["httpx", "lightfuzz", "excavate"]
config_overrides = {
"interactsh_disable": True,
"modules": {
"lightfuzz": {
"enabled_submodules": ["sqli"],
"try_get_as_post": True,
}
},
}

def request_handler(self, request):
parameter_block = """
<section class=search>
<form action=/ method=GET>
<input type=text placeholder='Search the blog...' name=search>
<button type=submit class=button>Search</button>
</form>
</section>
"""

if request.method == "POST" and "search" in request.form.keys():
value = request.form["search"]

sql_block_normal = f"""
<section class=blog-header>
<h1>0 search results for '{unquote(value)}'</h1>
<hr>
</section>
"""

sql_block_error = """
<section class=error>
<h1>Found error in SQL query</h1>
<hr>
</section>
"""
if value.endswith("'"):
if value.endswith("''"):
return Response(sql_block_normal, status=200)
return Response(sql_block_error, status=500)
return Response(parameter_block, status=200)

async def setup_after_prep(self, module_test):
module_test.scan.modules["lightfuzz"].helpers.rand_string = lambda *args, **kwargs: "AAAAAAAAAAAAAA"
expect_args = re.compile("/")
module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler)

def check(self, module_test, events):
web_parameter_emitted = False
sqli_postparam_converted_finding_emitted = False
for e in events:
if e.type == "WEB_PARAMETER":
if "HTTP Extracted Parameter [search]" in e.data["description"]:
web_parameter_emitted = True

if e.type == "FINDING":
if (
"Possible SQL Injection. Parameter: [search] Parameter Type: [POSTPARAM] (converted from GETPARAM) Detection Method: [Single Quote/Two Single Quote, Code Change (200->500->200)]"
in e.data["description"]
):
sqli_postparam_converted_finding_emitted = True

assert web_parameter_emitted, "WEB_PARAMETER was not emitted"
assert sqli_postparam_converted_finding_emitted, (
"SQLi POSTPARAM (converted from GETPARAM) FINDING not emitted (try_get_as_post failed)"
)
Loading