-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpentest_assistant.py
More file actions
311 lines (251 loc) · 12.6 KB
/
pentest_assistant.py
File metadata and controls
311 lines (251 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
import click
from colorama import Fore, Style, init
from datetime import datetime
from tqdm import tqdm
from modules.recon.subdomain_enum import SubdomainEnumerator # Update import statement
from modules.recon import (
PortScanner,
EmailFinder,
DNSAnalyzer,
WaybackMachine,
ReconFramework,
TechDetector
)
from modules.web import (
HeaderAnalyzer,
SecurityHeaderChecker,
RobotsSitemapChecker,
DirectoryScanner,
BackupFinder,
ScreenshotTaker
)
from modules.vulns import (
SSLAnalyzer,
WAFDetector,
CORSChecker,
HTTPMethodChecker
)
from modules.utils import OutputHandler, config
from modules.utils.recon_analyzer import ReconAnalyzer
from modules.webapp import (
XSSScanner,
SQLiScanner,
CSRFChecker,
FileUploadTester,
AuthTester,
APIScanner
)
init(autoreset=True)
class PentestAssistant:
def __init__(self):
self.output = OutputHandler()
self.total_tasks = 21 # Updated to include web app scanners
def banner(self):
self.output.print_banner()
def progress_bar(self, desc):
"""Create a progress bar for a task"""
return tqdm(
total=100,
desc=f"{Fore.BLUE}{desc}{Style.RESET_ALL}",
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}',
leave=True
)
def footprint(self, target):
"""Main footprinting function"""
self.output.findings = []
print(f"\n{Fore.CYAN}[*] Target: {target}{Style.RESET_ALL}")
print(f"{Fore.CYAN}[*] Scan started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{Style.RESET_ALL}")
start_time = datetime.now()
try:
# Initialize all modules
subdomain_enum = SubdomainEnumerator(target, self.output)
# Add findings attribute to SubdomainEnumerator
subdomain_enum.findings = set()
port_scanner = PortScanner(target, self.output)
dns_analyzer = DNSAnalyzer(target, self.output)
email_finder = EmailFinder(target, self.output)
wayback = WaybackMachine(target, self.output)
tech_detector = TechDetector(target, self.output)
header_analyzer = HeaderAnalyzer(f"https://{target}", self.output)
security_headers = SecurityHeaderChecker(f"https://{target}", self.output)
robots_checker = RobotsSitemapChecker(target, self.output)
dir_scanner = DirectoryScanner(f"https://{target}", self.output)
backup_finder = BackupFinder(f"https://{target}", self.output)
screenshot = ScreenshotTaker(target, self.output)
ssl_analyzer = SSLAnalyzer(target, self.output)
waf_detector = WAFDetector(f"https://{target}", self.output)
cors_checker = CORSChecker(f"https://{target}", self.output)
http_method_checker = HTTPMethodChecker(f"https://{target}", self.output)
recon_framework = ReconFramework(target, self.output)
# Run all modules and store their results
with tqdm(total=self.total_tasks, desc="Overall Progress") as overall_progress:
recon_framework.run_full_recon()
overall_progress.update(1)
subdomain_enum.enumerate()
# Remove the call to get_results()
subdomain_enum.findings.update(subdomain_enum.findings)
overall_progress.update(1)
# Check for subdomains containing "api"
api_subdomains = [sub for sub in subdomain_enum.findings if 'api' in sub]
port_scanner.scan()
overall_progress.update(1)
dns_analyzer.analyze()
overall_progress.update(1)
email_finder.find()
overall_progress.update(1)
with self.progress_bar("Checking Wayback Machine") as pbar:
wayback.check()
pbar.update(100)
overall_progress.update(1)
tech_detector.detect()
overall_progress.update(1)
header_analyzer.analyze()
overall_progress.update(1)
security_headers.check()
overall_progress.update(1)
robots_checker.check()
overall_progress.update(1)
dir_scanner.scan()
overall_progress.update(1)
backup_finder.find()
overall_progress.update(1)
ssl_analyzer.analyze()
overall_progress.update(1)
waf_detector.detect()
overall_progress.update(1)
cors_checker.check()
overall_progress.update(1)
http_method_checker.check()
overall_progress.update(1)
# Combine all findings into a dictionary
all_findings = {
'framework': recon_framework.findings,
'technologies': tech_detector.findings if hasattr(tech_detector, 'findings') else {},
'ports': port_scanner.findings if hasattr(port_scanner, 'findings') else {},
'dns': dns_analyzer.findings if hasattr(dns_analyzer, 'findings') else {},
'headers': header_analyzer.findings if hasattr(header_analyzer, 'findings') else [],
'security_headers': security_headers.findings if hasattr(security_headers, 'findings') else {},
'ssl': ssl_analyzer.findings if hasattr(ssl_analyzer, 'findings') else {},
'subdomains': subdomain_enum.findings if hasattr(subdomain_enum, 'findings') else set(),
'robots': robots_checker.findings if hasattr(robots_checker, 'findings') else [],
'directories': dir_scanner.findings if hasattr(dir_scanner, 'findings') else [],
'backups': backup_finder.findings if hasattr(backup_finder, 'findings') else [],
'cors': cors_checker.findings if hasattr(cors_checker, 'findings') else {},
'http_methods': http_method_checker.findings if hasattr(http_method_checker, 'findings') else {},
'waf': waf_detector.findings if hasattr(waf_detector, 'findings') else {},
'emails': email_finder.findings if hasattr(email_finder, 'findings') else set(),
'wayback': wayback.findings if hasattr(wayback, 'findings') else [],
}
# Analyze findings and plan next steps
analyzer = ReconAnalyzer(all_findings, self.output)
attack_vectors = analyzer.analyze_and_plan()
# Initialize web application scanners with context from recon
self._run_webapp_scans(target, all_findings, api_subdomains)
# Print detailed scan results
self.output.print_scan_results()
# Print attack vectors and recommendations
self._print_attack_surface_analysis(attack_vectors)
# Print scan duration
end_time = datetime.now()
duration = end_time - start_time
print(f"\n{Fore.CYAN}[*] Scan Duration: {duration}{Style.RESET_ALL}")
print(f"{Fore.CYAN}[*] Scan completed at: {end_time.strftime('%Y-%m-%d %H:%M:%S')}{Style.RESET_ALL}")
return attack_vectors
except Exception as e:
self.output.print_error(f"Error during scanning: {str(e)}")
return None
def _run_webapp_scans(self, target, recon_findings, api_subdomains):
"""Run web application scans using recon data"""
try:
self.output.print_section_header("WEB APPLICATION SECURITY TESTING")
# Extract useful information from recon
tech_stack = recon_findings.get('technologies', {})
discovered_endpoints = recon_findings.get('directories', [])
framework_info = recon_findings.get('framework', {})
# Initialize scanners with context
xss_scanner = XSSScanner(target, self.output)
sqli_scanner = SQLiScanner(target, self.output)
csrf_checker = CSRFChecker(target, self.output)
file_upload_tester = FileUploadTester(target, self.output)
auth_tester = AuthTester(target, self.output)
api_scanner = APIScanner(target, self.output)
# Adjust scanning based on technology stack
if 'PHP' in tech_stack:
sqli_scanner.add_php_specific_payloads()
file_upload_tester.add_php_extensions()
elif 'ASP.NET' in tech_stack:
sqli_scanner.add_mssql_payloads()
file_upload_tester.add_aspx_extensions()
# Add discovered endpoints to scanners
for endpoint in discovered_endpoints:
xss_scanner.add_target_endpoint(endpoint)
sqli_scanner.add_target_endpoint(endpoint)
csrf_checker.add_target_endpoint(endpoint)
# Add API subdomains to the API scanner
for api_sub in api_subdomains:
api_scanner.add_target_endpoint(api_sub)
# Run scans with progress bars
with tqdm(total=6, desc="Web Application Scans") as progress:
xss_scanner.scan()
progress.update(1)
sqli_scanner.scan()
progress.update(1)
csrf_checker.check()
progress.update(1)
file_upload_tester.test()
progress.update(1)
auth_tester.test()
progress.update(1)
api_scanner.scan()
progress.update(1)
# Update findings with web application vulnerabilities
self.output.findings.update({
'xss': xss_scanner.findings,
'sqli': sqli_scanner.findings,
'csrf': csrf_checker.findings,
'file_upload': file_upload_tester.findings,
'auth': auth_tester.findings,
'api': api_scanner.findings
})
except Exception as e:
self.output.print_error(f"Error during web application scanning: {str(e)}")
def _print_attack_surface_analysis(self, attack_vectors):
"""Print attack surface analysis results"""
print(f"\n{Fore.CYAN}[*] ATTACK SURFACE ANALYSIS{Style.RESET_ALL}")
print("=" * 50)
if not any(attack_vectors.values()):
print(f"\n{Fore.GREEN}[+] No significant vulnerabilities found{Style.RESET_ALL}")
return
# Print attack vectors by category
for category, vectors in attack_vectors.items():
if vectors:
print(f"\n{Fore.YELLOW}[*] {category.upper()}:{Style.RESET_ALL}")
for vector in sorted(vectors):
print(f"{Fore.RED}[-] {vector}{Style.RESET_ALL}")
# Print recommendations
print(f"\n{Fore.CYAN}[*] RECOMMENDED ACTIONS{Style.RESET_ALL}")
print("=" * 50)
if attack_vectors['web_attacks']:
print(f"\n{Fore.YELLOW}[*] Web Application Security:{Style.RESET_ALL}")
print("1. Run targeted vulnerability scans")
print("2. Perform detailed web application testing")
if attack_vectors['network_attacks']:
print(f"\n{Fore.YELLOW}[*] Network Security:{Style.RESET_ALL}")
print("1. Conduct comprehensive port scanning")
print("2. Test exposed services for misconfigurations")
if attack_vectors['configuration_issues']:
print(f"\n{Fore.YELLOW}[*] Configuration Review:{Style.RESET_ALL}")
print("1. Review and harden security headers")
print("2. Update SSL/TLS configurations")
print("3. Implement proper access controls")
@click.command()
@click.option('--target', '-t', help='Target domain to scan')
def main(target):
assistant = PentestAssistant()
assistant.banner()
if target:
assistant.footprint(target)
else:
print(f"{Fore.RED}[-] Please provide a target domain using --target or -t{Style.RESET_ALL}")
if __name__ == "__main__":
main()