Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 86 additions & 28 deletions pymisp/tools/emailobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,37 +373,95 @@ def __add_emails(self, typ: str, data: str, insert_display_names: bool = True) -
# email object doesn't support display name for all email addrs
pass

def extract_matches(self, pattern: re.Pattern[str], text: str) -> list[tuple[str, ...]]:
"""Returns all regex matches for a given pattern in a text."""
return re.findall(pattern, text)

def add_ip_attribute(self, ip_candidate: str, received: str, seen_attributes: set[tuple[str, str]]) -> None:
"""Validates and adds an IP address to MISP if it's public and not already seen during extraction."""
try:
ip = ipaddress.ip_address(ip_candidate)
if not ip.is_private and ("received-header-ip", ip_candidate) not in seen_attributes:
self.add_attribute("received-header-ip", ip_candidate, comment=received)
seen_attributes.add(("received-header-ip", ip_candidate))
except ValueError:
pass # Invalid IPs are ignored

def add_hostname_attribute(self, hostname: str, received: str, seen_attributes: set[tuple[str, str]]) -> None:
"""Validates and adds a hostname to MISP if it contains a valid TLD-like format and is not already seen."""
if "." in hostname and not hostname.endswith(".") and len(hostname.split(".")[-1]) > 1:
if ("received-header-hostname", hostname) not in seen_attributes:
self.add_attribute("received-header-hostname", hostname, comment=received)
seen_attributes.add(("received-header-hostname", hostname))

def process_received_header(self, received: str, seen_attributes: set[tuple[str, str]]) -> None:
"""Processes a single 'Received' header and extracts hostnames and IPs."""

# Regex patterns
received_from_regex = re.compile(
r'from\s+([\w.-]+)' # Declared sending hostname
r'(?:\s+\(([^)]+)\))?' # Reverse DNS hostname inside parentheses
)
ipv4_regex = re.compile(
r'\[(?P<ipv4_brackets>(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9]))\]' # IPv4 inside []
r'|\((?P<ipv4_parentheses>(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9]))\)' # IPv4 inside ()
r'|(?<=\.\s)(?P<ipv4_after_domain>(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9]))\b' # IPv4 appearing after a domain.
)
ipv6_regex = re.compile(
r'\b(?:[a-fA-F0-9]{1,4}:[a-fA-F0-9]{1,4}(?::[a-fA-F0-9]{1,4}){0,6})\b'
)

# Extract hostnames
matches = self.extract_matches(received_from_regex, received)
for match in matches:
declared_sending_host = match[0].strip() if match[0] else None
reverse_dns_host = match[1].split()[0].strip("[]()").rstrip('.') if match[1] else None

if declared_sending_host:
clean_host = declared_sending_host.strip("[]()")
try:
ipaddress.ip_address(declared_sending_host)
self.add_ip_attribute(declared_sending_host, received, seen_attributes)
except ValueError:
self.add_hostname_attribute(declared_sending_host, received, seen_attributes)

if reverse_dns_host:
try:
ipaddress.ip_address(reverse_dns_host)
self.add_ip_attribute(reverse_dns_host, received, seen_attributes)
except ValueError:
self.add_hostname_attribute(reverse_dns_host, received, seen_attributes)

# Extract and add **only valid** IPv4 addresses
for ipv4_match in self.extract_matches(ipv4_regex, received):
ip_candidate = ipv4_match[0] or ipv4_match[1] or ipv4_match[2] # Select first non-empty match
if ip_candidate:
self.add_ip_attribute(ip_candidate, received, seen_attributes)

# Extract and add IPv6 addresses
for ipv6_match in self.extract_matches(ipv6_regex, received):
self.add_ip_attribute(ipv6_match, received, seen_attributes)

def __generate_received(self) -> None:
"""
Extract IP addresses from received headers that are not private. Also extract hostnames or domains.
Extracts public IP addresses and hostnames from "Received" email headers.
"""
received_items = self.email.get_all("received")
if received_items is None:
return
for received in received_items:
fromstr = re.split(r"\sby\s", received)[0].strip()
if fromstr.startswith('from') is not True:
continue
for i in ['(', ')', '[', ']']:
fromstr = fromstr.replace(i, " ")
tokens = fromstr.split(" ")
ip = None
for token in tokens:
try:
ip = ipaddress.ip_address(token)
break
except ValueError:
pass # token is not IP address

if not ip or ip.is_private:
continue # skip header if IP not found or is private
received_items = self.email.get_all("Received")
if not received_items:
return

self.add_attribute("received-header-ip", value=str(ip), comment=fromstr)
# Track added attributes to prevent duplicates (store as (type, value) tuples)
seen_attributes: set[tuple[str, str]] = set()

# The hostnames and/or domains always come after the "Received: from"
# part so we can use regex to pick up those attributes.
received_from = re.findall(r'(?<=from\s)[\w\d\.\-]+\.\w{2,24}', str(received_items))
try:
[self.add_attribute("received-header-hostname", i) for i in received_from]
except Exception:
pass
for received in received_items:
self.process_received_header(received, seen_attributes)