diff --git a/README.md b/README.md index 56e47871..b7e1f8c0 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # Ratio1 Edge Node + Welcome to the **Ratio1 Edge Node** repository, formerly known as the **Naeural Edge Protocol Edge Node**. As a pivotal component of the Ratio1 ecosystem, this Edge Node software empowers a decentralized, privacy-preserving, and secure edge computing network. By enabling a collaborative network of edge nodes, Ratio1 facilitates the secure sharing of resources and the seamless execution of computation tasks across diverse devices. Documentation sections: @@ -16,12 +17,12 @@ Documentation sections: ## Introduction -The Ratio1 Edge Node is a meta Operating System designed to operate on edge devices, providing them the essential functionality required to join and thrive within the Ratio1 network. Each Edge Node manages the device’s resources, executes computation tasks efficiently, and communicates securely with other nodes in the network. Leveraging the powerful Ratio1 core libraries (formely known as Naeural Edge Protocol libraries) `naeural_core` and `ratio1`— the Ratio1 Edge Node offers out-of-the-box usability starting in 2025. Users can deploy the Edge Node and SDK (`ratio1`) effortlessly without the need for intricate configurations, local subscriptions, tenants, user accounts, passwords, or broker setups. +The Ratio1 Edge Node is a meta Operating System designed to operate on edge devices, providing them the essential functionality required to join and thrive within the Ratio1 network. Each Edge Node manages the device’s resources, executes computation tasks efficiently, and communicates securely with other nodes in the network. Leveraging the powerful Ratio1 core libraries (formerly known as Naeural Edge Protocol libraries) `naeural_core` and the Ratio1 SDK (`ratio1_sdk`, published on PyPI as `ratio1`), the Ratio1 Edge Node offers out-of-the-box usability starting in 2025 without intricate configurations, local subscriptions, tenants, user accounts, passwords, or broker setups. ## Related Repositories - [ratio1/naeural_core](https://github.com/ratio1/naeural_core) provides the modular pipeline engine that powers data ingestion, processing, and serving inside this node. Extend or troubleshoot runtime behavior by mirroring the folder layout in `extensions/` against the upstream modules. -- [Ratio1/ratio1_sdk](https://github.com/Ratio1/ratio1_sdk) is the client toolkit for building and dispatching jobs to Ratio1 nodes. Its tutorials pair with the workflows in `plugins/business/tutorials/` and are the best place to validate end-to-end scenarios. +- [Ratio1/ratio1_sdk](https://github.com/Ratio1/ratio1_sdk) is the client toolkit for building and dispatching jobs to Ratio1 nodes (published on PyPI as `ratio1`). Its tutorials pair with the workflows in `plugins/business/tutorials/` and are the best place to validate end-to-end scenarios. When developing custom logic, install the three repositories in the same virtual environment (`pip install -e . ../naeural_core ../ratio1_sdk`) so interface changes remain consistent across the stack. @@ -33,28 +34,30 @@ When developing custom logic, install the three repositories in the same virtual Deploying a Ratio1 Edge Node within a development network is straightforward. Execute the following Docker command to launch the node making sure you mount a persistent volume to the container to preserve the node data between restarts: ```bash -docker run -d --rm --name r1node --pull=always -v r1vol:/edge_node/_local_cache/ ratio1/edge_node:develop +docker run -d --rm --name r1node --pull=always -v r1vol:/edge_node/_local_cache/ ratio1/edge_node:devnet ``` - `-d`: Runs the container in the background. - `--rm`: Removes the container upon stopping. - `--name r1node`: Assigns the name `r1node` to the container. - `--pull=always`: Ensures the latest image version is always pulled. -- `ratio1/edge_node:develop`: Specifies the Docker image to run. +- `ratio1/edge_node:devnet`: Specifies the devnet image; use `:mainnet` or `:testnet` for those networks. - `-v r1vol:/edge_node/_local_cache/`: Mounts the `r1vol` volume to the `/edge_node/_local_cache/` directory within the container. +Architecture-specific variants (for example `:devnet-arm64`, `:devnet-tegra`, `:devnet-amd64-cpu`) will follow; pick the tag that matches your hardware once available. + This command initializes the Ratio1 Edge Node in development mode, automatically connecting it to the Ratio1 development network and preparing it to receive computation tasks while ensuring that all node data is stored in `r1vol`, preserving it between container restarts. If for some reason you encounter issues when running the Edge Node, you can try to run the container with the `--platform linux/amd64` flag to ensure that the container runs on the correct platform. ```bash -docker run -d --rm --name r1node --platform linux/amd64 --pull=always -v r1vol:/edge_node/_local_cache/ ratio1/edge_node:develop +docker run -d --rm --name r1node --platform linux/amd64 --pull=always -v r1vol:/edge_node/_local_cache/ ratio1/edge_node:devnet ``` Also, if you have GPU(s) on your machine, you can enable GPU support by adding the `--gpus all` flag to the Docker command. This flag allows the Edge Node to utilize the GPU(s) for computation tasks. ```bash -docker run -d --rm --name r1node --gpus all --pull=always -v r1vol:/edge_node/_local_cache/ ratio1/edge_node:develop +docker run -d --rm --name r1node --gpus all --pull=always -v r1vol:/edge_node/_local_cache/ ratio1/edge_node:devnet ``` This will ensure that your node will be able to utilize the GPU(s) for computation tasks and will accept training and inference jobs that require GPU acceleration. @@ -64,12 +67,12 @@ This will ensure that your node will be able to utilize the GPU(s) for computati If you want to run multiple Edge Nodes on the same machine, you can do so by specifying different names for each container but more importantly, you need to specify different volumes for each container to avoid conflicts between the nodes. You can do this by creating a new volume for each node and mounting it to the container as follows: ```bash -docker run -d --rm --name r1node1 --pull=always -v r1vol1:/edge_node/_local_cache/ ratio1/edge_node:develop -docker run -d --rm --name r1node2 --pull=always -v r1vol2:/edge_node/_local_cache/ ratio1/edge_node:develop +docker run -d --rm --name r1node1 --pull=always -v r1vol1:/edge_node/_local_cache/ ratio1/edge_node:devnet +docker run -d --rm --name r1node2 --pull=always -v r1vol2:/edge_node/_local_cache/ ratio1/edge_node:devnet ``` Now you can run multiple Edge Nodes on the same machine without any conflicts between them. ->NOTE: If you are running multiple nodes on the same machine it is recommended to use docker-compose to manage the nodes. You can find an example of how to run multiple nodes on the same machine using docker-compose in the [Running multiple nodes on the same machine](#running-multiple-nodes-on-the-same-machine) section. +>NOTE: If you are running multiple nodes on the same machine it is recommended to use docker-compose to manage the nodes. You can find a docker-compose example in the section below. ## Inspecting the Edge Node @@ -145,6 +148,8 @@ The [Ratio1 SDK](https://github.com/Ratio1/ratio1_sdk) is the recommended way to pip install -e ../ratio1_sdk ``` +If you prefer the published package, install from PyPI via `pip install ratio1`. + - Use the `nepctl` (formerly `r1ctl`) CLI that ships with the SDK to inspect the network, configure clients, and dispatch jobs. - Explore `ratio1_sdk/tutorials/` for end-to-end examples; most have matching runtime counterparts in `plugins/business/tutorials/` inside this repository. - SDK releases 2.6+ perform automatic dAuth configuration. After whitelisting your client, you can submit jobs without additional secrets. @@ -226,6 +231,7 @@ Lets suppose you have the following node data: "whitelist": [ "0xai_AthDPWc_k3BKJLLYTQMw--Rjhe3B6_7w76jlRpT6nDeX" ] + } } ``` @@ -250,6 +256,7 @@ docker exec r1node get_node_info "whitelist": [ "0xai_AthDPWc_k3BKJLLYTQMw--Rjhe3B6_7w76jlRpT6nDeX" ] + } } ``` @@ -286,7 +293,7 @@ If you want to run multiple nodes on the same machine the best option is to use ```yaml services: r1node1: - image: ratio1/edge_node:testnet + image: ratio1/edge_node:devnet container_name: r1node1 platform: linux/amd64 restart: always @@ -297,7 +304,7 @@ services: - "com.centurylinklabs.watchtower.stop-signal=SIGINT" r1node2: - image: ratio1/edge_node:testnet + image: ratio1/edge_node:devnet container_name: r1node2 platform: linux/amd64 restart: always @@ -350,7 +357,7 @@ docker-compose down Now, lets dissect the `docker-compose.yml` file: - we have a variable number of nodes - in our case 2 nodes - `r1node1` and `r1node2` as services (we commented out the third node for simplicity) - - each node is using the `ratio1/edge_node:testnet` image + - each node is using the `ratio1/edge_node:devnet` image (swap the tag for `:mainnet` or `:testnet` as needed; architecture-specific variants such as `-arm64`, `-tegra`, `-amd64-cpu` will follow) - each node has own unique volume mounted to it - we have a watchtower service that will check for new images every 1 minute and will update the nodes if a new image is available @@ -375,6 +382,7 @@ For inquiries regarding the funding and its impact on this project, please conta ## Citation + If you use the Ratio1 Edge Node in your research or projects, please cite it as follows: ```bibtex @@ -385,3 +393,36 @@ If you use the Ratio1 Edge Node in your research or projects, please cite it as howpublished = {\url{https://github.com/Ratio1/edge_node}}, } ``` + + +Additional publications and references: + +```bibtex +@inproceedings{Damian2025CSCS, + author = {Damian, Andrei Ionut and Bleotiu, Cristian and Grigoras, Marius and + Butusina, Petrica and De Franceschi, Alessandro and Toderian, Vitalii and + Tapus, Nicolae}, + title = {Ratio1 meta-{OS} -- decentralized {MLOps} and beyond}, + booktitle = {2025 25th International Conference on Control Systems and Computer Science (CSCS)}, + year = {2025}, + pages = {258--265}, + address = {Bucharest, Romania}, + month = {May 27--30}, + doi = {10.1109/CSCS66924.2025.00046}, + isbn = {979-8-3315-7343-0}, + issn = {2379-0482}, + publisher = {IEEE} +} + +@misc{Damian2025arXiv, + title = {Ratio1 -- AI meta-OS}, + author = {Damian, Andrei and Butusina, Petrica and De Franceschi, Alessandro and + Toderian, Vitalii and Grigoras, Marius and Bleotiu, Cristian}, + year = {2025}, + month = {September}, + eprint = {2509.12223}, + archivePrefix = {arXiv}, + primaryClass = {cs.OS}, + doi = {10.48550/arXiv.2509.12223} +} +``` diff --git a/extensions/business/cybersec/red_mesh/PROMPT.MD b/extensions/business/cybersec/red_mesh/PROMPT.MD new file mode 100644 index 00000000..8b90944d --- /dev/null +++ b/extensions/business/cybersec/red_mesh/PROMPT.MD @@ -0,0 +1,17 @@ +[We have the following new features that must be implemented in RedMesh red-teaming pentesting framework API: +- better monitoring of worker status and job progress +- possibility to select certain types of tests instead of running all tests (we already have the option to exclude certain ports) +- per-worker port distribution strategies: "slice" each Ratio1 worker that runs RedMesh will receive a "slice" or "mirror" when all workers get the same port range +- for each local-worker (thread configured via `nr_local_workers` parameter of the `_launch_job` method) we want to have a parameter called PORT_ORDER (sequential or shuffle), that would define if we want to iterate through ports sequentially or shuffle. In on_inti order ports_to_scan per port_order. +- run mode: singlepass (one time scan) vs continuous monitoring (scanning continuously). Don't forget to keep the stats about each run . +- for continuous job, make it possible to configure the scheduling +- Add parameters for configuring Pacing/jitter (“Dune sand walking”) to slow/space out scans by adding random pauses + +I want you to describe the architecture as minimalistic as possible. Don't add unnecessary thing. It should follow the Keep It Simple Stuping principles. +It should be simple and functional. + + +Review the TODO_A.md and TODO_B.md and create a final TODO_C.md that proposes a action plan with clear architectural steps to implement these features (no code as TODO_B.md currently wrongly has incorrect code) +Please, focus on architecture, feature and planning them. + +Use the maximum amount of ultrathink. Take all the time you need. It's much better if you do too much research and thinking than not enough. \ No newline at end of file diff --git a/extensions/business/cybersec/red_mesh/TODO_A.md b/extensions/business/cybersec/red_mesh/TODO_A.md new file mode 100644 index 00000000..8a9814ca --- /dev/null +++ b/extensions/business/cybersec/red_mesh/TODO_A.md @@ -0,0 +1,61 @@ +# RedMesh feature plan (PentesterApi01 / PentestLocalWorker) + +## Current state (quick map) +- API plugin `pentester_api_01.py` orchestrates jobs announced via CStore, splits target port ranges evenly across local workers, and aggregates reports when all workers finish. +- `PentestLocalWorker` auto-runs port scan → service probes (all `_service_info_*`) → web probes (all `_web_test_*`), with optional port exclusions but no per-test selection or pacing controls. +- Job spec today: `job_id`, `target`, `start_port`, `end_port`, `exceptions`, `launcher`, `workers{peer->{finished,result}}`. No concept of run mode (single/continuous), jitter, or distribution strategy choices. + +## Required feature tracks +- Service/web test selection + - Extend job spec with `include_tests` / `exclude_tests` (separate lists for `service_info` vs `web_tests`) validated against `_get_all_features()`. Default: run all. + - Add API params to `launch_test` (and validation rules) to accept comma/space lists; normalize to method names. Reject unknown tests with a helpful error. + - `PentestLocalWorker` should accept the allowed set and filter the discovered `_service_info_*` / `_web_test_*` before execution. Persist allowed/blocked lists in report metadata for auditability. + - Reporting: add per-worker fields `tests_run`, `tests_skipped`, and propagate to aggregated report. + - Tests: add unit coverage for include-only, exclude-only, and conflicting rules (exclude wins). + +- Worker port-range distribution modes + - New job flag `distribution_mode`: `slice` (default, breadth-first coverage) vs `mirror` (every worker gets same range) vs optional `staggered` (all workers same range but randomized start offset/stride to reduce duplication). + - If `mirror`/`staggered`, mark worker reports with `coverage_strategy` and ensure aggregation dedupes `open_ports` and merges service/web results deterministically. + - Wire flag into `_launch_job` splitter; keep guardrails when requested workers > ports. In `staggered`, randomize per-worker port order and introduce optional `max_retries_per_port` to bound duplicate effort. + - Config surface: plugin default (e.g., `CFG_PORT_DISTRIBUTION_MODE`), default `slice`. Default worker count = available CPU cores (plugin runs as sole job); allow override but cap at cores. + +- Run mode: singlepass vs continuous monitoring + - Add `run_mode` in job spec (`singlepass` default, `continuous` for chained jobs). Continuous: after `_close_job`, schedule a successor job with inherited params, new `job_id`, incremented `iteration` counter, and backoff delay. + - Persist lineage fields (`parent_job_id`, `iteration`, `last_report_at`, `next_launch_at`) to aid observability and cleanup. Add TTL/`max_iterations` or `stop_after` datetime to prevent infinite loops. + - API responses should surface next scheduled run time; allow `stop_and_delete_job` to cancel the chain (mark lineage as canceled in cstore). + - Consider a `run_interval_sec` knob; default to a conservative interval to avoid rate-limiting targets. + - Add optional daily runtime windows (UTC hour-based `window_start`, `window_end`, 0–23) with at least one-hour disallowed window; continuous mode should pause/resume respecting the window. Default can be full-day minus a 1-hour break for safety. + +- Steps temporization (“Dune sand walking”) + - Job-level pacing config: `min_steps`, `max_steps`, `min_wait_sec`, `max_wait_sec`. Optional for `singlepass`; enforced non-zero for `continuous` (fallback defaults if not provided). + - Implement in `PentestLocalWorker` port scanning loop and optionally between service/web methods: after a random number of actions, sleep random wait while honoring `stop_event`. + - Record pacing stats in worker status (`jitter_applied`, `total_sleep_sec`) for transparency. Make values configurable via plugin defaults and `launch_test` params. + +## Additional logical enhancements (fit with API/framework) +- Target/port randomization: shuffle port order per worker (configurable) to reduce IDS signatures and distribute load. +- Safe-rate controller: per-target max RPS and concurrent sockets; auto-throttle on repeated timeouts/connection resets to mimic human scanning. +- Fingerprinting & preflight: optional light-touch pre-scan (ICMP/TCP SYN-lite) to bail early on dead hosts; enrich reports with ASN/cloud provider to better interpret noise/blocks. +- Credential hygiene: allow injecting bearer/API keys for authenticated tests via secrets manager pointer (never store raw secrets in cstore; expect vaulted reference). +- Health/timeout guardrails: per-stage max duration; force-close jobs that exceed SLAs and flag in report to avoid runaway continuous chains. +- Observability: append `audit_log` entries (timestamps, action, module) to worker status; expose via `get_job_status` for forensic traceability. +- Extensibility hooks: plugin registry for `_service_info_*` / `_web_test_*` from `extensions/.../plugins` so users can add probes without core edits; validate names against allowlist. + +## Stealth & red-team best practices (public Ratio1 edge nodes) +- Pacing and jitter: default to non-burst scans with randomized inter-request sleeps; stagger workers across time windows to evade traffic spikes. +- Traffic shaping: rotate User-Agent/Host headers, optionally bind to egress pools or proxies per cloud region to avoid IP reputation clustering. +- Noise reduction: avoid full 1–65535 sweeps by default; prefer common/high-value ports + heuristics from previous runs; honor exclusions strictly. +- Detection-aware retries: back off or skip ports when seeing WAF/IDS fingerprints (e.g., TCP RST storms, HTTP 429/403 patterns). +- Cover traffic & blending: mix benign HEAD/OPTIONS with probes; throttle to stay below typical NIDS thresholds; optionally insert dormant intervals to simulate human behavior. +- Logging hygiene: ensure reports strip sensitive headers/body fragments; store only minimal artifacts needed for findings. +- Authorization compliance: enforce explicit allowlists/attestation per target before running (config flag) to prevent misuse of public nodes. + +## Testing & rollout +- Add unit tests covering new job spec validation, distribution modes, pacing counters, continuous chaining lifecycle, and aggregation dedupe paths. +- Provide a dry-run/simulation mode to exercise scheduling without sending network traffic for CI. +- Update documentation/README and FastAPI schema to reflect new params and defaults. + +## Open questions for the product/ops team +- What is the acceptable default pacing for continuous mode (sleep floor/ceiling, max daily test hours) given UTC windows and the mandated 1-hour daily break? +- Confirm default distribution stays `slice` and whether any cap below `cpu_count` is desired (thermal/network guardrails). +- Do we need per-target authorization tokens or signed scopes to launch tests from public edge nodes, and are certain probes (SQLi/path traversal/auth bypass) forbidden for specific tenants/environments (e.g., production vs staging, regulated sectors)? +- How should chained jobs be retained (TTL) and how much historical reporting is required for compliance? diff --git a/extensions/business/cybersec/red_mesh/TODO_B.md b/extensions/business/cybersec/red_mesh/TODO_B.md new file mode 100644 index 00000000..51feacb6 --- /dev/null +++ b/extensions/business/cybersec/red_mesh/TODO_B.md @@ -0,0 +1,1677 @@ +# RedMesh v2.0 Implementation Plan +## Advanced Red Teaming & Continuous Monitoring Features + +**Document Version:** 1.0 +**Date:** 2025-12-05 +**Target RedMesh Version:** 2.0.0 + +--- + +## Executive Summary + +This document outlines the implementation plan for RedMesh v2.0, transforming it from a point-in-time distributed penetration testing framework into a sophisticated continuous security monitoring platform with advanced stealth capabilities. The plan incorporates industry best practices from 2025 red teaming methodologies, emphasizing operational security (OPSEC), temporal evasion, and distributed coordination. + +### Key Objectives + +1. Enable continuous monitoring with automated job chaining +2. Implement advanced temporal evasion ("Dune sand walking") +3. Provide flexible port range distribution strategies +4. Support granular test selection and exclusion +5. Enhance stealth through traffic pattern randomization +6. Maintain compliance with responsible disclosure and ethical testing standards + +--- + +## Table of Contents + +1. [Core Feature Requirements](#core-feature-requirements) +2. [Architecture Changes](#architecture-changes) +3. [Detailed Feature Specifications](#detailed-feature-specifications) +4. [Best Practices Integration](#best-practices-integration) +5. [Additional Proposed Features](#additional-proposed-features) +6. [Implementation Roadmap](#implementation-roadmap) +7. [Testing Strategy](#testing-strategy) +8. [Security & Ethical Considerations](#security--ethical-considerations) +9. [References](#references) + +--- + +## Core Feature Requirements + +### 1. Service Test Selection & Exclusion + +**Current State:** +All `_service_info_*` and `_web_test_*` methods execute automatically for every open port. + +**Required Changes:** + +#### 1.1 Test Selection API +- Add `included_tests` parameter to job configuration (list of test method names) +- Add `excluded_tests` parameter to job configuration (list of test method names) +- Exclusions take precedence over inclusions +- If neither specified, run all tests (backward compatible) + +#### 1.2 Test Categories +Organize tests into logical categories for easier selection: + +**Service Info Tests:** +- `service_info_http` (ports 80, 8080) +- `service_info_https` (ports 443, 8443) +- `service_info_tls` (TLS handshake analysis) +- `service_info_ftp` (port 21) +- `service_info_ssh` (port 22) +- `service_info_smtp` (port 25) +- `service_info_dns` (port 53) +- `service_info_databases` (3306, 5432, 1433, 27017) +- `service_info_cache` (6379, 11211) +- `service_info_search` (9200) +- `service_info_legacy` (23, 445, 5900, 161, 502) +- `service_info_generic` (catch-all) + +**Web Tests:** +- `web_test_recon` (common endpoints, homepage) +- `web_test_headers` (security headers, CORS, cookies) +- `web_test_injection` (SQL, XSS, path traversal) +- `web_test_api` (auth bypass, GraphQL, metadata) +- `web_test_redirect` (open redirect) +- `web_test_methods` (HTTP methods) + +#### 1.3 Implementation Details + +**File:** `redmesh_utils.py` + +```python +def __init__(self, ..., included_tests=None, excluded_tests=None): + # Store test filters + self.included_tests = set(included_tests) if included_tests else None + self.excluded_tests = set(excluded_tests) if excluded_tests else set() + +def _should_run_test(self, test_method_name): + """Determine if a test should be executed based on inclusion/exclusion rules.""" + if test_method_name in self.excluded_tests: + return False + if self.included_tests is None: + return True + return test_method_name in self.included_tests + +def _gather_service_info(self): + # Filter methods based on _should_run_test() + service_info_methods = [ + method for method in dir(self) + if method.startswith("_service_info_") and self._should_run_test(method) + ] + # ... rest of implementation + +def _run_web_tests(self): + # Filter methods based on _should_run_test() + web_tests_methods = [ + method for method in dir(self) + if method.startswith("_web_test_") and self._should_run_test(method) + ] + # ... rest of implementation +``` + +**File:** `pentester_api_01.py` + +```python +@BasePlugin.endpoint +def launch_test( + self, + target: str = "", + start_port: int = 1, + end_port: int = 65535, + exceptions: str = "64297", + included_tests: str = "", # NEW: comma-separated test names + excluded_tests: str = "", # NEW: comma-separated test names +): + # Parse test filters + included = [t.strip() for t in included_tests.split(",") if t.strip()] if included_tests else None + excluded = [t.strip() for t in excluded_tests.split(",") if t.strip()] if excluded_tests else [] + + job_specs = { + # ... existing fields + "included_tests": included, + "excluded_tests": excluded, + } +``` + +--- + +### 2. Worker Port Range Distribution Modes + +**Current State:** +Port range is sliced and distributed among workers (each gets a unique subset). + +**Required Changes:** + +#### 2.1 Distribution Modes + +Add `port_distribution_mode` parameter with two options: + +1. **`SLICE` (default, current behavior):** + - Divide port range among workers + - Each worker scans unique subset + - Faster completion, no redundancy + +2. **`FULL` (new):** + - All workers scan the entire port range + - Independent scanning for validation + - Redundancy and cross-verification + - Useful for reliability testing and stealth (distributed sources) + +#### 2.2 Implementation Details + +**File:** `pentester_api_01.py` + +```python +def _launch_job( + self, + job_id, + target, + start_port, + end_port, + network_worker_address, + nr_local_workers=4, + exceptions=None, + port_distribution_mode="SLICE", # NEW parameter +): + ports = list(range(start_port, end_port + 1)) + ports = [p for p in ports if p not in (exceptions or [])] + + if port_distribution_mode.upper() == "SLICE": + # Current implementation - slice ports + batches = self._slice_ports(ports, nr_local_workers) + elif port_distribution_mode.upper() == "FULL": + # New implementation - all workers get full range + batches = [ports] * nr_local_workers + else: + raise ValueError(f"Invalid port_distribution_mode: {port_distribution_mode}") + + # Launch workers with their respective port batches + # ... rest of implementation + +def _slice_ports(self, ports, nr_workers): + """Slice ports into batches (current logic extracted).""" + batches = [] + nr_ports = len(ports) + nr_workers = max(1, min(nr_workers, nr_ports)) + base_chunk, remainder = divmod(nr_ports, nr_workers) + start = 0 + for i in range(nr_workers): + chunk = base_chunk + (1 if i < remainder else 0) + end = start + chunk + batch = ports[start:end] + if batch: + batches.append(batch) + start = end + return batches +``` + +**API Endpoint Update:** + +```python +@BasePlugin.endpoint +def launch_test( + self, + target: str = "", + start_port: int = 1, + end_port: int = 65535, + exceptions: str = "64297", + port_distribution_mode: str = "SLICE", # NEW parameter +): + job_specs = { + # ... existing fields + "port_distribution_mode": port_distribution_mode, + } +``` + +--- + +### 3. Single-Pass vs Continuous Monitoring + +**Current State:** +Jobs run once and complete (single-pass only). + +**Required Changes:** + +#### 3.1 Operation Modes + +Add `operation_mode` parameter with two options: + +1. **`SINGLEPASS` (default, current behavior):** + - Job runs once to completion + - Results stored, job marked as done + - No automatic restart + +2. **`CONTINUOUS` (new):** + - Job runs indefinitely in a loop + - After completion, automatically chain new job + - Each iteration generates intermediate report + - Continues until manually stopped + - Ideal for ongoing security monitoring + +#### 3.2 Continuous Mode Behavior + +**Job Lifecycle:** +``` +[Init] → [Scan] → [Service Info] → [Web Tests] → [Report] → [Wait] → [Scan] → ... + ↓ + Cumulative tracking +``` + +**Key Features:** +- Each iteration is a complete scan cycle +- Intermediate reports stored with iteration number +- Cumulative change detection (new open ports, new services) +- Configurable inter-iteration delay +- Graceful shutdown on stop signal + +#### 3.3 Implementation Details + +**File:** `redmesh_utils.py` + +```python +class PentestLocalWorker: + def __init__( + self, + ..., + operation_mode="SINGLEPASS", # NEW + continuous_delay_min=300, # NEW: min seconds between iterations + continuous_delay_max=600, # NEW: max seconds between iterations + ): + self.operation_mode = operation_mode.upper() + self.continuous_delay_min = continuous_delay_min + self.continuous_delay_max = continuous_delay_max + self.iteration_count = 0 + self.continuous_results = [] # Store results from each iteration + + def execute_job(self): + """Enhanced to support continuous mode.""" + try: + while True: + self.iteration_count += 1 + self.P(f"Starting iteration {self.iteration_count} " + + f"(mode: {self.operation_mode})") + + # Reset state for new iteration + self._reset_iteration_state() + + # Run standard workflow + if not self._check_stopped(): + self._scan_ports_step() + if not self._check_stopped(): + self._gather_service_info() + self.state["completed_tests"].append("service_info_completed") + if not self._check_stopped(): + self._run_web_tests() + self.state["completed_tests"].append("web_tests_completed") + + # Store iteration results + iteration_result = self.get_status() + iteration_result["iteration"] = self.iteration_count + iteration_result["timestamp"] = self.owner.time() + self.continuous_results.append(iteration_result) + + self.P(f"Iteration {self.iteration_count} completed. " + + f"Ports open: {self.state['open_ports']}") + + # Check if continuous mode + if self.operation_mode == "SINGLEPASS": + self.state['done'] = True + break + + # Continuous mode: check for stop signal before next iteration + if self.stop_event.is_set(): + self.P("Continuous mode stopped by user request.") + self.state['done'] = True + self.state['canceled'] = True + break + + # Wait before next iteration with random delay + delay = self._get_random_delay( + self.continuous_delay_min, + self.continuous_delay_max + ) + self.P(f"Waiting {delay}s before iteration {self.iteration_count + 1}...") + self.stop_event.wait(timeout=delay) + + if self.stop_event.is_set(): + self.P("Continuous mode stopped during inter-iteration delay.") + self.state['done'] = True + self.state['canceled'] = True + break + + self.P(f"Job completed after {self.iteration_count} iteration(s).") + + except Exception as e: + self.P(f"Exception in job execution: {e}:\n{traceback.format_exc()}", + color='r') + self.state['done'] = True + + def _reset_iteration_state(self): + """Reset state for new iteration while preserving continuous tracking.""" + self.state["ports_to_scan"] = list(self.initial_ports) + self.state["open_ports"] = [] + self.state["ports_scanned"] = [] + self.state["service_info"] = {} + self.state["web_tested"] = False + self.state["web_tests_info"] = {} + self.state["completed_tests"] = [] + + def _get_random_delay(self, min_val, max_val): + """Generate random delay for stealth.""" + import random + return random.uniform(min_val, max_val) + + def get_status(self, for_aggregations=False): + """Enhanced to include continuous mode metrics.""" + status = { + # ... existing fields + "operation_mode": self.operation_mode, + } + if self.operation_mode == "CONTINUOUS": + status["iteration_count"] = self.iteration_count + status["total_iterations"] = len(self.continuous_results) + return status +``` + +**API Changes:** + +```python +@BasePlugin.endpoint +def launch_test( + self, + # ... existing parameters + operation_mode: str = "SINGLEPASS", # NEW + continuous_delay_min: int = 300, # NEW: 5 minutes + continuous_delay_max: int = 600, # NEW: 10 minutes +): + job_specs = { + # ... existing fields + "operation_mode": operation_mode, + "continuous_delay_min": continuous_delay_min, + "continuous_delay_max": continuous_delay_max, + } +``` + +#### 3.4 Continuous Mode Reporting + +**Enhanced Report Structure:** + +```json +{ + "job_id": "abc123", + "operation_mode": "CONTINUOUS", + "iteration_count": 15, + "current_iteration": { + "iteration": 15, + "timestamp": 1733423456.78, + "open_ports": [22, 80, 443], + "new_since_last": [443], + "closed_since_last": [8080] + }, + "historical_iterations": [ + {"iteration": 1, "timestamp": 1733400000.00, ...}, + {"iteration": 2, "timestamp": 1733400650.12, ...}, + ... + ], + "trends": { + "port_stability_score": 0.87, + "services_changed": 3, + "new_vulnerabilities_found": 1 + } +} +``` + +--- + +### 4. Step Temporization - "Dune Sand Walking" + +**Concept:** +Based on the novel "Dune" where characters walk with irregular patterns to avoid detection by sandworms. Applied to penetration testing: introduce random delays after random intervals to avoid pattern-based IDS detection. + +**Current State:** +Operations execute as fast as possible with only socket timeout delays. + +**Required Changes:** + +#### 4.1 Temporal Randomization Strategy + +**Port-Level Delays:** +- After scanning random(min_steps, max_steps) ports, wait random(min_wait, max_wait) seconds +- Applies to: port scanning, service info gathering, web tests +- Creates unpredictable traffic patterns +- Evades time-based signature detection + +**Parameters:** +- `min_steps`: Minimum ports to process before delay (e.g., 5) +- `max_steps`: Maximum ports to process before delay (e.g., 20) +- `min_wait`: Minimum delay in seconds (e.g., 1.0) +- `max_wait`: Maximum delay in seconds (e.g., 5.0) + +**Behavior:** +- Optional for `SINGLEPASS` mode +- Mandatory for `CONTINUOUS` mode (to avoid detection over time) + +#### 4.2 Implementation Details + +**File:** `redmesh_utils.py` + +```python +import random + +class PentestLocalWorker: + def __init__( + self, + ..., + enable_sand_walking=False, # NEW + min_steps=5, # NEW + max_steps=20, # NEW + min_wait=1.0, # NEW (seconds) + max_wait=5.0, # NEW (seconds) + ): + self.enable_sand_walking = enable_sand_walking + self.min_steps = min_steps + self.max_steps = max_steps + self.min_wait = min_wait + self.max_wait = max_wait + + # Calculate next delay trigger + self._steps_until_delay = self._calculate_next_delay_trigger() + self._current_step_count = 0 + + def _calculate_next_delay_trigger(self): + """Calculate random number of steps before next delay.""" + if not self.enable_sand_walking: + return float('inf') # Never delay + return random.randint(self.min_steps, self.max_steps) + + def _maybe_sand_walk_delay(self, operation_name="operation"): + """Check if delay needed and execute if so.""" + if not self.enable_sand_walking: + return + + self._current_step_count += 1 + + if self._current_step_count >= self._steps_until_delay: + # Time to delay + delay = random.uniform(self.min_wait, self.max_wait) + self.P(f"[Sand Walking] Delaying {delay:.2f}s after " + + f"{self._current_step_count} steps during {operation_name}") + + # Use stop_event.wait() for interruptible sleep + self.stop_event.wait(timeout=delay) + + # Reset counter and calculate next trigger + self._current_step_count = 0 + self._steps_until_delay = self._calculate_next_delay_trigger() + + def _scan_ports_step(self, batch_size=None, batch_nr=1): + """Enhanced port scanning with sand walking delays.""" + # ... existing setup code + + for i, port in enumerate(ports_batch): + if self.stop_event.is_set(): + return + + # Port scanning logic + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(0.3) + try: + result = sock.connect_ex((target, port)) + if result == 0: + self.state["open_ports"].append(port) + self.P(f"Port {port} is open on {target}.") + except Exception as e: + self.P(f"Exception scanning port {port} on {target}: {e}") + finally: + sock.close() + + self.state["ports_scanned"].append(port) + self.state["ports_to_scan"].remove(port) + + # NEW: Sand walking delay check + self._maybe_sand_walk_delay(operation_name="port_scan") + + # ... existing progress tracking code + + def _gather_service_info(self): + """Enhanced service info with sand walking delays.""" + # ... existing setup code + + for method in service_info_methods: + func = getattr(self, method) + for port in open_ports: + if self.stop_event.is_set(): + continue + + info = func(target, port) + # ... store info + + # NEW: Sand walking delay check + self._maybe_sand_walk_delay(operation_name="service_info") + + self.state["completed_tests"].append(method) + + def _run_web_tests(self): + """Enhanced web tests with sand walking delays.""" + # ... existing setup code + + for method in web_tests_methods: + func = getattr(self, method) + for port in ports_to_test: + if self.stop_event.is_set(): + return + + iter_result = func(target, port) + # ... store results + + # NEW: Sand walking delay check + self._maybe_sand_walk_delay(operation_name="web_tests") + + self.state["completed_tests"].append(method) +``` + +**API Changes:** + +```python +@BasePlugin.endpoint +def launch_test( + self, + # ... existing parameters + enable_sand_walking: bool = False, # NEW + min_steps: int = 5, # NEW + max_steps: int = 20, # NEW + min_wait: float = 1.0, # NEW + max_wait: float = 5.0, # NEW +): + # Auto-enable sand walking for continuous mode + if operation_mode.upper() == "CONTINUOUS": + enable_sand_walking = True + self.P("Sand walking automatically enabled for continuous mode") + + job_specs = { + # ... existing fields + "enable_sand_walking": enable_sand_walking, + "min_steps": min_steps, + "max_steps": max_steps, + "min_wait": min_wait, + "max_wait": max_wait, + } +``` + +#### 4.3 Visualization of Sand Walking Pattern + +``` +Time → +Port Scans: [1][2][3][4][5][6][7]-----[8][9][10][11][12][13][14][15]--[16]... + ←7 ports→ ←random delay→ ←8 ports→ ←random delay→ ←1 port→... + +vs Traditional: +Port Scans: [1][2][3][4][5][6][7][8][9][10][11][12][13][14][15][16]... + ←no delays, constant rate, easily detectable pattern→ +``` + +--- + +## Best Practices Integration + +### 5. Timing Templates (Inspired by Nmap) + +**Rationale:** +Nmap's timing templates (T0-T5) provide proven stealth-to-speed tradeoffs. Integrate similar presets. + +**Implementation:** + +```python +TIMING_TEMPLATES = { + "T0_PARANOID": { + "socket_timeout": 5.0, + "enable_sand_walking": True, + "min_steps": 1, + "max_steps": 1, # Delay after every port + "min_wait": 5.0, + "max_wait": 10.0, + "continuous_delay_min": 3600, # 1 hour between iterations + "continuous_delay_max": 7200, # 2 hours + }, + "T1_SNEAKY": { + "socket_timeout": 3.0, + "enable_sand_walking": True, + "min_steps": 3, + "max_steps": 10, + "min_wait": 2.0, + "max_wait": 5.0, + "continuous_delay_min": 1800, # 30 minutes + "continuous_delay_max": 3600, # 1 hour + }, + "T2_POLITE": { + "socket_timeout": 2.0, + "enable_sand_walking": True, + "min_steps": 10, + "max_steps": 30, + "min_wait": 0.5, + "max_wait": 2.0, + "continuous_delay_min": 600, # 10 minutes + "continuous_delay_max": 1200, # 20 minutes + }, + "T3_NORMAL": { + "socket_timeout": 1.0, + "enable_sand_walking": False, + "continuous_delay_min": 300, # 5 minutes + "continuous_delay_max": 600, # 10 minutes + }, + "T4_AGGRESSIVE": { + "socket_timeout": 0.5, + "enable_sand_walking": False, + "continuous_delay_min": 60, # 1 minute + "continuous_delay_max": 180, # 3 minutes + }, + "T5_INSANE": { + "socket_timeout": 0.3, + "enable_sand_walking": False, + "continuous_delay_min": 30, # 30 seconds + "continuous_delay_max": 60, # 1 minute + }, +} +``` + +**API Integration:** + +```python +@BasePlugin.endpoint +def launch_test( + self, + # ... existing parameters + timing_template: str = "T3_NORMAL", # NEW +): + # Apply template defaults + template = TIMING_TEMPLATES.get(timing_template.upper()) + if template: + # Override defaults with template values + # Allow explicit parameters to override template + pass +``` + +### 6. Jitter and Traffic Pattern Randomization + +**Rationale:** +Consistent timing creates detectable signatures. Add random jitter to all timing parameters. + +**Implementation:** + +```python +def _apply_jitter(self, base_value, jitter_percent=10): + """Add random jitter to timing value.""" + jitter = base_value * (jitter_percent / 100.0) + return random.uniform( + base_value - jitter, + base_value + jitter + ) + +# Example usage in socket operations +def _scan_ports_step(self, ...): + # Apply jitter to timeout + timeout = self._apply_jitter(self.socket_timeout) + sock.settimeout(timeout) +``` + +### 7. Rate Limiting and Throttling + +**Rationale:** +Prevent overwhelming targets and triggering rate-based defenses. + +**Parameters:** +- `max_requests_per_second`: Global rate limit (default: unlimited) +- `max_requests_per_second_per_port`: Per-port limit (default: unlimited) + +**Implementation:** + +```python +import time +from collections import defaultdict + +class PentestLocalWorker: + def __init__(self, ..., max_rps=None, max_rps_per_port=None): + self.max_rps = max_rps + self.max_rps_per_port = max_rps_per_port + self.last_request_time = 0 + self.port_request_times = defaultdict(list) + + def _throttle_global(self): + """Enforce global rate limit.""" + if self.max_rps is None: + return + min_interval = 1.0 / self.max_rps + elapsed = time.time() - self.last_request_time + if elapsed < min_interval: + time.sleep(min_interval - elapsed) + self.last_request_time = time.time() + + def _throttle_per_port(self, port): + """Enforce per-port rate limit.""" + if self.max_rps_per_port is None: + return + min_interval = 1.0 / self.max_rps_per_port + times = self.port_request_times[port] + if times: + elapsed = time.time() - times[-1] + if elapsed < min_interval: + time.sleep(min_interval - elapsed) + self.port_request_times[port].append(time.time()) +``` + +### 8. User-Agent and Request Header Randomization + +**Rationale:** +Avoid fingerprinting of web requests. Randomize User-Agent and other headers. + +**Implementation:** + +```python +USER_AGENTS = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", + # Add more diverse user agents +] + +def _get_random_headers(self): + """Generate randomized HTTP headers.""" + return { + "User-Agent": random.choice(USER_AGENTS), + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": random.choice(["en-US,en;q=0.9", "en-GB,en;q=0.8"]), + "Accept-Encoding": "gzip, deflate, br", + "DNT": "1", + "Connection": "keep-alive", + "Upgrade-Insecure-Requests": "1", + } + +# Use in all requests +requests.get(url, headers=self._get_random_headers(), ...) +``` + +--- + +## Additional Proposed Features + +### 9. Enhanced Reporting and Metrics + +**9.1 Differential Reporting** + +For continuous mode, track changes between iterations: + +```python +def _compute_diff(self, prev_result, current_result): + """Compute differences between iterations.""" + return { + "new_open_ports": set(current_result["open_ports"]) - set(prev_result["open_ports"]), + "closed_ports": set(prev_result["open_ports"]) - set(current_result["open_ports"]), + "new_vulnerabilities": self._diff_vulnerabilities(prev_result, current_result), + "changed_services": self._diff_services(prev_result, current_result), + } +``` + +**9.2 Alerting Thresholds** + +Notify when significant changes detected: + +```python +alert_config = { + "alert_on_new_port": True, + "alert_on_new_vulnerability": True, + "alert_threshold_ports": 3, # Alert if 3+ new ports open +} +``` + +### 10. Scan Fingerprint Obfuscation + +**10.1 Port Scan Order Randomization** + +```python +def _scan_ports_step(self, ...): + # Randomize port order to avoid sequential patterns + ports_batch = list(ports_batch) + random.shuffle(ports_batch) + + for port in ports_batch: + # ... scan +``` + +**10.2 Decoy Traffic** + +Inspired by Nmap's `-D` option, generate decoy requests from randomized source patterns: + +```python +def _generate_decoy_request(self, target, port): + """Send occasional decoy requests to obfuscate true scan pattern.""" + # Implementation would require low-level socket manipulation + # This is an advanced feature for future consideration + pass +``` + +### 11. Distributed Worker Coordination Enhancements + +**11.1 Worker Health Monitoring** + +Track worker status in continuous mode: + +```python +def _update_worker_health(self): + """Report worker health metrics to CStore.""" + health = { + "worker_id": self.ee_addr, + "status": "active", + "iteration": self.iteration_count, + "last_heartbeat": self.time(), + "ports_scanned_total": len(self.state["ports_scanned"]), + } + self.chainstore_hset( + hkey=f"{self.cfg_instance_id}:health", + key=self.ee_addr, + value=health + ) +``` + +**11.2 Load Balancing Awareness** + +For FULL port distribution mode with multiple network workers, track which workers are actively scanning to balance load: + +```python +def _get_active_workers(self, job_id): + """Get list of workers actively working on this job.""" + job_specs = self._get_job_from_cstore(job_id) + active = [ + worker_addr + for worker_addr, worker_data in job_specs.get("workers", {}).items() + if not worker_data.get("finished") + ] + return active +``` + +### 12. Error Handling and Resilience + +**12.1 Network Failure Recovery** + +Handle transient network errors gracefully: + +```python +def _scan_with_retry(self, target, port, max_retries=3): + """Scan port with exponential backoff retry.""" + for attempt in range(max_retries): + try: + # ... scanning logic + return result + except (socket.timeout, socket.error) as e: + if attempt < max_retries - 1: + backoff = (2 ** attempt) * random.uniform(0.5, 1.5) + self.P(f"Retry {attempt+1}/{max_retries} for {target}:{port} " + + f"after {backoff:.2f}s") + time.sleep(backoff) + else: + self.P(f"Failed to scan {target}:{port} after {max_retries} attempts", + color='r') + return None +``` + +**12.2 Graceful Degradation** + +Continue operation even if some tests fail: + +```python +def _run_test_safely(self, test_func, target, port): + """Execute test with exception handling.""" + try: + return test_func(target, port) + except Exception as e: + self.P(f"Test {test_func.__name__} failed for {target}:{port}: {e}", + color='y') + return f"ERROR: {str(e)}" +``` + +### 13. Compliance and Ethical Testing Features + +**13.1 Authorized Target Validation** + +```python +def _validate_target_authorization(self, target): + """Verify target is authorized for testing.""" + # Check against whitelist + authorized_networks = self.cfg_authorized_networks or [] + # Implement CIDR matching, domain validation, etc. + # Return True only if explicitly authorized + pass + +@BasePlugin.endpoint +def launch_test(self, target, ...): + if not self._validate_target_authorization(target): + raise ValueError(f"Target {target} not in authorized list. " + + "Add to AUTHORIZED_NETWORKS config.") +``` + +**13.2 Safe Mode** + +Disable aggressive tests: + +```python +SAFE_MODE_EXCLUDED_TESTS = [ + "_web_test_sql_injection", # Might trigger WAF + "_web_test_path_traversal", # Might access sensitive files + # Add other potentially disruptive tests +] + +def launch_test(self, ..., safe_mode=False): + if safe_mode: + excluded_tests = excluded_tests or [] + excluded_tests.extend(SAFE_MODE_EXCLUDED_TESTS) +``` + +### 14. Protocol-Specific Enhancements + +**14.1 TLS/SSL Advanced Analysis** + +Enhanced certificate validation and cipher suite analysis: + +```python +def _service_info_tls_advanced(self, target, port): + """Deep TLS analysis including cipher suites, certificate chain, etc.""" + # Implement full certificate chain validation + # Test for weak ciphers (RC4, 3DES, etc.) + # Check for certificate transparency logs + # Validate OCSP/CRL + pass +``` + +**14.2 Service-Specific Vulnerability Checks** + +Expand service-specific tests: + +```python +def _service_info_docker_api(self, target, port): + """Detect exposed Docker API.""" + # Check port 2375, 2376 + pass + +def _service_info_kubernetes_api(self, target, port): + """Detect Kubernetes API server.""" + # Check port 6443, 8080 + pass +``` + +### 15. Performance Monitoring and Optimization + +**15.1 Scan Performance Metrics** + +```python +def get_performance_metrics(self): + """Return performance statistics.""" + return { + "total_ports_scanned": len(self.state["ports_scanned"]), + "scan_rate_pps": self._calculate_scan_rate(), + "average_port_time": self._calculate_avg_port_time(), + "test_execution_times": self._get_test_timings(), + } +``` + +**15.2 Adaptive Timeout** + +Adjust socket timeout based on target responsiveness: + +```python +def _adaptive_timeout(self, target): + """Calculate optimal timeout based on RTT.""" + # Ping target to measure RTT + # Set timeout to RTT * safety_factor + pass +``` + +### 16. Integration and Extensibility + +**16.1 Plugin Architecture for Custom Tests** + +Allow external test modules: + +```python +def _load_custom_tests(self, test_directory): + """Load custom test modules from directory.""" + # Dynamically import Python modules + # Register custom _service_info_* and _web_test_* methods + pass +``` + +**16.2 Webhook Notifications** + +Alert external systems on findings: + +```python +def _send_webhook(self, event_type, data): + """Send webhook notification.""" + if self.cfg_webhook_url: + requests.post(self.cfg_webhook_url, json={ + "event": event_type, + "timestamp": self.time(), + "data": data, + }) +``` + +### 17. Reporting Formats + +**17.1 Export Formats** + +Support multiple output formats: + +```python +@BasePlugin.endpoint +def export_report(self, job_id, format="json"): + """Export report in various formats.""" + report = self._get_job_status(job_id) + + if format == "json": + return report + elif format == "html": + return self._generate_html_report(report) + elif format == "csv": + return self._generate_csv_report(report) + elif format == "markdown": + return self._generate_markdown_report(report) + elif format == "pdf": + return self._generate_pdf_report(report) +``` + +**17.2 OWASP Mapping** + +Tag findings with OWASP Top 10 categories: + +```python +OWASP_MAPPINGS = { + "_web_test_sql_injection": "A03:2021-Injection", + "_web_test_xss": "A03:2021-Injection", + "_web_test_security_headers": "A05:2021-Security Misconfiguration", + # ... complete mappings +} + +def _enrich_with_owasp(self, finding): + """Add OWASP category to finding.""" + test_method = finding.get("test_method") + finding["owasp_category"] = OWASP_MAPPINGS.get(test_method) + return finding +``` + +--- + +## Architecture Changes + +### File Structure Updates + +``` +extensions/business/cybersec/red_mesh/ +├── pentester_api_01.py [MODIFIED] - API endpoints with new parameters +├── redmesh_utils.py [MODIFIED] - Enhanced PentestLocalWorker +├── service_mixin.py [MODIFIED] - Enhanced service tests +├── web_mixin.py [MODIFIED] - Enhanced web tests +├── timing_templates.py [NEW] - Timing template definitions +├── stealth_utils.py [NEW] - Stealth and evasion utilities +├── continuous_monitor.py [NEW] - Continuous monitoring logic +├── report_generator.py [NEW] - Enhanced reporting +├── test_redmesh.py [MODIFIED] - Updated tests +├── test_new_features.py [NEW] - Tests for new features +└── TODO_B.md [THIS FILE] +``` + +### Configuration Changes + +**Plugin Config Updates:** + +```python +_CONFIG = { + **BasePlugin.CONFIG, + + 'PORT': None, + 'CHECK_JOBS_EACH': 5, + 'NR_LOCAL_WORKERS': 8, + 'WARMUP_DELAY': 30, + + # NEW: Timing and stealth + 'DEFAULT_TIMING_TEMPLATE': 'T3_NORMAL', + 'ENABLE_SAND_WALKING_BY_DEFAULT': False, + + # NEW: Continuous monitoring + 'CONTINUOUS_MODE_ENABLED': True, + 'DEFAULT_OPERATION_MODE': 'SINGLEPASS', + + # NEW: Security and compliance + 'AUTHORIZED_NETWORKS': [], # CIDR blocks authorized for testing + 'SAFE_MODE_DEFAULT': True, + 'WEBHOOK_URL': None, + + # NEW: Performance + 'MAX_REQUESTS_PER_SECOND': None, + 'ENABLE_ADAPTIVE_TIMEOUT': False, + + 'VALIDATION_RULES': { + **BasePlugin.CONFIG['VALIDATION_RULES'], + }, +} +``` + +--- + +## Implementation Roadmap + +### Phase 1: Core Features (Weeks 1-3) + +**Week 1: Service Test Selection** +- [ ] Implement `_should_run_test()` filtering logic +- [ ] Add `included_tests` and `excluded_tests` parameters +- [ ] Update `_gather_service_info()` and `_run_web_tests()` +- [ ] Add API endpoint parameters +- [ ] Write unit tests +- [ ] Update documentation + +**Week 2: Port Distribution Modes** +- [ ] Implement `_slice_ports()` (extract existing logic) +- [ ] Implement FULL distribution mode +- [ ] Add `port_distribution_mode` parameter +- [ ] Update `_launch_job()` logic +- [ ] Test with multiple workers +- [ ] Validate independent scanning behavior + +**Week 3: Continuous Monitoring Foundation** +- [ ] Refactor `execute_job()` for iteration loop +- [ ] Implement `_reset_iteration_state()` +- [ ] Add `operation_mode` parameter +- [ ] Implement inter-iteration delays +- [ ] Add graceful shutdown handling +- [ ] Test continuous mode execution + +### Phase 2: Stealth & Temporization (Weeks 4-5) + +**Week 4: Sand Walking Implementation** +- [ ] Implement `_calculate_next_delay_trigger()` +- [ ] Implement `_maybe_sand_walk_delay()` +- [ ] Integrate delays into port scanning +- [ ] Integrate delays into service info gathering +- [ ] Integrate delays into web tests +- [ ] Add sand walking parameters to API +- [ ] Test delay patterns and randomization + +**Week 5: Timing Templates** +- [ ] Define timing template constants +- [ ] Implement template application logic +- [ ] Add jitter functionality +- [ ] Integrate with sand walking +- [ ] Test all timing templates (T0-T5) +- [ ] Validate stealth vs. performance tradeoffs + +### Phase 3: Advanced Features (Weeks 6-8) + +**Week 6: Enhanced Reporting** +- [ ] Implement differential reporting for continuous mode +- [ ] Add iteration tracking and history +- [ ] Implement trend analysis +- [ ] Create report export formats (HTML, CSV, MD) +- [ ] Add OWASP mappings to findings +- [ ] Test reporting with continuous jobs + +**Week 7: Stealth Enhancements** +- [ ] Implement User-Agent randomization +- [ ] Add HTTP header randomization +- [ ] Implement port scan order randomization +- [ ] Add rate limiting/throttling +- [ ] Implement retry logic with backoff +- [ ] Test evasion effectiveness + +**Week 8: Compliance & Safety** +- [ ] Implement authorized target validation +- [ ] Add safe mode exclusions +- [ ] Implement webhook notifications +- [ ] Add worker health monitoring +- [ ] Enhance error handling +- [ ] Write compliance documentation + +### Phase 4: Testing & Optimization (Weeks 9-10) + +**Week 9: Comprehensive Testing** +- [ ] Write integration tests for all new features +- [ ] Test continuous mode for extended runs (24+ hours) +- [ ] Load testing with multiple workers +- [ ] Stealth testing against IDS systems +- [ ] Validate FULL port distribution mode +- [ ] Cross-platform testing (Linux, Windows, cloud) + +**Week 10: Performance Optimization** +- [ ] Profile code for bottlenecks +- [ ] Optimize port scanning loops +- [ ] Optimize memory usage for continuous mode +- [ ] Implement adaptive timeout +- [ ] Add performance metrics collection +- [ ] Benchmark against baseline + +### Phase 5: Documentation & Release (Week 11-12) + +**Week 11: Documentation** +- [ ] Write API documentation for all new endpoints +- [ ] Create usage examples and tutorials +- [ ] Write best practices guide +- [ ] Document timing templates and use cases +- [ ] Create troubleshooting guide +- [ ] Update README and changelog + +**Week 12: Release Preparation** +- [ ] Code review and refactoring +- [ ] Security audit of new features +- [ ] Finalize configuration defaults +- [ ] Prepare migration guide from v1.x +- [ ] Create release notes +- [ ] Deploy to staging for final validation + +--- + +## Testing Strategy + +### Unit Tests + +**Test Coverage Requirements:** +- Minimum 85% code coverage for new features +- 100% coverage for critical security functions + +**Key Test Cases:** + +```python +class TestServiceSelection(unittest.TestCase): + def test_include_only_web_tests(self): + # Test included_tests parameter + pass + + def test_exclude_specific_service(self): + # Test excluded_tests parameter + pass + + def test_exclusion_precedence(self): + # Verify exclusions override inclusions + pass + +class TestPortDistribution(unittest.TestCase): + def test_slice_mode_no_overlap(self): + # Verify sliced ports don't overlap + pass + + def test_full_mode_all_workers_same_range(self): + # Verify FULL mode gives same ports to all + pass + +class TestContinuousMode(unittest.TestCase): + def test_iteration_loop(self): + # Test multiple iterations execute + pass + + def test_state_reset_between_iterations(self): + # Verify state resets properly + pass + + def test_graceful_shutdown(self): + # Test stop_event handling + pass + +class TestSandWalking(unittest.TestCase): + def test_random_delay_triggers(self): + # Verify delays occur randomly + pass + + def test_delay_ranges(self): + # Verify delays within min/max bounds + pass + + def test_pattern_unpredictability(self): + # Statistical test for randomness + pass +``` + +### Integration Tests + +**End-to-End Scenarios:** + +1. **Single-pass with service exclusion:** + - Launch job excluding database tests + - Verify only selected tests execute + - Validate report completeness + +2. **Continuous monitoring with sand walking:** + - Launch continuous job with T1_SNEAKY template + - Run for 1 hour (multiple iterations) + - Verify timing patterns are irregular + - Check for proper iteration reports + +3. **Distributed FULL mode:** + - Launch job with 3 network workers + - Use FULL port distribution + - Verify all workers scan same ports + - Check for independent findings + +4. **Stealth evasion:** + - Target system with IDS/IPS + - Use T0_PARANOID template + - Monitor for alerts (should be minimal) + - Validate successful completion + +### Performance Benchmarks + +**Baseline Metrics:** + +| Metric | Target | Measurement Method | +|--------|--------|-------------------| +| Port scan rate (T3) | 100-500 ports/sec | Time 1000 port scan | +| Port scan rate (T0) | 0.1-1 ports/sec | Time 100 port scan | +| Memory per worker | < 50MB | Monitor RSS during scan | +| Continuous mode uptime | > 7 days | Long-running test | +| Report generation time | < 5 seconds | Time get_job_status() | + +### Security Testing + +**Validation Requirements:** + +1. **Stealth verification:** + - Deploy against honeypot systems + - Monitor logs for detection signatures + - Measure time-to-detection for each template + +2. **Authorized target enforcement:** + - Attempt to scan unauthorized IP + - Verify request is blocked + - Check error message is clear + +3. **Safe mode validation:** + - Launch with safe_mode=True + - Verify aggressive tests are skipped + - Validate report notes safe mode + +--- + +## Security & Ethical Considerations + +### Responsible Disclosure + +**RedMesh must be used ethically and legally:** + +1. **Authorization Required:** + - Only scan systems you own or have written permission to test + - Maintain documentation of authorization + - Configure AUTHORIZED_NETWORKS to prevent accidental misuse + +2. **Public Edge Node Usage:** + - RedMesh uses public Ratio1.ai Edge Nodes as scan sources + - Targets may see requests from multiple public IP addresses + - Include contact information in User-Agent or HTTP headers + - Respect robots.txt and security.txt directives + +3. **Rate Limiting Respect:** + - Honor target rate limits and backoff requests + - Use appropriate timing templates for production systems + - Avoid denial of service conditions + +### Compliance Considerations + +**Regulatory Alignment:** + +- **GDPR:** Ensure scan data doesn't capture personal information +- **PCI DSS 4.0:** Continuous testing aligns with requirement 11.3.2 +- **SOC 2:** Document authorization and approval processes +- **HIPAA:** Avoid scanning healthcare systems without specific approval + +### Network Ethics + +**Best Practices:** + +1. **Minimize Impact:** + - Use T2_POLITE or slower for production systems + - Schedule scans during maintenance windows + - Monitor target system health + +2. **Transparent Operation:** + - Set identifiable User-Agent strings + - Provide abuse contact information + - Document scan purpose in logs + +3. **Data Handling:** + - Encrypt reports containing vulnerability data + - Implement data retention policies + - Secure CStore data with access controls + +### Incident Response + +**If RedMesh Triggers Alerts:** + +1. **Immediate Actions:** + - Stop job immediately + - Contact target system owner + - Provide scan details and purpose + - Offer assistance with log analysis + +2. **Documentation:** + - Log all scan activities + - Maintain authorization records + - Document incident and resolution + +--- + +## References + +### Research & Best Practices + +**Red Teaming & Stealth:** +- [Red Teaming methodologies focused on op-sec and stealth](https://security.stackexchange.com/questions/270430/red-teaming-methodologies-focused-on-op-sec-and-stealth) +- [Red Teaming in 2025: The Bleeding Edge](https://www.cycognito.com/learn/red-teaming/) +- [Red Teaming Tools 2025](https://www.cycognito.com/learn/red-teaming/red-teaming-tools.php) +- [Advanced Red Team Tactics](https://undercodetesting.com/advanced-red-team-tactics-exploiting-vulnerable-drivers-for-evasion/) +- [Red Team Reconnaissance Techniques](https://www.linode.com/docs/guides/red-team-reconnaissance-techniques/) + +**Continuous Testing:** +- [Beyond Point-in-Time: The ROI Case for Continuous Pentesting](https://thehackernews.com/expert-insights/2025/12/beyond-point-in-time-roi-case-for.html) +- [Continuous Penetration Testing 2025](https://deepstrike.io/blog/continuous-penetration-testing) +- [Point-in-time vs. continuous penetration testing](https://www.bugcrowd.com/blog/point-in-time-vs-continuous-penetration-testing-a-comparison-guide/) + +**Timing & Evasion:** +- [Nmap Timing Templates](https://nmap.org/book/performance-timing-templates.html) +- [Mastering Nmap Part 5: Timing & Performance Optimization](https://medium.com/@appsecvenue/mastering-nmap-part-5-in-2025-timing-performance-optimization-a2b98f187e0c) +- [Nmap Scan with Timing Parameters](https://www.hackingarticles.in/nmap-scan-with-timing-parameters/) +- [Advanced NMAP Scanning](https://securedebug.com/advanced-nmap-scanning-techiques-network-scan/) + +**Distributed Scanning:** +- [Distributed port-scan attack in cloud environment](https://ieeexplore.ieee.org/document/6622595/) +- [Enhancing Network Visibility with Advanced Port Scanning](https://pmc.ncbi.nlm.nih.gov/articles/PMC10490701/) +- [Hiding in the AI Traffic: Abusing MCP for Red Teaming](https://arxiv.org/html/2511.15998) + +**Penetration Testing Trends:** +- [Penetration Testing Trends 2025](https://www.getastra.com/blog/security-audit/penetration-testing-trends/) +- [Evolution of Penetration Testing Methodologies](https://www.uprootsecurity.com/blog/pentest-methodologies) +- [Pentesting Statistics 2025](https://zerothreat.ai/blog/emerging-penetration-testing-statistics) + +### OWASP Resources + +- **OWASP Top 10 2021:** https://owasp.org/Top10/ +- **OWASP Testing Guide v4:** https://owasp.org/www-project-web-security-testing-guide/ +- **OWASP API Security Top 10:** https://owasp.org/www-project-api-security/ + +### Standards & Compliance + +- **PCI DSS v4.0:** https://www.pcisecuritystandards.org/ +- **NIST Cybersecurity Framework:** https://www.nist.gov/cyberframework +- **ISO 27001:** https://www.iso.org/isoiec-27001-information-security.html + +--- + +## Appendix A: Configuration Examples + +### Example 1: Stealth Reconnaissance + +```python +# Ultra-stealthy scan for sensitive production system +launch_test( + target="production.example.com", + start_port=1, + end_port=1024, + timing_template="T0_PARANOID", + operation_mode="SINGLEPASS", + port_distribution_mode="FULL", # Multiple sources for obfuscation + excluded_tests="_web_test_sql_injection,_web_test_path_traversal", + safe_mode=True +) +``` + +### Example 2: Continuous Monitoring + +```python +# 24/7 security monitoring +launch_test( + target="staging.example.com", + start_port=1, + end_port=65535, + timing_template="T2_POLITE", + operation_mode="CONTINUOUS", + continuous_delay_min=3600, # 1 hour between scans + continuous_delay_max=7200, # 2 hours + port_distribution_mode="SLICE", +) +``` + +### Example 3: Quick Infrastructure Audit + +```python +# Fast, comprehensive scan +launch_test( + target="dev.example.com", + start_port=1, + end_port=10000, + timing_template="T4_AGGRESSIVE", + operation_mode="SINGLEPASS", + included_tests="_service_info_http,_service_info_https,_web_test_recon,_web_test_headers", +) +``` + +### Example 4: Database Security Focus + +```python +# Database-only security check +launch_test( + target="db.example.com", + start_port=1, + end_port=65535, + timing_template="T3_NORMAL", + included_tests="_service_info_3306,_service_info_5432,_service_info_1433,_service_info_27017", + excluded_tests="_web_test_*", # Skip web tests +) +``` + +--- + +## Appendix B: Migration Guide from v1.x to v2.0 + +### Breaking Changes + +1. **None:** v2.0 is backward compatible with v1.x API calls +2. All new parameters have sensible defaults +3. Existing jobs continue to work unchanged + +### New Capabilities + +**For existing users, to adopt new features:** + +1. **Add continuous monitoring:** + ```python + # Old (v1.x) + launch_test(target="example.com", start_port=1, end_port=1000) + + # New (v2.0) + launch_test( + target="example.com", + start_port=1, + end_port=1000, + operation_mode="CONTINUOUS" # NEW + ) + ``` + +2. **Improve stealth:** + ```python + # Add timing template + launch_test(..., timing_template="T1_SNEAKY") + ``` + +3. **Selective testing:** + ```python + # Exclude aggressive tests + launch_test(..., excluded_tests="_web_test_sql_injection") + ``` + +### Recommended Upgrades + +**Priority 1: Add timing templates** +- Review current scan speeds +- Select appropriate template (T2_POLITE recommended for production) + +**Priority 2: Enable continuous monitoring for critical assets** +- Start with 6-hour intervals +- Monitor for new vulnerabilities + +**Priority 3: Implement service selection** +- Reduce scan time by 30-50% +- Focus on relevant attack surface + +--- + +## Appendix C: Troubleshooting Guide + +### Common Issues + +**Issue: Continuous mode not chaining jobs** +- **Cause:** Job marked as done incorrectly +- **Solution:** Check `operation_mode` parameter, verify not in SINGLEPASS + +**Issue: Sand walking delays too long** +- **Cause:** Incorrect timing template or parameters +- **Solution:** Use faster template (T3, T4) or reduce max_wait + +**Issue: All tests skipped** +- **Cause:** Over-restrictive `excluded_tests` or empty `included_tests` +- **Solution:** Review test selection parameters + +**Issue: Worker not reporting results** +- **Cause:** Network issue or worker crashed +- **Solution:** Check worker health endpoint, review logs + +**Issue: Detection by IDS despite stealth settings** +- **Cause:** Target has advanced behavioral detection +- **Solution:** Use T0_PARANOID, increase delays, reduce parallelism + +--- + +## Appendix D: Performance Tuning Matrix + +| Use Case | Template | Port Dist | Workers | Expected Speed | +|----------|----------|-----------|---------|----------------| +| Production stealth | T0 | FULL | 2-3 | Hours per 1000 ports | +| Production safe | T2 | SLICE | 4-8 | Minutes per 1000 ports | +| Staging full | T3 | SLICE | 8-16 | Seconds per 1000 ports | +| Dev quick | T4 | SLICE | 16-32 | Sub-second per 1000 ports | +| Emergency audit | T5 | SLICE | 32+ | Fastest possible | + +--- + +## Conclusion + +This implementation plan transforms RedMesh from a capable distributed penetration testing framework into a world-class continuous security monitoring platform. The proposed features align with industry best practices for 2025, emphasizing: + +1. **Stealth & Evasion:** Advanced temporal randomization prevents detection +2. **Flexibility:** Granular control over tests, timing, and distribution +3. **Continuous Assurance:** Shift from point-in-time to always-on monitoring +4. **Responsible Operation:** Built-in safeguards and ethical considerations +5. **Scalability:** Leverages Ratio1.ai Edge Network for distributed coordination + +By implementing these features systematically over 12 weeks, RedMesh v2.0 will provide organizations with cutting-edge red teaming capabilities while maintaining the ethical and legal standards required for responsible security testing. + +--- + +**Document Status:** DRAFT v1.0 +**Next Review:** Upon approval to proceed with Phase 1 +**Approvers:** Technical Lead, Security Team, Product Owner + diff --git a/extensions/business/cybersec/red_mesh/TODO_E_fixed.md b/extensions/business/cybersec/red_mesh/TODO_E_fixed.md new file mode 100644 index 00000000..f2b17a65 --- /dev/null +++ b/extensions/business/cybersec/red_mesh/TODO_E_fixed.md @@ -0,0 +1,60 @@ +# RedMesh v2.0 – Minimal, Correct Implementation Plan +Status: ready to implement. Scope is only the features from PROMPT.MD; no extras. + +## Principles +- Preserve existing orchestrator/worker/CStore lifecycle (discover → launch → aggregate → publish). +- Keep it simple: minimal new fields, no lineage trees, no extra audit metadata. +- Non-blocking scheduling: never sleep the orchestrator. +- Defaults: single-pass, sliced ports, shuffled order unless configured otherwise. + +## Job spec (CStore) additions +- `included_tests: list[str] | None` – whitelist; None means all. Validate against discovered features (same discovery path as workers); reject unknowns. +- `excluded_tests: list[str]` – blacklist; wins over included. Same validation. +- `port_distribution: "SLICE" | "MIRROR"` – SLICE default. +- `port_order: "SEQUENTIAL" | "SHUFFLE"` – default SHUFFLE for stealth; SEQUENTIAL allowed (no compatibility constraint). +- `run_mode: "SINGLEPASS" | "CONTINUOUS"` – SINGLEPASS default. +- `schedule: {"interval": int} | None` – required for CONTINUOUS; interval > 0 else reject; reject malformed/non-dict payloads. +- `pacing: {"pause_interval": int, "pause_duration": float} | None` – pause after ~N actions for ~D seconds with defined jitter; validate pause_interval > 0 and pause_duration >= 0; reject malformed/non-dict payloads. +- `scheduled_at: float | None` – future launch time for non-blocking continuous runs (set only on successors). +- `continuous_stopped: bool` – stop flag for continuous chains; default False; never reset in successors. +- `max_iterations: int | None` – optional safety ceiling; default None (no cap). If provided, must be > 0; job-level value overrides any global default if such config is introduced. + +## Architecture by feature +- Monitoring: add a simple `current_stage` string in worker state (INITIALIZED/SCANNING/PROBING/TESTING/PAUSED/COMPLETED) plus clearer logs; surface it in `get_job_status` per worker entry (e.g., status[job_id][worker_id]["current_stage"]); no extra nested telemetry. +- Test selection: filter `_service_info_*` and `_web_test_*` via include/ exclude lists with validation against discovered features. +- Port distribution: in `_launch_job`, choose SLICE (existing chunking) or MIRROR (all workers get full range); aggregation already deduplicates. +- Port order: per job, apply SEQUENTIAL (sorted) or SHUFFLE (random.shuffle once before scan) to each worker’s ports. +- Run modes & scheduling: job chaining. When a CONTINUOUS job finishes, orchestrator **re-reads the latest job spec from CStore** to honor concurrent stop requests, then writes a successor job with a new `job_id`, incremented iteration, and `scheduled_at = now + interval`. `_maybe_launch_jobs` skips jobs until `scheduled_at` is reached. No sleeps in the orchestrator. +- Stop continuous: `stop_and_delete_job(job_id, stop_continuous=True)` sets `continuous_stopped=True` so `_close_job` skips creating successors. Do not reset this flag in successors. +- Pacing (“Dune sand walking”): worker-level `_maybe_pause` that uses `stop_event.wait(duration)` with defined jitter (interval jitter 0.8–1.2x, duration jitter 0.5–1.5x); invoked in port scan, service probes, and web tests; preserves/restores `current_stage`. Each worker gets its own port list copy when shuffling MIRROR batches to avoid shared state; for MIRROR+SEQUENTIAL reuse the shared range to avoid unnecessary memory. + +## Implementation steps +1) Data/validation (pentester_api_01.py) + - Extend `_CONFIG` defaults for new fields (port_order default SHUFFLE). + - Update `launch_test` signature to accept new params; validate enums, schedule interval > 0 (required for CONTINUOUS), pacing bounds, known test names; reject malformed dicts (missing keys/wrong types) for schedule/pacing; reject or explicitly ignore extra keys (document behavior). + - Include new fields in job spec; set `scheduled_at=None`, `continuous_stopped=False`, optional `max_iterations` (must be > 0 if set; job-level overrides any global default if present). + - `_normalize_job_record` to backfill defaults for missing fields; for malformed specs, reject with a clear error (no legacy compatibility/implicit coercion). +2) Worker wiring (redmesh_utils.py) + - Accept and store include/exclude, port_order, pacing in `PentestLocalWorker`. + - Add `current_stage` to state. + - Implement `_should_run(test_name)` and use it in service/web loops. + - Apply port order before scanning; inject `_maybe_pause` with jitter and stage preservation. +3) Port distribution (pentester_api_01.py) + - In `_launch_job`, branch on `port_distribution`: existing batch split for SLICE; identical full range per worker for MIRROR. For MIRROR+SEQUENTIAL reuse the same range object; for MIRROR+SHUFFLE, create per-worker list copies (or document the memory cost) so shuffling does not share state. +4) Continuous mode (pentester_api_01.py) + - In `_close_job`, after aggregation and updating worker entry, **re-read** the job spec from CStore to honor concurrent stop flags; if spec is missing, log and skip successor creation. If `run_mode=="CONTINUOUS"` and not `continuous_stopped` and (max_iterations is None or iteration < max_iterations), create successor with new `job_id`, `scheduled_at=now+interval`, and reset `workers={}`; do not block. + - In `_maybe_launch_jobs`, skip jobs whose `scheduled_at` is in the future; if a job spec is missing between read and launch, log and skip gracefully. + - In `stop_and_delete_job`, when `stop_continuous` is true, set `continuous_stopped=True` in CStore before or during stop. +5) Observability polish + - Update logs to include job_id, stage, and port range/test name. + - Ensure `get_job_status` surfaces `current_stage` per worker when present. + +## Out of scope (to stay KISS) +- No parent_job_id lineage, no tests_run/skipped audit lists, no RBAC/allowlists, no extra pacing knobs beyond interval/duration. + +## Acceptance checks +- SINGLEPASS job still works with no new params. +- CONTINUOUS job creates successor after interval without blocking other jobs; stop flag prevents further successors. +- MIRROR launches workers with same port range; aggregation dedupes. +- SHUFFLE changes scan order; SEQUENTIAL remains deterministic. +- Pacing pauses respect stop requests (stop_event.wait), and stage restores correctly after pauses. diff --git a/extensions/business/cybersec/red_mesh/pentester_api_01.py b/extensions/business/cybersec/red_mesh/pentester_api_01.py index e2f6e43d..019e98ed 100644 --- a/extensions/business/cybersec/red_mesh/pentester_api_01.py +++ b/extensions/business/cybersec/red_mesh/pentester_api_01.py @@ -7,6 +7,27 @@ - plugin-based scanning and testing services - real-time job monitoring and logging via API + +```pipeline-example.json +{ + "NAME": "redmesh_api", + "TYPE": "Void", + "PLUGINS": [ + { + "SIGNATURE": "PENTESTER_API_01", + "INSTANCES": [ + { + "INSTANCE_ID": "PENTESTER_API_01_DEFAULT", + "CHECK_JOBS_EACH": 5, + "NR_LOCAL_WORKERS": 4, + "WARMUP_DELAY": 30 + } + ] + } + ] +} +``` + """ @@ -25,6 +46,17 @@ "NR_LOCAL_WORKERS" : 8, "WARMUP_DELAY" : 30, + + # feature defaults + "PORT_DISTRIBUTION" : "SLICE", # SLICE | MIRROR + "PORT_ORDER" : "SHUFFLE", # SHUFFLE | SEQUENTIAL + "RUN_MODE" : "SINGLEPASS", # SINGLEPASS | CONTINUOUS + "SCHEDULE_INTERVAL" : None, # required for CONTINUOUS if not passed at launch + "PACING" : None, # {"pause_interval": int, "pause_duration": float} + "MAX_ITERATIONS" : None, # optional safety cap; None means no cap + "INCLUDED_TESTS" : None, + "EXCLUDED_TESTS" : [], + "CHAIN_ID": None, 'VALIDATION_RULES': { @@ -34,18 +66,35 @@ class PentesterApi01Plugin(BasePlugin): """ - RedMesh API - a pentesting meta-plugin for receiving pentesting targets and performing operations. - Supports asynchronous job execution and performs distributed red-team attacks based on - decentralized workers orchestrated using CStore. + RedMesh API plugin for orchestrating decentralized pentest jobs. + + The plugin listens for announced jobs in CStore, spawns local workers to + cover assigned port ranges, aggregates their reports, and updates the + distributed job state. It exposes FastAPI endpoints for launching tests, + inspecting progress, and stopping runs. - Supports semaphore-based pairing with Container App Runner plugins via - the SEMAPHORE configuration key. When configured, exposes API host/port - as environment variables to paired containers (e.g., RedMesh UI). + Attributes + ---------- + CONFIG : dict + Plugin configuration merged with `BasePlugin` defaults. + scan_jobs : dict + Active local jobs keyed by job_id. + completed_jobs_reports : dict + Aggregated reports for finished jobs keyed by job_id. + lst_completed_jobs : list + List of job_ids that completed locally (used for status responses). """ CONFIG = _CONFIG def on_init(self): + """ + Initialize plugin state and log available features. + + Returns + ------- + None + """ super(PentesterApi01Plugin, self).on_init() self.__features = self._get_all_features() # Track active and completed jobs by target @@ -80,11 +129,36 @@ def on_close(self): def P(self, s, *args, **kwargs): + """ + Prefixed logger for RedMesh messages. + + Parameters + ---------- + s : str + Message to log. + *args + Positional args forwarded to parent logger. + **kwargs + Keyword args forwarded to parent logger. + + Returns + ------- + Any + Result of parent logger. + """ s = "[REDMESH] " + s return super(PentesterApi01Plugin, self).P(s, *args, **kwargs) def __post_init(self): + """ + Perform warmup: reconcile existing jobs in CStore, migrate legacy keys, + and aggregate any incomplete reports. + + Returns + ------- + None + """ all_network_jobs = self.chainstore_hgetall(hkey=self.cfg_instance_id) info = "" for job_key, job_spec in all_network_jobs.items(): @@ -100,6 +174,7 @@ def __post_init(self): sample_value = next(iter(raw_report.values()), None) needs_aggregation = isinstance(sample_value, dict) and "local_worker_id" in sample_value if needs_aggregation: + # Merge stranded partial worker outputs left from prior deployments. self.P(f"Found incomplete report for {normalized_key}, aggregating...", color='r') agg_report = self._get_aggregated_report(raw_report) our_worker["result"] = agg_report @@ -135,6 +210,19 @@ def __post_init(self): def _get_all_features(self, categs=False): + """ + Discover all service and web test methods available to workers. + + Parameters + ---------- + categs : bool, optional + If True, return a dict keyed by category; otherwise a flat list. + + Returns + ------- + dict | list + Mapping or list of method names prefixed with `_service_info_` / `_web_test_`. + """ features = {} if categs else [] PREFIXES = ["_service_info_", "_web_test_"] for prefix in PREFIXES: @@ -147,13 +235,26 @@ def _get_all_features(self, categs=False): def _normalize_job_record(self, job_key, job_spec, migrate=False): - """Return a normalized job record and optionally migrate legacy entries. + """ + Normalize a job record and optionally migrate legacy entries. Stage 1 of the multi-job coordination refactor lives here. The helper keeps legacy single-key entries compatible while allowing each job to live under - its own hash key. Stage 2 (not yet implemented) will introduce a dedicated - index for fast lookups and TTL-based cleanup so workers can garbage collect - stale jobs without clobbering peers. + its own hash key. + + Parameters + ---------- + job_key : str + Raw key stored in CStore. + job_spec : Any + Raw job specification (expected dict). + migrate : bool, optional + If True, rewrite legacy keys to normalized form. + + Returns + ------- + tuple[str | None, dict | None] + Normalized key and spec; `(None, None)` when invalid. """ if not isinstance(job_spec, dict): return None, None @@ -166,6 +267,86 @@ def _normalize_job_record(self, job_key, job_spec, migrate=False): if not isinstance(workers, dict): workers = {} normalized["workers"] = workers + + # Defaults and validation for new fields + allowed_distributions = {"SLICE", "MIRROR"} + allowed_orders = {"SEQUENTIAL", "SHUFFLE"} + allowed_run_modes = {"SINGLEPASS", "CONTINUOUS"} + + port_distribution = normalized.get("port_distribution", self.cfg_port_distribution) + port_order = normalized.get("port_order", self.cfg_port_order) + run_mode = normalized.get("run_mode", self.cfg_run_mode) + included_tests = normalized.get("included_tests", self.cfg_included_tests) + excluded_tests = normalized.get("excluded_tests", self.cfg_excluded_tests) + schedule = normalized.get("schedule") + pacing = normalized.get("pacing", self.cfg_pacing) + scheduled_at = normalized.get("scheduled_at") + continuous_stopped = normalized.get("continuous_stopped", False) + max_iterations = normalized.get("max_iterations", self.cfg_max_iterations) + chain_id = normalized.get("chain_id", normalized.get("job_id")) + + if port_distribution not in allowed_distributions: + self.P(f"Invalid port_distribution for job {job_id}: {port_distribution}", color='r') + return None, None + if port_order not in allowed_orders: + self.P(f"Invalid port_order for job {job_id}: {port_order}", color='r') + return None, None + if run_mode not in allowed_run_modes: + self.P(f"Invalid run_mode for job {job_id}: {run_mode}", color='r') + return None, None + if included_tests is not None and not isinstance(included_tests, list): + self.P(f"Invalid included_tests for job {job_id}: expected list or None", color='r') + return None, None + if excluded_tests is not None and not isinstance(excluded_tests, list): + self.P(f"Invalid excluded_tests for job {job_id}: expected list", color='r') + return None, None + if pacing is not None: + if not isinstance(pacing, dict): + self.P(f"Invalid pacing for job {job_id}: expected dict", color='r') + return None, None + if "pause_interval" not in pacing or "pause_duration" not in pacing: + self.P(f"Invalid pacing for job {job_id}: missing keys", color='r') + return None, None + if max_iterations is not None: + try: + max_iterations = int(max_iterations) + except Exception: + self.P(f"Invalid max_iterations for job {job_id}: {max_iterations}", color='r') + return None, None + if max_iterations <= 0: + self.P(f"Invalid max_iterations for job {job_id}: must be > 0", color='r') + return None, None + if run_mode == "CONTINUOUS": + if schedule is None: + schedule = {"interval": self.cfg_schedule_interval} if self.cfg_schedule_interval else None + if not isinstance(schedule, dict) or schedule.get("interval") is None: + self.P(f"Invalid schedule for continuous job {job_id}: {schedule}", color='r') + return None, None + try: + interval = int(schedule.get("interval")) + except Exception: + self.P(f"Invalid interval for job {job_id}: {schedule.get('interval')}", color='r') + return None, None + if interval <= 0: + self.P(f"Invalid interval for job {job_id}: must be > 0", color='r') + return None, None + schedule = {"interval": interval} + else: + schedule = None + scheduled_at = None + + normalized["port_distribution"] = port_distribution + normalized["port_order"] = port_order + normalized["run_mode"] = run_mode + normalized["included_tests"] = included_tests + normalized["excluded_tests"] = excluded_tests or [] + normalized["schedule"] = schedule + normalized["pacing"] = pacing + normalized["scheduled_at"] = scheduled_at + normalized["continuous_stopped"] = bool(continuous_stopped) + normalized["max_iterations"] = max_iterations + normalized["chain_id"] = chain_id + if migrate and job_key != job_id: self.chainstore_hset(hkey=self.cfg_instance_id, key=job_id, value=normalized) self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=None) @@ -174,6 +355,21 @@ def _normalize_job_record(self, job_key, job_spec, migrate=False): def _ensure_worker_entry(self, job_id, job_spec): + """ + Ensure current worker has an entry in the distributed job spec. + + Parameters + ---------- + job_id : str + Identifier of the job. + job_spec : dict + Mutable job specification stored in CStore. + + Returns + ------- + dict + Worker entry for this edge node. + """ workers = job_spec.setdefault("workers", {}) worker_entry = workers.get(self.ee_addr) if worker_entry is None: @@ -192,27 +388,77 @@ def _launch_job( network_worker_address, nr_local_workers=4, exceptions=None, + port_distribution=None, + port_order=None, + included_tests=None, + excluded_tests=None, + pacing=None, ): + """ + Launch local worker threads for a job by splitting the port range. + + Parameters + ---------- + job_id : str + Identifier of the job being launched. + target : str + Hostname or IP to scan. + start_port : int + Inclusive start of port range. + end_port : int + Inclusive end of port range. + network_worker_address : str + Address of the worker that announced the job. + nr_local_workers : int, optional + Number of worker threads to spawn. + exceptions : list[int], optional + Ports to exclude from scanning. + port_distribution : str, optional + SLICE (split range) or MIRROR (full range per worker). + port_order : str, optional + SEQUENTIAL or SHUFFLE port iteration per worker. + included_tests / excluded_tests : list[str] | None, optional + Filters for worker test execution. + pacing : dict | None, optional + Stealth pauses: {"pause_interval": int, "pause_duration": float}; None disables pacing. + + Returns + ------- + dict + Mapping of local_worker_id to `PentestLocalWorker` instances. + + Raises + ------ + ValueError + When no ports are available or batches cannot be allocated. + """ local_jobs = {} ports = list(range(start_port, end_port + 1)) - batches = [] ports = sorted(ports) nr_ports = len(ports) if nr_ports == 0: raise ValueError("No ports available for local workers.") nr_local_workers = max(1, min(nr_local_workers, nr_ports)) - base_chunk, remainder = divmod(nr_ports, nr_local_workers) - start = 0 + batches = [] if exceptions is None: exceptions = [] - for i in range(nr_local_workers): - chunk = base_chunk + (1 if i < remainder else 0) - end = start + chunk - batch = ports[start:end] - if batch: - batches.append(batch) - start = end - #endfor create batches + + if (port_distribution or self.cfg_port_distribution) == "MIRROR": + # every worker gets full range; copy when we plan to shuffle per worker + for _ in range(nr_local_workers): + batches.append(list(ports)) + else: + base_chunk, remainder = divmod(nr_ports, nr_local_workers) + start = 0 + for i in range(nr_local_workers): + chunk = base_chunk + (1 if i < remainder else 0) + end = start + chunk + batch = ports[start:end] + if batch: + batches.append(batch) + start = end + #endfor create batches + #end if MIRROR vs SLICE if not batches: raise ValueError("Unable to allocate port batches to workers.") if job_id not in self.scan_jobs: @@ -233,6 +479,10 @@ def _launch_job( initiator=network_worker_address, exceptions=exceptions, worker_target_ports=batch, + included_tests=included_tests, + excluded_tests=excluded_tests, + port_order=port_order or self.cfg_port_order, + pacing=pacing, ) batch_job.start() local_jobs[batch_job.local_worker_id] = batch_job @@ -252,8 +502,18 @@ def _launch_job( def _maybe_launch_jobs(self, nr_local_workers=None): """ - Launch new PentestJob threads for any announced pentest target. - Called at each process iteratisson. + Launch new PentestJob threads for announced pentest targets. + + Called periodically from `process`. + + Parameters + ---------- + nr_local_workers : int, optional + Override for number of local workers; defaults to config. + + Returns + ------- + None """ if self.time() - self.__last_checked_jobs > self.cfg_check_jobs_each: self.__last_checked_jobs = self.time() @@ -266,6 +526,11 @@ def _maybe_launch_jobs(self, nr_local_workers=None): job_id = job_specs.get("job_id", normalized_key) if job_id is None: continue + scheduled_at = job_specs.get("scheduled_at") + if scheduled_at and self.time() < scheduled_at: + continue + if job_specs.get("continuous_stopped", False): + continue worker_entry = self._ensure_worker_entry(job_id, job_specs) current_worker_finished = worker_entry.get("finished", False) if current_worker_finished: @@ -295,7 +560,12 @@ def _maybe_launch_jobs(self, nr_local_workers=None): end_port=end_port, network_worker_address=launcher, nr_local_workers=workers_requested, - exceptions=exceptions + exceptions=exceptions, + port_distribution=job_specs.get("port_distribution"), + port_order=job_specs.get("port_order"), + included_tests=job_specs.get("included_tests"), + excluded_tests=job_specs.get("excluded_tests"), + pacing=job_specs.get("pacing"), ) except ValueError as exc: self.P(f"Skipping job {job_id}: {exc}", color='r') @@ -311,6 +581,19 @@ def _maybe_launch_jobs(self, nr_local_workers=None): def _get_aggregated_report(self, local_jobs): + """ + Aggregate results from multiple local workers. + + Parameters + ---------- + local_jobs : dict + Mapping of worker id to result dicts. + + Returns + ------- + dict + Aggregated report with merged open ports, service info, etc. + """ dct_aggregated_report = {} type_or_func, field = None, None try: @@ -360,16 +643,23 @@ def _get_aggregated_report(self, local_jobs): def _close_job(self, job_id, canceled=False): """ - This only closes the LOCAL job not the whole network job - must be refactored! - - TODO: change the logic as follows - - separate worker status from job status in different hset - - redmesh instance (network) cfg_instance_id - - job-id hset - - each network worker will post status (for its local workers) - - all network workers will monitor all network workers - - last network worker that finishes posts - + Close a local job, aggregate reports, and persist in CStore. + + Parameters + ---------- + job_id : str + Identifier for the job to close. + canceled : bool, optional + Whether job was canceled before completion. + + Returns + ------- + None + + Notes + ----- + Also handles continuous chaining: if `run_mode` is CONTINUOUS and not stopped, + a successor job is created with a future `scheduled_at`. """ local_workers = self.scan_jobs.pop(job_id, None) if local_workers: @@ -396,10 +686,56 @@ def _close_job(self, job_id, canceled=False): self.json_dumps(job_specs, indent=2) )) self.chainstore_hset(hkey=self.cfg_instance_id, key=job_id, value=job_specs) + # Handle continuous chaining if enabled and not stopped + # NOTE: if another node sets continuous_stopped between our read and this point, + # we could still create a successor; re-read here if stronger guarantees are needed. + if ( + job_specs.get("run_mode") == "CONTINUOUS" + and not job_specs.get("continuous_stopped", False) + ): + iteration = int(job_specs.get("iteration", 0) or 0) + max_iterations = job_specs.get("max_iterations") + if max_iterations is not None: + try: + max_iterations = int(max_iterations) + except Exception: + max_iterations = None + if max_iterations is not None and iteration + 1 >= max_iterations: + self.P(f"Max iterations reached for {job_id}, not creating successor.") + return + schedule = job_specs.get("schedule") or {} + interval = schedule.get("interval") + if interval: + try: + interval = int(interval) + except Exception: + interval = None + if interval is None or interval <= 0: + self.P(f"No valid interval for continuous job {job_id}, skipping successor.", color='r') + return + next_job_id = self.uuid(8) + next_specs = { + **job_specs, + "job_id": next_job_id, + "chain_id": job_specs.get("chain_id", job_id), + "iteration": iteration + 1, + "scheduled_at": self.time() + interval, + "workers": {}, + "continuous_stopped": False, + } + self.chainstore_hset(hkey=self.cfg_instance_id, key=next_job_id, value=next_specs) + self.P(f"Scheduled successor {next_job_id} for job {job_id} in {interval}s.") return def _maybe_close_jobs(self): + """ + Inspect running jobs and close those whose workers have finished. + + Returns + ------- + None + """ for job_id, local_workers in list(self.scan_jobs.items()): all_workers_done = True any_canceled_worker = False @@ -443,11 +779,32 @@ def _maybe_close_jobs(self): def _get_all_network_jobs(self): + """ + Retrieve all jobs tracked in CStore for this instance. + + Returns + ------- + dict + Raw mapping from job keys to specs. + """ all_workers_and_jobs = self.chainstore_hgetall(hkey=self.cfg_instance_id) return all_workers_and_jobs def _get_job_from_cstore(self, job_id: str): + """ + Fetch a normalized job spec from CStore by job_id. + + Parameters + ---------- + job_id : str + Identifier to search for. + + Returns + ------- + dict | None + Normalized job spec or None if not found. + """ all_workers_and_jobs = self._get_all_network_jobs() found = None for job_key, job_specs in all_workers_and_jobs.items(): @@ -459,6 +816,19 @@ def _get_job_from_cstore(self, job_id: str): def _get_job_status(self, job_id : str): + """ + Build a status or report payload for a job. + + Parameters + ---------- + job_id : str + Identifier of the job to inspect. + + Returns + ------- + dict + Status payload indicating running/completed/network_tracked/not_found. + """ target = None local_workers = self.scan_jobs.get(job_id) jobs_network_state = self._get_job_from_cstore(job_id) @@ -471,7 +841,10 @@ def _get_job_status(self, job_id : str): target = local_workers_reports[some_worker]["target"] result = { "job_id": job_id, + "chain_id": jobs_network_state.get("chain_id") if jobs_network_state else None, "target": target, + "iteration": jobs_network_state.get("iteration") if jobs_network_state else None, + "run_mode": jobs_network_state.get("run_mode") if jobs_network_state else None, "status": "completed", "report": self.completed_jobs_reports[job_id] } @@ -482,11 +855,22 @@ def _get_job_status(self, job_id : str): for local_worker_id, job in local_workers.items(): statuses[local_worker_id] = job.get_status() #end for each local worker - result = statuses + result = { + "job_id": job_id, + "chain_id": jobs_network_state.get("chain_id") if jobs_network_state else None, + "iteration": jobs_network_state.get("iteration") if jobs_network_state else None, + "run_mode": jobs_network_state.get("run_mode") if jobs_network_state else None, + "status": "running", + "workers": statuses, + } elif jobs_network_state: result = { "job_id": job_id, + "chain_id": jobs_network_state.get("chain_id"), "target": jobs_network_state.get("target"), + "iteration": jobs_network_state.get("iteration"), + "run_mode": jobs_network_state.get("run_mode"), + "scheduled_at": jobs_network_state.get("scheduled_at"), "status": "network_tracked", "job": jobs_network_state } @@ -512,6 +896,14 @@ def _get_job_status(self, job_id : str): @BasePlugin.endpoint def list_features(self): + """ + List available service and web test features. + + Returns + ------- + dict + Mapping of categories to lists of feature names. + """ result = {"features": self._get_all_features(categs=True)} return result @@ -521,11 +913,58 @@ def launch_test( self, target: str = "", start_port: int = 1, end_port: int = 65535, - exceptions: str = "64297" + exceptions: str = "64297", + included_tests=None, + excluded_tests=None, + port_distribution: str = None, + port_order: str = None, + run_mode: str = None, + schedule_interval: int = None, + pacing=None, + max_iterations=None, ): """ - Endpoint to start a pentest on the specified target. - Announce job to network via CStore and return current jobs. + Start a pentest on the specified target. + + Announces the job to the network via CStore; actual execution is handled + asynchronously by worker threads. + + Parameters + ---------- + target : str, optional + Hostname or IP to scan. + start_port : int, optional + Inclusive start port, default 1. + end_port : int, optional + Inclusive end port, default 65535. + exceptions : str, optional + Comma/space separated list of ports to skip. + included_tests : list | str | None, optional + Whitelist of tests to run; None runs all available tests. + excluded_tests : list | str | None, optional + Blacklist of tests to skip; takes precedence over included_tests. + port_distribution : str, optional + SLICE (default) to divide ports among workers, or MIRROR to give all workers the full range. + port_order : str, optional + SEQUENTIAL or SHUFFLE (default) port scan order per worker. + run_mode : str, optional + SINGLEPASS (default) or CONTINUOUS for recurring scans. + schedule_interval : int, optional + Interval in seconds between continuous iterations (required for CONTINUOUS). + pacing : dict | None, optional + Stealth pauses: {"pause_interval": int, "pause_duration": float}; None disables pacing. + max_iterations : int | None, optional + Optional safety cap for continuous chains; None means no cap. + + Returns + ------- + dict + Job specification, current worker id, and other active jobs. + + Raises + ------ + ValueError + If no target is provided. """ # INFO: This method only announces the job to the network. It does not # execute the job itself - that part is handled by PentestJob @@ -541,17 +980,106 @@ def launch_test( int(x) for x in self.re.findall(r'\d+', exceptions) if x.isdigit() ] + allowed_distributions = {"SLICE", "MIRROR"} + allowed_orders = {"SEQUENTIAL", "SHUFFLE"} + allowed_run_modes = {"SINGLEPASS", "CONTINUOUS"} + if port_distribution is None: + port_distribution = self.cfg_port_distribution + if port_order is None: + port_order = self.cfg_port_order + if run_mode is None: + run_mode = self.cfg_run_mode + if port_distribution not in allowed_distributions: + raise ValueError(f"port_distribution must be one of {allowed_distributions}") + if port_order not in allowed_orders: + raise ValueError(f"port_order must be one of {allowed_orders}") + if run_mode not in allowed_run_modes: + raise ValueError(f"run_mode must be one of {allowed_run_modes}") + + # Validate tests against available features + available_tests = set(self._get_all_features()) + def _normalize_tests(lst): + if lst is None: + return None + if isinstance(lst, str): + lst = [x for x in self.re.split(r'[,\s]+', lst) if x] + if not isinstance(lst, (list, tuple)): + raise ValueError("Tests list must be list/tuple/str or None") + lst = [str(x) for x in lst] + for t in lst: + if t not in available_tests: + raise ValueError(f"Unknown test name: {t}") + return list(lst) + + included_tests = _normalize_tests(included_tests) + excluded_tests = _normalize_tests(excluded_tests) or [] + + # Validate pacing + if pacing is not None: + if not isinstance(pacing, dict): + raise ValueError("pacing must be a dict with pause_interval and pause_duration") + if "pause_interval" not in pacing or "pause_duration" not in pacing: + raise ValueError("pacing requires pause_interval and pause_duration") + try: + pause_interval = float(pacing["pause_interval"]) + pause_duration = float(pacing["pause_duration"]) + except Exception: + raise ValueError("pacing values must be numeric") + if pause_interval <= 0: + raise ValueError("pause_interval must be > 0") + if pause_duration < 0: + raise ValueError("pause_duration must be >= 0") + pacing = {"pause_interval": pause_interval, "pause_duration": pause_duration} + + # Validate continuous schedule + schedule = None + if run_mode == "CONTINUOUS": + interval = schedule_interval if schedule_interval is not None else self.cfg_schedule_interval + if interval is None: + raise ValueError("interval (schedule_interval) is required for CONTINUOUS mode") + try: + interval = int(interval) + except Exception: + raise ValueError("interval must be an integer") + if interval <= 0: + raise ValueError("interval must be > 0") + schedule = {"interval": interval} + + # Validate max_iterations + if max_iterations is not None: + try: + max_iterations = int(max_iterations) + except Exception: + raise ValueError("max_iterations must be an integer") + if max_iterations <= 0: + raise ValueError("max_iterations must be > 0") + else: + max_iterations = self.cfg_max_iterations + job_id = self.uuid(8) + chain_id = job_id if run_mode == "CONTINUOUS" else job_id self.P(f"Launching {job_id=} {target=} with {exceptions=}") self.P(f"Announcing pentest to workers (instance_id {self.cfg_instance_id})...") job_specs = { "job_id" : job_id, + "chain_id": chain_id, "target": target, "exceptions" : exceptions, "start_port" : start_port, "end_port" : end_port, "launcher": self.ee_addr, "created_at": self.time(), + "iteration": 0, + "port_distribution": port_distribution, + "port_order": port_order, + "run_mode": run_mode, + "schedule": schedule, + "pacing": pacing, + "included_tests": included_tests, + "excluded_tests": excluded_tests, + "scheduled_at": None, + "continuous_stopped": False, + "max_iterations": max_iterations, "workers" : { self.ee_addr: { "finished": False, @@ -584,9 +1112,17 @@ def launch_test( @BasePlugin.endpoint def get_job_status(self, job_id: str): """ - Endpoint to retrieve the status or final report of a pentest job for the given target. - - TODO: Data must be extracted from CStore + Retrieve status or final report of a pentest job. + + Parameters + ---------- + job_id : str + Identifier of the job. + + Returns + ------- + dict + Status payload as returned by `_get_job_status`. """ # If job has completed, return its report return self._get_job_status(job_id) @@ -595,7 +1131,12 @@ def get_job_status(self, job_id: str): @BasePlugin.endpoint def list_network_jobs(self): """ - Endpoint to list all network jobs. + List all network jobs stored in CStore. + + Returns + ------- + dict + Normalized job specs keyed by job_id. """ raw_network_jobs = self.chainstore_hgetall(hkey=self.cfg_instance_id) normalized_jobs = {} @@ -608,6 +1149,14 @@ def list_network_jobs(self): @BasePlugin.endpoint def list_local_jobs(self): + """ + List jobs currently running on this worker. + + Returns + ------- + dict + Mapping job_id to status payload. + """ jobs = { job_id: self._get_job_status(job_id) for job_id, local_workers in self.scan_jobs.items() @@ -616,9 +1165,21 @@ def list_local_jobs(self): @BasePlugin.endpoint - def stop_and_delete_job(self, job_id : str): + def stop_and_delete_job(self, job_id : str, stop_continuous: bool = False): """ - Endpoint to stop and delete a pentest job. + Stop and delete a pentest job. + + Parameters + ---------- + job_id : str + Identifier of the job to stop. + stop_continuous : bool, optional + If True, mark the continuous chain to stop spawning successors. + + Returns + ------- + dict + Status message and job_id. """ # Stop the job if it's running local_workers = self.scan_jobs.get(job_id) @@ -636,17 +1197,22 @@ def stop_and_delete_job(self, job_id : str): worker_entry = job_specs.setdefault("workers", {}).setdefault(self.ee_addr, {}) worker_entry["finished"] = True worker_entry["canceled"] = True + if stop_continuous: + job_specs["continuous_stopped"] = True self.chainstore_hset(hkey=self.cfg_instance_id, key=job_id, value=job_specs) else: self.chainstore_hset(hkey=self.cfg_instance_id, key=job_id, value=None) self.P(f"Job {job_id} deleted.") - return {"status": "success", "job_id": job_id} + return {"status": "success", "job_id": job_id, "stop_continuous": stop_continuous} def process(self): """ - Periodically invoked to manage job threads. - Launches new jobs and checks for completed ones. + Periodic task handler: launch new jobs and close completed ones. + + Returns + ------- + None """ super(PentesterApi01Plugin, self).process() diff --git a/extensions/business/cybersec/red_mesh/redmesh_utils.py b/extensions/business/cybersec/red_mesh/redmesh_utils.py index 940258e2..dd91b6bd 100644 --- a/extensions/business/cybersec/red_mesh/redmesh_utils.py +++ b/extensions/business/cybersec/red_mesh/redmesh_utils.py @@ -5,6 +5,7 @@ import ftplib import requests import traceback +import random from copy import deepcopy @@ -27,19 +28,23 @@ class PentestLocalWorker( _WebTestsMixin ): """ - PentestJob handles the execution of a pentest scanning job for a given target. - It performs port scanning, service banner gathering, and basic web vulnerability tests. - - Parameters + Execute a pentest workflow against a target on a dedicated thread. + + The worker scans ports, gathers service banners, and performs lightweight web + security probes. It maintains local state and exposes status for aggregation. + + Attributes ---------- target : str - The network address (IP or hostname) to scan. - logger : callable, optional - Function for logging messages (e.g., plugin.P); if None, prints to stdout. - - - TODO: - - target ports must be configurable per worker from PENTESTER_API and each worker must receive a slice + Hostname or IP being scanned. + job_id : str + Identifier tying the worker to a network job. + initiator : str + Address that announced the job. + local_worker_id : str + Unique identifier per worker instance. + state : dict + Mutable status including ports scanned, open ports, and findings. """ def __init__( @@ -51,7 +56,48 @@ def __init__( local_id_prefix : str, worker_target_ports=COMMON_PORTS, exceptions=None, + included_tests=None, + excluded_tests=None, + port_order=None, + pacing=None, ): + """ + Initialize a pentest worker with target ports and exclusions. + + Parameters + ---------- + owner : object + Parent object providing logger `P`. + target : str + Hostname or IP to scan. + job_id : str + Identifier of the job. + initiator : str + Address that announced the job. + local_id_prefix : str + Prefix used to derive a human-friendly worker id. + worker_target_ports : list[int], optional + Ports assigned to this worker; defaults to common ports. + exceptions : list[int], optional + Ports to exclude from scanning. + included_tests : list[str] | None, optional + Whitelist of tests to run; None runs all. + excluded_tests : list[str] | None, optional + Blacklist of tests to skip. + port_order : str, optional + SEQUENTIAL or SHUFFLE for port iteration. + pacing : dict | None, optional + Stealth pauses: {"pause_interval": int, "pause_duration": float}; None disables pacing. + + Raises + ------ + ValueError + If no ports remain after applying exceptions. + """ + self.included_tests = included_tests if included_tests is not None else getattr(owner, "cfg_included_tests", None) + self.excluded_tests = excluded_tests if excluded_tests is not None else getattr(owner, "cfg_excluded_tests", []) + self.port_order = port_order if port_order is not None else getattr(owner, "cfg_port_order", "SHUFFLE") + self.pacing = pacing if pacing is not None else getattr(owner, "cfg_pacing", None) if exceptions is None: exceptions = [] self.target = target @@ -96,7 +142,15 @@ def __init__( "completed_tests": [], "done": False, "canceled": False, + "current_stage": "INITIALIZED", } + # pacing state + self.action_counter = 0 + self.next_pause_at = None + if self.pacing: + base_interval = float(self.pacing.get("pause_interval", 0) or 0) + if base_interval > 0: + self.next_pause_at = int(base_interval * random.uniform(0.8, 1.2)) self.__features = self._get_all_features() self.P("Initialized worker {} on {} ports [{}-{}]...".format( self.local_worker_id, @@ -107,6 +161,19 @@ def __init__( return def _get_all_features(self, categs=False): + """ + Discover available probe methods on this worker. + + Parameters + ---------- + categs : bool, optional + If True, return dict by category; otherwise flat list. + + Returns + ------- + dict | list + Service and web test method names. + """ features = {} if categs else [] PREFIXES = ["_service_info_", "_web_test_"] for prefix in PREFIXES: @@ -116,9 +183,70 @@ def _get_all_features(self, categs=False): else: features.extend(methods) return features + + def _should_run(self, test_name): + """ + Check if a test should run based on include/exclude filters. + """ + if self.included_tests is not None and test_name not in self.included_tests: + return False + if test_name in (self.excluded_tests or []): + return False + return True + + def _maybe_pause(self): + """ + Pause execution based on pacing settings with jitter and stop awareness. + + Jitter ranges: + - pause_interval: 0.8-1.2x → actual pause frequency is varied around the configured interval to break patterns + e.g. pause_interval=10 means next pause after ~8-12 actions. + - pause_duration: 0.5-1.5x → actual sleep time is varied around the configured duration to avoid fixed timing + e.g. pause_duration=2.0 means sleep ~1.0-3.0 seconds. + + pause_interval controls how often we pause (every ~N actions); pause_duration + controls how long each pause lasts. + This is the “Dune sand walking” pacing: irregular, human-like delays to avoid + predictable scan patterns. + """ + if not self.pacing: + return + if self.next_pause_at is None: + base_interval = float(self.pacing.get("pause_interval", 0) or 0) + if base_interval <= 0: + return + self.next_pause_at = int(base_interval * random.uniform(0.8, 1.2)) + self.action_counter += 1 + if self.action_counter < self.next_pause_at: + return + + previous_stage = self.state.get("current_stage", "SCANNING") + base_duration = float(self.pacing.get("pause_duration", 0) or 0) + if base_duration < 0: + base_duration = 0 + duration = base_duration * random.uniform(0.5, 1.5) + self.state["current_stage"] = "PAUSED" + interrupted = self.stop_event.wait(timeout=duration) + if interrupted: + self.state["current_stage"] = "STOPPING" + else: + self.state["current_stage"] = previous_stage + + self.action_counter = 0 + base_interval = float(self.pacing.get("pause_interval", 0) or 0) + if base_interval > 0: + self.next_pause_at = int(base_interval * random.uniform(0.8, 1.2)) @staticmethod def get_worker_specific_result_fields(): + """ + Define fields that require aggregation functions across workers. + + Returns + ------- + dict + Mapping of field name to aggregation callable/type. + """ return { "start_port" : min, "end_port" : max, @@ -132,6 +260,19 @@ def get_worker_specific_result_fields(): def get_status(self, for_aggregations=False): + """ + Produce a status snapshot for this worker. + + Parameters + ---------- + for_aggregations : bool, optional + If True, omit volatile fields to simplify merges. + + Returns + ------- + dict + Worker status including progress and findings. + """ completed_tests = self.state.get("completed_tests", []) max_features = len(self.__features) + 1 # +1 from port scanning progress = f"{(len(completed_tests) / max_features) * 100 if self.__features else 0:.1f}%" @@ -162,11 +303,27 @@ def get_status(self, for_aggregations=False): dct_status["web_tests_info"] = self.state["web_tests_info"] dct_status["completed_tests"] = self.state["completed_tests"] + dct_status["current_stage"] = self.state.get("current_stage") return dct_status def P(self, s, **kwargs): + """ + Log a message with worker context prefix. + + Parameters + ---------- + s : str + Message to emit. + **kwargs + Additional logging keyword arguments. + + Returns + ------- + Any + Result of owner logger. + """ s = f"[{self.local_worker_id}:{self.target}] {s}" self.owner.P(s, **kwargs) return @@ -175,6 +332,10 @@ def P(self, s, **kwargs): def start(self): """ Start the pentest job in a new thread. + + Returns + ------- + None """ # Event to signal early stopping self.stop_event = threading.Event() @@ -187,6 +348,10 @@ def start(self): def stop(self): """ Signal the job to stop early. + + Returns + ------- + None """ self.P(f"Stop requested for job {self.job_id} on worker {self.local_worker_id}") self.stop_event.set() @@ -194,6 +359,14 @@ def stop(self): def _check_stopped(self): + """ + Determine whether the worker should cease execution. + + Returns + ------- + bool + True if done or stop event set. + """ return self.state["done"] or self.stop_event.is_set() @@ -201,22 +374,30 @@ def execute_job(self): """ Run the full pentesting workflow: port scanning, service info gathering, and web vulnerability tests, until the job is complete or stopped. + + Returns + ------- + None """ try: + self.state["current_stage"] = "SCANNING" self.P(f"Starting pentest job.") if not self._check_stopped(): self._scan_ports_step() if not self._check_stopped(): + self.state["current_stage"] = "PROBING" self._gather_service_info() self.state["completed_tests"].append("service_info_completed") if not self._check_stopped(): + self.state["current_stage"] = "TESTING" self._run_web_tests() self.state["completed_tests"].append("web_tests_completed") self.state['done'] = True + self.state["current_stage"] = "COMPLETED" self.P(f"Job completed. Ports open and checked: {self.state['open_ports']}") # If stopped before completion @@ -226,6 +407,7 @@ def execute_job(self): except Exception as e: self.P(f"Exception in job execution: {e}:\n{traceback.format_exc()}", color='r') self.state['done'] = True + self.state["current_stage"] = "COMPLETED" return @@ -234,6 +416,17 @@ def execute_job(self): def _scan_ports_step(self, batch_size=None, batch_nr=1): """ Scan a batch of ports from the remaining list to identify open ports. + + Parameters + ---------- + batch_size : int, optional + Number of ports per batch; scans all remaining when None. + batch_nr : int, optional + Batch index (used for logging). + + Returns + ------- + None """ REGISTER_PROGRESS_EACH = 500 @@ -243,6 +436,8 @@ def _scan_ports_step(self, batch_size=None, batch_nr=1): target = self.target ports = deepcopy(self.state["ports_to_scan"]) + if self.port_order == "SHUFFLE": + random.shuffle(ports) if not ports: return if batch_size is None: @@ -254,6 +449,7 @@ def _scan_ports_step(self, batch_size=None, batch_nr=1): self.P(f"Scanning {nr_ports} ports in batch {batch_nr}.") show_progress = False if len(ports_batch) > 1000: + # Avoid noisy progress logs on tiny batches. show_progress = True for i, port in enumerate(ports_batch): if self.stop_event.is_set(): @@ -272,6 +468,7 @@ def _scan_ports_step(self, batch_size=None, batch_nr=1): # endtry self.state["ports_scanned"].append(port) self.state["ports_to_scan"].remove(port) + self._maybe_pause() if ((i + 1) % REGISTER_PROGRESS_EACH) == 0: scan_ports_step_progress = (i + 1) / nr_ports * 100 str_progress = f"{scan_ports_step_progress:.0f}%" @@ -280,7 +477,7 @@ def _scan_ports_step(self, batch_size=None, batch_nr=1): self.state["completed_tests"] = [f"scan_ports_step_{str_progress}"] if show_progress: self.P(f"Port scanning progress on {target}: {str_progress}") - + # end for each port in batch left_ports = self.state["ports_to_scan"] if not left_ports: self.P(f"[{target}] Port scanning completed. {len(self.state['open_ports'])} open ports.") @@ -293,14 +490,23 @@ def _scan_ports_step(self, batch_size=None, batch_nr=1): def _gather_service_info(self): """ Gather banner or basic information from each newly open port. + + Returns + ------- + list + Aggregated string findings per method (may be empty). """ open_ports = self.state["open_ports"] if len(open_ports) == 0: self.P("No open ports to gather service info from.") return self.P(f"Gathering service info for {len(open_ports)} open ports.") + self.state["current_stage"] = "PROBING" target = self.target - service_info_methods = [method for method in dir(self) if method.startswith("_service_info_")] + service_info_methods = [ + method for method in dir(self) + if method.startswith("_service_info_") and self._should_run(method) + ] aggregated_info = [] for method in service_info_methods: func = getattr(self, method) @@ -314,23 +520,31 @@ def _gather_service_info(self): self.state["service_info"][port][method] = info if info is not None: method_info.append(f"{method}: {port}: {info}") + self._maybe_pause() if method_info: aggregated_info.extend(method_info) self.P( f"Method {method} findings:\n{json.dumps(method_info, indent=2)}" ) self.state["completed_tests"].append(method) + # end for each service info method return aggregated_info def _run_web_tests(self): """ Perform basic web vulnerability tests if a web service is open. + + Returns + ------- + list + Collected findings per test method (may be empty). """ open_ports = self.state["open_ports"] if len(open_ports) == 0: self.P("No open ports to run web tests on.") return + self.state["current_stage"] = "TESTING" ports_to_test = list(open_ports) self.P( @@ -342,7 +556,10 @@ def _run_web_tests(self): self.state["web_tested"] = True return result = [] - web_tests_methods = [method for method in dir(self) if method.startswith("_web_test_")] + web_tests_methods = [ + method for method in dir(self) + if method.startswith("_web_test_") and self._should_run(method) + ] for method in web_tests_methods: func = getattr(self, method) for port in ports_to_test: @@ -354,8 +571,9 @@ def _run_web_tests(self): if port not in self.state["web_tests_info"]: self.state["web_tests_info"][port] = {} self.state["web_tests_info"][port][method] = iter_result + self._maybe_pause() # end for each port of current method self.state["completed_tests"].append(method) # register completed method for port - # end for each method + # end for each web test method self.state["web_tested"] = True return result diff --git a/extensions/business/cybersec/red_mesh/service_mixin.py b/extensions/business/cybersec/red_mesh/service_mixin.py index 7e74dfbf..6f225e23 100644 --- a/extensions/business/cybersec/red_mesh/service_mixin.py +++ b/extensions/business/cybersec/red_mesh/service_mixin.py @@ -11,15 +11,27 @@ class _ServiceInfoMixin: Network service banner probes feeding RedMesh reports. Each helper focuses on a specific protocol and maps findings to - OWASP vulnerability families such as A06:2021 (Security - Misconfiguration) or A09:2021 (Security Logging and Monitoring). - The mixin is intentionally light-weight so that PentestLocalWorker - threads can run without external dependencies while still surfacing - high-signal security clues. + OWASP vulnerability families. The mixin is intentionally light-weight so + that `PentestLocalWorker` threads can run without heavy dependencies while + still surfacing high-signal clues. """ def _service_info_80(self, target, port): - """Collect HTTP banner and server metadata for common web ports.""" + """ + Collect HTTP banner and server metadata for common web ports. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Banner summary or error message. + """ info = None try: scheme = "https" if port in (443, 8443) else "http" @@ -36,7 +48,21 @@ def _service_info_80(self, target, port): def _service_info_8080(self, target, port): - """Probe alternate HTTP port 8080 for verbose banners.""" + """ + Probe alternate HTTP port 8080 for verbose banners. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Banner text or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -58,7 +84,21 @@ def _service_info_8080(self, target, port): def _service_info_443(self, target, port): - """Collect HTTPS response banner data for TLS services.""" + """ + Collect HTTPS response banner data for TLS services. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Banner summary or error message. + """ info = None try: url = f"https://{target}" @@ -74,7 +114,21 @@ def _service_info_443(self, target, port): def _service_info_tls(self, target, port): - """Inspect TLS handshake details and certificate lifetime.""" + """ + Inspect TLS handshake details and certificate lifetime. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + TLS version/cipher summary or error message. + """ info = None try: context = ssl.create_default_context() @@ -104,7 +158,21 @@ def _service_info_tls(self, target, port): def _service_info_21(self, target, port): - """Identify FTP banners and anonymous login exposure.""" + """ + Identify FTP banners and anonymous login exposure. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + FTP banner info or vulnerability message. + """ info = None try: ftp = ftplib.FTP(timeout=3) @@ -123,7 +191,21 @@ def _service_info_21(self, target, port): return info def _service_info_22(self, target, port): - """Retrieve the SSH banner to fingerprint implementations.""" + """ + Retrieve the SSH banner to fingerprint implementations. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + SSH banner text or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -138,7 +220,21 @@ def _service_info_22(self, target, port): return info def _service_info_25(self, target, port): - """Capture SMTP banner data for mail infrastructure mapping.""" + """ + Capture SMTP banner data for mail infrastructure mapping. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + SMTP banner text or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -153,7 +249,21 @@ def _service_info_25(self, target, port): return info def _service_info_3306(self, target, port): - """Perform a lightweight MySQL handshake to expose server version.""" + """ + Perform a lightweight MySQL handshake to expose server version. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + MySQL version info or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -172,7 +282,21 @@ def _service_info_3306(self, target, port): return info def _service_info_3389(self, target, port): - """Verify reachability of RDP services without full negotiation.""" + """ + Verify reachability of RDP services without full negotiation. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + RDP reachability summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -186,7 +310,21 @@ def _service_info_3389(self, target, port): return info def _service_info_6379(self, target, port): - """Test Redis exposure by issuing a PING command.""" + """ + Test Redis exposure by issuing a PING command. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Redis response summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -208,7 +346,21 @@ def _service_info_6379(self, target, port): def _service_info_23(self, target, port): - """Fetch Telnet negotiation banner (OWASP A05: insecure protocols).""" + """ + Fetch Telnet negotiation banner. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Telnet banner or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -227,7 +379,21 @@ def _service_info_23(self, target, port): def _service_info_445(self, target, port): - """Probe SMB services for negotiation responses (OWASP A06).""" + """ + Probe SMB services for negotiation responses. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + SMB response summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -248,7 +414,21 @@ def _service_info_445(self, target, port): def _service_info_5900(self, target, port): - """Read VNC handshake string to assess remote desktop exposure.""" + """ + Read VNC handshake string to assess remote desktop exposure. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + VNC banner summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -267,7 +447,21 @@ def _service_info_5900(self, target, port): def _service_info_161(self, target, port): - """Attempt SNMP community string disclosure using 'public'.""" + """ + Attempt SNMP community string disclosure using 'public'. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + SNMP response summary or error message. + """ info = None sock = None try: @@ -299,7 +493,21 @@ def _service_info_161(self, target, port): def _service_info_53(self, target, port): - """Query CHAOS TXT version.bind to detect DNS version disclosure.""" + """ + Query CHAOS TXT version.bind to detect DNS version disclosure. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + DNS disclosure summary or error message. + """ info = None sock = None try: @@ -362,7 +570,21 @@ def _service_info_53(self, target, port): def _service_info_1433(self, target, port): - """Send a TDS prelogin probe to expose SQL Server version data.""" + """ + Send a TDS prelogin probe to expose SQL Server version data. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + MSSQL response summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -387,7 +609,21 @@ def _service_info_1433(self, target, port): def _service_info_5432(self, target, port): - """Probe PostgreSQL for weak authentication methods.""" + """ + Probe PostgreSQL for weak authentication methods. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + PostgreSQL response summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -411,7 +647,21 @@ def _service_info_5432(self, target, port): def _service_info_11211(self, target, port): - """Issue Memcached stats command to detect unauthenticated access.""" + """ + Issue Memcached stats command to detect unauthenticated access. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Memcached response summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -431,7 +681,21 @@ def _service_info_11211(self, target, port): def _service_info_9200(self, target, port): - """Detect Elasticsearch/OpenSearch nodes leaking cluster metadata.""" + """ + Detect Elasticsearch/OpenSearch nodes leaking cluster metadata. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Elasticsearch exposure summary or error message. + """ info = None try: scheme = "http" @@ -450,7 +714,21 @@ def _service_info_9200(self, target, port): def _service_info_502(self, target, port): - """Send Modbus device identification request to detect exposed PLCs.""" + """ + Send Modbus device identification request to detect exposed PLCs. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Modbus exposure summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -473,7 +751,21 @@ def _service_info_502(self, target, port): def _service_info_27017(self, target, port): - """Attempt MongoDB isMaster handshake to detect unauthenticated access.""" + """ + Attempt MongoDB isMaster handshake to detect unauthenticated access. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + MongoDB exposure summary or error message. + """ info = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -508,7 +800,21 @@ def _service_info_27017(self, target, port): def _service_info_generic(self, target, port): - """Attempt a generic TCP banner grab for uncovered ports.""" + """ + Attempt a generic TCP banner grab for uncovered ports. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Port being probed. + + Returns + ------- + str | None + Generic banner text or error message. + """ info = None try: # Generic service: attempt to connect and read a short banner if any diff --git a/extensions/business/cybersec/red_mesh/test_redmesh.py b/extensions/business/cybersec/red_mesh/test_redmesh.py index 33572bb0..4a3c79ea 100644 --- a/extensions/business/cybersec/red_mesh/test_redmesh.py +++ b/extensions/business/cybersec/red_mesh/test_redmesh.py @@ -1,7 +1,9 @@ import sys import struct import unittest -from unittest.mock import MagicMock, patch +import threading +import time +from unittest.mock import MagicMock, patch, Mock from extensions.business.cybersec.red_mesh.redmesh_utils import PentestLocalWorker @@ -764,6 +766,439 @@ def test_http_methods_detection(self): result = worker._web_test_http_methods("example.com", 80) self.assertIn("VULNERABILITY: Risky HTTP methods", result) + def test_pacing_pauses_execution(self): + """Test that pacing configuration is set correctly""" + owner = DummyOwner() + owner.cfg_pacing = {"pause_interval": 2, "pause_duration": 0.05} + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-pacing", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=[80, 81], + pacing={"pause_interval": 2, "pause_duration": 0.05} + ) + worker.stop_event = threading.Event() + + # Verify pacing is configured correctly + self.assertIsNotNone(worker.pacing) + self.assertEqual(worker.pacing["pause_interval"], 2) + self.assertEqual(worker.pacing["pause_duration"], 0.05) + self.assertIsNotNone(worker.next_pause_at) + self.assertGreater(worker.next_pause_at, 0) + + # Verify action counter starts at 0 + self.assertEqual(worker.action_counter, 0) + + def test_pacing_respects_stop_event(self): + """Test that pacing pause can be interrupted by stop event""" + owner = DummyOwner() + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-stop", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=[80], + pacing={"pause_interval": 1, "pause_duration": 5.0} # Long pause + ) + worker.stop_event = threading.Event() + worker.action_counter = 1 + worker.next_pause_at = 1 + + # Start pause in background and stop it + def delayed_stop(): + time.sleep(0.1) + worker.stop_event.set() + + stop_thread = threading.Thread(target=delayed_stop) + stop_thread.start() + + start = time.time() + worker._maybe_pause() + elapsed = time.time() - start + + # Should be interrupted well before 5 seconds + self.assertLess(elapsed, 1.0) + self.assertTrue(worker.stop_event.is_set()) + stop_thread.join() + + def test_port_order_sequential(self): + """Test that SEQUENTIAL port order maintains input order (no shuffle)""" + owner = DummyOwner() + owner.cfg_port_order = "SEQUENTIAL" + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-seq", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=[100, 99, 98, 97, 96], + port_order="SEQUENTIAL" + ) + worker.stop_event = MagicMock() + worker.stop_event.is_set.return_value = False + + scanned_ports = [] + + class DummySocket: + def __init__(self, *args, **kwargs): + pass + def settimeout(self, timeout): + return None + def connect_ex(self, address): + scanned_ports.append(address[1]) + return 1 # not open + def close(self): + return None + + with patch( + "extensions.business.cybersec.red_mesh.redmesh_utils.socket.socket", + return_value=DummySocket(), + ): + worker._scan_ports_step() + + # With SEQUENTIAL, ports maintain their original order (no shuffle) + # Input [100, 99, 98, 97, 96] stays as [100, 99, 98, 97, 96] + expected = [100, 99, 98, 97, 96] + self.assertEqual(scanned_ports, expected) + + def test_port_order_shuffle(self): + """Test that SHUFFLE port order randomizes""" + owner = DummyOwner() + owner.cfg_port_order = "SHUFFLE" + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-shuffle", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=list(range(1, 21)), # 20 ports + port_order="SHUFFLE" + ) + worker.stop_event = MagicMock() + worker.stop_event.is_set.return_value = False + + scanned_ports = [] + + class DummySocket: + def __init__(self, *args, **kwargs): + pass + def settimeout(self, timeout): + return None + def connect_ex(self, address): + scanned_ports.append(address[1]) + return 1 + def close(self): + return None + + with patch( + "extensions.business.cybersec.red_mesh.redmesh_utils.socket.socket", + return_value=DummySocket(), + ): + worker._scan_ports_step() + + # Shuffled order should be different from sorted (very high probability with 20 ports) + self.assertNotEqual(scanned_ports, sorted(scanned_ports)) + # But should contain same ports + self.assertEqual(set(scanned_ports), set(range(1, 21))) + + def test_included_tests_filter(self): + """Test that included_tests filters which tests run""" + owner = DummyOwner() + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-filter", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=[80], + included_tests=["_service_info_80", "_web_test_common"] + ) + + # Should run + self.assertTrue(worker._should_run("_service_info_80")) + self.assertTrue(worker._should_run("_web_test_common")) + + # Should not run + self.assertFalse(worker._should_run("_service_info_443")) + self.assertFalse(worker._should_run("_web_test_xss")) + + def test_excluded_tests_filter(self): + """Test that excluded_tests prevents specific tests from running""" + owner = DummyOwner() + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-exclude", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=[80], + excluded_tests=["_web_test_xss", "_web_test_sql_injection"] + ) + + # Should not run + self.assertFalse(worker._should_run("_web_test_xss")) + self.assertFalse(worker._should_run("_web_test_sql_injection")) + + # Should run (not in excluded list) + self.assertTrue(worker._should_run("_web_test_common")) + self.assertTrue(worker._should_run("_service_info_80")) + + def test_combined_include_exclude_tests(self): + """Test that excluded_tests takes precedence over included_tests""" + owner = DummyOwner() + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-combined", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=[80], + included_tests=["_web_test_xss", "_web_test_common"], + excluded_tests=["_web_test_xss"] + ) + + # Excluded takes precedence + self.assertFalse(worker._should_run("_web_test_xss")) + # In included and not excluded + self.assertTrue(worker._should_run("_web_test_common")) + # Not in included + self.assertFalse(worker._should_run("_web_test_sql_injection")) + + def test_worker_status_reporting(self): + """Test that worker reports accurate status""" + owner = DummyOwner() + worker = PentestLocalWorker( + owner=owner, + target="192.168.1.1", + job_id="job-status", + initiator="launcher@example", + local_id_prefix="42", + worker_target_ports=[80, 443, 8080] + ) + + status = worker.get_status() + + self.assertEqual(status["job_id"], "job-status") + self.assertEqual(status["initiator"], "launcher@example") + self.assertEqual(status["target"], "192.168.1.1") + self.assertEqual(status["start_port"], 80) + self.assertEqual(status["end_port"], 8080) + self.assertFalse(status["done"]) + self.assertFalse(status["canceled"]) + self.assertIn("local_worker_id", status) + self.assertIn("RM-42-", status["local_worker_id"]) + + def test_worker_aggregation_fields(self): + """Test that worker-specific aggregation fields are defined correctly""" + fields = PentestLocalWorker.get_worker_specific_result_fields() + + self.assertIn("open_ports", fields) + self.assertIn("service_info", fields) + self.assertIn("web_tests_info", fields) + self.assertIn("completed_tests", fields) + self.assertIn("start_port", fields) + self.assertIn("end_port", fields) + self.assertEqual(fields["start_port"], min) + self.assertEqual(fields["end_port"], max) + + def test_stop_event_interrupts_port_scan(self): + """Test that setting stop event halts port scanning""" + owner = DummyOwner() + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-stop-scan", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=list(range(1, 101)) # 100 ports + ) + worker.stop_event = threading.Event() + + scanned_count = [0] + + class DummySocket: + def __init__(self, *args, **kwargs): + pass + def settimeout(self, timeout): + return None + def connect_ex(self, address): + scanned_count[0] += 1 + if scanned_count[0] == 10: + worker.stop_event.set() + return 1 + def close(self): + return None + + with patch( + "extensions.business.cybersec.red_mesh.redmesh_utils.socket.socket", + return_value=DummySocket(), + ): + worker._scan_ports_step() + + # Should stop early, not scan all 100 ports + self.assertLess(len(worker.state["ports_scanned"]), 100) + self.assertGreater(len(worker.state["ports_scanned"]), 0) + + def test_stop_event_interrupts_service_gathering(self): + """Test that setting stop event halts service info gathering""" + owner = DummyOwner() + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-stop-service", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=[80, 443, 8080] + ) + worker.state["open_ports"] = [80, 443, 8080] + worker.stop_event = threading.Event() + + call_count = [0] + + def fake_service_info(target, port): + call_count[0] += 1 + if call_count[0] == 2: + worker.stop_event.set() + return f"info:{port}" + + with patch( + "extensions.business.cybersec.red_mesh.redmesh_utils.dir", + return_value=["_service_info_fake"], + ): + setattr(worker, "_service_info_fake", fake_service_info) + worker._gather_service_info() + + # Should not process all ports + self.assertLess(len(worker.state["service_info"]), 3) + + def test_exceptions_removes_ports_from_scan_list(self): + """Test that exception ports are properly excluded""" + owner = DummyOwner() + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-except", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=[80, 8080, 443, 8443], + exceptions=[8080, 8443] + ) + + # Exceptions should be removed from ports_to_scan + self.assertNotIn(8080, worker.state["ports_to_scan"]) + self.assertNotIn(8443, worker.state["ports_to_scan"]) + self.assertIn(80, worker.state["ports_to_scan"]) + self.assertIn(443, worker.state["ports_to_scan"]) + + # Should be tracked in exceptions + self.assertIn(8080, worker.exceptions) + self.assertIn(8443, worker.exceptions) + + def test_exceptions_not_matching_worker_ports_ignored(self): + """Test that exceptions not in worker ports are ignored""" + owner = DummyOwner() + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-except-nomatch", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=[80, 443], + exceptions=[8080, 9000] # Not in worker ports + ) + + # Worker ports should remain unchanged + self.assertIn(80, worker.state["ports_to_scan"]) + self.assertIn(443, worker.state["ports_to_scan"]) + + # Exceptions should be empty since they don't match + self.assertEqual(worker.exceptions, []) + + def test_worker_thread_lifecycle(self): + """Test that worker can be started and stopped properly""" + owner = DummyOwner() + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-lifecycle", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=[80] + ) + + # Mock execute_job to avoid actual scanning + executed = [False] + def mock_execute(): + executed[0] = True + worker.state["done"] = True + + with patch.object(worker, 'execute_job', side_effect=mock_execute): + worker.start() + self.assertIsInstance(worker.thread, threading.Thread) + self.assertTrue(worker.thread.daemon) + + # Wait for thread to complete + worker.thread.join(timeout=1.0) + self.assertTrue(executed[0]) + + def test_current_stage_tracking(self): + """Test that current_stage is updated during workflow""" + owner = DummyOwner() + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-stage", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=[80] + ) + + self.assertEqual(worker.state["current_stage"], "INITIALIZED") + + worker.stop_event = threading.Event() + + # Mock the actual work + with patch.object(worker, '_scan_ports_step'): + worker.state["current_stage"] = "SCANNING" + self.assertEqual(worker.state["current_stage"], "SCANNING") + + worker.state["current_stage"] = "PROBING" + self.assertEqual(worker.state["current_stage"], "PROBING") + + worker.state["current_stage"] = "TESTING" + self.assertEqual(worker.state["current_stage"], "TESTING") + + worker.state["current_stage"] = "COMPLETED" + self.assertEqual(worker.state["current_stage"], "COMPLETED") + + def test_worker_progress_calculation(self): + """Test that worker progress is calculated correctly""" + owner = DummyOwner() + worker = PentestLocalWorker( + owner=owner, + target="example.com", + job_id="job-progress", + initiator="init@example", + local_id_prefix="1", + worker_target_ports=[80] + ) + + # No tests completed + status = worker.get_status() + self.assertIn("progress", status) + + # Simulate some completed tests + all_features = worker._get_all_features() + worker.state["completed_tests"] = all_features[:len(all_features)//2] + + status = worker.get_status() + progress_str = status["progress"] + # Should show partial progress + self.assertIn("%", progress_str) + class VerboseResult(unittest.TextTestResult): def addSuccess(self, test): diff --git a/extensions/business/cybersec/red_mesh/web_mixin.py b/extensions/business/cybersec/red_mesh/web_mixin.py index d42a9db6..0eb170b2 100644 --- a/extensions/business/cybersec/red_mesh/web_mixin.py +++ b/extensions/business/cybersec/red_mesh/web_mixin.py @@ -2,10 +2,29 @@ from urllib.parse import quote class _WebTestsMixin: - """HTTP-centric probes that emulate manual red-team playbooks.""" + """ + HTTP-centric probes that emulate manual red-team playbooks. + + Methods perform lightweight checks for common web vulnerabilities across + discovered web services. + """ def _web_test_common(self, target, port): - """Look for exposed common endpoints and weak access controls.""" + """ + Look for exposed common endpoints and weak access controls. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings from endpoint checks. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -35,7 +54,21 @@ def _web_test_common(self, target, port): def _web_test_homepage(self, target, port): - """Scan landing pages for clear-text secrets or database dumps.""" + """ + Scan landing pages for clear-text secrets or database dumps. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings from homepage inspection. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -67,7 +100,21 @@ def _web_test_homepage(self, target, port): def _web_test_security_headers(self, target, port): - """Flag missing HTTP security headers (OWASP A05/A06).""" + """ + Flag missing HTTP security headers. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings about security headers presence. + """ findings = [] try: scheme = "https" if port in (443, 8443) else "http" @@ -100,7 +147,21 @@ def _web_test_security_headers(self, target, port): def _web_test_flags(self, target, port): - """Check cookies for Secure/HttpOnly/SameSite and directory listing.""" + """ + Check cookies for Secure/HttpOnly/SameSite and directory listing. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings on cookie flags and directory listing. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -140,7 +201,21 @@ def _web_test_flags(self, target, port): def _web_test_xss(self, target, port): - """Probe reflected XSS by injecting a harmless script tag.""" + """ + Probe reflected XSS by injecting a harmless script tag. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings related to reflected XSS. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -165,7 +240,21 @@ def _web_test_xss(self, target, port): def _web_test_path_traversal(self, target, port): - """Attempt basic path traversal payload against the target.""" + """ + Attempt basic path traversal payload against the target. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings about traversal attempts. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -189,7 +278,21 @@ def _web_test_path_traversal(self, target, port): def _web_test_sql_injection(self, target, port): - """Send boolean SQLi payload and look for database error leakage.""" + """ + Send boolean SQLi payload and look for database error leakage. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings related to SQL injection. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -216,7 +319,21 @@ def _web_test_sql_injection(self, target, port): def _web_test_cors_misconfiguration(self, target, port): - """Detect overly permissive CORS policies (OWASP A01/A05).""" + """ + Detect overly permissive CORS policies. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings related to CORS policy. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -256,7 +373,21 @@ def _web_test_cors_misconfiguration(self, target, port): def _web_test_open_redirect(self, target, port): - """Check common redirect parameters for open redirect abuse.""" + """ + Check common redirect parameters for open redirect abuse. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings about open redirects. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -291,7 +422,21 @@ def _web_test_open_redirect(self, target, port): def _web_test_http_methods(self, target, port): - """Surface risky HTTP verbs enabled on the root resource.""" + """ + Surface risky HTTP verbs enabled on the root resource. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings related to allowed HTTP methods. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -320,7 +465,21 @@ def _web_test_http_methods(self, target, port): def _web_test_graphql_introspection(self, target, port): - """Check if GraphQL introspection is exposed in production endpoints.""" + """ + Check if GraphQL introspection is exposed in production endpoints. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings on GraphQL introspection exposure. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -346,7 +505,21 @@ def _web_test_graphql_introspection(self, target, port): def _web_test_metadata_endpoints(self, target, port): - """Probe cloud metadata paths to detect SSRF-style exposure.""" + """ + Probe cloud metadata paths to detect SSRF-style exposure. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings on metadata endpoint exposure. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}" @@ -375,7 +548,21 @@ def _web_test_metadata_endpoints(self, target, port): def _web_test_api_auth_bypass(self, target, port): - """Detect APIs that succeed despite invalid Authorization headers.""" + """ + Detect APIs that succeed despite invalid Authorization headers. + + Parameters + ---------- + target : str + Hostname or IP address. + port : int + Web port to probe. + + Returns + ------- + str + Joined findings related to auth bypass behavior. + """ findings = [] scheme = "https" if port in (443, 8443) else "http" base_url = f"{scheme}://{target}"