Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8d5f302
File named hostcheck.py is added to check whether target is buggy or…
Aarush289 Oct 24, 2025
aa994c5
Update app.py
Aarush289 Oct 24, 2025
ed7c81e
Update hostcheck.py , removed unnecessary logs
Aarush289 Oct 24, 2025
ccd870e
Updated hostcheck.py to use allow_single_label and return the lower-c…
Aarush289 Oct 24, 2025
b5da801
Docstring added
Aarush289 Oct 24, 2025
c1a9201
app.py updated to remove noise in exception handling
Aarush289 Oct 24, 2025
b66a598
Multi-threading issue is resolved
Aarush289 Oct 24, 2025
949c911
chore: test signed commit (SSH)
Aarush289 Oct 24, 2025
50374b9
chore: test signed commit (SSH) #2
Aarush289 Oct 24, 2025
2389890
chore: test signed commit (SSH) #3
Aarush289 Oct 24, 2025
859b850
Merge branch 'master' into feature/my-change
Aarush289 Oct 25, 2025
a83c17f
Removed unnecessary print statements
Aarush289 Oct 25, 2025
d852609
Indentation done
Aarush289 Oct 25, 2025
4de4cca
trim dot before checking the length
Aarush289 Oct 25, 2025
c472e71
Update hostcheck.py
Aarush289 Oct 25, 2025
fb43add
Logging exceptions for better debugging
Aarush289 Oct 25, 2025
8102395
Hostcheck.py OS-independent; add validate_before_scan
Aarush289 Oct 27, 2025
cd4e5ab
Hostchecker is made OS independent and validate_before_scan is adde…
Aarush289 Oct 27, 2025
c92c7f3
Update hostcheck.py to make it OS independent
Aarush289 Oct 27, 2025
8752881
Indentation done
Aarush289 Oct 27, 2025
bd762cd
removed the duplicate key
Aarush289 Oct 27, 2025
c23507e
unused parameter removed
Aarush289 Oct 27, 2025
7de608e
"Fix import order (ruff E402), isort formatting; run pre-commit"
Aarush289 Oct 27, 2025
87b773c
Per-pass timeout added
Aarush289 Oct 27, 2025
0ac3a96
Deadline removed
Aarush289 Oct 27, 2025
8565db6
Indentation done
Aarush289 Oct 27, 2025
ec97266
Suggested changes are done
Aarush289 Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ results.*
coverage.xml

venv
Public_sign
Public_sign.pub
cks_proxy
1 change: 1 addition & 0 deletions nettacker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class DefaultSettings(ConfigBase):
show_all_profiles = False
show_help_menu = False
show_version = False
validate_before_scan = False
skip_service_discovery = False
socks_proxy = None
targets = None
Expand Down
65 changes: 62 additions & 3 deletions nettacker/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from threading import Thread

import multiprocess

from concurrent.futures import ThreadPoolExecutor, as_completed
from nettacker import logger
from nettacker.config import Config, version_info
from nettacker.core.arg_parser import ArgParser
Expand All @@ -23,6 +23,7 @@
is_ipv6_range,
is_ipv6_cidr,
)
from nettacker.core.hostcheck import resolve_quick, is_ip_literal, valid_hostname
from nettacker.core.messages import messages as _
from nettacker.core.module import Module
from nettacker.core.socks_proxy import set_socks_proxy
Expand Down Expand Up @@ -142,7 +143,7 @@ def expand_targets(self, scan_id):
):
targets += generate_ip_range(target)
# domains probably
else:
else:
targets.append(target)
self.arguments.targets = targets
self.arguments.url_base_path = base_path
Expand Down Expand Up @@ -220,7 +221,6 @@ def run(self):
if self.arguments.scan_compare_id is not None:
create_compare_report(self.arguments, scan_id)
log.info("ScanID: {0} ".format(scan_id) + _("done"))

return exit_code

def start_scan(self, scan_id):
Expand Down Expand Up @@ -289,7 +289,66 @@ def scan_target(

return os.EX_OK


def filter_valid_targets(self, targets, timeout_per_target=2.0, max_threads=None, dedupe=True):
"""
Parallel validation of targets via resolve_quick(target, timeout_sec).
Returns a list of canonical targets (order preserved, invalids removed).
"""
# Ensure it's a concrete list (len, indexing OK)
try:
targets = list(targets)
except TypeError:
raise TypeError(f"`targets` must be iterable, got {type(targets).__name__}")

if not targets:
return []

if max_threads is None:
max_threads = min(len(targets), 10) # cap threads

# Preserve order
validated_target = [None] * len(targets) # Invalid targets will be replaced by "None"

def _task(idx, t):
ok, canon = resolve_quick(t, timeout_sec=timeout_per_target)
return idx, t, (canon if ok and canon else None)

with ThreadPoolExecutor(max_workers=max_threads) as ex:
futures = [ex.submit(_task, i, t) for i, t in enumerate(targets)]
for fut in as_completed(futures):
try:
idx, orig_target, canon = fut.result()
except (OSError, socket.gaierror) as exc:
log.error(f"Invalid target (resolver error): {exc!s}")
continue

if canon:
validated_target[idx] = canon
else:
log.info(f"Invalid target -> dropping: {orig_target}")

# Keep order, drop Nones
filtered = [c for c in validated_target if c is not None]

if dedupe:
seen, unique = set(), []
for _c in filtered:
if _c not in seen:
seen.add(_c)
unique.append(_c)
return unique
return filtered

def scan_target_group(self, targets, scan_id, process_number):

if(not self.arguments.socks_proxy and self.arguments.validate_before_scan):
targets = self.filter_valid_targets(
targets,
timeout_per_target=2.0,
max_threads=self.arguments.parallel_module_scan or None,
dedupe=True,
)
active_threads = []
log.verbose_event_info(_("single_process_started").format(process_number))
total_number_of_modules = len(targets) * len(self.arguments.selected_modules)
Expand Down
9 changes: 9 additions & 0 deletions nettacker/core/arg_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,15 @@ def add_arguments(self):
dest="skip_service_discovery",
help=_("skip_service_discovery"),
)
method_options.add_argument(
"-C",
"--validate-before-scan",
action="store_true",
default=Config.settings.validate_before_scan,
dest="validate_before_scan",
help=_("validate_before_scan"),

)
method_options.add_argument(
"-t",
"--thread-per-host",
Expand Down
129 changes: 129 additions & 0 deletions nettacker/core/hostcheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# nettacker/core/hostcheck.py
from __future__ import annotations
import re
import socket
import time
import concurrent.futures
from nettacker import logger
from nettacker.core.ip import (
get_ip_range,
generate_ip_range,
is_single_ipv4,
is_ipv4_range,
is_ipv4_cidr,
is_single_ipv6,
is_ipv6_range,
is_ipv6_cidr,
)
log = logger.get_logger()

_LABEL = re.compile(r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$")

_IPV4_VALID_RE = re.compile(
r'^(?:'
r'(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\.){3}'
r'(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)'
r'$'
)

def is_valid_ipv4(s: str) -> bool:
return bool(_IPV4_VALID_RE.match(s))

def is_ip_literal(name: str) -> bool:
"""Return True if name is a valid IPv4 or IPv6 address literal."""
if is_single_ipv4(name):
if is_valid_ipv4(name):
return True
else:
return False
else:
try:
socket.inet_pton(socket.AF_INET6, name)
return True
except OSError:
return False

def valid_hostname(
host: str,
allow_single_label: bool = True
) -> bool:
"""
Validate hostname syntax per RFC 1123.
Args:
host: Hostname to validate.
allow_single_label: If True, accept single-label names (e.g., "localhost").

Returns:
True if the hostname is syntactically valid.
"""
if host.endswith("."): # From RFC 1123 ,the number of characters can be 250 at max (without dots) and 253 with dots
host = host[:-1]
if len(host) > 253:
return False
parts = host.split(".")
if len(parts) < 2 and not allow_single_label:
return False
return all(_LABEL.match(p) for p in parts)


def _gai_once(name: str, use_ai_addrconfig: bool, port):
flags = getattr(socket, "AI_ADDRCONFIG", 0) if use_ai_addrconfig else 0
return socket.getaddrinfo(
name, port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, flags
)

def _clean_host(s: str) -> str:
# remove surrounding quotes and whitespaces
s = s.strip().strip('"').strip("'")
s = s.strip() # again, after quote strip
# drop trailing commas that often sneak in from CSV-like inputs
if s.endswith(","):
s = s[:-1].rstrip()
# collapse accidental spaces inside
return s

def resolve_quick(
host: str,
timeout_sec: float = 2.0,
allow_single_label: bool = True
) -> tuple[bool, str | None]:
"""
Perform fast DNS resolution with timeout.
Args:
host: Hostname or IP literal to resolve.
timeout_sec: Maximum time to wait for resolution.
allow_single_label: If True, allow single-label hostnames (e.g., "intranet").

Returns:
(True, host_name) on success, (False, None) on failure/timeout.
"""
host = _clean_host(host)
if is_single_ipv4(host) or is_single_ipv6(host):
if is_ip_literal(host):
return True, host
return False, None

if host.endswith("."):
host = host[:-1]

if not valid_hostname(host, allow_single_label=allow_single_label):
return False, None

def _call(use_ai_addrconfig: bool):
return _gai_once(host, use_ai_addrconfig, None)

for use_ai in (True, False):
try:
# Run getaddrinfo in a thread so we can enforce timeout
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex:
fut = ex.submit(_call, use_ai)
fut.result(timeout=timeout_sec) # raises on timeout or error
return True, host.lower()
except concurrent.futures.TimeoutError:
continue
except (OSError, socket.gaierror):
# DNS resolution failed for this candidate, try next
continue
return False, None


1 change: 1 addition & 0 deletions nettacker/locale/en.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ API_options: API options
API_port: API port number
Method: Method
skip_service_discovery: skip service discovery before scan and enforce all modules to scan anyway
validate_before_scan: Pre-validate targets/connectivity before scanning; unreachable targets are skipped.
no_live_service_found: no any live service found to scan.
icmp_need_root_access: to use icmp_scan module or --ping-before-scan you need to run the script as root!
available_graph: "build a graph of all activities and information, you must use HTML output. available graphs: {0}"
Expand Down