|
| 1 | +# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available |
| 2 | +# © Concepts 1996–2026 Miroslav Šotek. All rights reserved. |
| 3 | +# © Code 2020–2026 Miroslav Šotek. All rights reserved. |
| 4 | +# ORCID: 0009-0009-3560-0851 |
| 5 | +# Contact: www.anulum.li | protoscience@anulum.li |
| 6 | +# SCPN Phase Orchestrator — Anesthesia simulation via coupling reduction |
| 7 | +# |
| 8 | +# Propofol-induced unconsciousness IS a Kuramoto phase transition below K_c |
| 9 | +# (R7 research finding, confirmed by Deco's group). |
| 10 | +# Simulate by progressively reducing global coupling K. |
| 11 | +# Measure: does SPO regime FSM detect the transition? |
| 12 | + |
| 13 | +"""Simulate anesthesia as coupling reduction on 80-region brain model. |
| 14 | +
|
| 15 | +The neurolib+SPO pipeline proved R=0.41 (metastable) at K=2.0. |
| 16 | +Reducing K should drive R below critical thresholds. |
| 17 | +SPO's regime FSM should detect: NOMINAL → DEGRADED → CRITICAL. |
| 18 | +
|
| 19 | +Usage: |
| 20 | + python experiments/anesthesia_simulation.py |
| 21 | +""" |
| 22 | + |
| 23 | +from __future__ import annotations |
| 24 | + |
| 25 | +import json |
| 26 | +import time |
| 27 | + |
| 28 | +import numpy as np |
| 29 | +from scipy.signal import hilbert |
| 30 | + |
| 31 | +from neurolib.models.aln import ALNModel |
| 32 | +from neurolib.utils.loadData import Dataset |
| 33 | + |
| 34 | +from scpn_phase_orchestrator.monitor.boundaries import BoundaryState |
| 35 | +from scpn_phase_orchestrator.supervisor.events import EventBus |
| 36 | +from scpn_phase_orchestrator.supervisor.regimes import Regime, RegimeManager |
| 37 | +from scpn_phase_orchestrator.upde.metrics import LayerState, UPDEState |
| 38 | +from scpn_phase_orchestrator.upde.order_params import compute_order_parameter |
| 39 | + |
| 40 | + |
| 41 | +def run_at_coupling(ds, K: float, duration_ms: int = 10000) -> dict: |
| 42 | + """Run ALN at given coupling, return SPO regime analysis.""" |
| 43 | + model = ALNModel(Cmat=ds.Cmat, Dmat=ds.Dmat) |
| 44 | + model.params["duration"] = duration_ms |
| 45 | + model.params["Ke_gl"] = K |
| 46 | + |
| 47 | + t0 = time.perf_counter() |
| 48 | + model.run() |
| 49 | + elapsed = time.perf_counter() - t0 |
| 50 | + |
| 51 | + rates = model.rates_exc |
| 52 | + if rates is None or rates.shape[1] < 100: |
| 53 | + return {"K": K, "error": "rates too short"} |
| 54 | + |
| 55 | + # Extract phases via Hilbert on rate envelope |
| 56 | + n_regions = rates.shape[0] |
| 57 | + n_t = rates.shape[1] |
| 58 | + phases = np.zeros((n_regions, n_t)) |
| 59 | + for i in range(n_regions): |
| 60 | + analytic = hilbert(rates[i]) |
| 61 | + phases[i] = np.angle(analytic) % (2 * np.pi) |
| 62 | + |
| 63 | + # SPO regime analysis on 100-step windows |
| 64 | + ds_factor = 100 |
| 65 | + n_windows = n_t // ds_factor |
| 66 | + |
| 67 | + event_bus = EventBus() |
| 68 | + rm = RegimeManager(event_bus=event_bus) |
| 69 | + |
| 70 | + R_vals = [] |
| 71 | + regimes = [] |
| 72 | + for w in range(n_windows): |
| 73 | + phase_snap = phases[:, (w + 1) * ds_factor - 1] |
| 74 | + R, psi = compute_order_parameter(phase_snap) |
| 75 | + R_vals.append(float(R)) |
| 76 | + |
| 77 | + upde = UPDEState( |
| 78 | + layers=[LayerState(R=R, psi=psi)], |
| 79 | + cross_layer_alignment=np.eye(1), |
| 80 | + stability_proxy=R, |
| 81 | + regime_id=rm.current_regime.value, |
| 82 | + ) |
| 83 | + proposed = rm.evaluate(upde, BoundaryState()) |
| 84 | + rm.transition(proposed) |
| 85 | + regimes.append(rm.current_regime.value) |
| 86 | + |
| 87 | + from collections import Counter |
| 88 | + regime_counts = dict(Counter(regimes)) |
| 89 | + |
| 90 | + return { |
| 91 | + "K": K, |
| 92 | + "R_mean": round(float(np.mean(R_vals)), 4), |
| 93 | + "R_std": round(float(np.std(R_vals)), 4), |
| 94 | + "R_min": round(float(min(R_vals)), 4), |
| 95 | + "R_max": round(float(max(R_vals)), 4), |
| 96 | + "regime_counts": regime_counts, |
| 97 | + "n_transitions": len(rm.transition_history), |
| 98 | + "wall_time_s": round(elapsed, 2), |
| 99 | + "n_windows": n_windows, |
| 100 | + } |
| 101 | + |
| 102 | + |
| 103 | +def main() -> None: |
| 104 | + print("Loading HCP data...") |
| 105 | + ds_data = Dataset("hcp") |
| 106 | + |
| 107 | + # Simulate anesthesia: K from 5.0 (awake) down to 0.1 (deep anesthesia) |
| 108 | + K_values = [5.0, 3.0, 2.0, 1.5, 1.0, 0.5, 0.2, 0.1] |
| 109 | + |
| 110 | + print(f"\nSimulating anesthesia via coupling reduction...") |
| 111 | + print(f"{'K':>5} {'R_mean':>8} {'R_min':>8} {'regime':>30} {'transitions':>12}") |
| 112 | + print("-" * 70) |
| 113 | + |
| 114 | + results = [] |
| 115 | + for K in K_values: |
| 116 | + r = run_at_coupling(ds_data, K, duration_ms=15000) |
| 117 | + results.append(r) |
| 118 | + regime_str = str(r.get("regime_counts", {})) |
| 119 | + print( |
| 120 | + f"{K:>5.1f} {r.get('R_mean',0):>8.4f} {r.get('R_min',0):>8.4f} " |
| 121 | + f"{regime_str:>30} {r.get('n_transitions',0):>12}" |
| 122 | + ) |
| 123 | + |
| 124 | + print(f"\n{'='*70}") |
| 125 | + print(f"Anesthesia Simulation Summary") |
| 126 | + print(f"{'='*70}") |
| 127 | + |
| 128 | + # Find transition point |
| 129 | + for i in range(len(results) - 1): |
| 130 | + r1 = results[i] |
| 131 | + r2 = results[i + 1] |
| 132 | + if r1.get("R_mean", 0) > 0.3 and r2.get("R_mean", 0) < 0.3: |
| 133 | + print( |
| 134 | + f"Consciousness transition between K={r1['K']} " |
| 135 | + f"(R={r1.get('R_mean',0):.4f}) and K={r2['K']} " |
| 136 | + f"(R={r2.get('R_mean',0):.4f})" |
| 137 | + ) |
| 138 | + break |
| 139 | + else: |
| 140 | + print("No clear consciousness transition detected in K range") |
| 141 | + |
| 142 | + with open("experiments/anesthesia_results.json", "w") as f: |
| 143 | + json.dump(results, f, indent=2) |
| 144 | + print(f"\nSaved to experiments/anesthesia_results.json") |
| 145 | + |
| 146 | + |
| 147 | +if __name__ == "__main__": |
| 148 | + main() |
0 commit comments