Skip to content

Commit 48a09e0

Browse files
authored
Merge pull request #1438 from stratosphereips/alya/fix_slips_not_shutting_down_gracefully
Fix slips not shutting down gracefully
2 parents f56f235 + 3604e5b commit 48a09e0

File tree

22 files changed

+115
-61
lines changed

22 files changed

+115
-61
lines changed

docs/detection_modules.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,16 @@ You can add your own YARA rule in ```modules/leak_detector/yara_rules/rules``` a
255255

256256
## Blocking Module
257257

258-
To enable blocking in slips, start slips with the ```-p``` flag.
258+
Blocking in Slips is done for any IP that results in an alert. If an IP is detected as malicious and is blocked,
259+
it stays blocked forever, unless it is unblocked manually.
259260

260-
This feature is only supported in linux using iptables.
261+
The feature of unblocking IPs after a while is not supported yet.
262+
263+
The blocking is done using iptables, and the blocked IPs are stored in the database for future reference.
264+
265+
Blocking is disabled by default. To enable blocking in slips, start slips with the ```-p``` flag.
266+
267+
This feature is only supported in linux using iptables when running on an interface.
261268

262269
## Exporting Alerts Module
263270

@@ -850,7 +857,7 @@ abuse.ch -> Used by urlhaus for getting info about contacted domains and downloa
850857
If you want to contribute: improve existing Slips detection modules or implement your own detection modules, see section :doc:`Contributing <contributing>`.
851858

852859

853-
## Zeek Scripts
860+
# Zeek Scripts
854861

855862
Slips is shipped with it's own custom zeek scripts to be able to extend zeek functionality and
856863
customize the detections

docs/exporting.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Exporting
1+
# Exporting Slips Alerts
22

33
Slips supports exporting alerts to other systems using different modules (ExportingAlerts, CESNET sharing etc.)
44

docs/features.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -772,11 +772,11 @@ the update manager if they were changed on disk.
772772

773773
### IP Info Module
774774

775-
The IP info module has several ways of getting information about an IP address, it includes:
775+
The IP info module has several ways of getting information about IP and MAC address, it includes:
776776

777777
- ASN
778778
- Country by Geolocation
779-
- Given a MAC, its Vendor
779+
- MAC Vendors
780780
- Reverse DNS
781781

782782
#### ASN

docs/flowalerts.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Flow Alerts
1+
# Flow Alerts Module
22

33
The module of flow alerts has several behavioral techniques to detect attacks by analyzing the content of each flow alone.
44

managers/process_manager.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ def __init__(self, main):
5454
# to pass flows to the profiler
5555
self.profiler_queue = Queue()
5656
self.termination_event: Event = Event()
57+
# to make sure we only warn the user once about
58+
# the pending modules
59+
self.warning_printed_once = False
5760
# this one has its own termination event because we want it to
5861
# shutdown at the very end of all other slips modules.
5962
self.evidence_handler_termination_event: Event = Event()
@@ -468,17 +471,18 @@ def get_hitlist_in_order(self) -> Tuple[List[Process], List[Process]]:
468471
pid for pid in pids_to_kill_last if pid is not None
469472
]
470473

474+
# now get the process obj of each pid
471475
to_kill_first: List[Process] = []
472476
to_kill_last: List[Process] = []
473477
for process in self.processes:
474478
if process.pid in pids_to_kill_last:
475479
to_kill_last.append(process)
476-
else:
480+
elif isinstance(process, multiprocessing.context.ForkProcess):
477481
# skips the context manager of output.py, will close
478482
# it manually later
479483
# once all processes are closed
480-
if isinstance(process, multiprocessing.context.ForkProcess):
481-
continue
484+
continue
485+
else:
482486
to_kill_first.append(process)
483487

484488
return to_kill_first, to_kill_last
@@ -677,12 +681,14 @@ def shutdown_gracefully(self):
677681
print("\n" + "-" * 27)
678682
print("Stopping Slips")
679683

680-
# by default, 15 mins from this time, all modules should be killed
684+
# by default, max 15 mins (taken from wait_for_modules_to_finish)
685+
# from this time, all modules should be killed
681686
method_start_time = time.time()
682687

683688
# how long to wait for modules to finish in minutes
684689
timeout: float = self.main.conf.wait_for_modules_to_finish()
685-
timeout_seconds: float = timeout * 60
690+
# convert to seconds
691+
timeout *= 60
686692

687693
# close all tws
688694
self.main.db.check_tw_to_close(close_all=True)
@@ -701,20 +707,15 @@ def shutdown_gracefully(self):
701707
log_to_logfiles_only=True,
702708
)
703709

704-
hitlist: Tuple[List[Process], List[Process]]
705-
hitlist = self.get_hitlist_in_order()
706-
to_kill_first: List[Process] = hitlist[0]
707-
to_kill_last: List[Process] = hitlist[1]
710+
to_kill_first: List[Process]
711+
to_kill_last: List[Process]
712+
to_kill_first, to_kill_last = self.get_hitlist_in_order()
708713

709714
self.termination_event.set()
710715

711-
# to make sure we only warn the user once about
712-
# the pending modules
713-
self.warning_printed_once = False
714-
715716
try:
716717
# Wait timeout_seconds for all the processes to finish
717-
while time.time() - method_start_time < timeout_seconds:
718+
while time.time() - method_start_time < timeout:
718719
(
719720
to_kill_first,
720721
to_kill_last,
@@ -733,7 +734,7 @@ def shutdown_gracefully(self):
733734
reason = "User pressed ctr+c or Slips was killed by the OS"
734735
graceful_shutdown = False
735736

736-
if time.time() - method_start_time >= timeout_seconds:
737+
if time.time() - method_start_time >= timeout:
737738
# getting here means we're killing them bc of the timeout
738739
# not getting here means we're killing them bc of double
739740
# ctr+c OR they terminated successfully

modules/arp/arp.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ def init(self):
5555
# thats one hour in seconds
5656
self.period_before_deleting = 3600
5757
self.timer_thread_arp_scan = threading.Thread(
58-
target=self.wait_for_arp_scans, daemon=True
58+
target=self.wait_for_arp_scans,
59+
daemon=True,
60+
name="timer_thread_arp_scan",
5961
)
6062
self.pending_arp_scan_evidence = Queue()
6163
self.alerted_once_arp_scan = False
@@ -88,7 +90,7 @@ def wait_for_arp_scans(self):
8890
"""
8991
# this evidence is the one that triggered this thread
9092
scans_ctr = 0
91-
while True:
93+
while not self.should_stop():
9294
try:
9395
evidence: dict = self.pending_arp_scan_evidence.get(
9496
timeout=0.5
@@ -515,7 +517,7 @@ def clear_arp_logfile(self):
515517
def pre_main(self):
516518
"""runs once before the main() is executed in a loop"""
517519
utils.drop_root_privs()
518-
self.timer_thread_arp_scan.start()
520+
utils.start_thread(self.timer_thread_arp_scan, self.db)
519521

520522
def main(self):
521523
self.clear_arp_logfile()

modules/cesnet/cesnet.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,11 @@ def export_evidence(self, evidence: dict):
131131
# and don't stop this module until the thread is done
132132
q = queue.Queue()
133133
self.sender_thread = threading.Thread(
134-
target=self.wclient.sendEvents, args=[[evidence_in_idea], q]
134+
target=self.wclient.sendEvents,
135+
args=[[evidence_in_idea], q],
136+
name="cesnet_sender_thread",
135137
)
136-
self.sender_thread.start()
138+
utils.start_thread(self.sender_thread, self.db)
137139
self.sender_thread.join()
138140
result = q.get()
139141

modules/exporting_alerts/stix_exporter.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,16 @@ def init(self):
2828
# To avoid duplicates in STIX_data.json
2929
self.added_ips = set()
3030
self.export_to_taxii_thread = threading.Thread(
31-
target=self.schedule_sending_to_taxii_server, daemon=True
31+
target=self.schedule_sending_to_taxii_server,
32+
daemon=True,
33+
name="stix_exporter_to_taxii_thread",
3234
)
3335

3436
def start_exporting_thread(self):
3537
# This thread is responsible for waiting n seconds before
3638
# each push to the stix server
3739
# it starts the timer when the first alert happens
38-
self.export_to_taxii_thread.start()
40+
utils.start_thread(self.export_to_taxii_thread, self.db)
3941

4042
@property
4143
def name(self):

modules/flowalerts/dns.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import json
66
import math
77
import queue
8+
import time
89
from datetime import datetime
910
from typing import (
1011
List,
@@ -13,7 +14,7 @@
1314
)
1415
import validators
1516
from multiprocessing import Queue
16-
from threading import Thread
17+
from threading import Thread, Event
1718

1819
from slips_files.common.abstracts.flowalerts_analyzer import (
1920
IFlowalertsAnalyzer,
@@ -48,8 +49,9 @@ def init(self):
4849
self.dns_without_connection_timeout_checker_thread = Thread(
4950
target=self.check_dns_without_connection_timeout,
5051
daemon=True,
52+
name="dns_without_connection_timeout_checker_thread",
5153
)
52-
54+
self.stop_event = Event()
5355
# used to pass the msgs this analyzer reciecved, to the
5456
# dns_without_connection_timeout_checker_thread.
5557
# the reason why we can just use .get_msg() there is because once
@@ -222,7 +224,6 @@ def is_any_flow_answer_contacted(self, profileid, twid, flow) -> bool:
222224
if flow.answers == ["-"]:
223225
# If no IPs are in the answer, we can not expect
224226
# the computer to connect to anything
225-
# self.print(f'No ips in the answer, so ignoring')
226227
return True
227228

228229
contacted_ips = self.db.get_all_contacted_ips_in_profileid_twid(
@@ -233,7 +234,6 @@ def is_any_flow_answer_contacted(self, profileid, twid, flow) -> bool:
233234
# one of these ips should be present in the contacted ips
234235
# check each one of the resolutions of this domain
235236
for ip in self.extract_ips_from_dns_answers(flow.answers):
236-
# self.print(f'Checking if we have a connection to ip {ip}')
237237
if (
238238
ip in contacted_ips
239239
or self.is_connection_made_by_different_version(
@@ -342,10 +342,10 @@ def check_dns_without_connection_timeout(self):
342342
Does so by receiving every dns msg this analyzer receives. then we
343343
compare the ts of it to the ts of the flow we're waiting the 30
344344
mins for.
345-
once we know the diff between them is >=30 mins we check for the
345+
once we know that >=30 mins passed between them we check for the
346346
dns without connection evidence.
347347
The whole point is to give the connection 30 mins in zeek time to
348-
arrive before alerting "dns wihtout conn".
348+
arrive before setting "dns wihtout conn" evidence.
349349
350350
- To avoid having thousands of flows in memory for 30 mins. we check
351351
every 10 mins for the connections, if not found we put it back to
@@ -354,16 +354,21 @@ def check_dns_without_connection_timeout(self):
354354
This function runs in its own thread
355355
"""
356356
try:
357-
while not self.flowalerts.should_stop():
358-
if self.pending_dns_without_conn.empty():
359-
continue
357+
while (
358+
not self.flowalerts.should_stop()
359+
and not self.stop_event.is_set()
360+
):
361+
# if self.pending_dns_without_conn.empty():
362+
# time.sleep(1)
363+
# continue
360364

361365
# we just use it to know the zeek current ts to check if 30
362366
# mins zeek time passed or not. we are not going to
363367
# analyze it.
364368
reference_flow = self.get_dns_flow_from_queue()
365369
if not reference_flow:
366370
# ok wait for more dns flows to be read by slips
371+
time.sleep(1)
367372
continue
368373

369374
# back_to_queue will be used to store the flows we're
@@ -378,9 +383,12 @@ def check_dns_without_connection_timeout(self):
378383
for flow in back_to_queue:
379384
flow: Tuple[str, str, Any]
380385
self.pending_dns_without_conn.put(flow)
386+
381387
except KeyboardInterrupt:
382388
# the rest will be handled in shutdown_gracefully
383389
return
390+
except Exception:
391+
self.print_traceback()
384392

385393
async def check_dns_without_connection(
386394
self, profileid, twid, flow, waited_for_the_conn=False
@@ -686,7 +694,16 @@ def check_different_localnet_usage(
686694

687695
def shutdown_gracefully(self):
688696
self.check_dns_without_connection_of_all_pending_flows()
689-
self.dns_without_connection_timeout_checker_thread.join()
697+
self.stop_event.set()
698+
self.dns_without_connection_timeout_checker_thread.join(30)
699+
if self.dns_without_connection_timeout_checker_thread.is_alive():
700+
self.flowalerts.print(
701+
f"Problem shutting down "
702+
f"dns_without_connection_timeout_checker_thread."
703+
f"Flowalerts should_stop(): "
704+
f"{self.flowalerts.should_stop()}"
705+
)
706+
690707
# close the queue
691708
# without this, queues are left in memory and flowalerts keeps
692709
# waiting for them forever
@@ -703,7 +720,9 @@ def pre_analyze(self):
703720
flowalerts' pre_main"""
704721
# we didnt put this in __init__ because it uses self.flowalerts
705722
# attributes that are not initialized yet in __init__
706-
self.dns_without_connection_timeout_checker_thread.start()
723+
utils.start_thread(
724+
self.dns_without_connection_timeout_checker_thread, self.db
725+
)
707726

708727
async def analyze(self, msg):
709728
"""

modules/p2ptrust/p2ptrust.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,9 @@ def init(self, *args, **kwargs):
117117
self.print(f"Storing p2p.log in {self.pigeon_logfile}")
118118
# create the thread that rotates the p2p.log every 1d
119119
self.rotator_thread = threading.Thread(
120-
target=self.rotate_p2p_logfile, daemon=True
120+
target=self.rotate_p2p_logfile,
121+
daemon=True,
122+
name="p2p_rotator_thread",
121123
)
122124

123125
self.storage_name = "IPsInfo"
@@ -648,7 +650,7 @@ def pre_main(self):
648650
# create_p2p_logfile is taken from slips.yaml
649651
if self.create_p2p_logfile:
650652
# rotates p2p.log file every 1 day
651-
self.rotator_thread.start()
653+
utils.start_thread(self.rotator_thread, self.db)
652654

653655
# should call self.update_callback
654656
# self.c4 = self.db.subscribe(self.slips_update_channel)

0 commit comments

Comments
 (0)