Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion injector_common/injector_common/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ def extract_targets(
)

elif selector_key == "manual":
targets = [t.strip() for t in content[TARGETS_KEY].split(",") if t.strip()]
targets = list(
dict.fromkeys(
[t.strip() for t in content[TARGETS_KEY].split(",") if t.strip()]
)
)

else:
raise ValueError("No targets provided for this injection")
Expand Down
9 changes: 7 additions & 2 deletions nmap/src/helpers/nmap_output_parser.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from typing import Dict

from injector_common.targets import TargetExtractionResult


class NmapOutputParser:
def parse(data: Dict, result: str, asset_list: []) -> Dict:
def parse(data: Dict, result: str, target_results: TargetExtractionResult) -> Dict:
"""Parse nmap results and extract open ports."""
asset_list = list(target_results.ip_to_asset_id_map.values())
targets = target_results.targets

run = result["nmaprun"]
if not isinstance(run["host"], list):
run["host"] = [run["host"]]
Expand All @@ -27,7 +32,7 @@ def parse(data: Dict, result: str, asset_list: []) -> Dict:
port_result["host"] = host["address"]["@addr"]
else:
port_result["asset_id"] = None
port_result["host"] = asset_list[idx]
port_result["host"] = targets[idx]
ports_scans_results.append(port_result)

return {
Expand Down
9 changes: 4 additions & 5 deletions nmap/src/openaev_nmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,15 @@ def nmap_execution(self, start: float, data: Dict) -> Dict:
target_results = Targets.extract_targets(
selector_key, selector_property, data, self.helper
)
asset_list = list(target_results.ip_to_asset_id_map.values())
# Deduplicate targets
unique_targets = list(dict.fromkeys(target_results.targets))
targets = target_results.targets
# Handle empty targets as an error
if not unique_targets:
if not targets:
message = f"No target identified for the property {TargetProperty[selector_property.upper()].value}"
raise ValueError(message)

# Build Arguments to execute
nmap_args = NmapCommandBuilder.build_args(contract_id, unique_targets)
nmap_args = NmapCommandBuilder.build_args(contract_id, targets)

self.helper.injector_logger.info(
"Executing nmap with command: " + " ".join(nmap_args)
Expand Down Expand Up @@ -90,7 +89,7 @@ def nmap_execution(self, start: float, data: Dict) -> Dict:
jc = NmapProcess.js_execute(["jc", "--xml", "-p"], nmap_result)
result = json.loads(jc.stdout.decode("utf-8").strip())

return NmapOutputParser.parse(data, result, asset_list)
return NmapOutputParser.parse(data, result, target_results)

def process_message(self, data: Dict) -> None:
start = time.time()
Expand Down
Empty file added nmap/test/__init__.py
Empty file.
Empty file added nmap/test/helpers/__init__.py
Empty file.
112 changes: 112 additions & 0 deletions nmap/test/helpers/test_nmap_output_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from unittest import TestCase

from src.helpers.nmap_output_parser import NmapOutputParser

parse = NmapOutputParser()


class NmapOutputParserTest(TestCase):
def setUp(self):
self.data_assets = {
"injection": {"inject_content": {"target_selector": "assets"}}
}

self.data_no_assets = {
"injection": {"inject_content": {"target_selector": "other"}}
}

self.result_multiple_hosts = {
"nmaprun": {
"host": [
{
"address": {"@addr": "10.0.0.1"},
"ports": {
"port": [
{
"@portid": "22",
"state": {"@state": "open"},
"service": {"@name": "ssh"},
},
{
"@portid": "80",
"state": {"@state": "closed"},
"service": {"@name": "http"},
},
]
},
},
{
"address": {"@addr": "10.0.0.2"},
"ports": {
"port": [
{
"@portid": "443",
"state": {"@state": "open"},
"service": {"@name": "https"},
}
]
},
},
]
}
}

self.result_single_host = {
"nmaprun": {
"host": {
"address": {"@addr": "172.16.5.10"},
"ports": {
"port": [
{
"@portid": "21",
"state": {"@state": "open"},
"service": {"@name": "ftp"},
}
]
},
}
}
}

# -------------------------------
# Tests
# -------------------------------

def test_parse_target_assets(self):
"""Ensure target_selector='assets' uses asset_list and sets asset_id."""
data = {"injection": {"inject_content": {"target_selector": "assets"}}}

result = parse(data, self.result_single_host, ["asset-123"])

scan = result["outputs"]["scan_results"][0]

self.assertEqual(scan["asset_id"], "asset-123")
self.assertEqual(scan["host"], "172.16.5.10")
self.assertEqual(scan["port"], 21)
self.assertEqual(scan["service"], "ftp")

def test_parse_target_asset_groups(self):
"""Ensure target_selector='asset-groups' also uses asset_list."""
data = {"injection": {"inject_content": {"target_selector": "asset-groups"}}}

result = parse(data, self.result_single_host, ["group-asset-555"])

scan = result["outputs"]["scan_results"][0]

self.assertEqual(scan["asset_id"], "group-asset-555")
self.assertEqual(scan["host"], "172.16.5.10")
self.assertEqual(scan["port"], 21)
self.assertEqual(scan["service"], "ftp")

def test_parse_target_manual(self):
"""Ensure target_selector='manual' sets asset_id=None."""
data = {"injection": {"inject_content": {"target_selector": "manual"}}}

result = parse(data, self.result_single_host, ["ignored"])

scan = result["outputs"]["scan_results"][0]

self.assertIsNone(scan["asset_id"])
self.assertEqual(scan["host"], "172.16.5.10")
self.assertEqual(scan["port"], 21)
self.assertEqual(scan["service"], "ftp")
10 changes: 4 additions & 6 deletions nuclei/nuclei/openaev_nuclei.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,14 @@ def nuclei_execution(self, start: float, data: Dict) -> Dict:
selector_key, selector_property, data, self.helper
)
# Deduplicate targets
unique_targets = list(dict.fromkeys(target_results.targets))
targets = target_results.targets
# Handle empty targets as an error
if not unique_targets:
if not targets:
message = f"No target identified for the property {TargetProperty[selector_property.upper()].value}"
raise ValueError(message)
# Build Arguments to execute
nuclei_args = self.command_builder.build_args(
contract_id, content, unique_targets
)
input_data = "\n".join(unique_targets).encode("utf-8")
nuclei_args = self.command_builder.build_args(contract_id, content, targets)
input_data = "\n".join(targets).encode("utf-8")

self.helper.injector_logger.info(
"Executing nuclei with: " + " ".join(nuclei_args)
Expand Down