Skip to content

Commit 42bc43d

Browse files
committed
optimize add_ips() by using a redis pipe
1 parent 5bd5189 commit 42bc43d

File tree

2 files changed

+28
-18
lines changed

2 files changed

+28
-18
lines changed

slips_files/core/database/redis_db/flow_attributes_db.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import sys
66
import traceback
77
from typing import Generator
8+
from redis.client import Pipeline
89

910
from slips_files.core.structures.evidence import (
1011
ProfileID,
@@ -106,11 +107,8 @@ def convert_str_to_proto(self, str_proto: str) -> Protocol:
106107

107108
raise ValueError(f"Unknown protocol: {str_proto}")
108109

109-
def ask_modules_about_all_ips_in_flow(
110-
self,
111-
profileid: ProfileID,
112-
twid: TimeWindow,
113-
flow,
110+
def _ask_modules_about_all_ips_in_flow(
111+
self, profileid: ProfileID, twid: TimeWindow, flow
114112
):
115113
"""
116114
Ask the IP info module about saddr and daddr of this flow
@@ -129,7 +127,6 @@ def ask_modules_about_all_ips_in_flow(
129127
}
130128

131129
for ip_state, ip in cases.items():
132-
133130
if ip in self.our_ips:
134131
# dont ask p2p or other modules about your own ip
135132
continue
@@ -178,12 +175,16 @@ def add_ips(
178175
# flow, so if i'm the server i will gather info about the client and
179176
# vice versa
180177
target_ip = flow.daddr if role == Role.CLIENT else flow.client
178+
self._ask_modules_about_all_ips_in_flow(profileid, twid, flow)
181179

182-
self.ask_modules_about_all_ips_in_flow(profileid, twid, flow)
183-
self._add_scan_detection_info(profileid, twid, flow, role, target_ip)
184-
self.mark_profile_tw_as_modified(
185-
str(profileid), str(twid), flow.starttime
186-
)
180+
with self.r.pipeline() as pipe:
181+
pipe = self._add_scan_detection_info(
182+
profileid, twid, flow, role, target_ip, pipe
183+
)
184+
pipe = self.mark_profile_tw_as_modified(
185+
str(profileid), str(twid), flow.starttime, pipe=pipe
186+
)
187+
pipe.execute()
187188

188189
def _add_scan_detection_info(
189190
self,
@@ -192,7 +193,8 @@ def _add_scan_detection_info(
192193
flow,
193194
role: Role,
194195
target_ip: str,
195-
):
196+
pipe: Pipeline,
197+
) -> Pipeline:
196198
"""
197199
:param target_ip: the ip we are gathering info about, depends on the
198200
role of the profile in the flow, if the profile ip is the
@@ -202,7 +204,7 @@ def _add_scan_detection_info(
202204
if not self._are_scan_detection_modules_interested_in_this_ip(
203205
target_ip
204206
):
205-
return
207+
return pipe
206208

207209
# Get the state. Established, NotEstablished
208210
summary_state: str = self.get_final_state_from_flags(
@@ -222,7 +224,7 @@ def _add_scan_detection_info(
222224
f"{profileid}_{twid}"
223225
f":{str_proto}:not_estab:{target_ip}:dstports"
224226
)
225-
self.r.hincrby(key, flow.dport, int(flow.pkts))
227+
pipe.hincrby(key, flow.dport, int(flow.pkts))
226228

227229
if not self._was_flow_flipped(flow):
228230
# this hash is needed for horizontal portscans detections
@@ -232,7 +234,7 @@ def _add_scan_detection_info(
232234
f"{profileid}_{twid}:"
233235
f"{str_proto}:not_estab:{flow.daddr}:dstports"
234236
)
235-
self.r.hincrby(key, flow.dport, int(flow.pkts))
237+
pipe.hincrby(key, flow.dport, int(flow.pkts))
236238

237239
if self._is_info_needed_by_the_icmp_scan_detector_module(
238240
role, proto, state, flow.sport
@@ -241,7 +243,9 @@ def _add_scan_detection_info(
241243
# hash:
242244
# profile_tw:icmp:estab:sport:<port>:dstips <dstip> <flows_num>
243245
key = f"{profileid}_{twid}:icmp:est:sport:{flow.sport}:dstips"
244-
self.r.hincrby(key, flow.dstip, 1)
246+
pipe.hincrby(key, flow.dstip, 1)
247+
248+
return pipe
245249

246250
def _was_flow_flipped(self, flow) -> bool:
247251
"""
@@ -251,6 +255,7 @@ def _was_flow_flipped(self, flow) -> bool:
251255
10 webpages. To avoid this, we IGNORE all the flows that have
252256
in the history of flags (field history in zeek), the ^,
253257
that means that the flow was swapped/flipped.
258+
that means that the flow was swapped/flipped.
254259
since this func stores info that is only needed by the horizontal
255260
portscan module, we can safely ignore flipped flows.
256261
"""

slips_files/core/database/redis_db/profile_handler.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
import redis
1717
import validators
18+
from redis.client import Pipeline
1819

1920
from slips_files.core.structures.flow_attributes import Role
2021

@@ -1067,7 +1068,9 @@ def check_tw_to_close(self, close_all=False):
10671068
)
10681069
pipeline.execute()
10691070

1070-
def mark_profile_tw_as_modified(self, profileid, twid, timestamp):
1071+
def mark_profile_tw_as_modified(
1072+
self, profileid, twid, timestamp, pipe: Pipeline = None
1073+
):
10711074
"""
10721075
Mark a TW in a profile as modified
10731076
This means:
@@ -1079,8 +1082,10 @@ def mark_profile_tw_as_modified(self, profileid, twid, timestamp):
10791082
"""
10801083
timestamp = timestamp or time.time()
10811084
data = {f"{profileid}{self.separator}{twid}": float(timestamp)}
1082-
self.r.zadd(self.constants.MODIFIED_TIMEWINDOWS, data)
1085+
client = pipe if pipe else self.r
1086+
client.zadd(self.constants.MODIFIED_TIMEWINDOWS, data)
10831087
self.publish("tw_modified", f"{profileid}:{twid}")
1088+
return pipe
10841089

10851090
def publish_new_letter(
10861091
self, new_symbol: str, profileid: str, twid: str, tupleid: str, flow

0 commit comments

Comments
 (0)