Skip to content

Commit 28252dd

Browse files
authored
feat: redmesh deepseek integration (#351)
* feat: implement redmesh llm_agent_api and deepseek integration * fix: move llm methods to a mixin * fix: rename redmesh llm agent mixin * chore: inc version
1 parent 4c2b6b2 commit 28252dd

File tree

5 files changed

+1190
-7
lines changed

5 files changed

+1190
-7
lines changed

extensions/business/cybersec/red_mesh/constants.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,14 @@
9696

9797
# Port order constants
9898
PORT_ORDER_SHUFFLE = "SHUFFLE"
99-
PORT_ORDER_SEQUENTIAL = "SEQUENTIAL"
99+
PORT_ORDER_SEQUENTIAL = "SEQUENTIAL"
100+
101+
# LLM Agent API status constants
102+
LLM_API_STATUS_OK = "ok"
103+
LLM_API_STATUS_ERROR = "error"
104+
LLM_API_STATUS_TIMEOUT = "timeout"
105+
106+
# LLM Analysis types
107+
LLM_ANALYSIS_SECURITY_ASSESSMENT = "security_assessment"
108+
LLM_ANALYSIS_VULNERABILITY_SUMMARY = "vulnerability_summary"
109+
LLM_ANALYSIS_REMEDIATION_PLAN = "remediation_plan"

extensions/business/cybersec/red_mesh/pentester_api_01.py

Lines changed: 257 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,19 @@
3434

3535
from naeural_core.business.default.web_app.fast_api_web_app import FastApiWebAppPlugin as BasePlugin
3636
from .redmesh_utils import PentestLocalWorker # Import PentestJob from separate module
37-
from .constants import FEATURE_CATALOG
37+
from .redmesh_llm_agent_mixin import _RedMeshLlmAgentMixin
38+
from .constants import (
39+
FEATURE_CATALOG,
40+
LLM_ANALYSIS_SECURITY_ASSESSMENT,
41+
LLM_ANALYSIS_VULNERABILITY_SUMMARY,
42+
LLM_ANALYSIS_REMEDIATION_PLAN,
43+
)
3844

3945
__VER__ = '0.8.2'
4046

4147
_CONFIG = {
4248
**BasePlugin.CONFIG,
49+
4350
"TUNNEL_ENGINE_ENABLED": False,
4451

4552
'PORT': None,
@@ -49,9 +56,9 @@
4956
"CHECK_JOBS_EACH" : 5,
5057

5158
"REDMESH_VERBOSE" : 10, # Verbosity level for debug messages (0 = off, 1+ = debug)
52-
59+
5360
"NR_LOCAL_WORKERS" : 8,
54-
61+
5562
"WARMUP_DELAY" : 30,
5663

5764
# Defines how ports are split across local workers.
@@ -68,12 +75,19 @@
6875
"SCAN_MIN_RND_DELAY": 0.0, # minimum delay in seconds (0 = disabled)
6976
"SCAN_MAX_RND_DELAY": 0.0, # maximum delay in seconds (0 = disabled)
7077

78+
# LLM Agent API integration for auto-analysis
79+
"LLM_AGENT_API_ENABLED": False, # Enable LLM-powered analysis
80+
"LLM_AGENT_API_HOST": "127.0.0.1", # Host where LLM Agent API is running
81+
"LLM_AGENT_API_PORT": None, # Port for LLM Agent API (required if enabled)
82+
"LLM_AGENT_API_TIMEOUT": 120, # Timeout in seconds for LLM API calls
83+
"LLM_AUTO_ANALYSIS_TYPE": "security_assessment", # Default analysis type
84+
7185
'VALIDATION_RULES': {
72-
**BasePlugin.CONFIG['VALIDATION_RULES'],
86+
**BasePlugin.CONFIG['VALIDATION_RULES'],
7387
},
7488
}
7589

76-
class PentesterApi01Plugin(BasePlugin):
90+
class PentesterApi01Plugin(BasePlugin, _RedMeshLlmAgentMixin):
7791
"""
7892
RedMesh API plugin for orchestrating decentralized pentest jobs.
7993
@@ -846,6 +860,11 @@ def _maybe_finalize_pass(self):
846860
job_specs["date_updated"] = self.time()
847861
job_specs["date_finalized"] = self.time()
848862
self.P(f"[SINGLEPASS] Job {job_id} complete. Status set to FINALIZED.")
863+
864+
# Run LLM auto-analysis on aggregated report (launcher only)
865+
if self.cfg_llm_agent_api_enabled:
866+
self._run_aggregated_llm_analysis(job_id, job_specs, workers, pass_nr=job_pass)
867+
849868
self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=job_specs)
850869
continue
851870

@@ -857,10 +876,19 @@ def _maybe_finalize_pass(self):
857876
job_specs["date_updated"] = self.time()
858877
job_specs["date_finalized"] = self.time()
859878
self.P(f"[CONTINUOUS] Pass {job_pass} complete for job {job_id}. Status set to STOPPED (soft stop was scheduled)")
879+
880+
# Run LLM auto-analysis on aggregated report (launcher only)
881+
if self.cfg_llm_agent_api_enabled:
882+
self._run_aggregated_llm_analysis(job_id, job_specs, workers, pass_nr=job_pass)
883+
860884
self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=job_specs)
861885
continue
862886
# end if
863887

888+
# Run LLM auto-analysis for this pass (launcher only)
889+
if self.cfg_llm_agent_api_enabled:
890+
self._run_aggregated_llm_analysis(job_id, job_specs, workers, pass_nr=job_pass)
891+
864892
# Schedule next pass
865893
interval = job_specs.get("monitor_interval", self.cfg_monitor_interval)
866894
jitter = random.uniform(0, self.cfg_monitor_jitter)
@@ -1487,6 +1515,230 @@ def stop_monitoring(self, job_id: str, stop_type: str = "SOFT"):
14871515
}
14881516

14891517

1518+
@BasePlugin.endpoint(method="post")
1519+
def analyze_job(
1520+
self,
1521+
job_id: str,
1522+
analysis_type: str = "",
1523+
focus_areas: list[str] = None
1524+
):
1525+
"""
1526+
Manually trigger LLM analysis for a completed job.
1527+
1528+
Aggregates reports from all workers and runs analysis on the combined data.
1529+
1530+
Parameters
1531+
----------
1532+
job_id : str
1533+
Identifier of the job to analyze.
1534+
analysis_type : str, optional
1535+
Type of analysis: "security_assessment", "vulnerability_summary", "remediation_plan".
1536+
focus_areas : list[str], optional
1537+
Areas to focus on: ["web", "network", "databases", "authentication"].
1538+
1539+
Returns
1540+
-------
1541+
dict
1542+
LLM analysis result or error message.
1543+
"""
1544+
if not self.cfg_llm_agent_api_enabled:
1545+
return {"error": "LLM Agent API is not enabled", "job_id": job_id}
1546+
1547+
if not self.cfg_llm_agent_api_port:
1548+
return {"error": "LLM Agent API port not configured", "job_id": job_id}
1549+
1550+
# Get job from CStore
1551+
job_specs = self._get_job_from_cstore(job_id)
1552+
if not job_specs:
1553+
return {"error": "Job not found", "job_id": job_id}
1554+
1555+
workers = job_specs.get("workers", {})
1556+
if not workers:
1557+
return {"error": "No workers found for this job", "job_id": job_id}
1558+
1559+
# Check if all workers have finished
1560+
all_finished = all(w.get("finished") for w in workers.values())
1561+
if not all_finished:
1562+
return {"error": "Job not yet complete, some workers still running", "job_id": job_id}
1563+
1564+
# Collect and aggregate reports from all workers
1565+
aggregated_report = self._collect_aggregated_report(workers)
1566+
1567+
if not aggregated_report:
1568+
return {"error": "No report data available for this job", "job_id": job_id}
1569+
1570+
# Add job metadata to report for context
1571+
target = job_specs.get("target", "unknown")
1572+
aggregated_report["_job_metadata"] = {
1573+
"job_id": job_id,
1574+
"target": target,
1575+
"num_workers": len(workers),
1576+
"worker_addresses": list(workers.keys()),
1577+
"start_port": job_specs.get("start_port"),
1578+
"end_port": job_specs.get("end_port"),
1579+
"enabled_features": job_specs.get("enabled_features", []),
1580+
}
1581+
1582+
# Call LLM Agent API
1583+
analysis_type = analysis_type or self.cfg_llm_auto_analysis_type
1584+
1585+
analysis_result = self._call_llm_agent_api(
1586+
endpoint="/analyze_scan",
1587+
method="POST",
1588+
payload={
1589+
"scan_results": aggregated_report,
1590+
"analysis_type": analysis_type,
1591+
"focus_areas": focus_areas,
1592+
}
1593+
)
1594+
1595+
if "error" in analysis_result:
1596+
return {
1597+
"error": analysis_result.get("error"),
1598+
"status": analysis_result.get("status"),
1599+
"job_id": job_id,
1600+
}
1601+
1602+
# Save analysis to R1FS and store in pass_history
1603+
analysis_cid = None
1604+
pass_history = job_specs.get("pass_history", [])
1605+
current_pass = job_specs.get("job_pass", 1)
1606+
1607+
try:
1608+
analysis_cid = self.r1fs.add_json(analysis_result, show_logs=False)
1609+
if analysis_cid:
1610+
# Store in pass_history (find the latest completed pass)
1611+
if pass_history:
1612+
# Update the latest pass entry with analysis CID
1613+
pass_history[-1]["llm_analysis_cid"] = analysis_cid
1614+
else:
1615+
# No pass_history yet - create one
1616+
pass_history.append({
1617+
"pass_nr": current_pass,
1618+
"completed_at": self.time(),
1619+
"reports": {addr: w.get("report_cid") for addr, w in workers.items()},
1620+
"llm_analysis_cid": analysis_cid,
1621+
})
1622+
job_specs["pass_history"] = pass_history
1623+
1624+
self.chainstore_hset(hkey=self.cfg_instance_id, key=job_id, value=job_specs)
1625+
self.P(f"Manual LLM analysis saved for job {job_id}, CID: {analysis_cid}")
1626+
except Exception as e:
1627+
self.P(f"Failed to save analysis to R1FS: {e}", color='y')
1628+
1629+
return {
1630+
"job_id": job_id,
1631+
"target": target,
1632+
"num_workers": len(workers),
1633+
"pass_nr": pass_history[-1].get("pass_nr") if pass_history else current_pass,
1634+
"analysis_type": analysis_type,
1635+
"analysis": analysis_result,
1636+
"analysis_cid": analysis_cid,
1637+
}
1638+
1639+
1640+
@BasePlugin.endpoint
1641+
def get_analysis(self, job_id: str = "", cid: str = "", pass_nr: int = None):
1642+
"""
1643+
Retrieve LLM analysis for a job or by CID.
1644+
1645+
The analysis is generated by the launcher node after all workers complete,
1646+
containing the aggregated results from all distributed workers.
1647+
1648+
Parameters
1649+
----------
1650+
job_id : str, optional
1651+
Identifier of the job to get analysis for.
1652+
cid : str, optional
1653+
Direct CID of the analysis to retrieve.
1654+
pass_nr : int, optional
1655+
Pass number for continuous jobs. If not provided, returns the latest pass.
1656+
1657+
Returns
1658+
-------
1659+
dict
1660+
LLM analysis data or error message.
1661+
"""
1662+
# If CID provided directly, fetch it
1663+
if cid:
1664+
try:
1665+
analysis = self.r1fs.get_json(cid)
1666+
if analysis is None:
1667+
return {"error": "Analysis not found", "cid": cid}
1668+
return {"cid": cid, "analysis": analysis}
1669+
except Exception as e:
1670+
return {"error": str(e), "cid": cid}
1671+
1672+
# Otherwise, look up by job_id
1673+
if not job_id:
1674+
return {"error": "Either job_id or cid must be provided"}
1675+
1676+
job_specs = self._get_job_from_cstore(job_id)
1677+
if not job_specs:
1678+
return {"error": "Job not found", "job_id": job_id}
1679+
1680+
# Look for analysis in pass_history
1681+
pass_history = job_specs.get("pass_history", [])
1682+
job_status = job_specs.get("job_status", "RUNNING")
1683+
1684+
if not pass_history:
1685+
if job_status == "RUNNING":
1686+
return {"error": "Job still running, no passes completed yet", "job_id": job_id, "job_status": job_status}
1687+
return {"error": "No pass history available for this job", "job_id": job_id, "job_status": job_status}
1688+
1689+
# Find the requested pass (or latest if not specified)
1690+
target_pass = None
1691+
if pass_nr is not None:
1692+
for entry in pass_history:
1693+
if entry.get("pass_nr") == pass_nr:
1694+
target_pass = entry
1695+
break
1696+
if not target_pass:
1697+
return {"error": f"Pass {pass_nr} not found in history", "job_id": job_id, "available_passes": [e.get("pass_nr") for e in pass_history]}
1698+
else:
1699+
# Get the latest pass
1700+
target_pass = pass_history[-1]
1701+
1702+
analysis_cid = target_pass.get("llm_analysis_cid")
1703+
if not analysis_cid:
1704+
return {
1705+
"error": "No LLM analysis available for this pass",
1706+
"job_id": job_id,
1707+
"pass_nr": target_pass.get("pass_nr"),
1708+
"job_status": job_status
1709+
}
1710+
1711+
try:
1712+
analysis = self.r1fs.get_json(analysis_cid)
1713+
if analysis is None:
1714+
return {"error": "Analysis not found in R1FS", "cid": analysis_cid, "job_id": job_id}
1715+
return {
1716+
"job_id": job_id,
1717+
"pass_nr": target_pass.get("pass_nr"),
1718+
"completed_at": target_pass.get("completed_at"),
1719+
"cid": analysis_cid,
1720+
"target": job_specs.get("target"),
1721+
"num_workers": len(job_specs.get("workers", {})),
1722+
"total_passes": len(pass_history),
1723+
"analysis": analysis,
1724+
}
1725+
except Exception as e:
1726+
return {"error": str(e), "cid": analysis_cid, "job_id": job_id}
1727+
1728+
1729+
@BasePlugin.endpoint
1730+
def llm_health(self):
1731+
"""
1732+
Check health of the LLM Agent API connection.
1733+
1734+
Returns
1735+
-------
1736+
dict
1737+
Health status of the LLM Agent API.
1738+
"""
1739+
return self._get_llm_health_status()
1740+
1741+
14901742
def process(self):
14911743
"""
14921744
Periodic task handler: launch new jobs and close completed ones.

0 commit comments

Comments
 (0)