diff --git a/pymisp/tools/emailobject.py b/pymisp/tools/emailobject.py index fff56c5e..bd9d2b12 100644 --- a/pymisp/tools/emailobject.py +++ b/pymisp/tools/emailobject.py @@ -373,37 +373,95 @@ def __add_emails(self, typ: str, data: str, insert_display_names: bool = True) - # email object doesn't support display name for all email addrs pass + def extract_matches(self, pattern: re.Pattern[str], text: str) -> list[tuple[str, ...]]: + """Returns all regex matches for a given pattern in a text.""" + return re.findall(pattern, text) + + def add_ip_attribute(self, ip_candidate: str, received: str, seen_attributes: set[tuple[str, str]]) -> None: + """Validates and adds an IP address to MISP if it's public and not already seen during extraction.""" + try: + ip = ipaddress.ip_address(ip_candidate) + if not ip.is_private and ("received-header-ip", ip_candidate) not in seen_attributes: + self.add_attribute("received-header-ip", ip_candidate, comment=received) + seen_attributes.add(("received-header-ip", ip_candidate)) + except ValueError: + pass # Invalid IPs are ignored + + def add_hostname_attribute(self, hostname: str, received: str, seen_attributes: set[tuple[str, str]]) -> None: + """Validates and adds a hostname to MISP if it contains a valid TLD-like format and is not already seen.""" + if "." in hostname and not hostname.endswith(".") and len(hostname.split(".")[-1]) > 1: + if ("received-header-hostname", hostname) not in seen_attributes: + self.add_attribute("received-header-hostname", hostname, comment=received) + seen_attributes.add(("received-header-hostname", hostname)) + + def process_received_header(self, received: str, seen_attributes: set[tuple[str, str]]) -> None: + """Processes a single 'Received' header and extracts hostnames and IPs.""" + + # Regex patterns + received_from_regex = re.compile( + r'from\s+([\w.-]+)' # Declared sending hostname + r'(?:\s+\(([^)]+)\))?' # Reverse DNS hostname inside parentheses + ) + ipv4_regex = re.compile( + r'\[(?P(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.' + r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.' + r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.' + r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9]))\]' # IPv4 inside [] + r'|\((?P(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.' + r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.' + r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.' + r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9]))\)' # IPv4 inside () + r'|(?<=\.\s)(?P(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.' + r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.' + r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.' + r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9]))\b' # IPv4 appearing after a domain. + ) + ipv6_regex = re.compile( + r'\b(?:[a-fA-F0-9]{1,4}:[a-fA-F0-9]{1,4}(?::[a-fA-F0-9]{1,4}){0,6})\b' + ) + + # Extract hostnames + matches = self.extract_matches(received_from_regex, received) + for match in matches: + declared_sending_host = match[0].strip() if match[0] else None + reverse_dns_host = match[1].split()[0].strip("[]()").rstrip('.') if match[1] else None + + if declared_sending_host: + clean_host = declared_sending_host.strip("[]()") + try: + ipaddress.ip_address(declared_sending_host) + self.add_ip_attribute(declared_sending_host, received, seen_attributes) + except ValueError: + self.add_hostname_attribute(declared_sending_host, received, seen_attributes) + + if reverse_dns_host: + try: + ipaddress.ip_address(reverse_dns_host) + self.add_ip_attribute(reverse_dns_host, received, seen_attributes) + except ValueError: + self.add_hostname_attribute(reverse_dns_host, received, seen_attributes) + + # Extract and add **only valid** IPv4 addresses + for ipv4_match in self.extract_matches(ipv4_regex, received): + ip_candidate = ipv4_match[0] or ipv4_match[1] or ipv4_match[2] # Select first non-empty match + if ip_candidate: + self.add_ip_attribute(ip_candidate, received, seen_attributes) + + # Extract and add IPv6 addresses + for ipv6_match in self.extract_matches(ipv6_regex, received): + self.add_ip_attribute(ipv6_match, received, seen_attributes) + def __generate_received(self) -> None: """ - Extract IP addresses from received headers that are not private. Also extract hostnames or domains. + Extracts public IP addresses and hostnames from "Received" email headers. """ - received_items = self.email.get_all("received") - if received_items is None: - return - for received in received_items: - fromstr = re.split(r"\sby\s", received)[0].strip() - if fromstr.startswith('from') is not True: - continue - for i in ['(', ')', '[', ']']: - fromstr = fromstr.replace(i, " ") - tokens = fromstr.split(" ") - ip = None - for token in tokens: - try: - ip = ipaddress.ip_address(token) - break - except ValueError: - pass # token is not IP address - if not ip or ip.is_private: - continue # skip header if IP not found or is private + received_items = self.email.get_all("Received") + if not received_items: + return - self.add_attribute("received-header-ip", value=str(ip), comment=fromstr) + # Track added attributes to prevent duplicates (store as (type, value) tuples) + seen_attributes: set[tuple[str, str]] = set() - # The hostnames and/or domains always come after the "Received: from" - # part so we can use regex to pick up those attributes. - received_from = re.findall(r'(?<=from\s)[\w\d\.\-]+\.\w{2,24}', str(received_items)) - try: - [self.add_attribute("received-header-hostname", i) for i in received_from] - except Exception: - pass + for received in received_items: + self.process_received_header(received, seen_attributes)