Skip to content

Commit fe84fbb

Browse files
committed
Add debugging tools for WAF configuration and anomaly threshold testing
- Implemented debug_test_results.py to evaluate WAF test results with detailed request/response logging. - Created debug_waf.go for logging request details and dumping WAF rules to a file. - Developed debug_waf.py to extract WAF configuration from Caddy Admin API and test WAF rules with sample requests. - Added sample_rules.json containing test rules for WAF evaluation. - Configured test.caddyfile for local testing of WAF with defined rules and logging. - Enhanced test_anomalythreshold.py to validate anomaly threshold behavior with comprehensive test cases and detailed output.
1 parent 533020d commit fe84fbb

14 files changed

+1285
-84
lines changed

caddywaf.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,14 @@ func (m *Middleware) Provision(ctx caddy.Context) error {
123123
zap.Int("anomaly_threshold", m.AnomalyThreshold),
124124
)
125125

126+
// ADDED: Set default anomaly threshold if not provided or invalid
127+
if m.AnomalyThreshold <= 0 {
128+
m.AnomalyThreshold = 20 // Use a reasonable default value
129+
m.logger.Info("Using default anomaly threshold", zap.Int("anomaly_threshold", m.AnomalyThreshold))
130+
} else {
131+
m.logger.Info("Using configured anomaly threshold", zap.Int("anomaly_threshold", m.AnomalyThreshold))
132+
}
133+
126134
// Start the asynchronous logging worker
127135
m.StartLogWorker()
128136

check_waf_config.py

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
#!/usr/bin/env python3
2+
3+
import requests
4+
import json
5+
import sys
6+
import re
7+
import argparse
8+
from termcolor import colored
9+
10+
def setup_args():
11+
parser = argparse.ArgumentParser(description='Check WAF configuration for testing')
12+
parser.add_argument('--url', default='http://localhost:8080', help='URL to test (default: http://localhost:8080)')
13+
parser.add_argument('--config-endpoint', default='', help='Endpoint for accessing WAF configuration (if available)')
14+
parser.add_argument('--rules-file', default='sample_rules.json', help='Path to rules file (default: sample_rules.json)')
15+
return parser.parse_args()
16+
17+
def load_rules_from_file(file_path):
18+
"""Load rules from a JSON file, handling comments if present."""
19+
try:
20+
# Read the file content
21+
with open(file_path, 'r') as f:
22+
content = f.read()
23+
24+
# Remove JavaScript-style comments if present
25+
content = re.sub(r'//.*?\n', '\n', content) # Remove single-line comments
26+
content = re.sub(r'/\*.*?\*/', '', content, flags=re.DOTALL) # Remove multi-line comments
27+
28+
# Parse JSON
29+
rules = json.loads(content)
30+
print(colored(f"Loaded {len(rules)} rules from {file_path}", "green"))
31+
return rules
32+
except json.JSONDecodeError as e:
33+
print(colored(f"Error parsing JSON from {file_path}: {str(e)}", "red"))
34+
print(colored("Make sure the file is valid JSON. JavaScript-style comments are stripped automatically.", "yellow"))
35+
return []
36+
except Exception as e:
37+
print(colored(f"Error loading rules from {file_path}: {str(e)}", "red"))
38+
return []
39+
40+
def check_rule_coverage(rules, threshold=5):
41+
"""Check if rules cover all test cases needed for anomaly threshold test."""
42+
required_tests = {
43+
"low_score_test": False,
44+
"param1_score2": False,
45+
"param2_score2": False,
46+
"param1_score3": False,
47+
"param2_score3": False,
48+
"block_true": False,
49+
"increment_score1": False,
50+
"increment_score2": False,
51+
"increment_score3": False
52+
}
53+
54+
# Store rule scores for tests
55+
rule_scores = {
56+
"low_score_test": 0,
57+
"param1_score2": 0,
58+
"param2_score2": 0,
59+
"param1_score3": 0,
60+
"param2_score3": 0,
61+
"increment_score1": 0,
62+
"increment_score2": 0,
63+
"increment_score3": 0
64+
}
65+
66+
block_rule_mode = None
67+
68+
for rule in rules:
69+
# Check for low score test rule
70+
if 'targets' in rule and 'URL_PARAM:test' in rule['targets'] and 'pattern' in rule and 'low_score_test' in rule['pattern']:
71+
required_tests["low_score_test"] = True
72+
print(colored(f"✓ Found rule for test=low_score_test (ID: {rule.get('id', 'unknown')})", "green"))
73+
if 'score' in rule:
74+
rule_scores["low_score_test"] = rule.get('score', 0)
75+
print(colored(f" Score: {rule['score']}", "yellow"))
76+
77+
# Check for param1 score2
78+
if 'targets' in rule and 'URL_PARAM:param1' in rule['targets'] and 'pattern' in rule and 'score2' in rule['pattern']:
79+
required_tests["param1_score2"] = True
80+
print(colored(f"✓ Found rule for param1=score2 (ID: {rule.get('id', 'unknown')})", "green"))
81+
if 'score' in rule:
82+
rule_scores["param1_score2"] = rule.get('score', 0)
83+
print(colored(f" Score: {rule['score']}", "yellow"))
84+
85+
# Check for param2 score2
86+
if 'targets' in rule and 'URL_PARAM:param2' in rule['targets'] and 'pattern' in rule and 'score2' in rule['pattern']:
87+
required_tests["param2_score2"] = True
88+
print(colored(f"✓ Found rule for param2=score2 (ID: {rule.get('id', 'unknown')})", "green"))
89+
if 'score' in rule:
90+
rule_scores["param2_score2"] = rule.get('score', 0)
91+
print(colored(f" Score: {rule['score']}", "yellow"))
92+
93+
# Check for param1 score3
94+
if 'targets' in rule and 'URL_PARAM:param1' in rule['targets'] and 'pattern' in rule and 'score3' in rule['pattern']:
95+
required_tests["param1_score3"] = True
96+
print(colored(f"✓ Found rule for param1=score3 (ID: {rule.get('id', 'unknown')})", "green"))
97+
if 'score' in rule:
98+
rule_scores["param1_score3"] = rule.get('score', 0)
99+
print(colored(f" Score: {rule['score']}", "yellow"))
100+
101+
# Check for param2 score3
102+
if 'targets' in rule and 'URL_PARAM:param2' in rule['targets'] and 'pattern' in rule and 'score3' in rule['pattern']:
103+
required_tests["param2_score3"] = True
104+
print(colored(f"✓ Found rule for param2=score3 (ID: {rule.get('id', 'unknown')})", "green"))
105+
if 'score' in rule:
106+
rule_scores["param2_score3"] = rule.get('score', 0)
107+
print(colored(f" Score: {rule['score']}", "yellow"))
108+
109+
# Check for block action
110+
if 'targets' in rule and 'URL_PARAM:block' in rule['targets'] and 'pattern' in rule and 'true' in rule['pattern']:
111+
required_tests["block_true"] = True
112+
block_rule_mode = rule.get('mode', 'unknown')
113+
print(colored(f"✓ Found rule for block=true (ID: {rule.get('id', 'unknown')})", "green"))
114+
print(colored(f" Action: {block_rule_mode}", "yellow"))
115+
if block_rule_mode != 'block':
116+
print(colored(" WARNING: This rule should have mode='block'", "red"))
117+
118+
# Check for increment score rules
119+
if 'targets' in rule and 'URL_PARAM:increment' in rule['targets']:
120+
if 'pattern' in rule and 'score1' in rule['pattern']:
121+
required_tests["increment_score1"] = True
122+
rule_scores["increment_score1"] = rule.get('score', 0)
123+
print(colored(f"✓ Found rule for increment=score1 (ID: {rule.get('id', 'unknown')})", "green"))
124+
if 'score' in rule:
125+
print(colored(f" Score: {rule['score']}", "yellow"))
126+
127+
if 'pattern' in rule and 'score2' in rule['pattern']:
128+
required_tests["increment_score2"] = True
129+
rule_scores["increment_score2"] = rule.get('score', 0)
130+
print(colored(f"✓ Found rule for increment=score2 (ID: {rule.get('id', 'unknown')})", "green"))
131+
if 'score' in rule:
132+
print(colored(f" Score: {rule['score']}", "yellow"))
133+
134+
if 'pattern' in rule and 'score3' in rule['pattern']:
135+
required_tests["increment_score3"] = True
136+
rule_scores["increment_score3"] = rule.get('score', 0)
137+
print(colored(f"✓ Found rule for increment=score3 (ID: {rule.get('id', 'unknown')})", "green"))
138+
if 'score' in rule:
139+
print(colored(f" Score: {rule['score']}", "yellow"))
140+
141+
# Check test coverage
142+
missing_tests = [test.replace('_', '=') for test, found in required_tests.items() if not found]
143+
if missing_tests:
144+
print(colored(f"\n⚠ Missing rules for: {', '.join(missing_tests)}", "red"))
145+
else:
146+
print(colored("\n✓ All required test rules are present!", "green"))
147+
148+
# Validate expected scores for key test combinations
149+
print(colored("\nCalculated Scores for Key Test Combinations:", "cyan"))
150+
151+
# Test 2: Below threshold
152+
test2_score = rule_scores["param1_score2"] + rule_scores["param2_score2"]
153+
test2_should_block = test2_score >= threshold
154+
155+
if required_tests["param1_score2"] and required_tests["param2_score2"]:
156+
print(colored(f"Test 2 - param1=score2&param2=score2: Score = {test2_score}", "yellow"))
157+
print(colored(f" Threshold: {threshold}, Should Block: {'Yes' if test2_should_block else 'No'}",
158+
"red" if test2_should_block else "green"))
159+
if test2_should_block:
160+
print(colored(" WARNING: This test should pass (not block) but the score may trigger blocking", "red"))
161+
else:
162+
print(colored("Test 2 - param1=score2&param2=score2: Cannot calculate - missing rules", "red"))
163+
164+
# Test 3: Exceeds threshold
165+
test3_score = rule_scores["param1_score3"] + rule_scores["param2_score3"]
166+
test3_should_block = test3_score >= threshold
167+
168+
if required_tests["param1_score3"] and required_tests["param2_score3"]:
169+
print(colored(f"Test 3 - param1=score3&param2=score3: Score = {test3_score}", "yellow"))
170+
print(colored(f" Threshold: {threshold}, Should Block: {'Yes' if test3_should_block else 'No'}",
171+
"green" if test3_should_block else "red"))
172+
if not test3_should_block:
173+
print(colored(" WARNING: This test should be blocked but the score is below threshold", "red"))
174+
else:
175+
print(colored("Test 3 - param1=score3&param2=score3: Cannot calculate - missing rules", "red"))
176+
177+
# Test 4: Block action
178+
if required_tests["block_true"]:
179+
block_should_work = block_rule_mode == 'block'
180+
print(colored(f"Test 4 - block=true: Mode = {block_rule_mode}", "yellow"))
181+
print(colored(f" Should Block: {'Yes' if block_should_work else 'No'}",
182+
"green" if block_should_work else "red"))
183+
if not block_should_work:
184+
print(colored(" WARNING: This rule should have mode='block' to properly test blocking", "red"))
185+
else:
186+
print(colored("Test 4 - block=true: Cannot evaluate - missing rule", "red"))
187+
188+
return required_tests, missing_tests, {
189+
"test2_score": test2_score if required_tests["param1_score2"] and required_tests["param2_score2"] else None,
190+
"test3_score": test3_score if required_tests["param1_score3"] and required_tests["param2_score3"] else None,
191+
"test2_should_block": test2_should_block if required_tests["param1_score2"] and required_tests["param2_score2"] else None,
192+
"test3_should_block": test3_should_block if required_tests["param1_score3"] and required_tests["param2_score3"] else None,
193+
"block_should_work": block_rule_mode == 'block' if required_tests["block_true"] else None
194+
}
195+
196+
def check_waf_active(url):
197+
"""Check if the WAF is active by attempting to trigger a basic rule."""
198+
block_payload = {'block': 'true'}
199+
200+
try:
201+
print(colored(f"\nSending test request to {url} with block=true", "blue"))
202+
response = requests.get(url, params=block_payload, timeout=5)
203+
204+
if response.status_code == 403:
205+
print(colored("✓ WAF appears to be active (blocked request as expected)", "green"))
206+
return True
207+
else:
208+
print(colored(f"⚠ WAF might not be active - received status {response.status_code} instead of 403", "red"))
209+
print(colored("Check your WAF configuration and make sure blocking is enabled", "yellow"))
210+
return False
211+
except requests.exceptions.RequestException as e:
212+
print(colored(f"Error checking WAF: {str(e)}", "red"))
213+
return False
214+
215+
def main():
216+
args = setup_args()
217+
base_url = args.url
218+
rules_file = args.rules_file
219+
220+
print(colored("WAF Configuration Checker", "cyan"))
221+
print(colored(f"Target URL: {base_url}", "yellow"))
222+
print(colored(f"Rules file: {rules_file}", "yellow"))
223+
224+
# Check server connectivity
225+
try:
226+
response = requests.get(base_url, timeout=2)
227+
print(colored(f"✓ Server is reachable at {base_url}", "green"))
228+
except requests.exceptions.RequestException:
229+
print(colored(f"⚠ Cannot reach server at {base_url}", "red"))
230+
print(colored("Make sure Caddy is running with your WAF configuration.", "yellow"))
231+
sys.exit(1)
232+
233+
# Load and check rules
234+
rules = load_rules_from_file(rules_file)
235+
if rules:
236+
required_tests, missing_tests, test_scores = check_rule_coverage(rules)
237+
238+
print(colored("\nExpected Test Results Based on Rules:", "cyan"))
239+
if test_scores["test2_should_block"] is not None:
240+
status = "FAIL (should block)" if test_scores["test2_should_block"] else "PASS (should allow)"
241+
color = "red" if test_scores["test2_should_block"] else "green"
242+
print(colored(f"Test 2 (Below threshold): {status}", color))
243+
244+
if test_scores["test3_should_block"] is not None:
245+
status = "PASS (should block)" if test_scores["test3_should_block"] else "FAIL (should allow)"
246+
color = "green" if test_scores["test3_should_block"] else "red"
247+
print(colored(f"Test 3 (Exceed threshold): {status}", color))
248+
249+
if test_scores["block_should_work"] is not None:
250+
status = "PASS (should block)" if test_scores["block_should_work"] else "FAIL (won't block)"
251+
color = "green" if test_scores["block_should_work"] else "red"
252+
print(colored(f"Test 4 (Block action): {status}", color))
253+
254+
# Only check WAF if we have the necessary rules
255+
if required_tests["block_true"]:
256+
print(colored("\nVerifying WAF is active...", "cyan"))
257+
check_waf_active(base_url)
258+
259+
# Provide recommendations
260+
if missing_tests:
261+
print(colored("\nRecommendations:", "cyan"))
262+
print(colored("Add the missing rules to your configuration to run all tests successfully.", "yellow"))
263+
264+
print(colored("\nConfiguration check complete.", "cyan"))
265+
else:
266+
print(colored("\nCould not load rules for verification.", "red"))
267+
268+
if __name__ == "__main__":
269+
main()

0 commit comments

Comments
 (0)