diff --git a/README.md b/README.md index 56e47871..b7e1f8c0 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # Ratio1 Edge Node + Welcome to the **Ratio1 Edge Node** repository, formerly known as the **Naeural Edge Protocol Edge Node**. As a pivotal component of the Ratio1 ecosystem, this Edge Node software empowers a decentralized, privacy-preserving, and secure edge computing network. By enabling a collaborative network of edge nodes, Ratio1 facilitates the secure sharing of resources and the seamless execution of computation tasks across diverse devices. Documentation sections: @@ -16,12 +17,12 @@ Documentation sections: ## Introduction -The Ratio1 Edge Node is a meta Operating System designed to operate on edge devices, providing them the essential functionality required to join and thrive within the Ratio1 network. Each Edge Node manages the device’s resources, executes computation tasks efficiently, and communicates securely with other nodes in the network. Leveraging the powerful Ratio1 core libraries (formely known as Naeural Edge Protocol libraries) `naeural_core` and `ratio1`— the Ratio1 Edge Node offers out-of-the-box usability starting in 2025. Users can deploy the Edge Node and SDK (`ratio1`) effortlessly without the need for intricate configurations, local subscriptions, tenants, user accounts, passwords, or broker setups. +The Ratio1 Edge Node is a meta Operating System designed to operate on edge devices, providing them the essential functionality required to join and thrive within the Ratio1 network. Each Edge Node manages the device’s resources, executes computation tasks efficiently, and communicates securely with other nodes in the network. Leveraging the powerful Ratio1 core libraries (formerly known as Naeural Edge Protocol libraries) `naeural_core` and the Ratio1 SDK (`ratio1_sdk`, published on PyPI as `ratio1`), the Ratio1 Edge Node offers out-of-the-box usability starting in 2025 without intricate configurations, local subscriptions, tenants, user accounts, passwords, or broker setups. ## Related Repositories - [ratio1/naeural_core](https://github.com/ratio1/naeural_core) provides the modular pipeline engine that powers data ingestion, processing, and serving inside this node. Extend or troubleshoot runtime behavior by mirroring the folder layout in `extensions/` against the upstream modules. -- [Ratio1/ratio1_sdk](https://github.com/Ratio1/ratio1_sdk) is the client toolkit for building and dispatching jobs to Ratio1 nodes. Its tutorials pair with the workflows in `plugins/business/tutorials/` and are the best place to validate end-to-end scenarios. +- [Ratio1/ratio1_sdk](https://github.com/Ratio1/ratio1_sdk) is the client toolkit for building and dispatching jobs to Ratio1 nodes (published on PyPI as `ratio1`). Its tutorials pair with the workflows in `plugins/business/tutorials/` and are the best place to validate end-to-end scenarios. When developing custom logic, install the three repositories in the same virtual environment (`pip install -e . ../naeural_core ../ratio1_sdk`) so interface changes remain consistent across the stack. @@ -33,28 +34,30 @@ When developing custom logic, install the three repositories in the same virtual Deploying a Ratio1 Edge Node within a development network is straightforward. Execute the following Docker command to launch the node making sure you mount a persistent volume to the container to preserve the node data between restarts: ```bash -docker run -d --rm --name r1node --pull=always -v r1vol:/edge_node/_local_cache/ ratio1/edge_node:develop +docker run -d --rm --name r1node --pull=always -v r1vol:/edge_node/_local_cache/ ratio1/edge_node:devnet ``` - `-d`: Runs the container in the background. - `--rm`: Removes the container upon stopping. - `--name r1node`: Assigns the name `r1node` to the container. - `--pull=always`: Ensures the latest image version is always pulled. -- `ratio1/edge_node:develop`: Specifies the Docker image to run. +- `ratio1/edge_node:devnet`: Specifies the devnet image; use `:mainnet` or `:testnet` for those networks. - `-v r1vol:/edge_node/_local_cache/`: Mounts the `r1vol` volume to the `/edge_node/_local_cache/` directory within the container. +Architecture-specific variants (for example `:devnet-arm64`, `:devnet-tegra`, `:devnet-amd64-cpu`) will follow; pick the tag that matches your hardware once available. + This command initializes the Ratio1 Edge Node in development mode, automatically connecting it to the Ratio1 development network and preparing it to receive computation tasks while ensuring that all node data is stored in `r1vol`, preserving it between container restarts. If for some reason you encounter issues when running the Edge Node, you can try to run the container with the `--platform linux/amd64` flag to ensure that the container runs on the correct platform. ```bash -docker run -d --rm --name r1node --platform linux/amd64 --pull=always -v r1vol:/edge_node/_local_cache/ ratio1/edge_node:develop +docker run -d --rm --name r1node --platform linux/amd64 --pull=always -v r1vol:/edge_node/_local_cache/ ratio1/edge_node:devnet ``` Also, if you have GPU(s) on your machine, you can enable GPU support by adding the `--gpus all` flag to the Docker command. This flag allows the Edge Node to utilize the GPU(s) for computation tasks. ```bash -docker run -d --rm --name r1node --gpus all --pull=always -v r1vol:/edge_node/_local_cache/ ratio1/edge_node:develop +docker run -d --rm --name r1node --gpus all --pull=always -v r1vol:/edge_node/_local_cache/ ratio1/edge_node:devnet ``` This will ensure that your node will be able to utilize the GPU(s) for computation tasks and will accept training and inference jobs that require GPU acceleration. @@ -64,12 +67,12 @@ This will ensure that your node will be able to utilize the GPU(s) for computati If you want to run multiple Edge Nodes on the same machine, you can do so by specifying different names for each container but more importantly, you need to specify different volumes for each container to avoid conflicts between the nodes. You can do this by creating a new volume for each node and mounting it to the container as follows: ```bash -docker run -d --rm --name r1node1 --pull=always -v r1vol1:/edge_node/_local_cache/ ratio1/edge_node:develop -docker run -d --rm --name r1node2 --pull=always -v r1vol2:/edge_node/_local_cache/ ratio1/edge_node:develop +docker run -d --rm --name r1node1 --pull=always -v r1vol1:/edge_node/_local_cache/ ratio1/edge_node:devnet +docker run -d --rm --name r1node2 --pull=always -v r1vol2:/edge_node/_local_cache/ ratio1/edge_node:devnet ``` Now you can run multiple Edge Nodes on the same machine without any conflicts between them. ->NOTE: If you are running multiple nodes on the same machine it is recommended to use docker-compose to manage the nodes. You can find an example of how to run multiple nodes on the same machine using docker-compose in the [Running multiple nodes on the same machine](#running-multiple-nodes-on-the-same-machine) section. +>NOTE: If you are running multiple nodes on the same machine it is recommended to use docker-compose to manage the nodes. You can find a docker-compose example in the section below. ## Inspecting the Edge Node @@ -145,6 +148,8 @@ The [Ratio1 SDK](https://github.com/Ratio1/ratio1_sdk) is the recommended way to pip install -e ../ratio1_sdk ``` +If you prefer the published package, install from PyPI via `pip install ratio1`. + - Use the `nepctl` (formerly `r1ctl`) CLI that ships with the SDK to inspect the network, configure clients, and dispatch jobs. - Explore `ratio1_sdk/tutorials/` for end-to-end examples; most have matching runtime counterparts in `plugins/business/tutorials/` inside this repository. - SDK releases 2.6+ perform automatic dAuth configuration. After whitelisting your client, you can submit jobs without additional secrets. @@ -226,6 +231,7 @@ Lets suppose you have the following node data: "whitelist": [ "0xai_AthDPWc_k3BKJLLYTQMw--Rjhe3B6_7w76jlRpT6nDeX" ] + } } ``` @@ -250,6 +256,7 @@ docker exec r1node get_node_info "whitelist": [ "0xai_AthDPWc_k3BKJLLYTQMw--Rjhe3B6_7w76jlRpT6nDeX" ] + } } ``` @@ -286,7 +293,7 @@ If you want to run multiple nodes on the same machine the best option is to use ```yaml services: r1node1: - image: ratio1/edge_node:testnet + image: ratio1/edge_node:devnet container_name: r1node1 platform: linux/amd64 restart: always @@ -297,7 +304,7 @@ services: - "com.centurylinklabs.watchtower.stop-signal=SIGINT" r1node2: - image: ratio1/edge_node:testnet + image: ratio1/edge_node:devnet container_name: r1node2 platform: linux/amd64 restart: always @@ -350,7 +357,7 @@ docker-compose down Now, lets dissect the `docker-compose.yml` file: - we have a variable number of nodes - in our case 2 nodes - `r1node1` and `r1node2` as services (we commented out the third node for simplicity) - - each node is using the `ratio1/edge_node:testnet` image + - each node is using the `ratio1/edge_node:devnet` image (swap the tag for `:mainnet` or `:testnet` as needed; architecture-specific variants such as `-arm64`, `-tegra`, `-amd64-cpu` will follow) - each node has own unique volume mounted to it - we have a watchtower service that will check for new images every 1 minute and will update the nodes if a new image is available @@ -375,6 +382,7 @@ For inquiries regarding the funding and its impact on this project, please conta ## Citation + If you use the Ratio1 Edge Node in your research or projects, please cite it as follows: ```bibtex @@ -385,3 +393,36 @@ If you use the Ratio1 Edge Node in your research or projects, please cite it as howpublished = {\url{https://github.com/Ratio1/edge_node}}, } ``` + + +Additional publications and references: + +```bibtex +@inproceedings{Damian2025CSCS, + author = {Damian, Andrei Ionut and Bleotiu, Cristian and Grigoras, Marius and + Butusina, Petrica and De Franceschi, Alessandro and Toderian, Vitalii and + Tapus, Nicolae}, + title = {Ratio1 meta-{OS} -- decentralized {MLOps} and beyond}, + booktitle = {2025 25th International Conference on Control Systems and Computer Science (CSCS)}, + year = {2025}, + pages = {258--265}, + address = {Bucharest, Romania}, + month = {May 27--30}, + doi = {10.1109/CSCS66924.2025.00046}, + isbn = {979-8-3315-7343-0}, + issn = {2379-0482}, + publisher = {IEEE} +} + +@misc{Damian2025arXiv, + title = {Ratio1 -- AI meta-OS}, + author = {Damian, Andrei and Butusina, Petrica and De Franceschi, Alessandro and + Toderian, Vitalii and Grigoras, Marius and Bleotiu, Cristian}, + year = {2025}, + month = {September}, + eprint = {2509.12223}, + archivePrefix = {arXiv}, + primaryClass = {cs.OS}, + doi = {10.48550/arXiv.2509.12223} +} +``` diff --git a/extensions/business/container_apps/container_app_runner.py b/extensions/business/container_apps/container_app_runner.py index cfd9985e..665db729 100644 --- a/extensions/business/container_apps/container_app_runner.py +++ b/extensions/business/container_apps/container_app_runner.py @@ -2065,7 +2065,7 @@ def start_container(self): self.P(log_str) - nano_cpu_limit = self._cpu_limit * 1_000_000_000 + nano_cpu_limit = int(self._cpu_limit * 1_000_000_000) mem_reservation = f"{parse_memory_to_mb(self._mem_limit, 0.9)}m" run_kwargs = dict( diff --git a/extensions/business/container_apps/container_utils.py b/extensions/business/container_apps/container_utils.py index 0390546d..59d82e16 100644 --- a/extensions/business/container_apps/container_utils.py +++ b/extensions/business/container_apps/container_utils.py @@ -307,7 +307,7 @@ def _setup_resource_limits_and_ports(self): container_resources = self.cfg_container_resources if isinstance(container_resources, dict) and len(container_resources) > 0: - self._cpu_limit = int(container_resources.get("cpu", DEFAULT_CPU_LIMIT)) + self._cpu_limit = float(container_resources.get("cpu", DEFAULT_CPU_LIMIT)) self._gpu_limit = container_resources.get("gpu", DEFAULT_GPU_LIMIT) self._mem_limit = container_resources.get("memory", DEFAULT_MEM_LIMIT) @@ -417,7 +417,7 @@ def _setup_resource_limits_and_ports(self): # endif main_port_mapped else: # No container resources specified, use defaults - self._cpu_limit = DEFAULT_CPU_LIMIT + self._cpu_limit = float(DEFAULT_CPU_LIMIT) self._gpu_limit = DEFAULT_GPU_LIMIT self._mem_limit = DEFAULT_MEM_LIMIT diff --git a/extensions/business/cybersec/README.md b/extensions/business/cybersec/README.md index 1ec61994..1dd16bcb 100644 --- a/extensions/business/cybersec/README.md +++ b/extensions/business/cybersec/README.md @@ -3,4 +3,44 @@ ## RedMesh - folder: extensions/business/cybersec/red_mesh - description: A framework for distributed orchestrated penetration testing and vulnerability assessment. -- version: v1 (Alpha) as of 2025-09-30 \ No newline at end of file +- version: v1 (Alpha) as of 2025-09-30 + +### Features + +**Distributed Scanning** +- Port scanning distributed across heterogeneous network workers +- Distribution strategies: `SLICE` (divide ports across workers) or `MIRROR` (full redundancy) +- Port ordering: `SHUFFLE` (randomized for stealth) or `SEQUENTIAL` + +**Service Detection** +- Banner grabbing and protocol identification +- Detection modules for FTP, SSH, HTTP, and other common services + +**Web Vulnerability Testing** +- SQL injection detection +- Cross-site scripting (XSS) testing +- Directory traversal checks +- Security header analysis + +**Run Modes** +- `SINGLEPASS`: One-time scan with aggregated report +- `CONTINUOUS_MONITORING`: Repeated scans at configurable intervals for change detection + +**Stealth Capabilities** +- "Dune sand walking": Random delays between operations for IDS evasion +- Configurable `scan_min_delay` and `scan_max_delay` parameters + +**Distributed Architecture** +- Job coordination via CStore (distributed state) +- Report storage in R1FS (IPFS-based content-addressed storage) +- Network-wide job tracking and worker status monitoring + +### API Endpoints +- `POST /launch_test` - Start a new pentest job +- `GET /get_job_status` - Check job progress or retrieve results +- `GET /list_features` - List available scanning/testing features +- `GET /list_network_jobs` - List jobs across the network +- `GET /list_local_jobs` - List jobs on current node +- `GET /stop_and_delete_job` - Stop and remove a job +- `POST /stop_monitoring` - Stop continuous monitoring (SOFT/HARD) +- `GET /get_report` - Retrieve report by CID from R1FS \ No newline at end of file diff --git a/extensions/business/cybersec/red_mesh/constants.py b/extensions/business/cybersec/red_mesh/constants.py new file mode 100644 index 00000000..a3f0e401 --- /dev/null +++ b/extensions/business/cybersec/red_mesh/constants.py @@ -0,0 +1,99 @@ +""" +RedMesh constants and feature catalog definitions. +""" + +FEATURE_CATALOG = [ + { + "id": "service_info_common", + "label": "Service fingerprinting", + "description": "Collect banner and version data for common network services.", + "category": "service", + "methods": [ + "_service_info_80", + "_service_info_443", + "_service_info_8080", + "_service_info_21", + "_service_info_22", + "_service_info_23", + "_service_info_25", + "_service_info_53", + "_service_info_161", + "_service_info_445", + "_service_info_generic" + ] + }, + { + "id": "service_info_advanced", + "label": "TLS/SSL & database diagnostics", + "description": "Evaluate TLS configuration, database services, and industrial protocols.", + "category": "service", + "methods": [ + "_service_info_tls", + "_service_info_1433", + "_service_info_3306", + "_service_info_3389", + "_service_info_5432", + "_service_info_5900", + "_service_info_6379", + "_service_info_9200", + "_service_info_11211", + "_service_info_27017", + "_service_info_502" + ] + }, + { + "id": "web_test_common", + "label": "Common exposure scan", + "description": "Probe default admin panels, disclosed files, and common misconfigurations.", + "category": "web", + "methods": [ + "_web_test_common", + "_web_test_homepage", + "_web_test_flags", + "_web_test_graphql_introspection", + "_web_test_metadata_endpoints" + ] + }, + { + "id": "web_test_security_headers", + "label": "Security headers audit", + "description": "Check HSTS, CSP, X-Frame-Options, and other critical response headers.", + "category": "web", + "methods": [ + "_web_test_security_headers", + "_web_test_cors_misconfiguration", + "_web_test_open_redirect", + "_web_test_http_methods" + ] + }, + { + "id": "web_test_vulnerability", + "label": "Vulnerability probes", + "description": "Non-destructive probes for common web vulnerabilities.", + "category": "web", + "methods": [ + "_web_test_path_traversal", + "_web_test_xss", + "_web_test_sql_injection", + "_web_test_api_auth_bypass" + ] + } +] + +# Job status constants +JOB_STATUS_RUNNING = "RUNNING" +JOB_STATUS_SCHEDULED_FOR_STOP = "SCHEDULED_FOR_STOP" +JOB_STATUS_STOPPED = "STOPPED" +JOB_STATUS_FINALIZED = "FINALIZED" + +# Run mode constants +RUN_MODE_SINGLEPASS = "SINGLEPASS" +RUN_MODE_CONTINUOUS_MONITORING = "CONTINUOUS_MONITORING" + +# Distribution strategy constants +DISTRIBUTION_SLICE = "SLICE" +DISTRIBUTION_MIRROR = "MIRROR" + +# Port order constants +PORT_ORDER_SHUFFLE = "SHUFFLE" +PORT_ORDER_SEQUENTIAL = "SEQUENTIAL" \ No newline at end of file diff --git a/extensions/business/cybersec/red_mesh/pentester_api_01.py b/extensions/business/cybersec/red_mesh/pentester_api_01.py index e4dfc9f0..e4dd1931 100644 --- a/extensions/business/cybersec/red_mesh/pentester_api_01.py +++ b/extensions/business/cybersec/red_mesh/pentester_api_01.py @@ -7,26 +7,65 @@ - plugin-based scanning and testing services - real-time job monitoring and logging via API + +```pipeline-example.json +{ + "NAME": "redmesh_api", + "TYPE": "Void", + "PLUGINS": [ + { + "SIGNATURE": "PENTESTER_API_01", + "INSTANCES": [ + { + "INSTANCE_ID": "PENTESTER_API_01_DEFAULT", + "CHECK_JOBS_EACH": 5, + "NR_LOCAL_WORKERS": 4, + "WARMUP_DELAY": 30 + } + ] + } + ] +} +``` + """ +import random from naeural_core.business.default.web_app.fast_api_web_app import FastApiWebAppPlugin as BasePlugin from .redmesh_utils import PentestLocalWorker # Import PentestJob from separate module +from .constants import FEATURE_CATALOG -__VER__ = '0.8.1' # updated version +__VER__ = '0.8.2' _CONFIG = { **BasePlugin.CONFIG, - + "TUNNEL_ENGINE_ENABLED": False, + 'PORT': None, - + + "CHAINSTORE_PEERS": [], + "CHECK_JOBS_EACH" : 5, "NR_LOCAL_WORKERS" : 8, "WARMUP_DELAY" : 30, - - + + # Defines how ports are split across local workers. + "DISTRIBUTION_STRATEGY": "SLICE", # "SLICE" or "MIRROR" + "PORT_ORDER": "SHUFFLE", # "SHUFFLE" or "SEQUENTIAL" + "EXCLUDED_FEATURES": [], + + # Run mode: SINGLEPASS (default) or CONTINUOUS_MONITORING + "RUN_MODE": "SINGLEPASS", + "MONITOR_INTERVAL": 60, # seconds between passes in continuous mode + "MONITOR_JITTER": 5, # random jitter to avoid simultaneous CStore writes + + # Dune sand walking - random delays between operations to evade IDS detection + "SCAN_MIN_RND_DELAY": 0.0, # minimum delay in seconds (0 = disabled) + "SCAN_MAX_RND_DELAY": 0.0, # maximum delay in seconds (0 = disabled) + 'VALIDATION_RULES': { **BasePlugin.CONFIG['VALIDATION_RULES'], }, @@ -34,18 +73,35 @@ class PentesterApi01Plugin(BasePlugin): """ - RedMesh API - a pentesting meta-plugin for receiving pentesting targets and performing operations. - Supports asynchronous job execution and performs distributed red-team attacks based on - decentralized workers orchestrated using CStore. + RedMesh API plugin for orchestrating decentralized pentest jobs. + + The plugin listens for announced jobs in CStore, spawns local workers to + cover assigned port ranges, aggregates their reports, and updates the + distributed job state. It exposes FastAPI endpoints for launching tests, + inspecting progress, and stopping runs. - Supports semaphore-based pairing with Container App Runner plugins via - the SEMAPHORE configuration key. When configured, exposes API host/port - as environment variables to paired containers (e.g., RedMesh UI). + Attributes + ---------- + CONFIG : dict + Plugin configuration merged with `BasePlugin` defaults. + scan_jobs : dict + Active local jobs keyed by job_id. + completed_jobs_reports : dict + Aggregated reports for finished jobs keyed by job_id. + lst_completed_jobs : list + List of job_ids that completed locally (used for status responses). """ CONFIG = _CONFIG def on_init(self): + """ + Initialize plugin state and log available features. + + Returns + ------- + None + """ super(PentesterApi01Plugin, self).on_init() self.__features = self._get_all_features() # Track active and completed jobs by target @@ -80,11 +136,36 @@ def on_close(self): def P(self, s, *args, **kwargs): + """ + Prefixed logger for RedMesh messages. + + Parameters + ---------- + s : str + Message to log. + *args + Positional args forwarded to parent logger. + **kwargs + Keyword args forwarded to parent logger. + + Returns + ------- + Any + Result of parent logger. + """ s = "[REDMESH] " + s return super(PentesterApi01Plugin, self).P(s, *args, **kwargs) def __post_init(self): + """ + Perform warmup: reconcile existing jobs in CStore, migrate legacy keys, + and aggregate any incomplete reports. + + Returns + ------- + None + """ all_network_jobs = self.chainstore_hgetall(hkey=self.cfg_instance_id) info = "" for job_key, job_spec in all_network_jobs.items(): @@ -100,6 +181,7 @@ def __post_init(self): sample_value = next(iter(raw_report.values()), None) needs_aggregation = isinstance(sample_value, dict) and "local_worker_id" in sample_value if needs_aggregation: + # Merge stranded partial worker outputs left from prior deployments. self.P(f"Found incomplete report for {normalized_key}, aggregating...", color='r') agg_report = self._get_aggregated_report(raw_report) our_worker["result"] = agg_report @@ -135,6 +217,19 @@ def __post_init(self): def _get_all_features(self, categs=False): + """ + Discover all service and web test methods available to workers. + + Parameters + ---------- + categs : bool, optional + If True, return a dict keyed by category; otherwise a flat list. + + Returns + ------- + dict | list + Mapping or list of method names prefixed with `_service_info_` / `_web_test_`. + """ features = {} if categs else [] PREFIXES = ["_service_info_", "_web_test_"] for prefix in PREFIXES: @@ -147,13 +242,26 @@ def _get_all_features(self, categs=False): def _normalize_job_record(self, job_key, job_spec, migrate=False): - """Return a normalized job record and optionally migrate legacy entries. + """ + Normalize a job record and optionally migrate legacy entries. Stage 1 of the multi-job coordination refactor lives here. The helper keeps legacy single-key entries compatible while allowing each job to live under - its own hash key. Stage 2 (not yet implemented) will introduce a dedicated - index for fast lookups and TTL-based cleanup so workers can garbage collect - stale jobs without clobbering peers. + its own hash key. + + Parameters + ---------- + job_key : str + Raw key stored in CStore. + job_spec : Any + Raw job specification (expected dict). + migrate : bool, optional + If True, rewrite legacy keys to normalized form. + + Returns + ------- + tuple[str | None, dict | None] + Normalized key and spec; `(None, None)` when invalid. """ if not isinstance(job_spec, dict): return None, None @@ -174,29 +282,99 @@ def _normalize_job_record(self, job_key, job_spec, migrate=False): def _ensure_worker_entry(self, job_id, job_spec): + """ + Ensure current worker has an entry in the distributed job spec. + + Parameters + ---------- + job_id : str + Identifier of the job. + job_spec : dict + Mutable job specification stored in CStore. + + Returns + ------- + dict + Worker entry for this edge node. + """ workers = job_spec.setdefault("workers", {}) worker_entry = workers.get(self.ee_addr) if worker_entry is None: - worker_entry = {"finished": False, "result": None} - workers[self.ee_addr] = worker_entry - self.chainstore_hset(hkey=self.cfg_instance_id, key=job_id, value=job_spec) + self.P("No worker entry found for this node in job spec job_id={}, workers={}".format( + job_id, + self.json_dumps(workers)), + color='r' + ) return worker_entry def _launch_job( - self, + self, job_id, target, - start_port, - end_port, + start_port, + end_port, network_worker_address, nr_local_workers=4, exceptions=None, + port_order=None, + excluded_features=None, + enabled_features=None, + scan_min_delay=0.0, + scan_max_delay=0.0, ): + """ + Launch local worker threads for a job by splitting the port range. + + Parameters + ---------- + job_id : str + Identifier of the job being launched. + target : str + Hostname or IP to scan. + start_port : int + Inclusive start of port range. + end_port : int + Inclusive end of port range. + network_worker_address : str + Address of the worker that announced the job. + nr_local_workers : int, optional + Number of worker threads to spawn. + exceptions : list[int], optional + Ports to exclude from scanning. + port_order : str, optional + Port scanning order: "SHUFFLE" or "SEQUENTIAL". + excluded_features : list[str], optional + List of feature names to exclude from scanning. + enabled_features : list[str], optional + List of feature names to enable for scanning. + scan_min_delay: float, optional + Minimum random delay between scan operations. + scan_max_delay: float, optional + Maximum random delay between scan operations. + + Returns + ------- + dict + Mapping of local_worker_id to `PentestLocalWorker` instances. + + Raises + ------ + ValueError + When no ports are available or batches cannot be allocated. + """ + if excluded_features is None: + excluded_features = [] + if enabled_features is None: + enabled_features = [] local_jobs = {} ports = list(range(start_port, end_port + 1)) batches = [] - ports = sorted(ports) + if port_order == "SEQUENTIAL": + ports = sorted(ports) # redundant but explicit + else: + port_order = "SHUFFLE" + random.shuffle(ports) nr_ports = len(ports) if nr_ports == 0: raise ValueError("No ports available for local workers.") @@ -219,28 +397,31 @@ def _launch_job( self.scan_jobs[job_id] = {} for i, batch in enumerate(batches): try: - start_port = batch[0] - end_port = batch[-1] - self.P("Launching {} requested by {} for target {} - {} ports [{}-{}]".format( + self.P("Launching {} requested by {} for target {} - {} ports. Port order {}".format( job_id, network_worker_address, - target, len(batch), start_port, end_port + target, len(batch), port_order )) batch_job = PentestLocalWorker( owner=self, local_id_prefix=str(i + 1), - target=target, + target=target, job_id=job_id, initiator=network_worker_address, exceptions=exceptions, worker_target_ports=batch, + excluded_features=excluded_features, + enabled_features=enabled_features, + scan_min_delay=scan_min_delay, + scan_max_delay=scan_max_delay, ) batch_job.start() local_jobs[batch_job.local_worker_id] = batch_job except Exception as exc: self.P( - "Failed to launch batch local job for ports [{}-{}]: {}".format( - batch[0] if batch else "-", - batch[-1] if batch else "-", + "Failed to launch batch local job for ports [{}-{}]. Port order {}: {}".format( + min(batch) if batch else "-", + max(batch) if batch else "-", + port_order, exc ), color='r' @@ -252,8 +433,18 @@ def _launch_job( def _maybe_launch_jobs(self, nr_local_workers=None): """ - Launch new PentestJob threads for any announced pentest target. - Called at each process iteratisson. + Launch new PentestJob threads for announced pentest targets. + + Called periodically from `process`. + + Parameters + ---------- + nr_local_workers : int, optional + Override for number of local workers; defaults to config. + + Returns + ------- + None """ if self.time() - self.__last_checked_jobs > self.cfg_check_jobs_each: self.__last_checked_jobs = self.time() @@ -264,27 +455,56 @@ def _maybe_launch_jobs(self, nr_local_workers=None): continue target = job_specs.get("target") job_id = job_specs.get("job_id", normalized_key) + port_order = job_specs.get("port_order", self.cfg_port_order) + excluded_features = job_specs.get("excluded_features", self.cfg_excluded_features) + enabled_features = job_specs.get("enabled_features", []) if job_id is None: continue worker_entry = self._ensure_worker_entry(job_id, job_specs) current_worker_finished = worker_entry.get("finished", False) if current_worker_finished: continue + + is_in_progress_target = job_id in self.scan_jobs + is_closed_target = job_id in self.completed_jobs_reports + + # Check if this is a continuous monitoring job where our worker was reset + # (launcher reset our finished flag for next pass) - clear local tracking + # Only applies to CONTINUOUS_MONITORING and only when job is not currently running + run_mode = job_specs.get("run_mode", "SINGLEPASS") + if run_mode == "CONTINUOUS_MONITORING" and is_closed_target and not is_in_progress_target: + # Our worker entry was reset by launcher for next pass - clear local state + self.P(f"Detected worker reset for job {job_id}, clearing local tracking for next pass") + self.completed_jobs_reports.pop(job_id, None) + if job_id in self.lst_completed_jobs: + self.lst_completed_jobs.remove(job_id) + is_closed_target = False + # If job not already running and not completed, start a new thread - closed_target = job_id in self.completed_jobs_reports - in_progress_target = job_id in self.scan_jobs - if not in_progress_target and not closed_target: + if not is_in_progress_target and not is_closed_target: launcher = job_specs.get("launcher") - self.P(f"Starting job {job_id}, target {target} from {launcher}", boxed=True) - start_port = job_specs.get("start_port") + launcher_alias = job_specs.get("launcher_alias") + is_current_node_launcher = self.ee_addr == launcher + self.P("Starting job {}, target {} from {}".format( + job_id, + target, + f"{launcher_alias} <{launcher}>" if is_current_node_launcher else "myself"), + boxed=True + ) + start_port = worker_entry.get("start_port") if start_port is None: self.P("No start port specified, defaulting to 1.") start_port = 1 - end_port = job_specs.get("end_port") + end_port = worker_entry.get("end_port") if end_port is None: self.P("No end port specified, defaulting to 65535.") end_port = 65535 exceptions = job_specs.get("exceptions", []) + # Ensure exceptions is always a list (handle legacy string format) + if not isinstance(exceptions, list): + exceptions = [] + scan_min_delay = job_specs.get("scan_min_delay", self.cfg_scan_min_rnd_delay) + scan_max_delay = job_specs.get("scan_max_delay", self.cfg_scan_max_rnd_delay) workers_requested = nr_local_workers if nr_local_workers is not None else self.cfg_nr_local_workers self.P("Using {} local workers for job {}".format(workers_requested, job_id)) try: @@ -295,7 +515,12 @@ def _maybe_launch_jobs(self, nr_local_workers=None): end_port=end_port, network_worker_address=launcher, nr_local_workers=workers_requested, - exceptions=exceptions + exceptions=exceptions, + port_order=port_order, + excluded_features=excluded_features, + enabled_features=enabled_features, + scan_min_delay=scan_min_delay, + scan_max_delay=scan_max_delay, ) except ValueError as exc: self.P(f"Skipping job {job_id}: {exc}", color='r') @@ -311,6 +536,19 @@ def _maybe_launch_jobs(self, nr_local_workers=None): def _get_aggregated_report(self, local_jobs): + """ + Aggregate results from multiple local workers. + + Parameters + ---------- + local_jobs : dict + Mapping of worker id to result dicts. + + Returns + ------- + dict + Aggregated report with merged open ports, service info, etc. + """ dct_aggregated_report = {} type_or_func, field = None, None try: @@ -335,10 +573,9 @@ def _get_aggregated_report(self, local_jobs): except TypeError: dct_aggregated_report[field] = list(merged) elif isinstance(dct_aggregated_report[field], dict): - dct_aggregated_report[field] = { - **dct_aggregated_report[field], - **local_job_status[field] - } + dct_aggregated_report[field] = self.merge_objects_deep( + dct_aggregated_report[field], + local_job_status[field]) else: _existing = dct_aggregated_report[field] _new = local_job_status[field] @@ -357,19 +594,56 @@ def _get_aggregated_report(self, local_jobs): )) return dct_aggregated_report + # todo: move to helper + def merge_objects_deep(self, obj_a, obj_b): + """ + Deeply merge two objects (dicts, lists, sets). + + Parameters + ---------- + obj_a : Any + First object. + obj_b : Any + Second object. + + Returns + ------- + Any + Merged object. + """ + if isinstance(obj_a, dict) and isinstance(obj_b, dict): + merged = dict(obj_a) + for key, value_b in obj_b.items(): + if key in merged: + merged[key] = self.merge_objects_deep(merged[key], value_b) + else: + merged[key] = value_b + return merged + elif isinstance(obj_a, list) and isinstance(obj_b, list): + return list(set(obj_a).union(set(obj_b))) + elif isinstance(obj_a, set) and isinstance(obj_b, set): + return obj_a.union(obj_b) + else: + return obj_b # Prefer obj_b in case of conflict + def _close_job(self, job_id, canceled=False): """ - This only closes the LOCAL job not the whole network job - must be refactored! - - TODO: change the logic as follows - - separate worker status from job status in different hset - - redmesh instance (network) cfg_instance_id - - job-id hset - - each network worker will post status (for its local workers) - - all network workers will monitor all network workers - - last network worker that finishes posts - + Close a local job, aggregate reports, and persist in CStore. + + Reports are saved to R1FS (IPFS) and only the CID is stored in CStore + to avoid bloating the distributed state. + + Parameters + ---------- + job_id : str + Identifier for the job to close. + canceled : bool, optional + Whether job was canceled before completion. + + Returns + ------- + None """ local_workers = self.scan_jobs.pop(job_id, None) if local_workers: @@ -387,9 +661,37 @@ def _close_job(self, job_id, canceled=False): closing = "Forced" if canceled else "Post finish" worker_entry = job_specs.setdefault("workers", {}).setdefault(self.ee_addr, {}) worker_entry["finished"] = True - worker_entry["result"] = report worker_entry["canceled"] = canceled - job_specs["workers"][self.ee_addr] = worker_entry + + # Save full report to R1FS and store only CID in CStore + if report: + try: + report_cid = self.r1fs.add_json(report, show_logs=False) + if report_cid: + worker_entry["report_cid"] = report_cid + worker_entry["result"] = None # No blob in CStore + self.P(f"Report saved to R1FS with CID: {report_cid}") + else: + # Fallback: store report directly if R1FS fails + self.P("R1FS add_json returned None, storing report directly in CStore", color='y') + worker_entry["report_cid"] = None + worker_entry["result"] = report + except Exception as e: + # Fallback: store report directly if R1FS fails + self.P(f"Failed to save report to R1FS: {e}. Storing directly in CStore", color='r') + worker_entry["report_cid"] = None + worker_entry["result"] = report + else: + self.P(f"No report data to save for job {job_id}", color='y') + worker_entry["report_cid"] = None + worker_entry["result"] = report + + # Re-read job_specs to avoid overwriting concurrent updates (e.g., pass_history) + fresh_job_specs = self.chainstore_hget(hkey=self.cfg_instance_id, key=job_id) + if fresh_job_specs and isinstance(fresh_job_specs, dict): + fresh_job_specs["workers"][self.ee_addr] = worker_entry + job_specs = fresh_job_specs + self.P("{} closing job_id {}:\n{}".format( closing, job_id, @@ -400,6 +702,13 @@ def _close_job(self, job_id, canceled=False): def _maybe_close_jobs(self): + """ + Inspect running jobs and close those whose workers have finished. + + Returns + ------- + None + """ for job_id, local_workers in list(self.scan_jobs.items()): all_workers_done = True any_canceled_worker = False @@ -412,16 +721,20 @@ def _maybe_close_jobs(self): if not job.thread.is_alive() or job.state.get("done"): if job_id not in self.completed_jobs_reports: self.completed_jobs_reports[job_id] = {} + # Check if this worker was already reported + already_reported = local_worker_id in self.completed_jobs_reports[job_id] # Prepare final report for this job local_worker_report = job.get_status() any_canceled_worker = any_canceled_worker or local_worker_report.get("canceled", False) # Save completed report self.completed_jobs_reports[job_id][local_worker_id] = local_worker_report reports[local_worker_id] = local_worker_report - self.P("Worker {} has finished job_id {} for target {}:\n{}".format( - local_worker_id, job_id, local_worker_report['target'], - self.json_dumps(local_worker_report, indent=2) - )) + # Only log once when worker first finishes + if not already_reported: + self.P("Worker {} has finished job_id {} for target {}:\n{}".format( + local_worker_id, job_id, local_worker_report['target'], + self.json_dumps(local_worker_report, indent=2) + )) else: all_workers_done = False initiator = job.initiator @@ -442,12 +755,148 @@ def _maybe_close_jobs(self): return + def _maybe_finalize_pass(self): + """ + Launcher finalizes completed passes and orchestrates continuous monitoring. + + For all jobs, this method: + 1. Detects when all workers have finished the current pass + 2. Records pass completion in pass_history + + For CONTINUOUS_MONITORING jobs, additionally: + 3. Schedules the next pass after monitor_interval + 4. Resets all workers when it's time to start the next pass + + Only the launcher node executes this logic. + + Returns + ------- + None + """ + all_jobs = self.chainstore_hgetall(hkey=self.cfg_instance_id) + + for job_key, job_specs in all_jobs.items(): + normalized_key, job_specs = self._normalize_job_record(job_key, job_specs) + if normalized_key is None: + continue + + # Only launcher manages pass finalization + is_launcher = job_specs.get("launcher") == self.ee_addr + if not is_launcher: + continue + + workers = job_specs.get("workers", {}) + if not workers: + continue + + run_mode = job_specs.get("run_mode", "SINGLEPASS") + job_status = job_specs.get("job_status", "RUNNING") + all_finished = all(w.get("finished") for w in workers.values()) + next_pass_at = job_specs.get("next_pass_at") + job_pass = job_specs.get("job_pass", 1) + job_id = job_specs.get("job_id") + pass_history = job_specs.setdefault("pass_history", []) + + # Skip jobs that are already finalized or stopped + if job_status in ("FINALIZED", "STOPPED"): + continue + + if all_finished and next_pass_at is None: + # ═══════════════════════════════════════════════════ + # STATE: All peers completed current pass + # ═══════════════════════════════════════════════════ + pass_history.append({ + "pass_nr": job_pass, + "completed_at": self.time(), + "reports": {addr: w.get("report_cid") for addr, w in workers.items()} + }) + + # Handle SINGLEPASS - set FINALIZED and exit (no scheduling) + if run_mode == "SINGLEPASS": + job_specs["job_status"] = "FINALIZED" + job_specs["date_updated"] = self.time() + job_specs["date_finalized"] = self.time() + self.P(f"[SINGLEPASS] Job {job_id} complete. Status set to FINALIZED.") + self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=job_specs) + continue + + # CONTINUOUS_MONITORING logic below + + # Check if soft stop was scheduled + if job_status == "SCHEDULED_FOR_STOP": + job_specs["job_status"] = "STOPPED" + job_specs["date_updated"] = self.time() + job_specs["date_finalized"] = self.time() + self.P(f"[CONTINUOUS] Pass {job_pass} complete for job {job_id}. Status set to STOPPED (soft stop was scheduled)") + self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=job_specs) + continue + # end if + + # Schedule next pass + interval = job_specs.get("monitor_interval", self.cfg_monitor_interval) + jitter = random.uniform(0, self.cfg_monitor_jitter) + job_specs["next_pass_at"] = self.time() + interval + jitter + job_specs["date_updated"] = self.time() + + self.P(f"[CONTINUOUS] Pass {job_pass} complete for job {job_id}. Next pass in {interval}s (+{jitter:.1f}s jitter)") + self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=job_specs) + + # Clear from completed_jobs_reports to allow relaunch + self.completed_jobs_reports.pop(job_id, None) + if job_id in self.lst_completed_jobs: + self.lst_completed_jobs.remove(job_id) + + elif run_mode == "CONTINUOUS_MONITORING" and all_finished and next_pass_at and self.time() >= next_pass_at: + # ═══════════════════════════════════════════════════ + # STATE: Interval elapsed, start next pass + # ═══════════════════════════════════════════════════ + job_specs["job_pass"] = job_pass + 1 + job_specs["next_pass_at"] = None + + for addr in workers: + workers[addr]["finished"] = False + workers[addr]["result"] = None + workers[addr]["report_cid"] = None + # end for each worker reset + + self.P(f"[CONTINUOUS] Starting pass {job_pass + 1} for job {job_id}", boxed=True) + self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=job_specs) + + # Clear local tracking to allow relaunch + self.completed_jobs_reports.pop(job_id, None) + if job_id in self.lst_completed_jobs: + self.lst_completed_jobs.remove(job_id) + #end for each job + return + + def _get_all_network_jobs(self): + """ + Retrieve all jobs tracked in CStore for this instance. + + Returns + ------- + dict + Raw mapping from job keys to specs. + """ all_workers_and_jobs = self.chainstore_hgetall(hkey=self.cfg_instance_id) return all_workers_and_jobs def _get_job_from_cstore(self, job_id: str): + """ + Fetch a normalized job spec from CStore by job_id. + + Parameters + ---------- + job_id : str + Identifier to search for. + + Returns + ------- + dict | None + Normalized job spec or None if not found. + """ all_workers_and_jobs = self._get_all_network_jobs() found = None for job_key, job_specs in all_workers_and_jobs.items(): @@ -459,6 +908,19 @@ def _get_job_from_cstore(self, job_id: str): def _get_job_status(self, job_id : str): + """ + Build a status or report payload for a job. + + Parameters + ---------- + job_id : str + Identifier of the job to inspect. + + Returns + ------- + dict + Status payload indicating running/completed/network_tracked/not_found. + """ target = None local_workers = self.scan_jobs.get(job_id) jobs_network_state = self._get_job_from_cstore(job_id) @@ -512,35 +974,222 @@ def _get_job_status(self, job_id : str): @BasePlugin.endpoint def list_features(self): + """ + List available service and web test features. + + Returns + ------- + dict + Mapping of categories to lists of feature names. + """ result = {"features": self._get_all_features(categs=True)} return result @BasePlugin.endpoint + def get_feature_catalog(self): + """ + Return the feature catalog with grouped features, labels, and descriptions. + + The catalog provides human-readable groupings of features for UI display, + along with the actual method names for each group. + + Returns + ------- + dict + Feature catalog with categories and all available methods. + """ + all_methods = self._get_all_features() + return { + "catalog": FEATURE_CATALOG, + "all_methods": all_methods, + } + + + @BasePlugin.endpoint(method="post") def launch_test( - self, - target: str = "", - start_port: int = 1, end_port: int = 65535, - exceptions: str = "64297" + self, + target: str = "", + start_port: int = 1, end_port: int = 65535, + exceptions: str = "64297", #todo format -> list + distribution_strategy: str = "", + port_order: str = "", + excluded_features: list[str] = None, + run_mode: str = "", + monitor_interval: int = 0, + scan_min_delay: float = 0.0, + scan_max_delay: float = 0.0, + task_name: str = "", + task_description: str = "", + selected_peers: list[str] = None, ): """ - Endpoint to start a pentest on the specified target. - Announce job to network via CStore and return current jobs. + Start a pentest on the specified target. + + Announces the job to the network via CStore; actual execution is handled + asynchronously by worker threads. + + Parameters + ---------- + target : str, optional + Hostname or IP to scan. + start_port : int, optional + Inclusive start port, default 1. + end_port : int, optional + Inclusive end port, default 65535. + exceptions : str, optional + Comma/space separated list of ports to skip. + distribution_strategy: str, optional + "MIRROR" to have all workers scan full range; "SLICE" to split range. + port_order: str, optional + Defines port scanning order at worker-thread level: + "SHUFFLE" to randomize port order; "SEQUENTIAL" for ordered scan. + excluded_features: list[str], optional + List of feature names to exclude from scanning. + run_mode: str, optional + "SINGLEPASS" (default) for one-time scan; "CONTINUOUS_MONITORING" for + repeated scans at monitor_interval. + monitor_interval: int, optional + Seconds between passes in CONTINUOUS_MONITORING mode (0 = use config). + scan_min_delay: float, optional + Minimum random delay between scan operations (Dune sand walking). + scan_max_delay: float, optional + Maximum random delay between scan operations (Dune sand walking). + task_name: str, optional + Human-readable name for the task. + task_description: str, optional + Human-readable description for the task. + selected_peers: list[str], optional + List of peer addresses to run the test on. If not provided or empty, + all configured chainstore_peers will be used. Each address must exist + in the chainstore_peers configuration. + + Returns + ------- + dict + Job specification, current worker id, and other active jobs. + + Raises + ------ + ValueError + If no target is provided or if selected_peers contains invalid addresses. """ # INFO: This method only announces the job to the network. It does not # execute the job itself - that part is handled by PentestJob # executed after periodical check from plugin process. + if excluded_features is None: + excluded_features = self.cfg_excluded_features or [] if not target: raise ValueError("No target specified.") start_port = int(start_port) end_port = int(end_port) + if start_port > end_port: + raise ValueError("start_port must be less than end_port.") + if len(exceptions) > 0: exceptions = [ int(x) for x in self.re.findall(r'\d+', exceptions) if x.isdigit() - ] + ] + else: + exceptions = [] + + # Validate excluded_features against known features and calculate enabled_features for audit + all_features = self.__features + if excluded_features: + invalid = [f for f in excluded_features if f not in all_features] + if invalid: + self.P(f"Warning: Unknown features in excluded_features (ignored): {self.json_dumps(invalid)}") + excluded_features = [f for f in excluded_features if f in all_features] + enabled_features = [f for f in all_features if f not in excluded_features] + + self.P(f"Excluded features: {self.json_dumps(excluded_features)}") + self.P(f"Enabled features: {self.json_dumps(enabled_features)}") + + distribution_strategy = str(distribution_strategy).upper() + + if not distribution_strategy or distribution_strategy not in ["MIRROR", "SLICE"]: + distribution_strategy = self.cfg_distribution_strategy + + port_order = str(port_order).upper() + if not port_order or port_order not in ["SHUFFLE", "SEQUENTIAL"]: + port_order = self.cfg_port_order + + # Validate run_mode and monitor_interval + run_mode = str(run_mode).upper() + if not run_mode or run_mode not in ["SINGLEPASS", "CONTINUOUS_MONITORING"]: + run_mode = self.cfg_run_mode + if monitor_interval <= 0: + monitor_interval = self.cfg_monitor_interval + + # Validate scan delays (Dune sand walking) + if scan_min_delay <= 0: + scan_min_delay = self.cfg_scan_min_rnd_delay + if scan_max_delay <= 0: + scan_max_delay = self.cfg_scan_max_rnd_delay + # Ensure min <= max + if scan_min_delay > scan_max_delay: + scan_min_delay, scan_max_delay = scan_max_delay, scan_min_delay + + # Validate and determine which peers to use + chainstore_peers = self.cfg_chainstore_peers + if not chainstore_peers: + raise ValueError("No workers found in chainstore peers configuration.") + + # Validate selected_peers against chainstore_peers + if selected_peers and len(selected_peers) > 0: + invalid_peers = [p for p in selected_peers if p not in chainstore_peers] + if invalid_peers: + raise ValueError( + f"Invalid peer addresses not found in chainstore_peers: {invalid_peers}. " + f"Available peers: {chainstore_peers}" + ) + active_peers = selected_peers + else: + active_peers = chainstore_peers + + num_workers = len(active_peers) + if num_workers == 0: + raise ValueError("No workers available for job execution.") + + workers = {} + if distribution_strategy == "MIRROR": + for address in active_peers: + workers[address] = { + "start_port": start_port, + "end_port": end_port, + "finished": False, + "result": None + } + # else if selected strategy is "SLICE" + else: + + total_ports = end_port - start_port + 1 + + base_ports_count = total_ports // num_workers + rem_ports_count = total_ports % num_workers + + current_start = start_port + for i, address in enumerate(active_peers): + if i < rem_ports_count: + size = base_ports_count + 1 + else: + size = base_ports_count + current_end = current_start + size - 1 + + workers[address] = { + "start_port": current_start, + "end_port": current_end, + "finished": False, + "result": None + } + + current_start = current_end + 1 + # end for chainstore_peers + # end if + job_id = self.uuid(8) self.P(f"Launching {job_id=} {target=} with {exceptions=}") self.P(f"Announcing pentest to workers (instance_id {self.cfg_instance_id})...") @@ -551,13 +1200,32 @@ def launch_test( "start_port" : start_port, "end_port" : end_port, "launcher": self.ee_addr, - "created_at": self.time(), - "workers" : { - self.ee_addr: { - "finished": False, - "result": None - } - }, + "launcher_alias": self.ee_id, + "date_created": self.time(), + "date_updated": self.time(), + "date_finalized": None, + "workers" : workers, + "distribution_strategy": distribution_strategy, + "port_order": port_order, + "excluded_features": excluded_features, + "enabled_features": enabled_features, + # Job lifecycle: RUNNING | SCHEDULED_FOR_STOP | STOPPED | FINALIZED + "job_status": "RUNNING", + # Continuous monitoring fields + "run_mode": run_mode, + "monitor_interval": monitor_interval, + "job_pass": 1, + "next_pass_at": None, + "pass_history": [], + # Dune sand walking + "scan_min_delay": scan_min_delay, + "scan_max_delay": scan_max_delay, + # Human-readable task info + # TODO: rename to job_ + "task_name": task_name, + "task_description": task_description, + # Peer selection (defaults to all chainstore_peers if not specified) + "selected_peers": active_peers, } self.chainstore_hset( hkey=self.cfg_instance_id, @@ -584,18 +1252,68 @@ def launch_test( @BasePlugin.endpoint def get_job_status(self, job_id: str): """ - Endpoint to retrieve the status or final report of a pentest job for the given target. - - TODO: Data must be extracted from CStore + Retrieve status or final report of a pentest job. + + Parameters + ---------- + job_id : str + Identifier of the job. + + Returns + ------- + dict + Status payload as returned by `_get_job_status`. """ # If job has completed, return its report return self._get_job_status(job_id) + @BasePlugin.endpoint + def get_job_data(self, job_id: str): + """ + Retrieve the complete job data from CStore. + + Unlike `get_job_status` which returns local worker progress, + this endpoint returns the full job specification including: + - All network workers and their completion status + - Job lifecycle state (RUNNING/SCHEDULED_FOR_STOP/STOPPED/FINALIZED) + - Launcher info and timestamps + - Distribution strategy and configuration + - Pass history for continuous monitoring jobs + + Parameters + ---------- + job_id : str + Identifier of the job. + + Returns + ------- + dict + Complete job data or error if not found. + """ + job_specs = self._get_job_from_cstore(job_id) + if job_specs: + return { + "job_id": job_id, + "found": True, + "job": job_specs, + } + return { + "job_id": job_id, + "found": False, + "message": "Job not found in network store.", + } + + @BasePlugin.endpoint def list_network_jobs(self): """ - Endpoint to list all network jobs. + List all network jobs stored in CStore. + + Returns + ------- + dict + Normalized job specs keyed by job_id. """ raw_network_jobs = self.chainstore_hgetall(hkey=self.cfg_instance_id) normalized_jobs = {} @@ -608,6 +1326,14 @@ def list_network_jobs(self): @BasePlugin.endpoint def list_local_jobs(self): + """ + List jobs currently running on this worker. + + Returns + ------- + dict + Mapping job_id to status payload. + """ jobs = { job_id: self._get_job_status(job_id) for job_id, local_workers in self.scan_jobs.items() @@ -618,7 +1344,17 @@ def list_local_jobs(self): @BasePlugin.endpoint def stop_and_delete_job(self, job_id : str): """ - Endpoint to stop and delete a pentest job. + Stop and delete a pentest job. + + Parameters + ---------- + job_id : str + Identifier of the job to stop. + + Returns + ------- + dict + Status message and job_id. """ # Stop the job if it's running local_workers = self.scan_jobs.get(job_id) @@ -643,10 +1379,92 @@ def stop_and_delete_job(self, job_id : str): return {"status": "success", "job_id": job_id} + @BasePlugin.endpoint + def get_report(self, cid: str): + """ + Retrieve a full report from R1FS by CID. + + Parameters + ---------- + cid : str + Content identifier of the report stored in R1FS. + + Returns + ------- + dict + The full report data or error message. + """ + if not cid: + return {"error": "No CID provided"} + try: + report = self.r1fs.get_json(cid) + if report is None: + return {"error": "Report not found", "cid": cid} + return {"cid": cid, "report": report} + except Exception as e: + self.P(f"Failed to retrieve report from R1FS: {e}", color='r') + return {"error": str(e), "cid": cid} + + + @BasePlugin.endpoint(method="post") + def stop_monitoring(self, job_id: str, stop_type: str = "SOFT"): + """ + Stop continuous monitoring for a job. + + Parameters + ---------- + job_id : str + Identifier of the job to stop monitoring. + stop_type : str, optional + "SOFT" (default): Let current pass complete, then stop. + Sets job_status="SCHEDULED_FOR_STOP". + "HARD": Stop immediately. Sets job_status="STOPPED". + + Returns + ------- + dict + Status including job_id and passes completed. + """ + raw_job_specs = self.chainstore_hget(hkey=self.cfg_instance_id, key=job_id) + if not raw_job_specs: + return {"error": "Job not found", "job_id": job_id} + + _, job_specs = self._normalize_job_record(job_id, raw_job_specs) + if job_specs.get("run_mode") != "CONTINUOUS_MONITORING": + return {"error": "Job is not in CONTINUOUS_MONITORING mode", "job_id": job_id} + + stop_type = str(stop_type).upper() + passes_completed = job_specs.get("job_pass", 1) + + if stop_type == "HARD": + job_specs["job_status"] = "STOPPED" + job_specs["date_updated"] = self.time() + job_specs["date_finalized"] = self.time() + self.P(f"[CONTINUOUS] Hard stop for job {job_id} after {passes_completed} passes") + else: + # SOFT stop - let current pass complete + job_specs["job_status"] = "SCHEDULED_FOR_STOP" + job_specs["date_updated"] = self.time() + self.P(f"[CONTINUOUS] Soft stop scheduled for job {job_id} (will stop after current pass)") + + self.chainstore_hset(hkey=self.cfg_instance_id, key=job_id, value=job_specs) + + return { + "job_status": job_specs["job_status"], + "stop_type": stop_type, + "job_id": job_id, + "passes_completed": passes_completed, + "pass_history": job_specs.get("pass_history", []), + } + + def process(self): """ - Periodically invoked to manage job threads. - Launches new jobs and checks for completed ones. + Periodic task handler: launch new jobs and close completed ones. + + Returns + ------- + None """ super(PentesterApi01Plugin, self).process() @@ -655,9 +1473,11 @@ def process(self): return elif not self.__warmup_done: self.__post_init() - #endif + #endif # Launch any new jobs self._maybe_launch_jobs() # Check active jobs for completion self._maybe_close_jobs() + # Finalize completed passes and handle continuous monitoring (launcher only) + self._maybe_finalize_pass() return diff --git a/extensions/business/cybersec/red_mesh/redmesh_utils.py b/extensions/business/cybersec/red_mesh/redmesh_utils.py index 940258e2..d1d93865 100644 --- a/extensions/business/cybersec/red_mesh/redmesh_utils.py +++ b/extensions/business/cybersec/red_mesh/redmesh_utils.py @@ -1,10 +1,12 @@ import uuid +import random import threading import socket import json import ftplib import requests import traceback +import time from copy import deepcopy @@ -27,33 +29,81 @@ class PentestLocalWorker( _WebTestsMixin ): """ - PentestJob handles the execution of a pentest scanning job for a given target. - It performs port scanning, service banner gathering, and basic web vulnerability tests. - - Parameters + Execute a pentest workflow against a target on a dedicated thread. + + The worker scans ports, gathers service banners, and performs lightweight web + security probes. It maintains local state and exposes status for aggregation. + + Attributes ---------- target : str - The network address (IP or hostname) to scan. - logger : callable, optional - Function for logging messages (e.g., plugin.P); if None, prints to stdout. - - - TODO: - - target ports must be configurable per worker from PENTESTER_API and each worker must receive a slice + Hostname or IP being scanned. + job_id : str + Identifier tying the worker to a network job. + initiator : str + Address that announced the job. + local_worker_id : str + Unique identifier per worker instance. + state : dict + Mutable status including ports scanned, open ports, and findings. """ def __init__( - self, - owner, - target, + self, + owner, + target, job_id : str, - initiator : str, + initiator : str, local_id_prefix : str, - worker_target_ports=COMMON_PORTS, + worker_target_ports=None, exceptions=None, + excluded_features=None, + enabled_features=None, + scan_min_delay: float = 0.0, + scan_max_delay: float = 0.0, ): + """ + Initialize a pentest worker with target ports and exclusions. + + Parameters + ---------- + owner : object + Parent object providing logger `P`. + target : str + Hostname or IP to scan. + job_id : str + Identifier of the job. + initiator : str + Address that announced the job. + local_id_prefix : str + Prefix used to derive a human-friendly worker id. + worker_target_ports : list[int], optional + Ports assigned to this worker; defaults to common ports. + exceptions : list[int], optional + Ports to exclude from scanning. + excluded_features: list[str], optional + List of feature method names to exclude. + enabled_features: list[str], optional + List of feature method names to enable (overrides exclusions). + scan_min_delay : float, optional + Minimum random delay (seconds) between operations (Dune sand walking). + scan_max_delay : float, optional + Maximum random delay (seconds) between operations (Dune sand walking). + + Raises + ------ + ValueError + If no ports remain after applying exceptions. + """ + if worker_target_ports is None: + worker_target_ports = COMMON_PORTS + if excluded_features is None: + excluded_features = [] + if enabled_features is None: + enabled_features = [] if exceptions is None: exceptions = [] + self.target = target self.job_id = job_id self.initiator = initiator @@ -61,7 +111,10 @@ def __init__( local_id_prefix, str(uuid.uuid4())[:4] ) self.owner = owner + self.scan_min_delay = scan_min_delay + self.scan_max_delay = scan_max_delay + self.P(f"Initializing pentest worker {self.local_worker_id} for target {self.target}...") # port handling if exceptions: self.P("Given exceptions: {}".format(exceptions)) @@ -77,6 +130,7 @@ def __init__( worker_target_ports = [p for p in worker_target_ports if p not in exceptions] if not worker_target_ports: raise ValueError("No ports available for worker after applying exceptions.") + self.initial_ports = list(worker_target_ports) # end port handling @@ -97,7 +151,10 @@ def __init__( "done": False, "canceled": False, } - self.__features = self._get_all_features() + self.__all_features = self._get_all_features() + + self.__excluded_features = excluded_features + self.__enabled_features = enabled_features self.P("Initialized worker {} on {} ports [{}-{}]...".format( self.local_worker_id, len(worker_target_ports), @@ -107,6 +164,19 @@ def __init__( return def _get_all_features(self, categs=False): + """ + Discover available probe methods on this worker. + + Parameters + ---------- + categs : bool, optional + If True, return dict by category; otherwise flat list. + + Returns + ------- + dict | list + Service and web test method names. + """ features = {} if categs else [] PREFIXES = ["_service_info_", "_web_test_"] for prefix in PREFIXES: @@ -119,6 +189,14 @@ def _get_all_features(self, categs=False): @staticmethod def get_worker_specific_result_fields(): + """ + Define fields that require aggregation functions across workers. + + Returns + ------- + dict + Mapping of field name to aggregation callable/type. + """ return { "start_port" : min, "end_port" : max, @@ -132,9 +210,28 @@ def get_worker_specific_result_fields(): def get_status(self, for_aggregations=False): + """ + Produce a status snapshot for this worker. + + Parameters + ---------- + for_aggregations : bool, optional + If True, omit volatile fields to simplify merges. + + Returns + ------- + dict + Worker status including progress and findings. + """ completed_tests = self.state.get("completed_tests", []) - max_features = len(self.__features) + 1 # +1 from port scanning - progress = f"{(len(completed_tests) / max_features) * 100 if self.__features else 0:.1f}%" + open_ports = self.state.get("open_ports", []) + if open_ports: + # Full work: port scan + all enabled features + 2 completion markers + max_features = len(self.__enabled_features) + 3 + else: + # No open ports: just port scan + 2 completion markers + max_features = 3 + progress = f"{(len(completed_tests) / max_features) * 100:.1f}%" dct_status = { # same data for all workers below @@ -167,6 +264,21 @@ def get_status(self, for_aggregations=False): def P(self, s, **kwargs): + """ + Log a message with worker context prefix. + + Parameters + ---------- + s : str + Message to emit. + **kwargs + Additional logging keyword arguments. + + Returns + ------- + Any + Result of owner logger. + """ s = f"[{self.local_worker_id}:{self.target}] {s}" self.owner.P(s, **kwargs) return @@ -175,6 +287,10 @@ def P(self, s, **kwargs): def start(self): """ Start the pentest job in a new thread. + + Returns + ------- + None """ # Event to signal early stopping self.stop_event = threading.Event() @@ -187,6 +303,10 @@ def start(self): def stop(self): """ Signal the job to stop early. + + Returns + ------- + None """ self.P(f"Stop requested for job {self.job_id} on worker {self.local_worker_id}") self.stop_event.set() @@ -194,13 +314,43 @@ def stop(self): def _check_stopped(self): + """ + Determine whether the worker should cease execution. + + Returns + ------- + bool + True if done or stop event set. + """ return self.state["done"] or self.stop_event.is_set() + def _interruptible_sleep(self): + """ + Sleep for a random interval (Dune sand walking). + + Returns + ------- + bool + True if stop was requested (should exit), False otherwise. + """ + if self.scan_max_delay <= 0: + return False # Delays disabled + delay = random.uniform(self.scan_min_delay, self.scan_max_delay) + time.sleep(delay) + # TODO: while elapsed < delay with sleep(0.1) could be used for more granular interruptible sleep + # Check if stop was requested during sleep + return self.stop_event.is_set() + + def execute_job(self): """ Run the full pentesting workflow: port scanning, service info gathering, and web vulnerability tests, until the job is complete or stopped. + + Returns + ------- + None """ try: self.P(f"Starting pentest job.") @@ -234,6 +384,17 @@ def execute_job(self): def _scan_ports_step(self, batch_size=None, batch_nr=1): """ Scan a batch of ports from the remaining list to identify open ports. + + Parameters + ---------- + batch_size : int, optional + Number of ports per batch; scans all remaining when None. + batch_nr : int, optional + Batch index (used for logging). + + Returns + ------- + None """ REGISTER_PROGRESS_EACH = 500 @@ -254,6 +415,7 @@ def _scan_ports_step(self, batch_size=None, batch_nr=1): self.P(f"Scanning {nr_ports} ports in batch {batch_nr}.") show_progress = False if len(ports_batch) > 1000: + # Avoid noisy progress logs on tiny batches. show_progress = True for i, port in enumerate(ports_batch): if self.stop_event.is_set(): @@ -270,8 +432,9 @@ def _scan_ports_step(self, batch_size=None, batch_nr=1): finally: sock.close() # endtry - self.state["ports_scanned"].append(port) - self.state["ports_to_scan"].remove(port) + self.state["ports_scanned"].append(port) + self.state["ports_to_scan"].remove(port) + if ((i + 1) % REGISTER_PROGRESS_EACH) == 0: scan_ports_step_progress = (i + 1) / nr_ports * 100 str_progress = f"{scan_ports_step_progress:.0f}%" @@ -281,6 +444,12 @@ def _scan_ports_step(self, batch_size=None, batch_nr=1): if show_progress: self.P(f"Port scanning progress on {target}: {str_progress}") + # Dune sand walking - random delay after each port scan + if self._interruptible_sleep(): + # TODO: LOGGING "returning early from loop 5/300 iteration" + return # Stop was requested during sleep + #end for each port + left_ports = self.state["ports_to_scan"] if not left_ports: self.P(f"[{target}] Port scanning completed. {len(self.state['open_ports'])} open ports.") @@ -293,6 +462,11 @@ def _scan_ports_step(self, batch_size=None, batch_nr=1): def _gather_service_info(self): """ Gather banner or basic information from each newly open port. + + Returns + ------- + list + Aggregated string findings per method (may be empty). """ open_ports = self.state["open_ports"] if len(open_ports) == 0: @@ -300,32 +474,44 @@ def _gather_service_info(self): return self.P(f"Gathering service info for {len(open_ports)} open ports.") target = self.target - service_info_methods = [method for method in dir(self) if method.startswith("_service_info_")] + service_info_methods = [m for m in self.__enabled_features if m.startswith("_service_info_")] aggregated_info = [] for method in service_info_methods: func = getattr(self, method) method_info = [] for port in open_ports: if self.stop_event.is_set(): - continue + return info = func(target, port) if port not in self.state["service_info"]: self.state["service_info"][port] = {} self.state["service_info"][port][method] = info if info is not None: method_info.append(f"{method}: {port}: {info}") + + # Dune sand walking - random delay before each service probe + if self._interruptible_sleep(): + return # Stop was requested during sleep + #end for each port of current method + if method_info: aggregated_info.extend(method_info) self.P( f"Method {method} findings:\n{json.dumps(method_info, indent=2)}" ) self.state["completed_tests"].append(method) + # end for each method return aggregated_info def _run_web_tests(self): """ Perform basic web vulnerability tests if a web service is open. + + Returns + ------- + list + Collected findings per test method (may be empty). """ open_ports = self.state["open_ports"] if len(open_ports) == 0: @@ -342,18 +528,22 @@ def _run_web_tests(self): self.state["web_tested"] = True return result = [] - web_tests_methods = [method for method in dir(self) if method.startswith("_web_test_")] + web_tests_methods = [m for m in self.__enabled_features if m.startswith("_web_test_")] for method in web_tests_methods: func = getattr(self, method) for port in ports_to_test: if self.stop_event.is_set(): - return + return iter_result = func(target, port) if iter_result: result.append(f"{method}:{port} {iter_result}") if port not in self.state["web_tests_info"]: self.state["web_tests_info"][port] = {} self.state["web_tests_info"][port][method] = iter_result + + # Dune sand walking - random delay before each web test + if self._interruptible_sleep(): + return # Stop was requested during sleep # end for each port of current method self.state["completed_tests"].append(method) # register completed method for port # end for each method diff --git a/extensions/business/cybersec/red_mesh/service_mixin.py b/extensions/business/cybersec/red_mesh/service_mixin.py index 7e74dfbf..6f225e23 100644 --- a/extensions/business/cybersec/red_mesh/service_mixin.py +++ b/extensions/business/cybersec/red_mesh/service_mixin.py @@ -11,15 +11,27 @@ class _ServiceInfoMixin: Network service banner probes feeding RedMesh reports. Each helper focuses on a specific protocol and maps findings to - OWASP vulnerability families such as A06:2021 (Security - Misconfiguration) or A09:2021 (Security Logging and Monitoring). - The mixin is intentionally light-weight so that PentestLocalWorker - threads can run without external dependencies while still surfacing - high-signal security clues. + OWASP vulnerability families. The mixin is intentionally light-weight so + that `PentestLocalWorker` threads can run without heavy dependencies while + still surfacing high-signal clues. """ def _service_info_80(self, target, port): - """Collect HTTP banner and server metadata for common web ports.""" + """ + Collect HTTP banner and server metadata for common web ports. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Banner summary or error message. + """ info = None try: scheme = "https" if port in (443, 8443) else "http" @@ -36,7 +48,21 @@ def _service_info_80(self, target, port): def _service_info_8080(self, target, port): - """Probe alternate HTTP port 8080 for verbose banners.""" + """ + Probe alternate HTTP port 8080 for verbose banners. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Banner text or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -58,7 +84,21 @@ def _service_info_8080(self, target, port): def _service_info_443(self, target, port): - """Collect HTTPS response banner data for TLS services.""" + """ + Collect HTTPS response banner data for TLS services. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Banner summary or error message. + """ info = None try: url = f"https://{target}" @@ -74,7 +114,21 @@ def _service_info_443(self, target, port): def _service_info_tls(self, target, port): - """Inspect TLS handshake details and certificate lifetime.""" + """ + Inspect TLS handshake details and certificate lifetime. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + TLS version/cipher summary or error message. + """ info = None try: context = ssl.create_default_context() @@ -104,7 +158,21 @@ def _service_info_tls(self, target, port): def _service_info_21(self, target, port): - """Identify FTP banners and anonymous login exposure.""" + """ + Identify FTP banners and anonymous login exposure. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + FTP banner info or vulnerability message. + """ info = None try: ftp = ftplib.FTP(timeout=3) @@ -123,7 +191,21 @@ def _service_info_21(self, target, port): return info def _service_info_22(self, target, port): - """Retrieve the SSH banner to fingerprint implementations.""" + """ + Retrieve the SSH banner to fingerprint implementations. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + SSH banner text or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -138,7 +220,21 @@ def _service_info_22(self, target, port): return info def _service_info_25(self, target, port): - """Capture SMTP banner data for mail infrastructure mapping.""" + """ + Capture SMTP banner data for mail infrastructure mapping. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + SMTP banner text or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -153,7 +249,21 @@ def _service_info_25(self, target, port): return info def _service_info_3306(self, target, port): - """Perform a lightweight MySQL handshake to expose server version.""" + """ + Perform a lightweight MySQL handshake to expose server version. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + MySQL version info or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -172,7 +282,21 @@ def _service_info_3306(self, target, port): return info def _service_info_3389(self, target, port): - """Verify reachability of RDP services without full negotiation.""" + """ + Verify reachability of RDP services without full negotiation. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + RDP reachability summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -186,7 +310,21 @@ def _service_info_3389(self, target, port): return info def _service_info_6379(self, target, port): - """Test Redis exposure by issuing a PING command.""" + """ + Test Redis exposure by issuing a PING command. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Redis response summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -208,7 +346,21 @@ def _service_info_6379(self, target, port): def _service_info_23(self, target, port): - """Fetch Telnet negotiation banner (OWASP A05: insecure protocols).""" + """ + Fetch Telnet negotiation banner. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Telnet banner or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -227,7 +379,21 @@ def _service_info_23(self, target, port): def _service_info_445(self, target, port): - """Probe SMB services for negotiation responses (OWASP A06).""" + """ + Probe SMB services for negotiation responses. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + SMB response summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -248,7 +414,21 @@ def _service_info_445(self, target, port): def _service_info_5900(self, target, port): - """Read VNC handshake string to assess remote desktop exposure.""" + """ + Read VNC handshake string to assess remote desktop exposure. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + VNC banner summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -267,7 +447,21 @@ def _service_info_5900(self, target, port): def _service_info_161(self, target, port): - """Attempt SNMP community string disclosure using 'public'.""" + """ + Attempt SNMP community string disclosure using 'public'. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + SNMP response summary or error message. + """ info = None sock = None try: @@ -299,7 +493,21 @@ def _service_info_161(self, target, port): def _service_info_53(self, target, port): - """Query CHAOS TXT version.bind to detect DNS version disclosure.""" + """ + Query CHAOS TXT version.bind to detect DNS version disclosure. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + DNS disclosure summary or error message. + """ info = None sock = None try: @@ -362,7 +570,21 @@ def _service_info_53(self, target, port): def _service_info_1433(self, target, port): - """Send a TDS prelogin probe to expose SQL Server version data.""" + """ + Send a TDS prelogin probe to expose SQL Server version data. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + MSSQL response summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -387,7 +609,21 @@ def _service_info_1433(self, target, port): def _service_info_5432(self, target, port): - """Probe PostgreSQL for weak authentication methods.""" + """ + Probe PostgreSQL for weak authentication methods. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + PostgreSQL response summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -411,7 +647,21 @@ def _service_info_5432(self, target, port): def _service_info_11211(self, target, port): - """Issue Memcached stats command to detect unauthenticated access.""" + """ + Issue Memcached stats command to detect unauthenticated access. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Memcached response summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -431,7 +681,21 @@ def _service_info_11211(self, target, port): def _service_info_9200(self, target, port): - """Detect Elasticsearch/OpenSearch nodes leaking cluster metadata.""" + """ + Detect Elasticsearch/OpenSearch nodes leaking cluster metadata. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Elasticsearch exposure summary or error message. + """ info = None try: scheme = "http" @@ -450,7 +714,21 @@ def _service_info_9200(self, target, port): def _service_info_502(self, target, port): - """Send Modbus device identification request to detect exposed PLCs.""" + """ + Send Modbus device identification request to detect exposed PLCs. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Modbus exposure summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -473,7 +751,21 @@ def _service_info_502(self, target, port): def _service_info_27017(self, target, port): - """Attempt MongoDB isMaster handshake to detect unauthenticated access.""" + """ + Attempt MongoDB isMaster handshake to detect unauthenticated access. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + MongoDB exposure summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -508,7 +800,21 @@ def _service_info_27017(self, target, port): def _service_info_generic(self, target, port): - """Attempt a generic TCP banner grab for uncovered ports.""" + """ + Attempt a generic TCP banner grab for uncovered ports. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Generic banner text or error message. + """ info = None try: # Generic service: attempt to connect and read a short banner if any diff --git a/extensions/business/cybersec/red_mesh/web_mixin.py b/extensions/business/cybersec/red_mesh/web_mixin.py index d42a9db6..0eb170b2 100644 --- a/extensions/business/cybersec/red_mesh/web_mixin.py +++ b/extensions/business/cybersec/red_mesh/web_mixin.py @@ -2,10 +2,29 @@ from urllib.parse import quote class _WebTestsMixin: - """HTTP-centric probes that emulate manual red-team playbooks.""" + """ + HTTP-centric probes that emulate manual red-team playbooks. + + Methods perform lightweight checks for common web vulnerabilities across + discovered web services. + """ def _web_test_common(self, target, port): - """Look for exposed common endpoints and weak access controls.""" + """ + Look for exposed common endpoints and weak access controls. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings from endpoint checks. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -35,7 +54,21 @@ def _web_test_common(self, target, port): def _web_test_homepage(self, target, port): - """Scan landing pages for clear-text secrets or database dumps.""" + """ + Scan landing pages for clear-text secrets or database dumps. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings from homepage inspection. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -67,7 +100,21 @@ def _web_test_homepage(self, target, port): def _web_test_security_headers(self, target, port): - """Flag missing HTTP security headers (OWASP A05/A06).""" + """ + Flag missing HTTP security headers. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings about security headers presence. + """ findings = [] try: scheme = "https" if port in (443, 8443) else "http" @@ -100,7 +147,21 @@ def _web_test_security_headers(self, target, port): def _web_test_flags(self, target, port): - """Check cookies for Secure/HttpOnly/SameSite and directory listing.""" + """ + Check cookies for Secure/HttpOnly/SameSite and directory listing. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings on cookie flags and directory listing. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -140,7 +201,21 @@ def _web_test_flags(self, target, port): def _web_test_xss(self, target, port): - """Probe reflected XSS by injecting a harmless script tag.""" + """ + Probe reflected XSS by injecting a harmless script tag. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings related to reflected XSS. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -165,7 +240,21 @@ def _web_test_xss(self, target, port): def _web_test_path_traversal(self, target, port): - """Attempt basic path traversal payload against the target.""" + """ + Attempt basic path traversal payload against the target. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings about traversal attempts. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -189,7 +278,21 @@ def _web_test_path_traversal(self, target, port): def _web_test_sql_injection(self, target, port): - """Send boolean SQLi payload and look for database error leakage.""" + """ + Send boolean SQLi payload and look for database error leakage. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings related to SQL injection. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -216,7 +319,21 @@ def _web_test_sql_injection(self, target, port): def _web_test_cors_misconfiguration(self, target, port): - """Detect overly permissive CORS policies (OWASP A01/A05).""" + """ + Detect overly permissive CORS policies. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings related to CORS policy. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -256,7 +373,21 @@ def _web_test_cors_misconfiguration(self, target, port): def _web_test_open_redirect(self, target, port): - """Check common redirect parameters for open redirect abuse.""" + """ + Check common redirect parameters for open redirect abuse. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings about open redirects. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -291,7 +422,21 @@ def _web_test_open_redirect(self, target, port): def _web_test_http_methods(self, target, port): - """Surface risky HTTP verbs enabled on the root resource.""" + """ + Surface risky HTTP verbs enabled on the root resource. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings related to allowed HTTP methods. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -320,7 +465,21 @@ def _web_test_http_methods(self, target, port): def _web_test_graphql_introspection(self, target, port): - """Check if GraphQL introspection is exposed in production endpoints.""" + """ + Check if GraphQL introspection is exposed in production endpoints. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings on GraphQL introspection exposure. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -346,7 +505,21 @@ def _web_test_graphql_introspection(self, target, port): def _web_test_metadata_endpoints(self, target, port): - """Probe cloud metadata paths to detect SSRF-style exposure.""" + """ + Probe cloud metadata paths to detect SSRF-style exposure. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings on metadata endpoint exposure. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -375,7 +548,21 @@ def _web_test_metadata_endpoints(self, target, port): def _web_test_api_auth_bypass(self, target, port): - """Detect APIs that succeed despite invalid Authorization headers.""" + """ + Detect APIs that succeed despite invalid Authorization headers. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings related to auth bypass behavior. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" diff --git a/extensions/business/deeploy/deeploy_const.py b/extensions/business/deeploy/deeploy_const.py index e5df1eef..577a17c6 100644 --- a/extensions/business/deeploy/deeploy_const.py +++ b/extensions/business/deeploy/deeploy_const.py @@ -144,6 +144,8 @@ class DEFAULT_CONTAINER_RESOURCES: JOB_TYPE_RESOURCE_SPECS = { # Generic Apps + 53: {DEEPLOY_RESOURCES.CPU: 0.25, DEEPLOY_RESOURCES.MEMORY: '512m', DEEPLOY_RESOURCES.STORAGE: '2g'}, # micro + 54: {DEEPLOY_RESOURCES.CPU: 0.5, DEEPLOY_RESOURCES.MEMORY: '1g', DEEPLOY_RESOURCES.STORAGE: '4g'}, # lite 1: {DEEPLOY_RESOURCES.CPU: 1, DEEPLOY_RESOURCES.MEMORY: '2g', DEEPLOY_RESOURCES.STORAGE: '8g'}, # entry 2: {DEEPLOY_RESOURCES.CPU: 2, DEEPLOY_RESOURCES.MEMORY: '4g', DEEPLOY_RESOURCES.STORAGE: '16g'}, # low1 3: {DEEPLOY_RESOURCES.CPU: 2, DEEPLOY_RESOURCES.MEMORY: '8g', DEEPLOY_RESOURCES.STORAGE: '32g'}, # low2 diff --git a/extensions/business/deeploy/deeploy_manager_api.py b/extensions/business/deeploy/deeploy_manager_api.py index 6b57828a..62f95331 100644 --- a/extensions/business/deeploy/deeploy_manager_api.py +++ b/extensions/business/deeploy/deeploy_manager_api.py @@ -29,7 +29,8 @@ 'PORT': None, 'ASSETS' : 'nothing', # TODO: this should not be required in future - + 'REQUEST_TIMEOUT': 300, + 'DEEPLOY_VERBOSE' : 10, 'SUPRESS_LOGS_AFTER_INTERVAL' : 300, @@ -446,7 +447,7 @@ def create_pipeline( For CONTAINER_APP_RUNNER: - IMAGE : str (required) - CONTAINER_RESOURCES : dict (required) - - cpu : int + - cpu : int | float - memory : str (e.g., "4096m", "4g") - CR, PORT, ENV, VOLUMES, TUNNEL_ENGINE_ENABLED, etc. For native plugins: diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 00865929..52a87d81 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -728,6 +728,13 @@ def _validate_plugin_instance_for_signature(self, signature: str, plugin_instanc f"{DEEPLOY_ERRORS.REQUEST6}. Plugin instance{index_str} with signature '{signature}': 'CONTAINER_RESOURCES.cpu' is required." ) + try: + self._parse_cpu_value(resources.get(DEEPLOY_RESOURCES.CPU), default=None) + except ValueError: + raise ValueError( + f"{DEEPLOY_ERRORS.REQUEST6}. Plugin instance{index_str} with signature '{signature}': 'CONTAINER_RESOURCES.cpu' must be a number." + ) + if DEEPLOY_RESOURCES.MEMORY not in resources: raise ValueError( f"{DEEPLOY_ERRORS.REQUEST6}. Plugin instance{index_str} with signature '{signature}': 'CONTAINER_RESOURCES.memory' is required." @@ -1087,6 +1094,17 @@ def deeploy_get_auth_result(self, inputs): } return result + def _parse_cpu_value(self, value, default=None): + """ + Parse CPU value as float, allowing numeric strings. + """ + if value is None: + return default + try: + return float(value) + except (TypeError, ValueError): + raise ValueError(f"{DEEPLOY_ERRORS.REQUEST6}. 'CONTAINER_RESOURCES.cpu' must be a number.") + def _aggregate_container_resources(self, inputs): """ Aggregate container resources across all CONTAINER_APP_RUNNER plugin instances. @@ -1110,11 +1128,17 @@ def _aggregate_container_resources(self, inputs): self.Pd("Using legacy format (app_params) for resource aggregation") app_params = inputs.get(DEEPLOY_KEYS.APP_PARAMS, {}) legacy_resources = app_params.get(DEEPLOY_RESOURCES.CONTAINER_RESOURCES, {}) + if isinstance(legacy_resources, dict): + legacy_resources = self.deepcopy(legacy_resources) + if DEEPLOY_RESOURCES.CPU in legacy_resources: + legacy_resources[DEEPLOY_RESOURCES.CPU] = self._parse_cpu_value( + legacy_resources.get(DEEPLOY_RESOURCES.CPU) + ) self.Pd(f"Legacy resources: {legacy_resources}") return legacy_resources self.Pd(f"Processing {len(plugins_array)} plugin instances from plugins array") - total_cpu = 0 + total_cpu = 0.0 total_memory_mb = 0 # Iterate through plugins array (simplified format - each object is an instance) @@ -1125,7 +1149,7 @@ def _aggregate_container_resources(self, inputs): # Only aggregate for CONTAINER_APP_RUNNER and WORKER_APP_RUNNER plugins if signature in CONTAINERIZED_APPS_SIGNATURES: resources = plugin_instance.get(DEEPLOY_RESOURCES.CONTAINER_RESOURCES, {}) - cpu = resources.get(DEEPLOY_RESOURCES.CPU, 0) + cpu = self._parse_cpu_value(resources.get(DEEPLOY_RESOURCES.CPU, 0)) memory = resources.get(DEEPLOY_RESOURCES.MEMORY, "0m") self.Pd(f" Container resources: cpu={cpu}, memory={memory}") diff --git a/extensions/business/deeploy/deeploy_target_nodes_mixin.py b/extensions/business/deeploy/deeploy_target_nodes_mixin.py index 2a4c3a80..60dd4535 100644 --- a/extensions/business/deeploy/deeploy_target_nodes_mixin.py +++ b/extensions/business/deeploy/deeploy_target_nodes_mixin.py @@ -164,7 +164,10 @@ def __find_suitable_nodes_for_container_app(self, nodes_with_resources, containe self.Pd(f"Starting __find_suitable_nodes_for_container_app with {len(nodes_with_resources)} candidate nodes") suitable_nodes = {} - required_cpu = container_requested_resources.get(DEEPLOY_RESOURCES.CPU, DEFAULT_CONTAINER_RESOURCES.CPU) + required_cpu = self._parse_cpu_value( + container_requested_resources.get(DEEPLOY_RESOURCES.CPU, DEFAULT_CONTAINER_RESOURCES.CPU), + default=DEFAULT_CONTAINER_RESOURCES.CPU, + ) required_mem = container_requested_resources.get(DEEPLOY_RESOURCES.MEMORY, DEFAULT_CONTAINER_RESOURCES.MEMORY) required_mem_bytes = self._parse_memory(required_mem) @@ -225,10 +228,13 @@ def __find_suitable_nodes_for_container_app(self, nodes_with_resources, containe continue self.Pd(f"Node {addr} has {self.json_dumps(used_container_resources)} used container resources.") # Sum up resources used by node. - used_cpu = 0 + used_cpu = 0.0 used_memory = 0 for res in used_container_resources: - cpu = int(res.get(DEEPLOY_RESOURCES.CPU, DEFAULT_CONTAINER_RESOURCES.CPU)) + cpu = self._parse_cpu_value( + res.get(DEEPLOY_RESOURCES.CPU, DEFAULT_CONTAINER_RESOURCES.CPU), + default=DEFAULT_CONTAINER_RESOURCES.CPU, + ) memory = res.get(DEEPLOY_RESOURCES.MEMORY, DEFAULT_CONTAINER_RESOURCES.MEMORY) used_cpu += cpu used_memory += self._parse_memory(memory) diff --git a/extensions/business/tunnels/tunnels_manager.py b/extensions/business/tunnels/tunnels_manager.py index 9af4e986..1a199d75 100644 --- a/extensions/business/tunnels/tunnels_manager.py +++ b/extensions/business/tunnels/tunnels_manager.py @@ -171,6 +171,8 @@ def new_tunnel(self, alias: str, cloudflare_account_id: str, cloudflare_zone_id: } dns_record = self.requests.post(url, headers=headers, json=data).json() + public_name = None + dns_record_public = None if tunnel_type == "tcp": # For TCP tunnels, we also need to create a CNAME for the public URL public_name = new_id.removeprefix(f"{self.cfg_tcp_prefix}-") @@ -189,8 +191,8 @@ def new_tunnel(self, alias: str, cloudflare_account_id: str, cloudflare_zone_id: "tunnel_token": tunnel_info['result']['token'], "dns_record_id": dns_record['result']['id'], "dns_name": f"{new_id}.{cloudflare_domain}", - "dns_record_public_id": dns_record_public['result']['id'] if tunnel_type == "tcp" else None, - "dns_public_name": f"{public_name}.{cloudflare_domain}", + "dns_record_public_id": dns_record_public['result']['id'] if dns_record_public else None, + "dns_public_name": f"{public_name}.{cloudflare_domain}" if public_name else None, "custom_hostnames": [], "type": tunnel_type, "creator": "ratio1" diff --git a/ver.py b/ver.py index eb5e59f5..2092ebef 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.9.970' +__VER__ = '2.9.980'