-
Notifications
You must be signed in to change notification settings - Fork 20
Open
Description
import os
import csv
import dns.resolver
import smtplib
import ssl
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from termcolor import colored, cprint
import pyfiglet
import datetime
import re
import sys
from typing import Dict, List, Optional, Tuple
# Configuration
CONFIG = {
'smtp_server': 'c2480355.ferozo.com',
'smtp_port': 465,
'smtp_user': '[email protected]',
'smtp_password': 'sSZY/Pz8aH',
'test_recipient': '[email protected]', # Change this to your test email
'log_file': 'dmarc_spf_log.csv',
'report_file': 'dmarc_spf_report.html',
'vulnerable_domains_file': 'vulnerable_domains.txt'
}
class DMARCAnalyzer:
"""Main class for DMARC/SPF analysis tool with interactive menu"""
def __init__(self):
self.results = {}
self.vulnerable_domains = set()
self.setup_directories()
def setup_directories(self) -> None:
"""Ensure necessary directories exist"""
os.makedirs('logs', exist_ok=True)
os.makedirs('reports', exist_ok=True)
os.makedirs('output', exist_ok=True)
def clear_screen(self) -> None:
"""Clear the terminal screen"""
os.system('cls' if os.name == 'nt' else 'clear')
def display_banner(self) -> None:
"""Display the program banner"""
self.clear_screen()
banner = pyfiglet.figlet_format("DMARC/SPF Analyzer", font="slant")
print(colored(banner, "cyan", attrs=["bold"]))
print(colored("=" * 80, "blue"))
print(colored("A comprehensive tool for email security analysis", "yellow"))
print(colored("=" * 80, "blue"))
print()
def main_menu(self) -> None:
"""Display the main menu and handle user input"""
while True:
self.display_banner()
print(colored("Main Menu:", "green"))
print("1. Analyze domains from file")
print("2. Analyze domains from email addresses file")
print("3. Analyze domains from SMTP log file")
print("4. View previous scan results")
print("5. Generate HTML report")
print("6. Test email spoofing")
print("7. Exit")
choice = input("\nEnter your choice (1-7): ")
if choice == "1":
self.analyze_domains_file_menu()
elif choice == "2":
self.analyze_from_emails_file_menu()
elif choice == "3":
self.analyze_from_smtp_log_menu()
elif choice == "4":
self.view_results_menu()
elif choice == "5":
self.generate_report_menu()
elif choice == "6":
self.test_spoofing_menu()
elif choice == "7":
print(colored("\nExiting program. Goodbye!\n", "green"))
sys.exit(0)
else:
print(colored("\nInvalid choice. Please try again.", "red"))
input("\nPress Enter to continue...")
def analyze_domains_file_menu(self) -> None:
"""Menu for analyzing domains from a file"""
self.clear_screen()
print(colored("Domain File Analysis", "green", attrs=["bold"]))
print(colored("=" * 50, "blue"))
file_path = input("\nEnter path to file containing domains (one per line): ").strip()
if not os.path.exists(file_path):
print(colored("\nFile not found.", "red"))
input("\nPress Enter to return to main menu...")
return
try:
with open(file_path, 'r') as f:
domains = [line.strip() for line in f if line.strip()]
if not domains:
print(colored("\nNo valid domains found in file.", "red"))
input("\nPress Enter to return to main menu...")
return
print(colored(f"\nFound {len(domains)} domains to analyze:", "yellow"))
print("\n".join(f"• {domain}" for domain in domains[:10])) # Show first 10
if len(domains) > 10:
print(colored(f"\n(Showing first 10 of {len(domains)} domains)", "yellow"))
proceed = input("\nProceed with analysis? (y/n): ").lower()
if proceed == 'y':
self.analyze_domains(domains)
except Exception as e:
print(colored(f"\nError reading file: {e}", "red"))
input("\nPress Enter to return to main menu...")
def analyze_from_emails_file_menu(self) -> None:
"""Menu for analyzing domains extracted from email addresses file"""
self.clear_screen()
print(colored("Email Address File Analysis", "green", attrs=["bold"]))
print(colored("=" * 50, "blue"))
file_path = input("\nEnter path to file containing email addresses (one per line): ").strip()
if not os.path.exists(file_path):
print(colored("\nFile not found.", "red"))
input("\nPress Enter to return to main menu...")
return
try:
with open(file_path, 'r') as f:
emails = [line.strip() for line in f if line.strip()]
if not emails:
print(colored("\nNo valid email addresses found in file.", "red"))
input("\nPress Enter to return to main menu...")
return
domains = self.extract_domains_from_emails(emails)
print(colored(f"\nFound {len(domains)} unique domains to analyze:", "yellow"))
print("\n".join(f"• {domain}" for domain in domains[:10])) # Show first 10
if len(domains) > 10:
print(colored(f"\n(Showing first 10 of {len(domains)} domains)", "yellow"))
proceed = input("\nProceed with analysis? (y/n): ").lower()
if proceed == 'y':
self.analyze_domains(domains)
except Exception as e:
print(colored(f"\nError processing file: {e}", "red"))
input("\nPress Enter to return to main menu...")
def analyze_from_smtp_log_menu(self) -> None:
"""Menu for analyzing domains extracted from SMTP logs"""
self.clear_screen()
print(colored("SMTP Log Analysis", "green", attrs=["bold"]))
print(colored("=" * 50, "blue"))
log_file = input("\nEnter path to SMTP log file: ").strip()
if not os.path.exists(log_file):
print(colored("\nFile not found.", "red"))
input("\nPress Enter to return to main menu...")
return
try:
domains = self.extract_domains_from_smtp_log(log_file)
print(colored(f"\nFound {len(domains)} unique domains to analyze:", "yellow"))
print("\n".join(f"• {domain}" for domain in domains[:10])) # Show first 10
if len(domains) > 10:
print(colored(f"\n(Showing first 10 of {len(domains)} domains)", "yellow"))
proceed = input("\nProceed with analysis? (y/n): ").lower()
if proceed == 'y':
self.analyze_domains(domains)
except Exception as e:
print(colored(f"\nError processing log file: {e}", "red"))
input("\nPress Enter to return to main menu...")
def view_results_menu(self) -> None:
"""Menu for viewing previous scan results"""
self.clear_screen()
print(colored("Previous Scan Results", "green", attrs=["bold"]))
print(colored("=" * 50, "blue"))
if not os.path.exists(CONFIG['log_file']):
print(colored("\nNo previous scan results found.", "yellow"))
input("\nPress Enter to return to main menu...")
return
try:
with open(CONFIG['log_file'], 'r') as f:
reader = csv.reader(f)
rows = list(reader)
if len(rows) <= 1: # Just header
print(colored("\nNo scan results found in log file.", "yellow"))
input("\nPress Enter to return to main menu...")
return
# Display last 10 results
print(colored("\nLast 10 scan results:", "yellow"))
for row in rows[-10:]:
print(f"{row[0]} | {row[1]:<20} | DMARC: {row[2]:<10} | SPF: {'Present' if row[3] else 'Absent'}")
except Exception as e:
print(colored(f"\nError reading log file: {e}", "red"))
input("\nPress Enter to return to main menu...")
def generate_report_menu(self) -> None:
"""Menu for generating HTML reports"""
self.clear_screen()
print(colored("Generate HTML Report", "green", attrs=["bold"]))
print(colored("=" * 50, "blue"))
if not os.path.exists(CONFIG['log_file']):
print(colored("\nNo scan results found to generate report.", "yellow"))
input("\nPress Enter to return to main menu...")
return
try:
self.generate_html_report()
print(colored(f"\nHTML report generated: {CONFIG['report_file']}", "green"))
except Exception as e:
print(colored(f"\nError generating report: {e}", "red"))
input("\nPress Enter to return to main menu...")
def test_spoofing_menu(self) -> None:
"""Menu for testing email spoofing"""
self.clear_screen()
print(colored("Email Spoofing Test", "green", attrs=["bold"]))
print(colored("=" * 50, "blue"))
# Check if we have vulnerable domains from previous scan
if self.vulnerable_domains:
print(colored("\nRecently found vulnerable domains:", "yellow"))
for i, domain in enumerate(self.vulnerable_domains, 1):
print(f"{i}. {domain}")
print(f"{len(self.vulnerable_domains)+1}. Enter custom domain")
choice = input("\nSelect domain to spoof (or enter custom number): ")
try:
choice_num = int(choice)
if 1 <= choice_num <= len(self.vulnerable_domains):
domain = list(self.vulnerable_domains)[choice_num-1]
else:
domain = input("\nEnter domain to spoof (e.g., example.com): ").strip()
except ValueError:
domain = input("\nEnter domain to spoof (e.g., example.com): ").strip()
else:
domain = input("\nEnter domain to spoof (e.g., example.com): ").strip()
if not domain:
print(colored("\nNo domain entered.", "red"))
input("\nPress Enter to return to main menu...")
return
recipient = input(f"\nEnter recipient email [default: {CONFIG['test_recipient']}]: ").strip()
recipient = recipient or CONFIG['test_recipient']
try:
spoofed_sender = f"admin@{domain}"
self.send_spoofed_email(spoofed_sender, recipient)
print(colored(f"\nSpoofed email sent from {spoofed_sender} to {recipient}", "green"))
except Exception as e:
print(colored(f"\nError sending spoofed email: {e}", "red"))
input("\nPress Enter to return to main menu...")
def extract_domains_from_emails(self, emails: List[str]) -> List[str]:
"""Extract unique domains from a list of email addresses"""
domains = set()
email_regex = r'^[a-zA-Z0-9._%+-]+@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})$'
for email in emails:
match = re.match(email_regex, email)
if match:
domains.add(match.group(1).lower())
return sorted(domains)
def extract_domains_from_smtp_log(self, log_file: str) -> List[str]:
"""Extract unique domains from an SMTP log file"""
domains = set()
email_regex = r'[a-zA-Z0-9._%+-]+@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})'
try:
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
matches = re.findall(email_regex, line)
for domain in matches:
domains.add(domain.lower())
except Exception as e:
raise Exception(f"Error processing log file: {e}")
return sorted(domains)
def analyze_domains(self, domains: List[str]) -> None:
"""Analyze a list of domains for DMARC and SPF records"""
self.results = {}
self.vulnerable_domains = set() # Reset vulnerable domains for this scan
print(colored("\nStarting analysis...\n", "yellow"))
for domain in domains:
dmarc_info = self.check_dmarc_policy(domain)
spf_record = self.check_spf_record(domain)
self.log_domain_scan(domain, dmarc_info["policy"], spf_record)
full_dmarc_record = dmarc_info["full_record"] if dmarc_info else None
self.results[domain] = {
"DMARC": dmarc_info["policy"],
"SPF": spf_record,
"Full DMARC": full_dmarc_record,
}
# Display results for this domain
print(colored(f"\nResults for {domain}:", "yellow", attrs=["bold"]))
if dmarc_info["policy"]:
print(colored(f" DMARC Policy: {dmarc_info['policy']}", "green"))
print(colored(f" Full DMARC Record: {full_dmarc_record}", "blue"))
else:
print(colored(" No DMARC record found", "red"))
if spf_record:
print(colored(f" SPF Record: {spf_record}", "blue"))
else:
print(colored(" No SPF record found", "red"))
# Check for vulnerabilities
if not dmarc_info["policy"] or dmarc_info["policy"].lower() in ("none", "null"):
print(colored(" WARNING: Domain is vulnerable to email spoofing!", "red", attrs=["bold"]))
print(colored(" Reason: DMARC policy is missing or not enforced", "red"))
self.vulnerable_domains.add(domain)
print(colored("-" * 50, "blue"))
# Save vulnerable domains to file
if self.vulnerable_domains:
self.save_vulnerable_domains()
def save_vulnerable_domains(self) -> None:
"""Save vulnerable domains to a text file"""
try:
with open(CONFIG['vulnerable_domains_file'], 'w') as f:
f.write("\n".join(sorted(self.vulnerable_domains)))
print(colored(f"\nSaved {len(self.vulnerable_domains)} vulnerable domains to {CONFIG['vulnerable_domains_file']}", "yellow"))
except Exception as e:
print(colored(f"\nError saving vulnerable domains: {e}", "red"))
def check_dmarc_policy(self, domain: str) -> Dict[str, Optional[str]]:
"""Check DMARC policy for a domain"""
try:
answers = dns.resolver.resolve(f"_dmarc.{domain}", "TXT")
for record in answers:
txt_record = str(record).strip('"')
if "v=DMARC1" in txt_record:
policy = None
for part in txt_record.split(";"):
if part.strip().startswith("p="):
policy = part.split("=")[1].strip()
break
return {"policy": policy, "full_record": txt_record}
# If DMARC exists but no policy is found
return {"policy": "DMARC exists, no policy", "full_record": txt_record}
except dns.resolver.NoAnswer:
return {"policy": None, "full_record": None} # No DMARC record found
except dns.resolver.NXDOMAIN:
return {"policy": None, "full_record": None} # Domain doesn't exist
except dns.resolver.Timeout:
print(colored(f"Timeout error for {domain}. DNS resolution took too long.", "red"))
return {"policy": None, "full_record": None}
except Exception as e:
print(colored(f"Error checking DMARC for {domain}: {e}", "red"))
return {"policy": None, "full_record": None}
def check_spf_record(self, domain: str) -> Optional[str]:
"""Check SPF record for a domain"""
try:
answers = dns.resolver.resolve(domain, "TXT")
for record in answers:
txt_record = str(record).strip('"')
if "v=spf1" in txt_record:
return txt_record
return None # No SPF record found
except dns.resolver.NoAnswer:
return None
except dns.resolver.NXDOMAIN:
return None
except dns.resolver.Timeout:
print(colored(f"Timeout error for {domain}. DNS resolution took too long.", "red"))
return None
except Exception as e:
print(colored(f"Error checking SPF for {domain}: {e}", "red"))
return None
def log_domain_scan(self, domain: str, dmarc_policy: Optional[str], spf_record: Optional[str]) -> None:
"""Log domain scan results to CSV file"""
try:
file_exists = os.path.isfile(CONFIG['log_file'])
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(CONFIG['log_file'], 'a', newline='') as file:
writer = csv.writer(file)
if not file_exists:
writer.writerow(["Timestamp", "Domain", "DMARC Policy", "SPF Record"])
writer.writerow([timestamp, domain, dmarc_policy or "None", spf_record or "None"])
except Exception as e:
print(colored(f"Error logging to file: {e}", "red"))
def generate_html_report(self) -> None:
"""Generate an HTML report from the log file"""
if not os.path.exists(CONFIG['log_file']):
print(colored(f"{CONFIG['log_file']} does not exist.", "red"))
return
try:
with open(CONFIG['log_file'], mode="r") as file:
reader = csv.reader(file)
rows = list(reader)
with open(CONFIG['report_file'], "w") as file:
file.write("<html><head><title>DMARC and SPF Log Report</title>")
file.write("<style>")
file.write("body { font-family: Arial, sans-serif; margin: 20px; }")
file.write("h1 { color: #333; }")
file.write("table { width: 100%; border-collapse: collapse; margin-top: 20px; }")
file.write("th, td { padding: 10px; text-align: left; border: 1px solid #ddd; }")
file.write("th { background-color: #f2f2f2; }")
file.write("tr:nth-child(even) { background-color: #f9f9f9; }")
file.write("tr:hover { background-color: #f1f1f1; }")
file.write(".vulnerable { background-color: #ffdddd; }")
file.write("</style>")
file.write("<script>")
file.write("""
function filterTable() {
const table = document.getElementById('logTable');
const rows = table.getElementsByTagName('tr');
const filterInputs = document.querySelectorAll('input[type="text"]');
for (let i = 1; i < rows.length; i++) {
const cells = rows[i].getElementsByTagName('td');
let showRow = true;
for (let j = 0; j < cells.length; j++) {
const filterValue = filterInputs[j].value.toLowerCase();
const cellText = cells[j].innerText.toLowerCase();
if (filterValue && !cellText.includes(filterValue)) {
showRow = false;
break;
}
}
rows[i].style.display = showRow ? '' : 'none';
}
}
""")
file.write("</script>")
file.write("</head><body>")
file.write("<h1>DMARC and SPF Log Report</h1>")
file.write("<p>Generated on: " + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "</p>")
file.write("<table id='logTable'>")
# Write headers with filter inputs
file.write("<tr>")
for header in rows[0]:
file.write(f"<th>{header} <input type='text' onkeyup='filterTable()' placeholder='Filter...'></th>")
file.write("</tr>")
# Write data rows
for row in rows[1:]:
# Highlight vulnerable domains (no DMARC or DMARC policy is none)
row_class = ""
if len(row) >= 3 and (not row[2] or row[2].lower() in ("none", "null")):
row_class = "class='vulnerable'"
file.write(f"<tr {row_class}>")
for column in row:
file.write(f"<td>{column}</td>")
file.write("</tr>")
file.write("</table>")
file.write("</body></html>")
print(colored(f"HTML report generated: {CONFIG['report_file']}", "green"))
except Exception as e:
print(colored(f"Error generating HTML report: {e}", "red"))
def send_spoofed_email(self, spoofed_sender: str, recipient: str) -> None:
"""Send a test spoofed email"""
try:
# Email message content
subject = "DMARC/SPF Test Email"
html_message = f"""\
<html>
<body>
<p>Hello,<br>
<br>
This is a test email to check DMARC/SPF configuration.<br>
<br>
<strong>Details:</strong><br>
- Spoofed sender: {spoofed_sender}<br>
- Actual sender: {CONFIG['smtp_user']}<br>
- Recipient: {recipient}<br>
<br>
Please review the email headers to verify how this message was handled.
</p>
</body>
</html>
"""
message = MIMEMultipart("alternative")
message["Subject"] = subject
message["From"] = spoofed_sender
message["To"] = recipient
# Create plain-text and HTML versions
text_part = MIMEText(
f"This is a test email with spoofed sender {spoofed_sender}.",
"plain"
)
html_part = MIMEText(html_message, "html")
message.attach(text_part)
message.attach(html_part)
# Create secure SSL context
context = ssl.create_default_context()
# Send email
with smtplib.SMTP_SSL(
CONFIG['smtp_server'],
CONFIG['smtp_port'],
context=context
) as server:
server.login(CONFIG['smtp_user'], CONFIG['smtp_password'])
server.sendmail(spoofed_sender, recipient, message.as_string())
print(colored(f"\nTest email sent from {spoofed_sender} to {recipient}", "green"))
except Exception as e:
raise Exception(f"Failed to send spoofed email: {e}")
if __name__ == "__main__":
analyzer = DMARCAnalyzer()
analyzer.main_menu()
Metadata
Metadata
Assignees
Labels
No labels