Skip to content

Commit 217dd92

Browse files
authored
tests: cover EngagementScheduler (consent gates, debounce, cooldown, safety filter, ranking)
1 parent 2bd62d4 commit 217dd92

File tree

1 file changed

+172
-0
lines changed

1 file changed

+172
-0
lines changed

tests/test_nudge_scheduler.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
"""
2+
IX-HapticSight — Tests for EngagementScheduler behavior.
3+
4+
Covers:
5+
- Consent gates (YELLOW without consent, GREEN with explicit consent)
6+
- Debounce (suppress identical nudges within window)
7+
- Social cooldown (blocks human-touch nudges, allows object nudges)
8+
- Safety-map filtering (RED targets are ignored)
9+
- Ranking preference (shoulder prioritized on distress)
10+
"""
11+
12+
import os
13+
import sys
14+
from datetime import datetime, timezone, timedelta
15+
16+
# Make `ohip` importable without packaging
17+
sys.path.insert(0, os.path.abspath("src"))
18+
19+
from ohip.nudge_scheduler import ( # noqa: E402
20+
EngagementScheduler,
21+
PolicyProfile,
22+
)
23+
from ohip.schemas import ( # noqa: E402
24+
Pose, Vector3, RPY,
25+
ConsentRecord, ConsentMode, ConsentSource,
26+
SafetyLevel,
27+
)
28+
29+
30+
# Utility poses reused across tests
31+
POSE_SHOULDER = Pose(frame="W", xyz=Vector3(0.42, -0.18, 1.36), rpy=RPY(0.0, 0.0, 1.57))
32+
POSE_TABLE = Pose(frame="W", xyz=Vector3(0.80, 0.10, 0.90), rpy=RPY(0.0, 0.0, 0.0))
33+
34+
35+
def risk_green(_pose: Pose) -> SafetyLevel:
36+
return SafetyLevel.GREEN
37+
38+
39+
def risk_red_if_shoulder(p: Pose) -> SafetyLevel:
40+
# Treat the exact shoulder pose as RED; everything else GREEN.
41+
if (abs(p.xyz.x - POSE_SHOULDER.xyz.x) < 1e-9 and
42+
abs(p.xyz.y - POSE_SHOULDER.xyz.y) < 1e-9 and
43+
abs(p.xyz.z - POSE_SHOULDER.xyz.z) < 1e-9):
44+
return SafetyLevel.RED
45+
return SafetyLevel.GREEN
46+
47+
48+
def make_affordances() -> list[dict]:
49+
return [
50+
{
51+
"name": "shoulder",
52+
"category": "human",
53+
"pose": POSE_SHOULDER.to_dict(),
54+
"utility": 0.90,
55+
"safety_level": "GREEN",
56+
},
57+
{
58+
"name": "flat_surface",
59+
"category": "object",
60+
"pose": POSE_TABLE.to_dict(),
61+
"utility": 0.40,
62+
"safety_level": "GREEN",
63+
},
64+
]
65+
66+
67+
def explicit_consent(subject="person-1") -> ConsentRecord:
68+
return ConsentRecord(
69+
subject_id=subject,
70+
mode=ConsentMode.EXPLICIT,
71+
source=ConsentSource.VERBAL,
72+
scope=["shoulder_contact"],
73+
ttl_s=60,
74+
)
75+
76+
77+
def expired_consent(subject="person-1") -> ConsentRecord:
78+
old = (datetime.now(timezone.utc) - timedelta(seconds=120)).isoformat()
79+
return ConsentRecord(
80+
subject_id=subject,
81+
mode=ConsentMode.EXPLICIT,
82+
source=ConsentSource.VERBAL,
83+
timestamp=old,
84+
scope=["shoulder_contact"],
85+
ttl_s=60, # expired by timestamp
86+
)
87+
88+
89+
def no_consent(subject="person-1") -> ConsentRecord:
90+
return ConsentRecord(
91+
subject_id=subject,
92+
mode=ConsentMode.NONE,
93+
source=ConsentSource.UI,
94+
scope=["shoulder_contact"],
95+
ttl_s=60,
96+
)
97+
98+
99+
def test_yellow_without_consent():
100+
policy = PolicyProfile() # require_explicit_for_social=True by default
101+
sched = EngagementScheduler(policy)
102+
human_state = {"present": True, "distress": 0.9}
103+
104+
nudge = sched.decide(human_state, no_consent(), make_affordances(), risk_green)
105+
assert nudge is not None
106+
assert nudge.level.name == "YELLOW"
107+
assert "Consent required" in nudge.rationale
108+
109+
110+
def test_green_with_explicit_consent():
111+
policy = PolicyProfile()
112+
sched = EngagementScheduler(policy)
113+
human_state = {"present": True, "distress": 0.8}
114+
115+
nudge = sched.decide(human_state, explicit_consent(), make_affordances(), risk_green)
116+
assert nudge is not None
117+
assert nudge.level.name == "GREEN"
118+
# Shoulder should be preferred on distress; rationale reflects that path.
119+
assert "Shoulder support" in nudge.rationale
120+
121+
122+
def test_expired_consent_behaves_as_yellow():
123+
policy = PolicyProfile()
124+
sched = EngagementScheduler(policy)
125+
human_state = {"present": True, "distress": 0.8}
126+
127+
nudge = sched.decide(human_state, expired_consent(), make_affordances(), risk_green)
128+
assert nudge is not None
129+
# Expired → treated as absent under require_explicit_for_social
130+
assert nudge.level.name in ("YELLOW",) # requires verification
131+
assert "Consent required" in nudge.rationale
132+
133+
134+
def test_debounce_blocks_immediate_repeat():
135+
policy = PolicyProfile()
136+
sched = EngagementScheduler(policy)
137+
human_state = {"present": True, "distress": 0.8}
138+
aff = make_affordances()
139+
140+
n1 = sched.decide(human_state, explicit_consent(), aff, risk_green)
141+
n2 = sched.decide(human_state, explicit_consent(), aff, risk_green) # immediate repeat
142+
assert n1 is not None
143+
assert n2 is None # suppressed by debounce window
144+
145+
146+
def test_social_cooldown_blocks_human_allows_object():
147+
policy = PolicyProfile()
148+
sched = EngagementScheduler(policy)
149+
human_state = {"present": True, "distress": 0.8}
150+
aff = make_affordances()
151+
152+
# First: get a shoulder nudge and mark it executed.
153+
n1 = sched.decide(human_state, explicit_consent(), aff, risk_green)
154+
assert n1 is not None and "Shoulder support" in n1.rationale
155+
sched.notify_contact_executed()
156+
157+
# Second: within cooldown, shoulder is blocked; object nudge should pass.
158+
n2 = sched.decide(human_state, explicit_consent(), aff, risk_green)
159+
assert n2 is not None
160+
assert "Object interaction" in n2.rationale # object chosen while shoulder blocked
161+
162+
163+
def test_safety_map_filters_red_targets():
164+
policy = PolicyProfile()
165+
sched = EngagementScheduler(policy)
166+
human_state = {"present": True, "distress": 0.9}
167+
aff = make_affordances()
168+
169+
# Risk map marks shoulder corridor/target RED; scheduler must ignore it and pick object.
170+
nudge = sched.decide(human_state, explicit_consent(), aff, risk_red_if_shoulder)
171+
assert nudge is not None
172+
assert "Object interaction" in nudge.rationale # shoulder filtered out

0 commit comments

Comments
 (0)