Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion nettacker/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)
from nettacker.core.messages import messages as _
from nettacker.core.module import Module
from nettacker.core.queue_manager import initialize_thread_pool, shutdown_thread_pool
from nettacker.core.socks_proxy import set_socks_proxy
from nettacker.core.utils import common as common_utils
from nettacker.core.utils.common import wait_for_threads_to_finish
Expand Down Expand Up @@ -245,6 +246,12 @@ def start_scan(self, scan_id):
target_groups.remove([])

log.info(_("start_multi_process").format(len(self.arguments.targets), len(target_groups)))

# Initialize the enhanced thread pool for cross-process sharing
num_processes = len(target_groups)
max_workers_per_process = getattr(self.arguments, "parallel_module_scan", None)
initialize_thread_pool(num_processes, max_workers_per_process)

active_processes = []
for t_id, target_groups in enumerate(target_groups):
process = multiprocess.Process(
Expand All @@ -253,7 +260,12 @@ def start_scan(self, scan_id):
process.start()
active_processes.append(process)

return wait_for_threads_to_finish(active_processes, sub_process=True)
result = wait_for_threads_to_finish(active_processes, sub_process=True)

# Shutdown the thread pool after scanning is complete
shutdown_thread_pool()

return result

def scan_target(
self,
Expand Down
73 changes: 68 additions & 5 deletions nettacker/core/lib/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from nettacker.config import Config
from nettacker.core.messages import messages as _
from nettacker.core.queue_manager import dependency_resolver
from nettacker.core.utils.common import merge_logs_to_list, remove_sensitive_header_keys
from nettacker.database.db import find_temp_events, submit_temp_logs_to_db, submit_logs_to_db
from nettacker.logger import get_logger, TerminalCodes
Expand Down Expand Up @@ -47,14 +48,40 @@ def filter_large_content(self, content, filter_rate=150):
return content

def get_dependent_results_from_database(self, target, module_name, scan_id, event_names):
"""
Efficiently get dependency results without busy-waiting.
Uses event-driven approach to avoid CPU consumption.
"""
# Try to get results efficiently using the new dependency resolver
results = dependency_resolver.get_dependency_results_efficiently(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try and quantify as well how effective using this is? I understand theoretically, polling every 100ms is not the best approach but this function is only called under the condition that dependent_on_temp_event is present in the response (that is for a few vuln and scan modules), for HTTP implementation.

So I am assuming that this polling doesn't really have to run for long periods. Given that, if you can justify adding this complexity, it would be cool.

target, module_name, scan_id, event_names, {}, self, ()
)

if results is not None:
return results

# Fallback to original implementation for backward compatibility
# but with increased sleep time to reduce CPU usage
events = []
for event_name in event_names.split(","):
while True:
retry_count = 0
max_retries = 300 # 30 seconds with 0.1s sleep

while retry_count < max_retries:
event = find_temp_events(target, module_name, scan_id, event_name)
if event:
events.append(json.loads(event.event)["response"]["conditions_results"])
break
time.sleep(0.1)

retry_count += 1
# Exponential backoff to reduce CPU usage
sleep_time = min(0.1 * (1.5 ** (retry_count // 10)), 1.0)
time.sleep(sleep_time)
else:
# Timeout reached
log.warn(f"Timeout waiting for dependency: {event_name} for {target}")
events.append(None)

return events

def find_and_replace_dependent_values(self, sub_step, dependent_on_temp_event):
Expand Down Expand Up @@ -123,18 +150,26 @@ def process_conditions(
# Remove sensitive keys from headers before submitting to DB
event = remove_sensitive_header_keys(event)
if "save_to_temp_events_only" in event.get("response", ""):
event_name = event["response"]["save_to_temp_events_only"]

# Submit to database
submit_temp_logs_to_db(
{
"date": datetime.now(),
"target": target,
"module_name": module_name,
"scan_id": scan_id,
"event_name": event["response"]["save_to_temp_events_only"],
"event_name": event_name,
"port": event.get("ports", ""),
"event": event,
"data": response,
}
)

# Notify dependency resolver that a dependency is now available
dependency_resolver.notify_dependency_available(
target, module_name, scan_id, event_name, response
)
if event["response"]["conditions_results"] and "save_to_temp_events_only" not in event.get(
"response", ""
):
Expand Down Expand Up @@ -279,9 +314,37 @@ def run(
sub_step[attr_name.rstrip("s")] = int(value) if attr_name == "ports" else value

if "dependent_on_temp_event" in backup_response:
temp_event = self.get_dependent_results_from_database(
target, module_name, scan_id, backup_response["dependent_on_temp_event"]
# Try to get dependency results efficiently
temp_event = dependency_resolver.get_dependency_results_efficiently(
target,
module_name,
scan_id,
backup_response["dependent_on_temp_event"],
sub_step,
self,
(
sub_step,
module_name,
target,
scan_id,
options,
process_number,
module_thread_number,
total_module_thread_number,
request_number_counter,
total_number_of_requests,
),
)

# If dependencies are not available yet, the task is queued
# Return early to avoid blocking the thread
if temp_event is None:
log.verbose_event_info(
f"Task queued waiting for dependencies: {target} -> {module_name}"
)
return False

# Dependencies are available, continue with execution
sub_step = self.replace_dependent_values(sub_step, temp_event)

action = getattr(self.library(), backup_method)
Expand Down
118 changes: 88 additions & 30 deletions nettacker/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from nettacker import logger
from nettacker.config import Config
from nettacker.core import queue_manager
from nettacker.core.messages import messages as _
from nettacker.core.template import TemplateLoader
from nettacker.core.utils.common import expand_module_steps, wait_for_threads_to_finish
Expand Down Expand Up @@ -118,29 +119,48 @@ def generate_loops(self):
self.module_content["payloads"] = expand_module_steps(self.module_content["payloads"])

def sort_loops(self):
steps = []
"""
Sort loops to optimize dependency resolution:
1. Independent steps first
2. Steps that generate dependencies (save_to_temp_events_only)
3. Steps that consume dependencies (dependent_on_temp_event)
"""
for index in range(len(self.module_content["payloads"])):
for step in copy.deepcopy(self.module_content["payloads"][index]["steps"]):
if "dependent_on_temp_event" not in step[0]["response"]:
steps.append(step)
independent_steps = []
dependency_generators = []
dependency_consumers = []

for step in copy.deepcopy(self.module_content["payloads"][index]["steps"]):
if (
"dependent_on_temp_event" in step[0]["response"]
and "save_to_temp_events_only" in step[0]["response"]
):
steps.append(step)
step_response = step[0]["response"] if step and len(step) > 0 else {}

for step in copy.deepcopy(self.module_content["payloads"][index]["steps"]):
if (
"dependent_on_temp_event" in step[0]["response"]
and "save_to_temp_events_only" not in step[0]["response"]
):
steps.append(step)
self.module_content["payloads"][index]["steps"] = steps
has_dependency = "dependent_on_temp_event" in step_response
generates_dependency = "save_to_temp_events_only" in step_response

if not has_dependency and not generates_dependency:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is equally hard to read, maybe something like this would be better?

no_dep = []
dep_temp_only = []
dep_normal = []

for step in copy.deepcopy(self.module_content["payloads"][index]["steps"])::
    resp = step[0]["response"]
    if "dependent_on_temp_event" not in resp:
        no_dep.append(step)
    elif "save_to_temp_events_only" in resp:
        dep_temp_only.append(step)
    else:
        dep_normal.append(step)

payload["steps"] = no_dep + dep_temp_only + dep_normal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change would benefit apart from this PR as well. Maybe you can make a new one with this, because the current one is O(n) but running it thrice with three deepcopies is bad.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay will fix it, is it okay if we add this changes in new commit in this pr ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make a separate one with this because it will probably be a while before this one is tested nicely, and this change is easier to verify and test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done @pUrGe12, pls review this PR

independent_steps.append(step)
elif generates_dependency and not has_dependency:
dependency_generators.append(step)
elif generates_dependency and has_dependency:
dependency_generators.append(step) # Generator first
elif has_dependency and not generates_dependency:
dependency_consumers.append(step)
else:
independent_steps.append(step) # Fallback

# Combine in optimal order
sorted_steps = independent_steps + dependency_generators + dependency_consumers
self.module_content["payloads"][index]["steps"] = sorted_steps

log.verbose_info(
f"Sorted {len(sorted_steps)} steps: "
f"{len(independent_steps)} independent, "
f"{len(dependency_generators)} generators, "
f"{len(dependency_consumers)} consumers"
)

def start(self):
active_threads = []
used_shared_pool = False

# counting total number of requests
total_number_of_requests = 0
Expand All @@ -158,11 +178,16 @@ def start(self):
importlib.import_module(f"nettacker.core.lib.{library.lower()}"),
f"{library.capitalize()}Engine",
)()

for step in payload["steps"]:
for sub_step in step:
thread = Thread(
target=engine.run,
args=(
# Try to use shared thread pool if available, otherwise use local threads
if queue_manager.thread_pool and hasattr(
queue_manager.thread_pool, "submit_task"
):
# Submit to shared thread pool
queue_manager.thread_pool.submit_task(
engine.run,
sub_step,
self.module_name,
self.target,
Expand All @@ -173,9 +198,36 @@ def start(self):
self.total_module_thread_number,
request_number_counter,
total_number_of_requests,
),
)
thread.name = f"{self.target} -> {self.module_name} -> {sub_step}"
)
used_shared_pool = True
else:
# Use local thread (fallback to original behavior)
thread = Thread(
target=engine.run,
args=(
sub_step,
self.module_name,
self.target,
self.scan_id,
self.module_inputs,
self.process_number,
self.module_thread_number,
self.total_module_thread_number,
request_number_counter,
total_number_of_requests,
),
)
thread.name = f"{self.target} -> {self.module_name} -> {sub_step}"
thread.start()
active_threads.append(thread)

# Manage local thread pool size
wait_for_threads_to_finish(
active_threads,
maximum=self.module_inputs["thread_per_host"],
terminable=True,
)

request_number_counter += 1
log.verbose_event_info(
_("sending_module_request").format(
Expand All @@ -188,13 +240,19 @@ def start(self):
total_number_of_requests,
)
)
thread.start()
time.sleep(self.module_inputs["time_sleep_between_requests"])
active_threads.append(thread)
wait_for_threads_to_finish(
active_threads,
maximum=self.module_inputs["thread_per_host"],
terminable=True,
)

wait_for_threads_to_finish(active_threads, maximum=None, terminable=True)
# Wait for completion based on execution path
if used_shared_pool:
# Wait for shared thread pool tasks to complete
if queue_manager.thread_pool and hasattr(
queue_manager.thread_pool, "wait_for_completion"
):
# Wait with a reasonable timeout to prevent hanging
completed = queue_manager.thread_pool.wait_for_completion(timeout=300) # 5 minutes
if not completed:
log.warn(f"Module {self.module_name} tasks did not complete within timeout")

# Wait for any remaining local threads to finish
if active_threads:
wait_for_threads_to_finish(active_threads, maximum=None, terminable=True)
Comment on lines +245 to +258
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Waits for global pool completion, not this module’s submissions

wait_for_completion() tracks global submitted/completed counts. This can block on unrelated work or time out prematurely. Prefer per-batch handles: have submit_* return a ticket and add wait_until(ticket, timeout) that waits until completed >= ticket, or return per-task futures/events and wait on those here.

Loading