Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ stem = "^1.8.0"
pandas = "^2.2.1"
openpyxl = "^3.0.10"
tomli = "^2.2.1"
pyyaml = "^6.0.3"

[tool.poetry.group.dev.dependencies]
jsonschema = "^4.0.0"
Expand Down
84 changes: 84 additions & 0 deletions sherlock_project/waf_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import requests
import yaml


NUCLEI_FINGERPRINT_URL: str = "https://raw.githubusercontent.com/projectdiscovery/nuclei-templates/refs/heads/main/http/global-matchers/global-waf-detect.yaml"

def _check_nuclei_regex(matcher: dict[str,str|list[str]], response: requests.Response) -> bool:
import re

and_cond: bool = matcher.get('condition', '') == 'and'

target_text: str
if matcher['part'] == 'body':
target_text = response.text
elif matcher['part'] == 'header':
target_text = str(response.headers)
else:
target_text = response.text + str(response.headers)

for regex in matcher['regex']:
if re.search(regex, target_text):
if not and_cond:
return True
else:
break
else:
# `and` conditions will cycle, resulting in this default return True
# unless an early failed detection breaks the loop (resulting in False)
return True
return False

def _check_nuclei_words(matcher: dict[str,str|list[str]], response: requests.Response) -> bool:
and_cond: bool = matcher.get('condition', '') == 'and'

target_text: str
if matcher['part'] == 'body':
target_text = response.text
elif matcher['part'] == 'header':
target_text = str(response.headers)
else:
target_text = response.text + str(response.headers)

for word in matcher['words']:
if word in target_text:
if not and_cond:
return True
else:
break
else:
# `and` conditions will cycle, resulting in this default return True
# unless an early failed detection breaks the loop (resulting in False)
return True
return False

def fetch_nuclei_fingerprints() -> list[dict[str,str|list[str]]] | None:
"""Fetch the latest Nuclei WAF fingerprints from the official repository."""
try:
response = requests.get(NUCLEI_FINGERPRINT_URL, timeout=10)
response.raise_for_status()
raw = yaml.safe_load(response.text)
fingerprints: list[dict[str,str|list[str]]] = raw['http'][0]['matchers']
return fingerprints
except requests.RequestException as e:
print(f"Error fetching Nuclei fingerprints: {e}")
return None
except yaml.YAMLError as e:
print(f"Error parsing YAML data: {e}")
return None

def nuclei_check(response: requests.Response, fingerprints: list[dict[str,str|list[str]]]) -> bool:
"""Check if the response matches any of the WAF fingerprints.

Keyword arguments:
response -- The HTTP response to check.
fingerprints -- The list of Nuclei WAF fingerprints to check against.

Returns True if a WAF is detected, False otherwise.
"""
for matcher in fingerprints:
if matcher['type'] == 'word':
return _check_nuclei_words(matcher, response)
elif matcher['type'] == 'regex':
return _check_nuclei_regex(matcher, response)
return False
26 changes: 26 additions & 0 deletions tests/mocks/global_waf_detect.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
id: global-waf-detect
http:
- global-matchers: true
matchers-condition: or
matchers:
- type: regex
name: regexSite
regex:
- '(?i)access.to.this.page.has.been.denied'
- '(?i)http(s)?://(www.)?anotheroneblocked.\w+.whywasiblocked'
condition: or
part: response

- type: word
name: wordSiteBody
part: body
words:
- "bad_text_in_body"

- type: word
name: wordSiteHead
part: header
condition: or
words:
- "text_in_head"
- "other_in_head"
107 changes: 107 additions & 0 deletions tests/test_waf_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import os
import unittest
from unittest.mock import patch, Mock
import requests
from requests.structures import CaseInsensitiveDict
import yaml

from sherlock_project import waf_check


TEMPLATE_BODY_PATH: str = os.path.join(os.path.dirname(__file__), 'mocks', 'global_waf_detect.yaml')

def side_effect(url, **kwargs) -> Mock:
if url == waf_check.NUCLEI_FINGERPRINT_URL:
with open(TEMPLATE_BODY_PATH, 'r', encoding='utf-8') as file:
template_body: str = file.read()
mock_response: Mock = Mock()
mock_response.status_code = 200
mock_response.text = template_body
return mock_response
raise RuntimeError("Unexpected URL")

class TestWafCheck(unittest.TestCase):

@patch('sherlock_project.waf_check.requests.get')
def test_fetch_nuclei_fingerprints(self, mock_requests_get): # type: ignore
mock_requests_get.side_effect = side_effect

result = waf_check.fetch_nuclei_fingerprints()

with open(TEMPLATE_BODY_PATH, 'r', encoding='utf-8') as file:
template_body: str = file.read()

expected: list[dict[str, str | list[str]]] = yaml.safe_load(template_body)['http'][0]['matchers']
self.assertEqual(result, expected)

def test_nuclei_regex_check(self):
mock_res: requests.Response = requests.Response()
mock_res.status_code = 200
mock_res._content = b"This is a test response with Test-Regex in the body."
mock_res.headers = CaseInsensitiveDict({
'Content-Type': 'text/html',
'Server': 'TestServer'
})
matcher: dict[str, str | list[str]] = {
'type': 'regex',
'name': 'test-regex',
'part': 'body',
'regex': [r'(?i)not-present'],
'condition': 'or'
}
self.assertFalse(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]

matcher['regex'] = [r'(?i)TeSt-REgEx']
self.assertTrue(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]

matcher['regex'] = [r'(?i)TeSt-REgEx', r'(?i)Not-Present']
self.assertTrue(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]

matcher['condition'] = 'and'
self.assertFalse(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]

matcher['part'] = 'header'
matcher['regex'] = [r'(?i)testserver']
self.assertTrue(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]

matcher['part'] = 'response'
self.assertTrue(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]

matcher['regex'] = [r'(?i)not-present']
self.assertFalse(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]

def test_nuclei_words_check(self):
mock_res: requests.Response = requests.Response()
mock_res.status_code = 200
mock_res._content = b"This is a test response with test-words in the body."
mock_res.headers = CaseInsensitiveDict({
'Content-Type': 'text/html',
'Server': 'TestServer'
})
matcher: dict[str, str | list[str]] = {
'type': 'word',
'name': 'test-word',
'part': 'body',
'words': ['not-present'],
'condition': 'or'
}
self.assertFalse(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]

matcher['words'] = ['test-word']
self.assertTrue(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]

matcher['words'] = ['test-word', 'Not-Present']
self.assertTrue(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]

matcher['condition'] = 'and'
self.assertFalse(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]

matcher['part'] = 'header'
matcher['words'] = ['testserver']
self.assertFalse(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]

matcher['words'] = ['TestServer']
self.assertTrue(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]

matcher['part'] = 'response'
self.assertTrue(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]