Skip to content

Commit 4967a3a

Browse files
authored
Merge pull request #916 from Sekhar-Kumar-Dash/patch-50
added a test for idea_format
2 parents 8a95897 + 54e0695 commit 4967a3a

File tree

4 files changed

+471
-114
lines changed

4 files changed

+471
-114
lines changed

.github/workflows/unit-tests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ jobs:
7777
- test_host_ip_manager.py
7878
- test_host_ip_manager.py
7979
- test_rnn_cc_detection.py
80+
- test_idea_format.py
8081

8182
steps:
8283
- uses: actions/checkout@v4

modules/cesnet/cesnet.py

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
1-
from slips_files.common.parsers.config_parser import ConfigParser
2-
from slips_files.common.abstracts.module import IModule
3-
from ..cesnet.warden_client import Client, read_cfg
41
import os
52
import json
63
import time
74
import threading
85
import queue
96
import ipaddress
107
import validators
8+
9+
from slips_files.common.parsers.config_parser import ConfigParser
10+
from slips_files.common.abstracts.module import IModule
11+
from slips_files.core.structures.evidence import (
12+
ThreatLevel,
13+
Evidence,
14+
dict_to_evidence,
15+
)
16+
from ..cesnet.warden_client import Client, read_cfg
1117
from slips_files.common.slips_utils import utils
18+
from slips_files.common.idea_format import (
19+
idea_format,
20+
)
1221

1322

1423
class CESNET(IModule):
@@ -92,34 +101,13 @@ def export_evidence(self, evidence: dict):
92101
"""
93102
Exports evidence to warden server
94103
"""
95-
threat_level = evidence.get("threat_level")
96-
if threat_level == "info":
104+
evidence: Evidence = dict_to_evidence(evidence)
105+
threat_level = evidence.threat_level
106+
if threat_level == ThreatLevel.INFO:
97107
# don't export alerts of type 'info'
98108
return False
99109

100-
description = evidence["description"]
101-
profileid = evidence["profileid"]
102-
srcip = profileid.split("_")[1]
103-
evidence_type = evidence["evidence_type"]
104-
attacker_direction = evidence["attacker_direction"]
105-
attacker = evidence["attacker"]
106-
evidence_id = evidence["ID"]
107-
confidence = evidence.get("confidence")
108-
port = evidence.get("port")
109-
proto = evidence.get("proto")
110-
111-
evidence_in_idea = utils.IDEA_format(
112-
srcip,
113-
evidence_type,
114-
attacker_direction,
115-
attacker,
116-
description,
117-
confidence,
118-
port,
119-
proto,
120-
evidence_id,
121-
)
122-
110+
evidence_in_idea: dict = idea_format(evidence)
123111
# remove private ips from the alert
124112
evidence_in_idea = self.remove_private_ips(evidence_in_idea)
125113

@@ -178,7 +166,8 @@ def import_alerts(self):
178166
tag = []
179167
notag = []
180168

181-
# group = ['cz.tul.ward.kippo','cz.vsb.buldog.kippo', 'cz.zcu.civ.afrodita','cz.vutbr.net.bee.hpscan']
169+
# group = ['cz.tul.ward.kippo','cz.vsb.buldog.kippo',
170+
# 'cz.zcu.civ.afrodita','cz.vutbr.net.bee.hpscan']
182171
group = []
183172
nogroup = []
184173

slips_files/common/idea_format.py

Lines changed: 84 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import traceback
21
from datetime import datetime
32
from typing import Tuple
43

@@ -57,13 +56,13 @@ def extract_role_type(evidence: Evidence, role=None) -> str:
5756
ioc = evidence.victim.value
5857
ioc_type = evidence.victim.victim_type
5958

60-
if ioc_type == IoCType.IP.name:
59+
if ioc_type == IoCType.IP:
6160
return ioc, get_ip_version(ioc)
6261

6362
# map of slips victim types to IDEA supported types
6463
idea_type = {
65-
IoCType.DOMAIN.name: "Hostname",
66-
IoCType.URL.name: "URL",
64+
IoCType.DOMAIN: "Hostname",
65+
IoCType.URL: "URL",
6766
}
6867
return ioc, idea_type[ioc_type]
6968

@@ -75,88 +74,88 @@ def idea_format(evidence: Evidence):
7574
Detailed explanation of IDEA categories:
7675
https://idea.cesnet.cz/en/classifications
7776
"""
78-
try:
79-
idea_dict = {
80-
"Format": "IDEA0",
81-
"ID": evidence.id,
82-
# both times represet the time of the detection, we probably
83-
# don't need flow_datetime
84-
"DetectTime": datetime.now(utils.local_tz).isoformat(),
85-
"EventTime": datetime.now(utils.local_tz).isoformat(),
86-
"Confidence": evidence.confidence,
87-
"Source": [{}],
88-
}
89-
90-
attacker, attacker_type = extract_role_type(evidence, role="attacker")
91-
idea_dict["Source"][0].update({attacker_type: [attacker]})
92-
93-
# according to the IDEA format
94-
# When someone communicates with C&C, both sides of communication are
95-
# sources, differentiated by the Type attribute, 'C&C' or 'Botnet'
96-
# https://idea.cesnet.cz/en/design#:~:text=to%20string%20%E2%80%9CIDEA1
97-
# %E2%80%9D.-,Sources%20and%20targets,-As%20source%20of
98-
if evidence.evidence_type == EvidenceType.COMMAND_AND_CONTROL_CHANNEL:
99-
# botnet, ip_version = extract_cc_botnet_ip(evidence)
100-
idea_dict["Source"][0].update({"Type": ["Botnet"]})
101-
102-
cc_server, ip_version = extract_cc_server_ip(evidence)
103-
server_info: dict = {ip_version: [cc_server], "Type": ["CC"]}
104-
105-
idea_dict["Source"].append(server_info)
106-
107-
# the idx of the daddr, in CC detections, it's the second one
108-
idx = (
109-
1
110-
if (
111-
evidence.evidence_type
112-
== EvidenceType.COMMAND_AND_CONTROL_CHANNEL
113-
)
114-
else 0
115-
)
116-
if evidence.port:
117-
idea_dict["Source"][idx].update({"Port": [evidence.port]})
118-
if evidence.proto:
119-
idea_dict["Source"][idx].update({"Proto": [evidence.proto.name]})
120-
121-
if hasattr(evidence, "victim") and evidence.victim:
122-
# is the dstip ipv4/ipv6 or mac?
123-
victims_ip: str
124-
victim_type: str
125-
victims_ip, victim_type = extract_role_type(
126-
evidence, role="victim"
127-
)
128-
idea_dict["Target"] = [{victim_type: [victims_ip]}]
77+
# try:
78+
idea_dict = {
79+
"Format": "IDEA0",
80+
"ID": evidence.id,
81+
# both times represet the time of the detection, we probably
82+
# don't need flow_datetime
83+
"DetectTime": datetime.now(utils.local_tz).isoformat(),
84+
"EventTime": datetime.now(utils.local_tz).isoformat(),
85+
"Confidence": evidence.confidence,
86+
"Source": [{}],
87+
}
12988

130-
# add the description
131-
attachment = {
132-
"Attach": [
89+
attacker, attacker_type = extract_role_type(evidence, role="attacker")
90+
idea_dict["Source"][0].update({attacker_type: [attacker]})
91+
92+
# according to the IDEA format
93+
# When someone communicates with C&C, both sides of communication are
94+
# sources, differentiated by the Type attribute, 'C&C' or 'Botnet'
95+
# https://idea.cesnet.cz/en/design#:~:text=to%20string%20%E2%80%9CIDEA1
96+
# %E2%80%9D.-,Sources%20and%20targets,-As%20source%20of
97+
if evidence.evidence_type == EvidenceType.COMMAND_AND_CONTROL_CHANNEL:
98+
# botnet, ip_version = extract_cc_botnet_ip(evidence)
99+
idea_dict["Source"][0].update({"Type": ["Botnet"]})
100+
101+
cc_server, ip_version = extract_cc_server_ip(evidence)
102+
server_info: dict = {ip_version: [cc_server], "Type": ["CC"]}
103+
104+
idea_dict["Source"].append(server_info)
105+
106+
# the idx of the daddr, in CC detections, is the second one
107+
idx = (
108+
1
109+
if (evidence.evidence_type == EvidenceType.COMMAND_AND_CONTROL_CHANNEL)
110+
else 0
111+
)
112+
if evidence.src_port:
113+
idea_dict["Source"][idx].update({"Port": [evidence.src_port]})
114+
if evidence.src_port:
115+
idea_dict["Target"][idx].update({"Port": [evidence.dst_port]})
116+
if evidence.proto:
117+
idea_dict["Source"][idx].update({"Proto": [evidence.proto.name]})
118+
119+
if hasattr(evidence, "victim") and evidence.victim:
120+
# is the dstip ipv4/ipv6 or mac?
121+
victims_ip: str
122+
victim_type: str
123+
victims_ip, victim_type = extract_role_type(evidence, role="victim")
124+
idea_dict["Target"] = [{victim_type: [victims_ip]}]
125+
126+
# add the description
127+
attachment = {
128+
"Attach": [
129+
{
130+
"Content": evidence.description,
131+
"ContentType": "text/plain",
132+
}
133+
]
134+
}
135+
idea_dict.update(attachment)
136+
137+
if evidence.evidence_type == EvidenceType.MALICIOUS_DOWNLOADED_FILE:
138+
md5 = evidence.description.split("downloaded file ")[-1].split(
139+
". size"
140+
)[0]
141+
idea_dict["Attach"] = [
142+
{
143+
"Type": ["Malware"],
144+
"Hash": [f"md5:{md5}"],
145+
}
146+
]
147+
if "size" in evidence.description:
148+
idea_dict.update(
133149
{
134-
"Content": evidence.description,
135-
"ContentType": "text/plain",
150+
"Size": int(
151+
evidence.description.replace(".", "")
152+
.split("size:")[1]
153+
.split("bytes")[0]
154+
)
136155
}
137-
]
138-
}
139-
idea_dict.update(attachment)
156+
)
140157

141-
if evidence.evidence_type == EvidenceType.MALICIOUS_DOWNLOADED_FILE:
142-
idea_dict["Attach"] = [
143-
{
144-
"Type": ["Malware"],
145-
"Hash": [f"md5:{evidence.attacker.value}"],
146-
}
147-
]
148-
if "size" in evidence.description:
149-
idea_dict.update(
150-
{
151-
"Size": int(
152-
evidence.description.replace(".", "")
153-
.split("size:")[1]
154-
.split("from")[0]
155-
)
156-
}
157-
)
158-
159-
return idea_dict
160-
except Exception as e:
161-
print(f"Error in idea_format(): {e}")
162-
print(traceback.format_exc())
158+
return idea_dict
159+
# except Exception as e:
160+
# print(f"Error in idea_format(): {e}")
161+
# print(traceback.format_exc())

0 commit comments

Comments
 (0)