Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions modules/timeline/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import sys
import time
import json
from typing import Any
from typing import (
Any,
List,
)

from slips_files.common.flow_classifier import FlowClassifier
from slips_files.common.parsers.config_parser import ConfigParser
Expand All @@ -20,15 +23,19 @@ class Timeline(IModule):
authors = ["Sebastian Garcia", "Alya Gomaa"]

def init(self):
self.separator = self.db.get_field_separator()
self.read_configuration()
self.c1 = self.db.subscribe("new_flow")
self.channels = {
"new_flow": self.c1,
}
self.classifier = FlowClassifier()
self.host_ip: str = self.db.get_host_ip()

def read_configuration(self):
conf = ConfigParser()
self.is_human_timestamp = conf.timeline_human_timestamp()
self.analysis_direction = conf.analysis_direction()
self.classifier = FlowClassifier()
self.client_ips: List[str] = conf.client_ips()

def convert_timestamp_to_slips_format(self, timestamp: float) -> str:
if self.is_human_timestamp:
Expand All @@ -42,7 +49,11 @@ def ensure_int_bytes(self, bytes: Any) -> int:

def is_inbound_traffic(self, flow) -> bool:
"""return True if profileid's IP is the same as the daddr"""
return self.analysis_direction == "all" and flow.daddr == flow.saddr
if self.analysis_direction != "all":
# slips only detects inbound traffic in the "all" direction
return False

return flow.daddr == self.host_ip or flow.daddr in self.client_ips

def process_dns_altflow(self, alt_flow: dict):
answer = alt_flow["answers"]
Expand Down
3 changes: 2 additions & 1 deletion modules/update_manager/update_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ def parse_ssl_feed(self, url, full_path):

async def update_TI_file(self, link_to_download: str) -> bool:
"""
Update remote TI files, JA3 feeds and SSL feeds by writing them to disk and parsing them
Update remote TI files, JA3 feeds and SSL feeds by writing them to
disk and parsing them
"""
try:
self.log(f"Updating the remote file {link_to_download}")
Expand Down
10 changes: 6 additions & 4 deletions tests/test_timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def test_interpret_dport(flow, expected_dport_name):
{
"timestamp": 1625097700,
"dport_name": "HTTPS",
"preposition": "from",
"preposition": "to",
"dns_resolution": "????",
"daddr": "10.0.0.1",
"dport/proto": "443/TCP",
Expand Down Expand Up @@ -545,7 +545,7 @@ def test_ensure_int_bytes(input_bytes, expected):


@pytest.mark.parametrize(
"saddr, daddr," "analysis_direction, expected_result",
"host_ip, daddr," "analysis_direction, expected_result",
[
# testcase1: Inbound traffic,
# analysis direction is "all"
Expand All @@ -561,12 +561,14 @@ def test_ensure_int_bytes(input_bytes, expected):
("10.0.0.1", "10.0.0.1", "all", True),
],
)
def test_is_inbound_traffic(saddr, daddr, analysis_direction, expected_result):
def test_is_inbound_traffic(
host_ip, daddr, analysis_direction, expected_result
):
timeline = ModuleFactory().create_timeline_object()
timeline.host_ip = host_ip
timeline.analysis_direction = analysis_direction
flow = Mock()
flow.daddr = daddr
flow.saddr = saddr
assert timeline.is_inbound_traffic(flow) == expected_result


Expand Down
Loading