Skip to content

Commit 90224ab

Browse files
committed
when a tw is closed, delete all info from the db starting from this tw-2, (always keep track of 2 tws only)
1 parent 84f94e8 commit 90224ab

File tree

2 files changed

+50
-18
lines changed

2 files changed

+50
-18
lines changed

slips_files/core/database/redis_db/profile_handler.py

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,17 +1064,59 @@ def check_tw_to_close(self, close_all=False):
10641064
if not profiles_tws_to_close:
10651065
return
10661066

1067-
# Mark the TWs as closed so module can work on its data
1068-
pipeline = self.r.pipeline()
1067+
# Mark the TWs as closed so modules can work on its data
1068+
pipe = self.r.pipeline()
10691069
for profile_tw_to_close in profiles_tws_to_close:
1070-
pipeline.sadd("ClosedTW", profile_tw_to_close)
1071-
pipeline.zrem(
1072-
self.constants.MODIFIED_TIMEWINDOWS, profile_tw_to_close
1070+
pipe.zrem(self.constants.MODIFIED_TIMEWINDOWS, profile_tw_to_close)
1071+
pipe = self.publish(
1072+
"tw_closed", profile_tw_to_close, pipeline=pipe
10731073
)
1074-
pipeline = self.publish(
1075-
"tw_closed", profile_tw_to_close, pipeline=pipeline
1074+
pipe = self._delete_past_timewindows(profile_tw_to_close, pipe)
1075+
pipe.execute()
1076+
1077+
def _delete_past_timewindows(self, closed_profile_tw: str, pipe):
1078+
"""
1079+
Deletes the past timewindows data from redis, starting from the
1080+
given tw-1, so that redis only has info about the current
1081+
timewindow and the one before it
1082+
1083+
why do we keep 2 tws instead of the current one in redis? see PR
1084+
#1765 in slips repo
1085+
1086+
:param closed_profile_tw: a str like profile_8.8.8.8_timewindow7
1087+
"""
1088+
# todo handle flows that have a ts in the very future that cause
1089+
# this func to del all tws!!
1090+
try:
1091+
profile, ip, tw = closed_profile_tw.split("_")
1092+
closed_tw = int(tw.replace("timewindow", ""))
1093+
except ValueError:
1094+
self.print(
1095+
f"Unable to delete old Timewindwos info from"
1096+
f" {closed_profile_tw}"
10761097
)
1077-
pipeline.execute()
1098+
return pipe
1099+
1100+
if closed_tw < 2:
1101+
# slips needs to always remember 2 tws, now tws to delete now
1102+
return pipe
1103+
1104+
profileid = f"{profile}_{ip}"
1105+
# to avoid deleting so many keys at once which causes mem spikes
1106+
BATCH = 500
1107+
for tw_to_close in range(closed_tw - 2, -1, -1):
1108+
for i, key in enumerate(
1109+
self.r.scan_iter(
1110+
match=f"{profileid}_timewindow{tw_to_close}", count=1000
1111+
)
1112+
):
1113+
pipe.unlink(key)
1114+
1115+
if i % BATCH == 0:
1116+
pipe.execute()
1117+
pipe = self.r.pipeline()
1118+
1119+
return pipe
10781120

10791121
def mark_profile_tw_as_modified(
10801122
self, profileid, twid, timestamp, pipe: Pipeline = None

slips_files/core/database/redis_db/scan_detections_db.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,10 @@ def _update_portscan_index_hash(
125125
# if no first seen ts is set, then this flow is the first seen
126126
key = f"{base}:first_seen"
127127
pipe.zadd(key, {ip: flow.starttime}, nx=True)
128-
pipe.expire(key, self.tw_width, nx=True)
129128

130129
key = f"{base}:last_seen"
131130
# last seen is now. this flow.
132131
pipe.zadd(key, {ip: flow.starttime})
133-
pipe.expire(key, self.tw_width, nx=True)
134132

135133
return pipe
136134

@@ -397,7 +395,6 @@ def _store_vertical_portscan_info(
397395
f":{str_proto}:not_estab:{target_ip}:dstports"
398396
)
399397
pipe.hincrby(key, flow.dport, int(flow.pkts))
400-
pipe.expire(key, self.tw_width, nx=True)
401398
# increment the total pkts sent to this target ip on this
402399
# proto so slips can retreieve it in O(1) when setting and
403400
# evidence
@@ -407,7 +404,6 @@ def _store_vertical_portscan_info(
407404
f"{target_ip}:dstports:tot_pkts_sum"
408405
)
409406
pipe.incrby(key, int(flow.spkts))
410-
pipe.expire(key, self.tw_width, nx=True)
411407

412408
# we keep an index hash of target_ips to be able to access the
413409
# diff variants of the key above using them
@@ -430,7 +426,6 @@ def _store_horizontal_portscan_info(
430426
f"{str_proto}:not_estab:dstports:total_packets"
431427
)
432428
pipe.hincrby(key, flow.dport, int(flow.pkts))
433-
pipe.expire(key, self.tw_width, nx=True)
434429

435430
# ZSET
436431
# profile_tw:[tcp|udp]:not_estab:dport:
@@ -445,7 +440,6 @@ def _store_horizontal_portscan_info(
445440
# To make sure the stored ts is the first seen ts of this
446441
# daddr, we use nx=True, so if a daddr is present we dont zadd
447442
pipe.zadd(key, {flow.daddr: flow.starttime}, nx=True)
448-
pipe.expire(key, self.tw_width, nx=True)
449443

450444
return pipe
451445

@@ -458,21 +452,17 @@ def _store_conn_to_multiple_ports_info(
458452
if role == role.CLIENT:
459453
key = f"{profileid}_{twid}:tcp:est:dstips"
460454
pipe.zadd(key, {flow.daddr: flow.starttime}, nx=True)
461-
pipe.expire(key, self.tw_width, nx=True)
462455

463456
key = f"{profileid}_{twid}:tcp:est:{flow.daddr}:dstports"
464457
pipe.hset(key, flow.dport, flow.uid)
465-
pipe.expire(key, self.tw_width, nx=True)
466458

467459
elif role == role.SERVER:
468460
client_profileid = ProfileID(ip=flow.saddr)
469461
key = f"{client_profileid}_{twid}:tcp:est:dstips"
470462
pipe.zadd(key, {flow.saddr: flow.starttime}, nx=True)
471-
pipe.expire(key, self.tw_width, nx=True)
472463

473464
key = f"{client_profileid}_{twid}:tcp:est:{flow.saddr}:dstports"
474465
pipe.hset(key, flow.dport, flow.uid)
475-
pipe.expire(key, self.tw_width, nx=True)
476466
return pipe
477467

478468
def _store_flow_info_if_needed_by_detection_modules(

0 commit comments

Comments
 (0)