Skip to content

Commit 4fe092d

Browse files
committed
db: add functions to lookup specific ips/ports statistics instead of querying all the hash map at once, we use redis for filtering the results instead of python
1 parent 1c30cf4 commit 4fe092d

File tree

1 file changed

+59
-108
lines changed

1 file changed

+59
-108
lines changed

slips_files/core/database/redis_db/flow_attributes_db.py

Lines changed: 59 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
ProfileID,
1010
TimeWindow,
1111
)
12-
from slips_files.core.structures.flow_attributes import FlowQuery
12+
from slips_files.core.structures.flow_attributes import (
13+
FlowQuery,
14+
)
1315

1416

1517
class FlowAttrHandler:
@@ -24,7 +26,12 @@ class FlowAttrHandler:
2426
name = "DB"
2527

2628
def _construct_query_key(
27-
self, profileid: ProfileID, twid: TimeWindow, query: FlowQuery
29+
self,
30+
profileid: ProfileID,
31+
twid: TimeWindow,
32+
query: FlowQuery,
33+
port=None,
34+
ip=None,
2835
) -> str:
2936
"""
3037
All queries done by this class (insertions and lookups)
@@ -36,6 +43,9 @@ def _construct_query_key(
3643
profile_1.1.1.1_timewindow2:client:tcp:not_est:dst:ports:dst_ips
3744
or
3845
profile_1.1.1.1_timewindow2:server:udp:est:src:ips:dst_ports
46+
47+
:kwargs ip, port: we are filtering for flows with this
48+
specific port/ip.
3949
"""
4050
role = query.role.name.lower()
4151
protocol = query.protocol.name.lower()
@@ -46,12 +56,52 @@ def _construct_query_key(
4656
# if key_type=PORT: request will be IP
4757
# if key_type=IP: request will be PORT
4858
request = query.request.name.lower()
59+
if port:
60+
# e.g something like
61+
# profile_1.1.1.1_timewindow2:server:udp:est:src:ips:192
62+
# .168.12.2:dst_ports
63+
# aka get me all the dst ports my priv ip connected to 1.1.1.1 on
64+
return (
65+
f"{str(profileid)}_{str(twid)}"
66+
f":{role}:{protocol}:{state}:{direction}:{key_type}:"
67+
f"{port}:{request}"
68+
)
69+
elif ip:
70+
# e.g something like
71+
# profile_1.1.1.1_timewindow2:server:udp:est:src:ips:192
72+
# .168.12.2:dst_ports
73+
# aka get me all the dst ports my priv ip connected to 1.1.1.1 on
74+
return (
75+
f"{str(profileid)}_{str(twid)}"
76+
f":{role}:{protocol}:{state}:{direction}:{key_type}:"
77+
f"{ip}:{request}"
78+
)
79+
else:
80+
return (
81+
f"{str(profileid)}_{str(twid)}"
82+
f":{role}:{protocol}:{state}:{direction}:{key_type}:{request}"
83+
)
4984

50-
key = (
51-
f"{str(profileid)}_{str(twid)}"
52-
f":{role}:{protocol}:{state}:{direction}:{key_type}:{request}"
53-
)
54-
return key
85+
def get_specific_ip_info_from_profile_tw(
86+
self,
87+
profileid: ProfileID,
88+
twid: TimeWindow,
89+
query: FlowQuery,
90+
requested_ip: str,
91+
) -> dict:
92+
key: str = self._construct_query_key(profileid, twid, query)
93+
return self.r.hget(key, requested_ip) or {}
94+
95+
def get_specific_port_info_from_profile_tw(
96+
self,
97+
profileid: ProfileID,
98+
twid: TimeWindow,
99+
query: FlowQuery,
100+
requested_port: int,
101+
) -> dict:
102+
# todo make sure ports are added as ints
103+
key: str = self._construct_query_key(profileid, twid, query)
104+
return self.r.hget(key, requested_port) or {}
55105

56106
def get_data_from_profile_tw(
57107
self,
@@ -60,9 +110,8 @@ def get_data_from_profile_tw(
60110
query: FlowQuery,
61111
) -> Generator:
62112
"""
63-
Retrieves information for a given profile and time window
64-
based on flow characteristics (role, protocol, state, direction).
65-
113+
Retrieves metadata about a given profile and time window
114+
based on the given query (role, protocol, state, direction).
66115
67116
:param flow: FlowQuery object containing:
68117
- role: CLIENT or SERVER (is the traffic from or to the profile)
@@ -104,104 +153,6 @@ def get_data_from_profile_tw(
104153
)
105154
self.print(traceback.format_exc(), 0, 1)
106155

107-
def add_ips(self, profileid, twid, flow, role):
108-
"""
109-
Function to add information about an IP address
110-
The flow can go out of the IP (we are acting as Client) or into the IP
111-
(we are acting as Server)
112-
ip_as_obj: IP to add. It can be a dstIP or srcIP depending on the role
113-
role: 'Client' or 'Server'
114-
This function does two things:
115-
1- Add the ip to this tw in this profile, counting how many times
116-
it was contacted, and storing it in the key 'DstIPs' or 'SrcIPs'
117-
in the hash of the profile
118-
2- Use the ip as a key to count how many times that IP was
119-
contacted on each port. We store it like this because its the
120-
pefect structure to detect vertical port scans later on
121-
3- Check if this IP has any detection in the threat intelligence
122-
module. The information is added by the module directly in the DB.
123-
"""
124-
125-
uid = flow.uid
126-
starttime = str(flow.starttime)
127-
ip = flow.daddr if role == "Client" else flow.saddr
128-
129-
"""
130-
Depending if the traffic is going out or not, we are Client or Server
131-
Client role means:
132-
The profile corresponds to the src ip that received this flow
133-
The dstip is here the one receiving data from your profile
134-
So check the dst ip
135-
Server role means:
136-
The profile corresponds to the dst ip that received this flow
137-
The srcip is here the one sending data to your profile
138-
So check the src ip
139-
"""
140-
direction = "Dst" if role == "Client" else "Src"
141-
142-
#############
143-
# Store the Dst as IP address and notify in the channel
144-
# We send the obj but when accessed as str, it is automatically
145-
# converted to str
146-
self.set_new_ip(ip)
147-
148-
#############
149-
150-
# OTH means that we didnt see the true src ip and dst ip
151-
# from zeek docs; OTH: No SYN seen, just midstream traffic
152-
# (one example of this is a “partial connection” that was not
153-
# later closed).
154-
if flow.state != "OTH":
155-
self.ask_for_ip_info(
156-
flow.saddr,
157-
profileid,
158-
twid,
159-
flow,
160-
"srcip",
161-
daddr=flow.daddr,
162-
)
163-
self.ask_for_ip_info(
164-
flow.daddr,
165-
profileid,
166-
twid,
167-
flow,
168-
"dstip",
169-
)
170-
171-
self.update_times_contacted(ip, direction, profileid, twid)
172-
173-
# Get the state. Established, NotEstablished
174-
summary_state = self.get_final_state_from_flags(flow.state, flow.pkts)
175-
key_name = f"{direction}IPs{role}{flow.proto.upper()}{summary_state}"
176-
# Get the previous data about this key
177-
old_profileid_twid_data = self.get_data_from_profile_tw(
178-
profileid,
179-
twid,
180-
direction,
181-
summary_state,
182-
flow.proto,
183-
role,
184-
"IPs",
185-
)
186-
profileid_twid_data: dict = self.update_ip_info(
187-
old_profileid_twid_data,
188-
flow.pkts,
189-
flow.dport,
190-
flow.spkts,
191-
flow.bytes,
192-
ip,
193-
starttime,
194-
uid,
195-
)
196-
197-
# Store this data in the profile hash
198-
self.r.hset(
199-
f"{profileid}{self.separator}{twid}",
200-
key_name,
201-
json.dumps(profileid_twid_data),
202-
)
203-
return True
204-
205156
def add_port(
206157
self, profileid: str, twid: str, flow: dict, role: str, port_type: str
207158
):

0 commit comments

Comments
 (0)