From d25848cc5fefb23474a0949c0aff78ded27c199c Mon Sep 17 00:00:00 2001 From: Paul Pfeister Date: Sat, 4 Oct 2025 23:54:29 -0400 Subject: [PATCH] chore: remote waf fingerprinting base --- pyproject.toml | 1 + sherlock_project/waf_check.py | 84 ++++++++++++++++++++++ tests/mocks/global_waf_detect.yaml | 26 +++++++ tests/test_waf_check.py | 107 +++++++++++++++++++++++++++++ 4 files changed, 218 insertions(+) create mode 100644 sherlock_project/waf_check.py create mode 100644 tests/mocks/global_waf_detect.yaml create mode 100644 tests/test_waf_check.py diff --git a/pyproject.toml b/pyproject.toml index 45dc683d6..b511eb661 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,7 @@ stem = "^1.8.0" pandas = "^2.2.1" openpyxl = "^3.0.10" tomli = "^2.2.1" +pyyaml = "^6.0.3" [tool.poetry.group.dev.dependencies] jsonschema = "^4.0.0" diff --git a/sherlock_project/waf_check.py b/sherlock_project/waf_check.py new file mode 100644 index 000000000..7e96f5fb0 --- /dev/null +++ b/sherlock_project/waf_check.py @@ -0,0 +1,84 @@ +import requests +import yaml + + +NUCLEI_FINGERPRINT_URL: str = "https://raw.githubusercontent.com/projectdiscovery/nuclei-templates/refs/heads/main/http/global-matchers/global-waf-detect.yaml" + +def _check_nuclei_regex(matcher: dict[str,str|list[str]], response: requests.Response) -> bool: + import re + + and_cond: bool = matcher.get('condition', '') == 'and' + + target_text: str + if matcher['part'] == 'body': + target_text = response.text + elif matcher['part'] == 'header': + target_text = str(response.headers) + else: + target_text = response.text + str(response.headers) + + for regex in matcher['regex']: + if re.search(regex, target_text): + if not and_cond: + return True + else: + break + else: + # `and` conditions will cycle, resulting in this default return True + # unless an early failed detection breaks the loop (resulting in False) + return True + return False + +def _check_nuclei_words(matcher: dict[str,str|list[str]], response: requests.Response) -> bool: + and_cond: bool = matcher.get('condition', '') == 'and' + + target_text: str + if matcher['part'] == 'body': + target_text = response.text + elif matcher['part'] == 'header': + target_text = str(response.headers) + else: + target_text = response.text + str(response.headers) + + for word in matcher['words']: + if word in target_text: + if not and_cond: + return True + else: + break + else: + # `and` conditions will cycle, resulting in this default return True + # unless an early failed detection breaks the loop (resulting in False) + return True + return False + +def fetch_nuclei_fingerprints() -> list[dict[str,str|list[str]]] | None: + """Fetch the latest Nuclei WAF fingerprints from the official repository.""" + try: + response = requests.get(NUCLEI_FINGERPRINT_URL, timeout=10) + response.raise_for_status() + raw = yaml.safe_load(response.text) + fingerprints: list[dict[str,str|list[str]]] = raw['http'][0]['matchers'] + return fingerprints + except requests.RequestException as e: + print(f"Error fetching Nuclei fingerprints: {e}") + return None + except yaml.YAMLError as e: + print(f"Error parsing YAML data: {e}") + return None + +def nuclei_check(response: requests.Response, fingerprints: list[dict[str,str|list[str]]]) -> bool: + """Check if the response matches any of the WAF fingerprints. + + Keyword arguments: + response -- The HTTP response to check. + fingerprints -- The list of Nuclei WAF fingerprints to check against. + + Returns True if a WAF is detected, False otherwise. + """ + for matcher in fingerprints: + if matcher['type'] == 'word': + return _check_nuclei_words(matcher, response) + elif matcher['type'] == 'regex': + return _check_nuclei_regex(matcher, response) + return False diff --git a/tests/mocks/global_waf_detect.yaml b/tests/mocks/global_waf_detect.yaml new file mode 100644 index 000000000..1a0a66de7 --- /dev/null +++ b/tests/mocks/global_waf_detect.yaml @@ -0,0 +1,26 @@ +id: global-waf-detect +http: + - global-matchers: true + matchers-condition: or + matchers: + - type: regex + name: regexSite + regex: + - '(?i)access.to.this.page.has.been.denied' + - '(?i)http(s)?://(www.)?anotheroneblocked.\w+.whywasiblocked' + condition: or + part: response + + - type: word + name: wordSiteBody + part: body + words: + - "bad_text_in_body" + + - type: word + name: wordSiteHead + part: header + condition: or + words: + - "text_in_head" + - "other_in_head" diff --git a/tests/test_waf_check.py b/tests/test_waf_check.py new file mode 100644 index 000000000..1a565d2ad --- /dev/null +++ b/tests/test_waf_check.py @@ -0,0 +1,107 @@ +import os +import unittest +from unittest.mock import patch, Mock +import requests +from requests.structures import CaseInsensitiveDict +import yaml + +from sherlock_project import waf_check + + +TEMPLATE_BODY_PATH: str = os.path.join(os.path.dirname(__file__), 'mocks', 'global_waf_detect.yaml') + +def side_effect(url, **kwargs) -> Mock: + if url == waf_check.NUCLEI_FINGERPRINT_URL: + with open(TEMPLATE_BODY_PATH, 'r', encoding='utf-8') as file: + template_body: str = file.read() + mock_response: Mock = Mock() + mock_response.status_code = 200 + mock_response.text = template_body + return mock_response + raise RuntimeError("Unexpected URL") + +class TestWafCheck(unittest.TestCase): + + @patch('sherlock_project.waf_check.requests.get') + def test_fetch_nuclei_fingerprints(self, mock_requests_get): # type: ignore + mock_requests_get.side_effect = side_effect + + result = waf_check.fetch_nuclei_fingerprints() + + with open(TEMPLATE_BODY_PATH, 'r', encoding='utf-8') as file: + template_body: str = file.read() + + expected: list[dict[str, str | list[str]]] = yaml.safe_load(template_body)['http'][0]['matchers'] + self.assertEqual(result, expected) + + def test_nuclei_regex_check(self): + mock_res: requests.Response = requests.Response() + mock_res.status_code = 200 + mock_res._content = b"This is a test response with Test-Regex in the body." + mock_res.headers = CaseInsensitiveDict({ + 'Content-Type': 'text/html', + 'Server': 'TestServer' + }) + matcher: dict[str, str | list[str]] = { + 'type': 'regex', + 'name': 'test-regex', + 'part': 'body', + 'regex': [r'(?i)not-present'], + 'condition': 'or' + } + self.assertFalse(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage] + + matcher['regex'] = [r'(?i)TeSt-REgEx'] + self.assertTrue(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage] + + matcher['regex'] = [r'(?i)TeSt-REgEx', r'(?i)Not-Present'] + self.assertTrue(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage] + + matcher['condition'] = 'and' + self.assertFalse(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage] + + matcher['part'] = 'header' + matcher['regex'] = [r'(?i)testserver'] + self.assertTrue(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage] + + matcher['part'] = 'response' + self.assertTrue(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage] + + matcher['regex'] = [r'(?i)not-present'] + self.assertFalse(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage] + + def test_nuclei_words_check(self): + mock_res: requests.Response = requests.Response() + mock_res.status_code = 200 + mock_res._content = b"This is a test response with test-words in the body." + mock_res.headers = CaseInsensitiveDict({ + 'Content-Type': 'text/html', + 'Server': 'TestServer' + }) + matcher: dict[str, str | list[str]] = { + 'type': 'word', + 'name': 'test-word', + 'part': 'body', + 'words': ['not-present'], + 'condition': 'or' + } + self.assertFalse(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage] + + matcher['words'] = ['test-word'] + self.assertTrue(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage] + + matcher['words'] = ['test-word', 'Not-Present'] + self.assertTrue(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage] + + matcher['condition'] = 'and' + self.assertFalse(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage] + + matcher['part'] = 'header' + matcher['words'] = ['testserver'] + self.assertFalse(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage] + + matcher['words'] = ['TestServer'] + self.assertTrue(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage] + + matcher['part'] = 'response' + self.assertTrue(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]