diff --git a/modules/timeline/timeline.py b/modules/timeline/timeline.py index 84ad4d940..96df9314a 100644 --- a/modules/timeline/timeline.py +++ b/modules/timeline/timeline.py @@ -2,7 +2,10 @@ import sys import time import json -from typing import Any +from typing import ( + Any, + List, +) from slips_files.common.flow_classifier import FlowClassifier from slips_files.common.parsers.config_parser import ConfigParser @@ -20,15 +23,19 @@ class Timeline(IModule): authors = ["Sebastian Garcia", "Alya Gomaa"] def init(self): - self.separator = self.db.get_field_separator() + self.read_configuration() self.c1 = self.db.subscribe("new_flow") self.channels = { "new_flow": self.c1, } + self.classifier = FlowClassifier() + self.host_ip: str = self.db.get_host_ip() + + def read_configuration(self): conf = ConfigParser() self.is_human_timestamp = conf.timeline_human_timestamp() self.analysis_direction = conf.analysis_direction() - self.classifier = FlowClassifier() + self.client_ips: List[str] = conf.client_ips() def convert_timestamp_to_slips_format(self, timestamp: float) -> str: if self.is_human_timestamp: @@ -42,7 +49,11 @@ def ensure_int_bytes(self, bytes: Any) -> int: def is_inbound_traffic(self, flow) -> bool: """return True if profileid's IP is the same as the daddr""" - return self.analysis_direction == "all" and flow.daddr == flow.saddr + if self.analysis_direction != "all": + # slips only detects inbound traffic in the "all" direction + return False + + return flow.daddr == self.host_ip or flow.daddr in self.client_ips def process_dns_altflow(self, alt_flow: dict): answer = alt_flow["answers"] diff --git a/modules/update_manager/update_manager.py b/modules/update_manager/update_manager.py index d06bc6cff..ca1dcb810 100644 --- a/modules/update_manager/update_manager.py +++ b/modules/update_manager/update_manager.py @@ -574,7 +574,8 @@ def parse_ssl_feed(self, url, full_path): async def update_TI_file(self, link_to_download: str) -> bool: """ - Update remote TI files, JA3 feeds and SSL feeds by writing them to disk and parsing them + Update remote TI files, JA3 feeds and SSL feeds by writing them to + disk and parsing them """ try: self.log(f"Updating the remote file {link_to_download}") diff --git a/tests/test_timeline.py b/tests/test_timeline.py index 22032ca99..c9fb8ef63 100644 --- a/tests/test_timeline.py +++ b/tests/test_timeline.py @@ -454,7 +454,7 @@ def test_interpret_dport(flow, expected_dport_name): { "timestamp": 1625097700, "dport_name": "HTTPS", - "preposition": "from", + "preposition": "to", "dns_resolution": "????", "daddr": "10.0.0.1", "dport/proto": "443/TCP", @@ -545,7 +545,7 @@ def test_ensure_int_bytes(input_bytes, expected): @pytest.mark.parametrize( - "saddr, daddr," "analysis_direction, expected_result", + "host_ip, daddr," "analysis_direction, expected_result", [ # testcase1: Inbound traffic, # analysis direction is "all" @@ -561,12 +561,14 @@ def test_ensure_int_bytes(input_bytes, expected): ("10.0.0.1", "10.0.0.1", "all", True), ], ) -def test_is_inbound_traffic(saddr, daddr, analysis_direction, expected_result): +def test_is_inbound_traffic( + host_ip, daddr, analysis_direction, expected_result +): timeline = ModuleFactory().create_timeline_object() + timeline.host_ip = host_ip timeline.analysis_direction = analysis_direction flow = Mock() flow.daddr = daddr - flow.saddr = saddr assert timeline.is_inbound_traffic(flow) == expected_result