|
| 1 | +""" |
| 2 | +IX-HapticSight — Quickstart Demo (1 minute) |
| 3 | +
|
| 4 | +Loads the basic sim scene, runs: |
| 5 | + Scene JSON → EngagementScheduler (nudge) |
| 6 | + → ContactPlanner (ContactPlan) |
| 7 | + → SafetyGate (dual-channel OK?) |
| 8 | +
|
| 9 | +Output: a short, factual log proving the stack is wired correctly. |
| 10 | +
|
| 11 | +Usage: |
| 12 | + python examples/quickstart.py |
| 13 | + python examples/quickstart.py --scene sim/scenes/basic_room.json --verbose |
| 14 | +""" |
| 15 | + |
| 16 | +from __future__ import annotations |
| 17 | + |
| 18 | +import argparse |
| 19 | +import json |
| 20 | +import sys |
| 21 | +from pathlib import Path |
| 22 | +from typing import Dict, Any, List, Tuple |
| 23 | + |
| 24 | +try: |
| 25 | + import yaml # type: ignore |
| 26 | +except Exception as e: # pragma: no cover |
| 27 | + print("Missing dependency: pyyaml. Install with `pip install pyyaml`.", file=sys.stderr) |
| 28 | + raise |
| 29 | + |
| 30 | +# Make 'ohip' importable from repo root |
| 31 | +ROOT = Path(__file__).resolve().parents[1] |
| 32 | +sys.path.insert(0, str(ROOT / "src")) |
| 33 | + |
| 34 | +from ohip import ( # noqa: E402 |
| 35 | + # data + utils |
| 36 | + Pose, Vector3, RPY, SafetyLevel, |
| 37 | + ConsentRecord, ConsentMode, ConsentSource, |
| 38 | + # engine |
| 39 | + EngagementScheduler, PolicyProfile, |
| 40 | + ContactPlanner, PlannerHints, |
| 41 | + SafetyGate, |
| 42 | +) |
| 43 | + |
| 44 | +SCENE_DEFAULT = ROOT / "sim" / "scenes" / "basic_room.json" |
| 45 | +FORCE_LIMITS = ROOT / "configs" / "force_limits.yaml" |
| 46 | + |
| 47 | + |
| 48 | +def load_scene(path: Path) -> Dict[str, Any]: |
| 49 | + data = json.loads(Path(path).read_text()) |
| 50 | + return data |
| 51 | + |
| 52 | + |
| 53 | +def load_envelopes(path: Path) -> Dict[str, Any]: |
| 54 | + with open(path, "r", encoding="utf-8") as f: |
| 55 | + return yaml.safe_load(f) |
| 56 | + |
| 57 | + |
| 58 | +def aabb_contains(aabb_min: List[float], aabb_max: List[float], p: Tuple[float, float, float]) -> bool: |
| 59 | + return all(aabb_min[i] <= p[i] <= aabb_max[i] for i in range(3)) |
| 60 | + |
| 61 | + |
| 62 | +def risk_query_from_scene(scene: Dict[str, Any]): |
| 63 | + """Return a callable pose->SafetyLevel derived from scene.safety_regions.""" |
| 64 | + regions = scene.get("safety_regions", []) or [] |
| 65 | + |
| 66 | + def risk(pose: Pose) -> SafetyLevel: |
| 67 | + x, y, z = pose.xyz.x, pose.xyz.y, pose.xyz.z |
| 68 | + point = (x, y, z) |
| 69 | + level = SafetyLevel.GREEN |
| 70 | + # Prefer the most severe label among matches |
| 71 | + for r in regions: |
| 72 | + mn = r.get("aabb_min_W") |
| 73 | + mx = r.get("aabb_max_W") |
| 74 | + if not (mn and mx): |
| 75 | + continue |
| 76 | + if aabb_contains(mn, mx, point): |
| 77 | + lv = r.get("level", "GREEN").upper() |
| 78 | + if lv == "RED": |
| 79 | + return SafetyLevel.RED |
| 80 | + if lv == "YELLOW": |
| 81 | + level = SafetyLevel.YELLOW |
| 82 | + return level |
| 83 | + |
| 84 | + return risk |
| 85 | + |
| 86 | + |
| 87 | +def make_consent_from_scene(scene: Dict[str, Any]) -> ConsentRecord: |
| 88 | + h = scene.get("human", {}) |
| 89 | + c = h.get("consent", {}) or {} |
| 90 | + mode = str(c.get("mode", "none")).lower() |
| 91 | + if mode == "explicit": |
| 92 | + cmode = ConsentMode.EXPLICIT |
| 93 | + csource = ConsentSource(str(c.get("source", "ui"))) |
| 94 | + elif mode == "policy": |
| 95 | + cmode = ConsentMode.POLICY |
| 96 | + csource = ConsentSource.PROFILE |
| 97 | + else: |
| 98 | + cmode = ConsentMode.NONE |
| 99 | + csource = ConsentSource.UI |
| 100 | + scope = [s.lower() for s in c.get("scope", [])] |
| 101 | + ttl = int(c.get("ttl_s", 60)) |
| 102 | + return ConsentRecord(subject_id=h.get("id", "anon"), mode=cmode, source=csource, scope=scope, ttl_s=ttl) |
| 103 | + |
| 104 | + |
| 105 | +def make_affordances_from_scene(scene: Dict[str, Any]) -> List[Dict[str, Any]]: |
| 106 | + aff = scene.get("affordances", []) or [] |
| 107 | + # Ensure pose dict matches schemas.Pose.to_dict() expectations |
| 108 | + for a in aff: |
| 109 | + pose = a.get("pose") |
| 110 | + if pose and isinstance(pose.get("xyz"), list) and isinstance(pose.get("rpy"), list): |
| 111 | + # OK |
| 112 | + pass |
| 113 | + return aff |
| 114 | + |
| 115 | + |
| 116 | +def to_pose(d: Dict[str, Any]) -> Pose: |
| 117 | + return Pose( |
| 118 | + frame=str(d.get("frame", "W")), |
| 119 | + xyz=Vector3(*[float(v) for v in d["xyz"]]), |
| 120 | + rpy=RPY(*[float(v) for v in d["rpy"]]), |
| 121 | + ) |
| 122 | + |
| 123 | + |
| 124 | +def main() -> int: |
| 125 | + ap = argparse.ArgumentParser() |
| 126 | + ap.add_argument("--scene", type=Path, default=SCENE_DEFAULT, help="Path to scene JSON") |
| 127 | + ap.add_argument("--envelopes", type=Path, default=FORCE_LIMITS, help="Path to force_limits.yaml") |
| 128 | + ap.add_argument("--verbose", action="store_true") |
| 129 | + args = ap.parse_args() |
| 130 | + |
| 131 | + scene = load_scene(args.scene) |
| 132 | + envelopes = load_envelopes(args.envelopes) |
| 133 | + risk_query = risk_query_from_scene(scene) |
| 134 | + |
| 135 | + # Human state & consent |
| 136 | + human = scene.get("human", {}) |
| 137 | + human_state = { |
| 138 | + "present": bool(human.get("present", True)), |
| 139 | + "distress": float(human.get("affect", {}).get("distress", 0.0)), |
| 140 | + } |
| 141 | + consent = make_consent_from_scene(scene) |
| 142 | + |
| 143 | + # Affordances (targets) |
| 144 | + affordances = make_affordances_from_scene(scene) |
| 145 | + |
| 146 | + # Run scheduler |
| 147 | + policy = PolicyProfile() |
| 148 | + sched = EngagementScheduler(policy=policy) |
| 149 | + nudge = sched.decide(human_state, consent, affordances, risk_query) |
| 150 | + |
| 151 | + if args.verbose: |
| 152 | + print("Scene:", args.scene) |
| 153 | + print("Human state:", human_state) |
| 154 | + print("Consent:", consent.to_dict()) |
| 155 | + |
| 156 | + if nudge is None: |
| 157 | + print("No nudge emitted (nothing safe/appropriate).") |
| 158 | + return 0 |
| 159 | + |
| 160 | + print("NUDGE:", nudge.to_dict()) |
| 161 | + |
| 162 | + # Plan contact |
| 163 | + planner = ContactPlanner(envelopes) |
| 164 | + plan = planner.plan(nudge, consent, profile_name=envelopes.get("defaults", {}).get("social_touch_profile")) |
| 165 | + if plan is None: |
| 166 | + print("Planner returned no plan.") |
| 167 | + return 0 |
| 168 | + |
| 169 | + print("PLAN:", plan.to_dict()) |
| 170 | + |
| 171 | + # Safety gate (dual-channel OK?) |
| 172 | + gate = SafetyGate(envelopes) |
| 173 | + ok = gate.dual_channel_ok(plan, risk_query, start_pose=None) |
| 174 | + print("SAFETY_OK:", ok) |
| 175 | + if not ok: |
| 176 | + print("Reason (if latched):", gate.last_reason()) |
| 177 | + |
| 178 | + return 0 |
| 179 | + |
| 180 | + |
| 181 | +if __name__ == "__main__": |
| 182 | + raise SystemExit(main()) |
0 commit comments