Skip to content

Commit 7443cb7

Browse files
committed
feat: add continuous monitoring
1 parent 5e18991 commit 7443cb7

File tree

1 file changed

+249
-10
lines changed

1 file changed

+249
-10
lines changed

extensions/business/cybersec/red_mesh/pentester_api_01.py

Lines changed: 249 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from naeural_core.business.default.web_app.fast_api_web_app import FastApiWebAppPlugin as BasePlugin
3636
from .redmesh_utils import PentestLocalWorker # Import PentestJob from separate module
3737

38-
__VER__ = '0.8.1' # updated version
38+
__VER__ = '0.8.2'
3939

4040
_CONFIG = {
4141
**BasePlugin.CONFIG,
@@ -55,6 +55,11 @@
5555
"PORT_ORDER": "SHUFFLE", # "SHUFFLE" or "SEQUENTIAL"
5656
"EXCLUDED_FEATURES": [],
5757

58+
# Run mode: SINGLEPASS (default) or CONTINUOUS_MONITORING
59+
"RUN_MODE": "SINGLEPASS",
60+
"MONITOR_INTERVAL": 60, # seconds between passes in continuous mode
61+
"MONITOR_JITTER": 5, # random jitter to avoid simultaneous CStore writes
62+
5863
'VALIDATION_RULES': {
5964
**BasePlugin.CONFIG['VALIDATION_RULES'],
6065
},
@@ -438,9 +443,20 @@ def _maybe_launch_jobs(self, nr_local_workers=None):
438443
current_worker_finished = worker_entry.get("finished", False)
439444
if current_worker_finished:
440445
continue
441-
# If job not already running and not completed, start a new thread
446+
447+
# Check if this is a continuous monitoring job where our worker was reset
448+
# (launcher reset our finished flag for next pass) - clear local tracking
442449
closed_target = job_id in self.completed_jobs_reports
443-
in_progress_target = job_id in self.scan_jobs
450+
if closed_target and not current_worker_finished:
451+
# Our worker entry was reset by launcher for next pass - clear local state
452+
self.P(f"Detected worker reset for job {job_id}, clearing local tracking for next pass")
453+
self.completed_jobs_reports.pop(job_id, None)
454+
if job_id in self.lst_completed_jobs:
455+
self.lst_completed_jobs.remove(job_id)
456+
closed_target = False
457+
458+
# If job not already running and not completed, start a new thread
459+
in_progress_target = job_id in self.scan_jobs
444460
if not in_progress_target and not closed_target:
445461
launcher = job_specs.get("launcher")
446462
launcher_alias = job_specs.get("launcher_alias")
@@ -552,6 +568,9 @@ def _close_job(self, job_id, canceled=False):
552568
"""
553569
Close a local job, aggregate reports, and persist in CStore.
554570
571+
Reports are saved to R1FS (IPFS) and only the CID is stored in CStore
572+
to avoid bloating the distributed state.
573+
555574
Parameters
556575
----------
557576
job_id : str
@@ -579,9 +598,32 @@ def _close_job(self, job_id, canceled=False):
579598
closing = "Forced" if canceled else "Post finish"
580599
worker_entry = job_specs.setdefault("workers", {}).setdefault(self.ee_addr, {})
581600
worker_entry["finished"] = True
582-
worker_entry["result"] = report
583601
worker_entry["canceled"] = canceled
584-
job_specs["workers"][self.ee_addr] = worker_entry
602+
603+
# Save full report to R1FS and store only CID in CStore
604+
try:
605+
report_cid = self.r1fs.add_json(report, show_logs=False)
606+
if report_cid:
607+
worker_entry["report_cid"] = report_cid
608+
worker_entry["result"] = None # No blob in CStore
609+
self.P(f"Report saved to R1FS with CID: {report_cid}")
610+
else:
611+
# Fallback: store report directly if R1FS fails
612+
self.P("R1FS add_json returned None, storing report directly in CStore", color='y')
613+
worker_entry["report_cid"] = None
614+
worker_entry["result"] = report
615+
except Exception as e:
616+
# Fallback: store report directly if R1FS fails
617+
self.P(f"Failed to save report to R1FS: {e}. Storing directly in CStore", color='r')
618+
worker_entry["report_cid"] = None
619+
worker_entry["result"] = report
620+
621+
# Re-read job_specs to avoid overwriting concurrent updates (e.g., pass_history)
622+
fresh_job_specs = self.chainstore_hget(hkey=self.cfg_instance_id, key=job_id)
623+
if fresh_job_specs and isinstance(fresh_job_specs, dict):
624+
fresh_job_specs["workers"][self.ee_addr] = worker_entry
625+
job_specs = fresh_job_specs
626+
585627
self.P("{} closing job_id {}:\n{}".format(
586628
closing,
587629
job_id,
@@ -641,6 +683,104 @@ def _maybe_close_jobs(self):
641683
return
642684

643685

686+
def _maybe_schedule_next_pass(self):
687+
"""
688+
Launcher orchestrates continuous monitoring passes.
689+
690+
For CONTINUOUS_MONITORING jobs, this method:
691+
1. Detects when all workers have finished the current pass
692+
2. Records pass completion in history and schedules the next pass
693+
3. Resets all workers when it's time to start the next pass
694+
695+
Only the launcher node executes scheduling logic.
696+
697+
Returns
698+
-------
699+
None
700+
"""
701+
all_jobs = self.chainstore_hgetall(hkey=self.cfg_instance_id)
702+
703+
for job_key, job_specs in all_jobs.items():
704+
normalized_key, job_specs = self._normalize_job_record(job_key, job_specs)
705+
if normalized_key is None:
706+
continue
707+
708+
# Only handle continuous monitoring jobs that are still running
709+
run_mode = job_specs.get("run_mode", "SINGLEPASS")
710+
if run_mode != "CONTINUOUS_MONITORING":
711+
continue
712+
monitoring_status = job_specs.get("monitoring_status", "RUNNING")
713+
if monitoring_status == "STOPPED":
714+
continue
715+
716+
# Only launcher manages scheduling
717+
is_launcher = job_specs.get("launcher") == self.ee_addr
718+
if not is_launcher:
719+
continue
720+
721+
workers = job_specs.get("workers", {})
722+
if not workers:
723+
continue
724+
725+
all_finished = all(w.get("finished") for w in workers.values())
726+
next_pass_at = job_specs.get("next_pass_at")
727+
job_pass = job_specs.get("job_pass", 1)
728+
job_id = job_specs.get("job_id")
729+
730+
if all_finished and next_pass_at is None:
731+
# ═══════════════════════════════════════════════════
732+
# STATE: All peers completed current pass
733+
# ═══════════════════════════════════════════════════
734+
pass_history = job_specs.setdefault("pass_history", [])
735+
pass_history.append({
736+
"pass_nr": job_pass,
737+
"completed_at": self.time(),
738+
"reports": {addr: w.get("report_cid") for addr, w in workers.items()}
739+
})
740+
741+
# Check if soft stop was scheduled
742+
if monitoring_status == "SCHEDULED_FOR_STOP":
743+
job_specs["monitoring_status"] = "STOPPED"
744+
self.P(f"[CONTINUOUS] Pass {job_pass} complete for job {job_id}. Stopping (soft stop was scheduled)")
745+
self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=job_specs)
746+
continue
747+
748+
# Schedule next pass
749+
interval = job_specs.get("monitor_interval", self.cfg_monitor_interval)
750+
jitter = random.uniform(0, self.cfg_monitor_jitter)
751+
job_specs["next_pass_at"] = self.time() + interval + jitter
752+
753+
self.P(f"[CONTINUOUS] Pass {job_pass} complete for job {job_id}. Next pass in {interval}s (+{jitter:.1f}s jitter)")
754+
self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=job_specs)
755+
756+
# Clear from completed_jobs_reports to allow relaunch
757+
self.completed_jobs_reports.pop(job_id, None)
758+
if job_id in self.lst_completed_jobs:
759+
self.lst_completed_jobs.remove(job_id)
760+
761+
elif all_finished and next_pass_at and self.time() >= next_pass_at:
762+
# ═══════════════════════════════════════════════════
763+
# STATE: Interval elapsed, start next pass
764+
# ═══════════════════════════════════════════════════
765+
job_specs["job_pass"] = job_pass + 1
766+
job_specs["next_pass_at"] = None
767+
768+
for addr in workers:
769+
workers[addr]["finished"] = False
770+
workers[addr]["result"] = None
771+
workers[addr]["report_cid"] = None
772+
773+
self.P(f"[CONTINUOUS] Starting pass {job_pass + 1} for job {job_id}", boxed=True)
774+
self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=job_specs)
775+
776+
# Clear local tracking to allow relaunch
777+
self.completed_jobs_reports.pop(job_id, None)
778+
if job_id in self.lst_completed_jobs:
779+
self.lst_completed_jobs.remove(job_id)
780+
#end for each job
781+
return
782+
783+
644784
def _get_all_network_jobs(self):
645785
"""
646786
Retrieve all jobs tracked in CStore for this instance.
@@ -759,18 +899,20 @@ def list_features(self):
759899

760900
@BasePlugin.endpoint(method="post")
761901
def launch_test(
762-
self,
763-
target: str = "",
764-
start_port: int = 1, end_port: int = 65535,
902+
self,
903+
target: str = "",
904+
start_port: int = 1, end_port: int = 65535,
765905
exceptions: str = "64297",
766906
distribution_strategy: str = "",
767907
port_order: str = "",
768908
excluded_features: list[str] = None,
909+
run_mode: str = "",
910+
monitor_interval: int = 0,
769911
):
770912
"""
771913
Start a pentest on the specified target.
772914
773-
Announces the job to the network via CStore; actual executtarget:ion is handled
915+
Announces the job to the network via CStore; actual execution is handled
774916
asynchronously by worker threads.
775917
776918
Parameters
@@ -790,6 +932,11 @@ def launch_test(
790932
"SHUFFLE" to randomize port order; "SEQUENTIAL" for ordered scan.
791933
excluded_features: list[str], optional
792934
List of feature names to exclude from scanning.
935+
run_mode: str, optional
936+
"SINGLEPASS" (default) for one-time scan; "CONTINUOUS_MONITORING" for
937+
repeated scans at monitor_interval.
938+
monitor_interval: int, optional
939+
Seconds between passes in CONTINUOUS_MONITORING mode (0 = use config).
793940
794941
Returns
795942
-------
@@ -842,6 +989,13 @@ def launch_test(
842989
if not port_order or port_order not in ["SHUFFLE", "SEQUENTIAL"]:
843990
port_order = self.cfg_port_order
844991

992+
# Validate run_mode and monitor_interval
993+
run_mode = str(run_mode).upper()
994+
if not run_mode or run_mode not in ["SINGLEPASS", "CONTINUOUS_MONITORING"]:
995+
run_mode = self.cfg_run_mode
996+
if monitor_interval <= 0:
997+
monitor_interval = self.cfg_monitor_interval
998+
845999
chainstore_peers = self.cfg_chainstore_peers
8461000
num_workers = len(chainstore_peers)
8471001

@@ -901,6 +1055,13 @@ def launch_test(
9011055
"port_order": port_order,
9021056
"excluded_features": excluded_features,
9031057
"enabled_features": enabled_features,
1058+
# Continuous monitoring fields
1059+
"run_mode": run_mode,
1060+
"monitor_interval": monitor_interval,
1061+
"monitoring_status": "RUNNING", # RUNNING | SCHEDULED_FOR_STOP | STOPPED
1062+
"job_pass": 1,
1063+
"next_pass_at": None,
1064+
"pass_history": [],
9041065
}
9051066
self.chainstore_hset(
9061067
hkey=self.cfg_instance_id,
@@ -1017,6 +1178,82 @@ def stop_and_delete_job(self, job_id : str):
10171178
return {"status": "success", "job_id": job_id}
10181179

10191180

1181+
@BasePlugin.endpoint
1182+
def get_report(self, cid: str):
1183+
"""
1184+
Retrieve a full report from R1FS by CID.
1185+
1186+
Parameters
1187+
----------
1188+
cid : str
1189+
Content identifier of the report stored in R1FS.
1190+
1191+
Returns
1192+
-------
1193+
dict
1194+
The full report data or error message.
1195+
"""
1196+
if not cid:
1197+
return {"error": "No CID provided"}
1198+
try:
1199+
report = self.r1fs.get_json(cid)
1200+
if report is None:
1201+
return {"error": "Report not found", "cid": cid}
1202+
return {"cid": cid, "report": report}
1203+
except Exception as e:
1204+
self.P(f"Failed to retrieve report from R1FS: {e}", color='r')
1205+
return {"error": str(e), "cid": cid}
1206+
1207+
1208+
@BasePlugin.endpoint(method="post")
1209+
def stop_monitoring(self, job_id: str, stop_type: str = "SOFT"):
1210+
"""
1211+
Stop continuous monitoring for a job.
1212+
1213+
Parameters
1214+
----------
1215+
job_id : str
1216+
Identifier of the job to stop monitoring.
1217+
stop_type : str, optional
1218+
"SOFT" (default): Let current pass complete, then stop.
1219+
Sets monitoring_status="SCHEDULED_FOR_STOP".
1220+
"HARD": Stop immediately. Sets monitoring_status="STOPPED".
1221+
1222+
Returns
1223+
-------
1224+
dict
1225+
Status including job_id and passes completed.
1226+
"""
1227+
raw_job_specs = self.chainstore_hget(hkey=self.cfg_instance_id, key=job_id)
1228+
if not raw_job_specs:
1229+
return {"error": "Job not found", "job_id": job_id}
1230+
1231+
_, job_specs = self._normalize_job_record(job_id, raw_job_specs)
1232+
if job_specs.get("run_mode") != "CONTINUOUS_MONITORING":
1233+
return {"error": "Job is not in CONTINUOUS_MONITORING mode", "job_id": job_id}
1234+
1235+
stop_type = str(stop_type).upper()
1236+
passes_completed = job_specs.get("job_pass", 1)
1237+
1238+
if stop_type == "HARD":
1239+
job_specs["monitoring_status"] = "STOPPED"
1240+
self.P(f"[CONTINUOUS] Hard stop for job {job_id} after {passes_completed} passes")
1241+
else:
1242+
# SOFT stop - let current pass complete
1243+
job_specs["monitoring_status"] = "SCHEDULED_FOR_STOP"
1244+
self.P(f"[CONTINUOUS] Soft stop scheduled for job {job_id} (will stop after current pass)")
1245+
1246+
self.chainstore_hset(hkey=self.cfg_instance_id, key=job_id, value=job_specs)
1247+
1248+
return {
1249+
"monitoring_status": job_specs["monitoring_status"],
1250+
"stop_type": stop_type,
1251+
"job_id": job_id,
1252+
"passes_completed": passes_completed,
1253+
"pass_history": job_specs.get("pass_history", []),
1254+
}
1255+
1256+
10201257
def process(self):
10211258
"""
10221259
Periodic task handler: launch new jobs and close completed ones.
@@ -1032,9 +1269,11 @@ def process(self):
10321269
return
10331270
elif not self.__warmup_done:
10341271
self.__post_init()
1035-
#endif
1272+
#endif
10361273
# Launch any new jobs
10371274
self._maybe_launch_jobs()
10381275
# Check active jobs for completion
10391276
self._maybe_close_jobs()
1277+
# Handle continuous monitoring scheduling (launcher only)
1278+
self._maybe_schedule_next_pass()
10401279
return

0 commit comments

Comments
 (0)