diff --git a/CHANGELOG.md b/CHANGELOG.md index a194bde7e..33a02bd1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +- Add multiple telnet reconnection attempts detection + 1.1.4.1 (Dec 3rd, 2024) - Fix abstract class starting with the rest of the modules. - Fix the updating of the MAC vendors database used in slips. diff --git a/modules/flowalerts/conn.py b/modules/flowalerts/conn.py index 35eccf94f..8ae1a5fe1 100644 --- a/modules/flowalerts/conn.py +++ b/modules/flowalerts/conn.py @@ -15,6 +15,10 @@ from slips_files.common.flow_classifier import FlowClassifier +NOT_ESTAB = "Not Established" +ESTAB = "Established" + + class Conn(IFlowalertsAnalyzer): def init(self): # get the default gateway @@ -38,6 +42,7 @@ def init(self): self.classifier = FlowClassifier() self.our_ips = utils.get_own_ips() self.input_type: str = self.db.get_input_type() + self.multiple_reconnection_attempts_threshold = 5 def read_configuration(self): conf = ConfigParser() @@ -171,7 +176,7 @@ def check_unknown_port(self, profileid, twid, flow): """ if not flow.dport: return - if flow.interpreted_state != "Established": + if flow.interpreted_state != ESTAB: # detect unknown ports on established conns only return False @@ -195,6 +200,54 @@ def check_unknown_port(self, profileid, twid, flow): self.set_evidence.unknown_port(twid, flow) return True + def is_telnet(self, flow) -> bool: + try: + dport = int(flow.dport) + except ValueError: + # binetflow icmp ports are hex strings + return False + + telnet_ports = (23, 2323) + return dport in telnet_ports and flow.proto.lower() == "tcp" + + def check_multiple_telnet_reconnection_attempts( + self, profileid, twid, flow + ): + if flow.interpreted_state != NOT_ESTAB: + return + + if not self.is_telnet(flow): + return + + key = f"{flow.saddr}-{flow.daddr}-telnet" + # add this conn to the stored number of reconnections + current_reconnections = self.db.get_reconnections_for_tw( + profileid, twid + ) + try: + reconnections, uids = current_reconnections[key] + reconnections += 1 + uids.append(flow.uid) + current_reconnections[key] = (reconnections, uids) + except KeyError: + current_reconnections[key] = (1, [flow.uid]) + reconnections = 1 + + if reconnections < 4: + # update the reconnections ctr in the db + self.db.set_reconnections(profileid, twid, current_reconnections) + return + + self.set_evidence.multiple_telnet_reconnection_attempts( + twid, flow, reconnections, current_reconnections[key][1] + ) + + # reset the reconnection attempts of this src->dst since an evidence + # is set + current_reconnections[key] = (0, []) + + self.db.set_reconnections(profileid, twid, current_reconnections) + def check_multiple_reconnection_attempts(self, profileid, twid, flow): """ Alerts when 5+ reconnection attempts from the same source IP to @@ -219,7 +272,8 @@ def check_multiple_reconnection_attempts(self, profileid, twid, flow): current_reconnections[key] = (1, [flow.uid]) reconnections = 1 - if reconnections < 5: + if reconnections < self.multiple_reconnection_attempts_threshold: + self.db.set_reconnections(profileid, twid, current_reconnections) return self.set_evidence.multiple_reconnection_attempts( @@ -511,7 +565,7 @@ def check_conn_to_port_0(self, profileid, twid, flow): ) def detect_connection_to_multiple_ports(self, profileid, twid, flow): - if flow.proto != "tcp" or flow.interpreted_state != "Established": + if flow.proto != "tcp" or flow.interpreted_state != ESTAB: return dport_name = flow.appproto @@ -525,7 +579,7 @@ def detect_connection_to_multiple_ports(self, profileid, twid, flow): # Connection to multiple ports to the destination IP if profileid.split("_")[1] == flow.saddr: direction = "Dst" - state = "Established" + state = ESTAB protocol = "TCP" role = "Client" type_data = "IPs" @@ -565,7 +619,7 @@ def detect_connection_to_multiple_ports(self, profileid, twid, flow): # Happens in the mode 'all' elif profileid.split("_")[-1] == flow.daddr: direction = "Src" - state = "Established" + state = ESTAB protocol = "TCP" role = "Server" type_data = "IPs" @@ -608,7 +662,7 @@ def check_non_http_port_80_conns(self, twid, flow): str(flow.dport) == "80" and flow.proto.lower() == "tcp" and str(flow.appproto).lower() != "http" - and flow.interpreted_state == "Established" + and flow.interpreted_state == ESTAB and (flow.sbytes + flow.dbytes) != 0 ): self.set_evidence.non_http_port_80_conn(twid, flow) @@ -734,6 +788,9 @@ async def analyze(self, msg): self.check_long_connection(twid, flow) self.check_unknown_port(profileid, twid, flow) self.check_multiple_reconnection_attempts(profileid, twid, flow) + self.check_multiple_telnet_reconnection_attempts( + profileid, twid, flow + ) self.check_conn_to_port_0(profileid, twid, flow) self.check_different_localnet_usage( twid, flow, what_to_check="dstip" diff --git a/modules/flowalerts/dns.py b/modules/flowalerts/dns.py index 4c217ba52..7f0602ecc 100644 --- a/modules/flowalerts/dns.py +++ b/modules/flowalerts/dns.py @@ -309,17 +309,17 @@ def check_high_entropy_dns_answers(self, twid, flow): ) def check_invalid_dns_answers(self, twid, flow): - # this function is used to check for certain IP - # answers to DNS queries being blocked - # (perhaps by ad blockers) and set to the following IP values - # currently hardcoding blocked ips - invalid_answers = {"127.0.0.1", "0.0.0.0"} + """ + this function is used to check for private IPs in the answers of + a dns queries. + probably means the queries is being blocked + (perhaps by ad blockers) and set to a private IP value + """ if not flow.answers: return for answer in flow.answers: - if answer in invalid_answers and flow.query != "localhost": - # blocked answer found + if utils.is_private_ip(answer) and flow.query != "localhost": self.set_evidence.invalid_dns_answer(twid, flow, answer) # delete answer from redis cache to prevent # associating this dns answer with this domain/query and diff --git a/modules/flowalerts/set_evidence.py b/modules/flowalerts/set_evidence.py index 188f4d5bd..13d5cbc27 100644 --- a/modules/flowalerts/set_evidence.py +++ b/modules/flowalerts/set_evidence.py @@ -881,6 +881,45 @@ def self_signed_certificates(self, twid, flow) -> None: ) self.db.set_evidence(evidence) + def multiple_telnet_reconnection_attempts( + self, twid, flow, reconnections, uids: List[str] + ): + """ + Set evidence for 4+ telnet unsuccessful attempts. + """ + confidence: float = 0.5 + threat_level: ThreatLevel = ThreatLevel.MEDIUM + + twid: int = int(twid.replace("timewindow", "")) + + description = ( + f"Multiple Telnet reconnection attempts from IP: {flow.saddr} " + f"to Destination IP: {flow.daddr} " + f"reconnections: {reconnections}" + ) + evidence: Evidence = Evidence( + evidence_type=EvidenceType.MULTIPLE_RECONNECTION_ATTEMPTS, + attacker=Attacker( + direction=Direction.SRC, + attacker_type=IoCType.IP, + value=flow.saddr, + ), + victim=Victim( + direction=Direction.DST, + victim_type=IoCType.IP, + value=flow.daddr, + ), + threat_level=threat_level, + confidence=confidence, + description=description, + profile=ProfileID(ip=flow.saddr), + timewindow=TimeWindow(number=twid), + uid=uids, + timestamp=flow.starttime, + ) + + self.db.set_evidence(evidence) + def multiple_reconnection_attempts( self, twid, flow, reconnections, uids: List[str] ) -> None: diff --git a/slips_files/common/slips_utils.py b/slips_files/common/slips_utils.py index 9a3127938..a4b228e6b 100644 --- a/slips_files/common/slips_utils.py +++ b/slips_files/common/slips_utils.py @@ -16,7 +16,11 @@ import sys import ipaddress import aid_hash -from typing import Any, Optional +from typing import ( + Any, + Optional, + Union, +) from dataclasses import is_dataclass, asdict from enum import Enum @@ -265,7 +269,7 @@ def convert_format(self, ts, required_format: str): # convert to the req format if required_format == "iso": - return datetime_obj.astimezone().isoformat() + return datetime_obj.astimezone(tz=self.local_tz).isoformat() elif required_format == "unixtimestamp": return datetime_obj.timestamp() else: @@ -390,20 +394,19 @@ def is_port_in_use(self, port: int) -> bool: sock.close() return True - def is_private_ip(self, ip_obj: ipaddress) -> bool: - """ - This function replaces the ipaddress library 'is_private' - because it does not work correctly and it does not ignore - the ips 0.0.0.0 or 255.255.255.255 - """ - # Is it a well-formed ipv4 or ipv6? - r_value = False - if ip_obj and ip_obj.is_private: - if ip_obj != ipaddress.ip_address( - "0.0.0.0" - ) and ip_obj != ipaddress.ip_address("255.255.255.255"): - r_value = True - return r_value + def is_private_ip( + self, ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, str] + ) -> bool: + ip_classes = {ipaddress.IPv4Address, ipaddress.IPv6Address} + for class_ in ip_classes: + if isinstance(ip, class_): + return ip and ip.is_private + + if self.detect_ioc_type(ip) != "ip": + return False + # convert the given str ip to obj + ip_obj = ipaddress.ip_address(ip) + return ip_obj.is_private def is_ignored_ip(self, ip: str) -> bool: """ @@ -414,16 +417,15 @@ def is_ignored_ip(self, ip: str) -> bool: ip_obj = ipaddress.ip_address(ip) except (ipaddress.AddressValueError, ValueError): return True + # Is the IP multicast, private? (including localhost) # The broadcast address 255.255.255.255 is reserved. - return bool( - ( - ip_obj.is_multicast - or self.is_private_ip(ip_obj) - or ip_obj.is_link_local - or ip_obj.is_reserved - or ".255" in ip_obj.exploded - ) + return ( + ip_obj.is_multicast + or self.is_private_ip(ip_obj) + or ip_obj.is_link_local + or ip_obj.is_loopback + or ip_obj.is_reserved ) def get_sha256_hash(self, filename: str): diff --git a/tests/test_conn.py b/tests/test_conn.py index 145cf6e9d..8795db878 100644 --- a/tests/test_conn.py +++ b/tests/test_conn.py @@ -185,6 +185,78 @@ def test_check_unknown_port_true_case(mocker): mock_set_evidence.assert_called_once_with(twid, flow) +@pytest.mark.parametrize( + "origstate, saddr, daddr, dport, uids, interpreted_state, expected_calls", + [ + ( # Testcase1:5 rejections, evidence should be set + "REJ", + "192.168.1.1", + "192.168.1.2", + 23, + [f"uid_{i}" for i in range(4)], + "Not Established", + 1, + ), + ( # Testcase2: Less than 5 rejections, no evidence + "RST", + "192.168.1.1", + "192.168.1.2", + 2323, + [f"uid_{i}" for i in range(4)], + "Not Established", + 1, + ), + ( # Testcase3: Non-REJ state, no evidence + "Established", + "192.168.1.1", + "192.168.1.2", + 23, + ["uid_1"], + "Established", + 0, + ), + ], +) +def test_check_multiple_telnet_reconnection_attempts( + origstate, saddr, daddr, dport, uids, interpreted_state, expected_calls +): + """ + Tests the check_multiple_telnet_reconnection_attempts function + with various scenarios. + """ + conn = ModuleFactory().create_conn_analyzer_obj() + conn.set_evidence.multiple_telnet_reconnection_attempts = Mock() + conn.db.get_reconnections_for_tw.return_value = {} + + for uid in uids: + flow = Conn( + starttime="1726249372.312124", + uid=uid, + saddr=saddr, + daddr=daddr, + dur=1, + proto="tcp", + appproto="", + sport="0", + dport=dport, + spkts=0, + dpkts=0, + sbytes=0, + dbytes=0, + smac="", + dmac="", + state=origstate, + history="", + ) + flow.interpreted_state = interpreted_state + conn.check_multiple_telnet_reconnection_attempts(profileid, twid, flow) + + assert ( + conn.set_evidence.multiple_telnet_reconnection_attempts.call_count + == expected_calls + ) + + @pytest.mark.parametrize( "origstate, saddr, daddr, dport, uids, expected_calls", [ diff --git a/tests/test_slips_utils.py b/tests/test_slips_utils.py index 03c3cb464..f1f61b96c 100644 --- a/tests/test_slips_utils.py +++ b/tests/test_slips_utils.py @@ -186,9 +186,9 @@ def test_assert_microseconds(input_value, expected_output): # testcase4: Public IPv4 address (ipaddress.ip_address("8.8.8.8"), False), # testcase5: Special IP address 0.0.0.0 - (ipaddress.ip_address("0.0.0.0"), False), + (ipaddress.ip_address("0.0.0.0"), True), # testcase6: Broadcast IP address 255.255.255.255 - (ipaddress.ip_address("255.255.255.255"), False), + (ipaddress.ip_address("255.255.255.255"), True), ], ) def test_is_private_ip(ip_address, expected_result):