Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- Add multiple telnet reconnection attempts detection

1.1.4.1 (Dec 3rd, 2024)
- Fix abstract class starting with the rest of the modules.
- Fix the updating of the MAC vendors database used in slips.
Expand Down
69 changes: 63 additions & 6 deletions modules/flowalerts/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
from slips_files.common.flow_classifier import FlowClassifier


NOT_ESTAB = "Not Established"
ESTAB = "Established"


class Conn(IFlowalertsAnalyzer):
def init(self):
# get the default gateway
Expand All @@ -38,6 +42,7 @@ def init(self):
self.classifier = FlowClassifier()
self.our_ips = utils.get_own_ips()
self.input_type: str = self.db.get_input_type()
self.multiple_reconnection_attempts_threshold = 5

def read_configuration(self):
conf = ConfigParser()
Expand Down Expand Up @@ -171,7 +176,7 @@ def check_unknown_port(self, profileid, twid, flow):
"""
if not flow.dport:
return
if flow.interpreted_state != "Established":
if flow.interpreted_state != ESTAB:
# detect unknown ports on established conns only
return False

Expand All @@ -195,6 +200,54 @@ def check_unknown_port(self, profileid, twid, flow):
self.set_evidence.unknown_port(twid, flow)
return True

def is_telnet(self, flow) -> bool:
try:
dport = int(flow.dport)
except ValueError:
# binetflow icmp ports are hex strings
return False

telnet_ports = (23, 2323)
return dport in telnet_ports and flow.proto.lower() == "tcp"

def check_multiple_telnet_reconnection_attempts(
self, profileid, twid, flow
):
if flow.interpreted_state != NOT_ESTAB:
return

if not self.is_telnet(flow):
return

key = f"{flow.saddr}-{flow.daddr}-telnet"
# add this conn to the stored number of reconnections
current_reconnections = self.db.get_reconnections_for_tw(
profileid, twid
)
try:
reconnections, uids = current_reconnections[key]
reconnections += 1
uids.append(flow.uid)
current_reconnections[key] = (reconnections, uids)
except KeyError:
current_reconnections[key] = (1, [flow.uid])
reconnections = 1

if reconnections < 4:
# update the reconnections ctr in the db
self.db.set_reconnections(profileid, twid, current_reconnections)
return

self.set_evidence.multiple_telnet_reconnection_attempts(
twid, flow, reconnections, current_reconnections[key][1]
)

# reset the reconnection attempts of this src->dst since an evidence
# is set
current_reconnections[key] = (0, [])

self.db.set_reconnections(profileid, twid, current_reconnections)

def check_multiple_reconnection_attempts(self, profileid, twid, flow):
"""
Alerts when 5+ reconnection attempts from the same source IP to
Expand All @@ -219,7 +272,8 @@ def check_multiple_reconnection_attempts(self, profileid, twid, flow):
current_reconnections[key] = (1, [flow.uid])
reconnections = 1

if reconnections < 5:
if reconnections < self.multiple_reconnection_attempts_threshold:
self.db.set_reconnections(profileid, twid, current_reconnections)
return

self.set_evidence.multiple_reconnection_attempts(
Expand Down Expand Up @@ -511,7 +565,7 @@ def check_conn_to_port_0(self, profileid, twid, flow):
)

def detect_connection_to_multiple_ports(self, profileid, twid, flow):
if flow.proto != "tcp" or flow.interpreted_state != "Established":
if flow.proto != "tcp" or flow.interpreted_state != ESTAB:
return

dport_name = flow.appproto
Expand All @@ -525,7 +579,7 @@ def detect_connection_to_multiple_ports(self, profileid, twid, flow):
# Connection to multiple ports to the destination IP
if profileid.split("_")[1] == flow.saddr:
direction = "Dst"
state = "Established"
state = ESTAB
protocol = "TCP"
role = "Client"
type_data = "IPs"
Expand Down Expand Up @@ -565,7 +619,7 @@ def detect_connection_to_multiple_ports(self, profileid, twid, flow):
# Happens in the mode 'all'
elif profileid.split("_")[-1] == flow.daddr:
direction = "Src"
state = "Established"
state = ESTAB
protocol = "TCP"
role = "Server"
type_data = "IPs"
Expand Down Expand Up @@ -608,7 +662,7 @@ def check_non_http_port_80_conns(self, twid, flow):
str(flow.dport) == "80"
and flow.proto.lower() == "tcp"
and str(flow.appproto).lower() != "http"
and flow.interpreted_state == "Established"
and flow.interpreted_state == ESTAB
and (flow.sbytes + flow.dbytes) != 0
):
self.set_evidence.non_http_port_80_conn(twid, flow)
Expand Down Expand Up @@ -734,6 +788,9 @@ async def analyze(self, msg):
self.check_long_connection(twid, flow)
self.check_unknown_port(profileid, twid, flow)
self.check_multiple_reconnection_attempts(profileid, twid, flow)
self.check_multiple_telnet_reconnection_attempts(
profileid, twid, flow
)
self.check_conn_to_port_0(profileid, twid, flow)
self.check_different_localnet_usage(
twid, flow, what_to_check="dstip"
Expand Down
14 changes: 7 additions & 7 deletions modules/flowalerts/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,17 +309,17 @@ def check_high_entropy_dns_answers(self, twid, flow):
)

def check_invalid_dns_answers(self, twid, flow):
# this function is used to check for certain IP
# answers to DNS queries being blocked
# (perhaps by ad blockers) and set to the following IP values
# currently hardcoding blocked ips
invalid_answers = {"127.0.0.1", "0.0.0.0"}
"""
this function is used to check for private IPs in the answers of
a dns queries.
probably means the queries is being blocked
(perhaps by ad blockers) and set to a private IP value
"""
if not flow.answers:
return

for answer in flow.answers:
if answer in invalid_answers and flow.query != "localhost":
# blocked answer found
if utils.is_private_ip(answer) and flow.query != "localhost":
self.set_evidence.invalid_dns_answer(twid, flow, answer)
# delete answer from redis cache to prevent
# associating this dns answer with this domain/query and
Expand Down
39 changes: 39 additions & 0 deletions modules/flowalerts/set_evidence.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,45 @@ def self_signed_certificates(self, twid, flow) -> None:
)
self.db.set_evidence(evidence)

def multiple_telnet_reconnection_attempts(
self, twid, flow, reconnections, uids: List[str]
):
"""
Set evidence for 4+ telnet unsuccessful attempts.
"""
confidence: float = 0.5
threat_level: ThreatLevel = ThreatLevel.MEDIUM

twid: int = int(twid.replace("timewindow", ""))

description = (
f"Multiple Telnet reconnection attempts from IP: {flow.saddr} "
f"to Destination IP: {flow.daddr} "
f"reconnections: {reconnections}"
)
evidence: Evidence = Evidence(
evidence_type=EvidenceType.MULTIPLE_RECONNECTION_ATTEMPTS,
attacker=Attacker(
direction=Direction.SRC,
attacker_type=IoCType.IP,
value=flow.saddr,
),
victim=Victim(
direction=Direction.DST,
victim_type=IoCType.IP,
value=flow.daddr,
),
threat_level=threat_level,
confidence=confidence,
description=description,
profile=ProfileID(ip=flow.saddr),
timewindow=TimeWindow(number=twid),
uid=uids,
timestamp=flow.starttime,
)

self.db.set_evidence(evidence)

def multiple_reconnection_attempts(
self, twid, flow, reconnections, uids: List[str]
) -> None:
Expand Down
50 changes: 26 additions & 24 deletions slips_files/common/slips_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
import sys
import ipaddress
import aid_hash
from typing import Any, Optional
from typing import (
Any,
Optional,
Union,
)
from dataclasses import is_dataclass, asdict
from enum import Enum

Expand Down Expand Up @@ -265,7 +269,7 @@ def convert_format(self, ts, required_format: str):

# convert to the req format
if required_format == "iso":
return datetime_obj.astimezone().isoformat()
return datetime_obj.astimezone(tz=self.local_tz).isoformat()
elif required_format == "unixtimestamp":
return datetime_obj.timestamp()
else:
Expand Down Expand Up @@ -390,20 +394,19 @@ def is_port_in_use(self, port: int) -> bool:
sock.close()
return True

def is_private_ip(self, ip_obj: ipaddress) -> bool:
"""
This function replaces the ipaddress library 'is_private'
because it does not work correctly and it does not ignore
the ips 0.0.0.0 or 255.255.255.255
"""
# Is it a well-formed ipv4 or ipv6?
r_value = False
if ip_obj and ip_obj.is_private:
if ip_obj != ipaddress.ip_address(
"0.0.0.0"
) and ip_obj != ipaddress.ip_address("255.255.255.255"):
r_value = True
return r_value
def is_private_ip(
self, ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, str]
) -> bool:
ip_classes = {ipaddress.IPv4Address, ipaddress.IPv6Address}
for class_ in ip_classes:
if isinstance(ip, class_):
return ip and ip.is_private

if self.detect_ioc_type(ip) != "ip":
return False
# convert the given str ip to obj
ip_obj = ipaddress.ip_address(ip)
return ip_obj.is_private

def is_ignored_ip(self, ip: str) -> bool:
"""
Expand All @@ -414,16 +417,15 @@ def is_ignored_ip(self, ip: str) -> bool:
ip_obj = ipaddress.ip_address(ip)
except (ipaddress.AddressValueError, ValueError):
return True

# Is the IP multicast, private? (including localhost)
# The broadcast address 255.255.255.255 is reserved.
return bool(
(
ip_obj.is_multicast
or self.is_private_ip(ip_obj)
or ip_obj.is_link_local
or ip_obj.is_reserved
or ".255" in ip_obj.exploded
)
return (
ip_obj.is_multicast
or self.is_private_ip(ip_obj)
or ip_obj.is_link_local
or ip_obj.is_loopback
or ip_obj.is_reserved
)

def get_sha256_hash(self, filename: str):
Expand Down
72 changes: 72 additions & 0 deletions tests/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,78 @@ def test_check_unknown_port_true_case(mocker):
mock_set_evidence.assert_called_once_with(twid, flow)


@pytest.mark.parametrize(
"origstate, saddr, daddr, dport, uids, interpreted_state, expected_calls",
[
( # Testcase1:5 rejections, evidence should be set
"REJ",
"192.168.1.1",
"192.168.1.2",
23,
[f"uid_{i}" for i in range(4)],
"Not Established",
1,
),
( # Testcase2: Less than 5 rejections, no evidence
"RST",
"192.168.1.1",
"192.168.1.2",
2323,
[f"uid_{i}" for i in range(4)],
"Not Established",
1,
),
( # Testcase3: Non-REJ state, no evidence
"Established",
"192.168.1.1",
"192.168.1.2",
23,
["uid_1"],
"Established",
0,
),
],
)
def test_check_multiple_telnet_reconnection_attempts(
origstate, saddr, daddr, dport, uids, interpreted_state, expected_calls
):
"""
Tests the check_multiple_telnet_reconnection_attempts function
with various scenarios.
"""
conn = ModuleFactory().create_conn_analyzer_obj()
conn.set_evidence.multiple_telnet_reconnection_attempts = Mock()
conn.db.get_reconnections_for_tw.return_value = {}

for uid in uids:
flow = Conn(
starttime="1726249372.312124",
uid=uid,
saddr=saddr,
daddr=daddr,
dur=1,
proto="tcp",
appproto="",
sport="0",
dport=dport,
spkts=0,
dpkts=0,
sbytes=0,
dbytes=0,
smac="",
dmac="",
state=origstate,
history="",
)
flow.interpreted_state = interpreted_state
conn.check_multiple_telnet_reconnection_attempts(profileid, twid, flow)

assert (
conn.set_evidence.multiple_telnet_reconnection_attempts.call_count
== expected_calls
)


@pytest.mark.parametrize(
"origstate, saddr, daddr, dport, uids, expected_calls",
[
Expand Down
4 changes: 2 additions & 2 deletions tests/test_slips_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ def test_assert_microseconds(input_value, expected_output):
# testcase4: Public IPv4 address
(ipaddress.ip_address("8.8.8.8"), False),
# testcase5: Special IP address 0.0.0.0
(ipaddress.ip_address("0.0.0.0"), False),
(ipaddress.ip_address("0.0.0.0"), True),
# testcase6: Broadcast IP address 255.255.255.255
(ipaddress.ip_address("255.255.255.255"), False),
(ipaddress.ip_address("255.255.255.255"), True),
],
)
def test_is_private_ip(ip_address, expected_result):
Expand Down
Loading