Skip to content

Commit 4a4c3ac

Browse files
authored
Merge pull request #1584 from stratosphereips/alya/fix-latency/remove_cpu_intensive_dead_code
Remove cpu intensive dead code
2 parents 586c90e + fac29da commit 4a4c3ac

File tree

5 files changed

+49
-97
lines changed

5 files changed

+49
-97
lines changed

modules/flowalerts/flowalerts.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ async def main(self):
9797
loop = asyncio.get_event_loop()
9898
task = loop.create_task(analyzer.analyze(msg))
9999
# because Async Tasks swallow exceptions.
100-
task.add_done_callback(self.handle_exception)
100+
task.add_done_callback(self.handle_task_exception)
101101
# to wait for these functions before flowalerts shuts down
102102
self.tasks.append(task)
103103
# Allow the event loop to run the scheduled task

modules/ip_info/ip_info.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
# SPDX-FileCopyrightText: 2021 Sebastian Garcia <sebastian.garcia@agents.fel.cvut.cz>
22
# SPDX-License-Identifier: GPL-2.0-only
3-
from asyncio import Task
43
from typing import (
54
Union,
65
Optional,
@@ -96,7 +95,7 @@ async def open_dbs(self):
9695
"https://dev.maxmind.com/geoip/geolite2-free-geolocation-data?lang=en. "
9796
"Please note it must be the MaxMind DB version."
9897
)
99-
self.reading_mac_db_task: Task = self.create_task(self.read_mac_db)
98+
self.create_task(self.read_mac_db)
10099

101100
async def read_mac_db(self):
102101
"""
@@ -325,7 +324,6 @@ async def shutdown_gracefully(self):
325324
self.country_db.close()
326325
if hasattr(self, "mac_db"):
327326
self.mac_db.close()
328-
await self.reading_mac_db_task
329327

330328
# GW
331329
@staticmethod

modules/update_manager/update_manager.py

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1653,22 +1653,14 @@ def delete_unused_cached_remote_feeds(self):
16531653
)
16541654
self.loaded_ti_files -= 1
16551655

1656-
def handle_exception(self, task):
1657-
"""
1658-
in asyncmodules we use Async.Task to run some of the functions
1659-
If an exception occurs in a coroutine that was wrapped in a Task
1660-
(e.g., asyncio.create_task), the exception does not crash the program
1661-
but remains in the task.
1662-
This function is used to handle the exception in the task
1663-
"""
1656+
def handle_task_exception(self, task):
16641657
try:
1665-
# Access task result to raise the exception if it occurred
1666-
task.result()
1667-
except asyncio.exceptions.CancelledError:
1668-
# like pressing ctrl+c
1669-
return
1670-
except Exception as e:
1671-
self.print(e, 0, 1)
1658+
exception = task.exception()
1659+
except asyncio.CancelledError:
1660+
return # Task was cancelled, not an error
1661+
if exception:
1662+
self.print(f"Unhandled exception in task: {exception}")
1663+
self.print_traceback()
16721664

16731665
async def update(self) -> bool:
16741666
"""
@@ -1725,7 +1717,7 @@ async def update(self) -> bool:
17251717
task = asyncio.create_task(
17261718
self.update_ti_file(file_to_download)
17271719
)
1728-
task.add_done_callback(self.handle_exception)
1720+
task.add_done_callback(self.handle_task_exception)
17291721
#######################################################
17301722
# in case of riskiq files, we don't have a link for them in ti_files, We update these files using their API
17311723
# check if we have a username and api key and a week has passed since we last updated
@@ -1753,7 +1745,7 @@ async def update_ti_files(self):
17531745
# create_task is used to run update() function
17541746
# concurrently instead of serially
17551747
self.update_finished: Task = asyncio.create_task(self.update())
1756-
self.update_finished.add_done_callback(self.handle_exception)
1748+
self.update_finished.add_done_callback(self.handle_task_exception)
17571749

17581750
await self.update_finished
17591751
self.print(

slips_files/common/abstracts/iasync_module.py

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def create_task(self, func, *args) -> Task:
3030
exceptions. because asyncio Tasks do not raise exceptions
3131
"""
3232
task = asyncio.create_task(func(*args))
33-
task.add_done_callback(self.handle_exception)
33+
task.add_done_callback(self.handle_task_exception)
3434

3535
# Allow the event loop to run the scheduled task
3636
# await asyncio.sleep(0)
@@ -39,20 +39,13 @@ def create_task(self, func, *args) -> Task:
3939
self.tasks.append(task)
4040
return task
4141

42-
def handle_exception(self, task):
43-
"""
44-
in asyncmodules we use Async.Task to run some of the functions
45-
If an exception occurs in a coroutine that was wrapped in a Task
46-
(e.g., asyncio.create_task), the exception does not crash the program
47-
but remains in the task.
48-
This function is used to handle the exception in the task
49-
"""
42+
def handle_task_exception(self, task: asyncio.Task):
5043
try:
51-
# Access task result to raise the exception if it occurred
52-
task.result()
53-
except (KeyboardInterrupt, asyncio.exceptions.CancelledError):
54-
pass
55-
except Exception:
44+
exception = task.exception()
45+
except asyncio.CancelledError:
46+
return # Task was cancelled, not an error
47+
if exception:
48+
self.print(f"Unhandled exception in task: {exception}")
5649
self.print_traceback()
5750

5851
async def main(self): ...
@@ -74,28 +67,45 @@ def run_async_function(self, func: Callable):
7467
Returns the Future’s result or raise its exception.
7568
"""
7669
loop = asyncio.get_event_loop()
77-
loop.set_exception_handler(self.handle_exception)
70+
loop.set_exception_handler(self.handle_loop_exception)
7871
return loop.run_until_complete(func())
7972

73+
def handle_loop_exception(self, loop, context):
74+
"""A common loop exception handler"""
75+
exception = context.get("exception")
76+
future = context.get("future")
77+
78+
if future:
79+
try:
80+
future.result()
81+
except Exception:
82+
self.print_traceback()
83+
elif exception:
84+
self.print(f"Unhandled loop exception: {exception}")
85+
else:
86+
self.print(f"Unhandled loop error: {context.get('message')}")
87+
8088
def run(self):
81-
loop = asyncio.new_event_loop()
82-
asyncio.set_event_loop(loop)
89+
asyncio.run(self._run_pre_main_and_main())
90+
91+
async def _run_pre_main_and_main(self):
92+
"""
93+
runs pre_main() once, then runs main() in a loop
94+
"""
95+
loop = asyncio.get_event_loop()
96+
loop.set_exception_handler(self.handle_loop_exception)
8397

8498
try:
8599
error: bool = self.pre_main()
86100
if error or self.should_stop():
87-
self.run_async_function(
88-
self.gather_tasks_and_shutdown_gracefully
89-
)
101+
await self.gather_tasks_and_shutdown_gracefully()
90102
return
91103
except KeyboardInterrupt:
92-
self.run_async_function(self.gather_tasks_and_shutdown_gracefully)
104+
await self.gather_tasks_and_shutdown_gracefully()
93105
return
94106
except RuntimeError as e:
95107
if "Event loop stopped before Future completed" in str(e):
96-
self.run_async_function(
97-
self.gather_tasks_and_shutdown_gracefully
98-
)
108+
await self.gather_tasks_and_shutdown_gracefully()
99109
return
100110
except Exception:
101111
self.print_traceback()
@@ -104,18 +114,14 @@ def run(self):
104114
while True:
105115
try:
106116
if self.should_stop():
107-
self.run_async_function(
108-
self.gather_tasks_and_shutdown_gracefully
109-
)
117+
await self.gather_tasks_and_shutdown_gracefully()
110118
return
111119

112120
# if a module's main() returns 1, it means there's an
113121
# error and it needs to stop immediately
114-
error: bool = self.run_async_function(self.main)
122+
error: bool | None = await self.main()
115123
if error:
116-
self.run_async_function(
117-
self.gather_tasks_and_shutdown_gracefully
118-
)
124+
await self.gather_tasks_and_shutdown_gracefully()
119125
return
120126

121127
except KeyboardInterrupt:
@@ -128,9 +134,7 @@ def run(self):
128134
continue
129135
except RuntimeError as e:
130136
if "Event loop stopped before Future completed" in str(e):
131-
self.run_async_function(
132-
self.gather_tasks_and_shutdown_gracefully
133-
)
137+
await self.gather_tasks_and_shutdown_gracefully()
134138
return
135139
except Exception:
136140
self.print_traceback()

slips_files/core/database/redis_db/alert_handler.py

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -438,45 +438,6 @@ def update_max_threat_level(
438438

439439
return old_max_threat_level_float
440440

441-
def update_past_threat_levels(self, profileid, threat_level, confidence):
442-
"""
443-
updates the past_threat_levels key of the given profileid
444-
if the past threat level and confidence
445-
are the same as the ones we wanna store, we replace the timestamp only
446-
"""
447-
now = utils.convert_ts_format(time.time(), utils.alerts_format)
448-
confidence = f"confidence: {confidence}"
449-
# this is what we'll be storing in the db, tl, ts, and confidence
450-
threat_level_data = (threat_level, now, confidence)
451-
452-
past_threat_levels: str = self.r.hget(profileid, "past_threat_levels")
453-
if past_threat_levels:
454-
# get the list of ts and past threat levels
455-
past_threat_levels: List[Tuple] = json.loads(past_threat_levels)
456-
457-
latest: Tuple = past_threat_levels[-1]
458-
latest_threat_level: str = latest[0]
459-
latest_confidence: str = latest[2]
460-
461-
if (
462-
latest_threat_level == threat_level
463-
and latest_confidence == confidence
464-
):
465-
# if the past threat level and confidence
466-
# are the same as the ones we wanna store,
467-
# replace the timestamp only
468-
past_threat_levels[-1] = threat_level_data
469-
# dont change the old max tl
470-
else:
471-
# add this threat level to the list of past threat levels
472-
past_threat_levels.append(threat_level_data)
473-
else:
474-
# first time setting a threat level for this profile
475-
past_threat_levels = [threat_level_data]
476-
477-
past_threat_levels = json.dumps(past_threat_levels)
478-
self.r.hset(profileid, "past_threat_levels", past_threat_levels)
479-
480441
def update_ips_info(self, profileid, max_threat_lvl, confidence):
481442
"""
482443
sets the score and confidence of the given ip in the db
@@ -507,11 +468,8 @@ def update_threat_level(
507468
Do not call this function directy from the db, always call it user
508469
dbmanager.update_threat_level() to update the trustdb too:D
509470
"""
510-
511471
self.r.hset(profileid, "threat_level", threat_level)
512472

513-
self.update_past_threat_levels(profileid, threat_level, confidence)
514-
515473
max_threat_lvl: float = self.update_max_threat_level(
516474
profileid, threat_level
517475
)

0 commit comments

Comments
 (0)