Skip to content

Commit 16c0d70

Browse files
committed
profiler: shutdown queue gracefully
1 parent 92a8bf9 commit 16c0d70

File tree

2 files changed

+17
-12
lines changed

2 files changed

+17
-12
lines changed

slips_files/common/abstracts/core.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ def run(self):
3939
self.shutdown_gracefully()
4040

4141
except KeyboardInterrupt:
42+
self.keyboard_int_ctr += 1
43+
if self.keyboard_int_ctr >= 2:
44+
return
4245
self.shutdown_gracefully()
46+
4347
except Exception:
4448
self.print(f"Problem in {self.name}", 0, 1)
4549
self.print(traceback.format_exc(), 0, 1)

slips_files/core/profiler.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def init(
8181
# the input process signals the rest of the modules to stop
8282
self.done_processing: multiprocessing.Semaphore = is_profiler_done
8383
# every line put in this queue should be profiled
84-
self.profiler_queue = profiler_queue
84+
self.profiler_queue: multiprocessing.Queue = profiler_queue
8585
self.timeformat = None
8686
self.input_type = False
8787
self.rec_lines = 0
@@ -230,10 +230,10 @@ def get_gateway_info(self, flow):
230230
):
231231
self.gw_mac: str = flow.dmac
232232
self.db.set_default_gateway("MAC", self.gw_mac)
233-
self.print(
234-
f"MAC address of the gateway detected: "
235-
f"{green(self.gw_mac)}"
236-
)
233+
# self.print(
234+
# f"MAC address of the gateway detected: "
235+
# f"{green(self.gw_mac)}"
236+
# )
237237
gw_mac_found = True
238238

239239
# we need the mac to be set to be able to find the ip using it
@@ -664,6 +664,12 @@ def should_stop(self):
664664
return False
665665

666666
def shutdown_gracefully(self):
667+
# signal the threads to stop
668+
self.stop_profiler_threads.set()
669+
self.flows_to_process_q.close()
670+
self.profiler_queue.close()
671+
# wait for them to finish
672+
self.join_profiler_threads()
667673
self.db.set_new_incoming_flows(False)
668674
self.print(
669675
f"Stopping. Total lines read: {self.rec_lines}",
@@ -701,13 +707,8 @@ def main(self):
701707
# without it, there's no way this module will know it's
702708
# time to stop and no new flows are coming
703709
if self.is_stop_msg(msg):
704-
# DO NOT return/exit this module before all profilers are
705-
# done. if you do, the profiler threads will shutdown
706-
# before reading all flows as soon as we receive the stop msg.
707-
# signal the threads to stop
708-
self.stop_profiler_threads.set()
709-
# wait for them to finish
710-
self.join_profiler_threads()
710+
# shutdown gracefully will be called by icore once this
711+
# function returns
711712
return 1
712713

713714
self.pending_flows_queue_lock.acquire()

0 commit comments

Comments
 (0)