|
| 1 | +# src/tests/backend/handlers/test_runtime_interrupt_kernel.py |
| 2 | + |
| 3 | +import sys |
| 4 | +import os |
| 5 | +import types |
| 6 | +import pytest |
| 7 | +import asyncio |
| 8 | + |
| 9 | +# ─── Stub out semantic_kernel so the module import works ───────────────────────── |
| 10 | +sk = types.ModuleType("semantic_kernel") |
| 11 | +ka = types.ModuleType("semantic_kernel.kernel_arguments") |
| 12 | +kp = types.ModuleType("semantic_kernel.kernel_pydantic") |
| 13 | + |
| 14 | +# Provide classes so subclassing and instantiation work |
| 15 | +class StubKernelBaseModel: |
| 16 | + def __init__(self, **data): |
| 17 | + for k, v in data.items(): setattr(self, k, v) |
| 18 | + |
| 19 | +class StubKernelArguments: |
| 20 | + pass |
| 21 | + |
| 22 | +class StubKernel: |
| 23 | + def __init__(self): |
| 24 | + self.functions = {} |
| 25 | + self.variables = {} |
| 26 | + def add_function(self, func, plugin_name, function_name): |
| 27 | + self.functions[(plugin_name, function_name)] = func |
| 28 | + def set_variable(self, name, value): |
| 29 | + self.variables[name] = value |
| 30 | + def get_variable(self, name, default=None): |
| 31 | + return self.variables.get(name, default) |
| 32 | + |
| 33 | +# Assign stubs to semantic_kernel modules |
| 34 | +sk.Kernel = StubKernel |
| 35 | +ka.KernelArguments = StubKernelArguments |
| 36 | +kp.KernelBaseModel = StubKernelBaseModel |
| 37 | + |
| 38 | +# Install into sys.modules before import |
| 39 | +sys.modules["semantic_kernel"] = sk |
| 40 | +sys.modules["semantic_kernel.kernel_arguments"] = ka |
| 41 | +sys.modules["semantic_kernel.kernel_pydantic"] = kp |
| 42 | +# ──────────────────────────────────────────────────────────────────────────────── |
| 43 | + |
| 44 | +# Ensure <repo root>/src is on sys.path |
| 45 | +THIS_DIR = os.path.dirname(__file__) |
| 46 | +SRC_DIR = os.path.abspath(os.path.join(THIS_DIR, "..", "..", "..")) |
| 47 | +if SRC_DIR not in sys.path: |
| 48 | + sys.path.insert(0, SRC_DIR) |
| 49 | + |
| 50 | +# Now import the module under test |
| 51 | +from backend.handlers.runtime_interrupt_kernel import ( |
| 52 | + GetHumanInputMessage, |
| 53 | + MessageBody, |
| 54 | + GroupChatMessage, |
| 55 | + NeedsUserInputHandler, |
| 56 | + AssistantResponseHandler, |
| 57 | + register_handlers, |
| 58 | + get_handlers, |
| 59 | +) |
| 60 | + |
| 61 | +# ─── Tests ─────────────────────────────────────────────────────────────────── |
| 62 | + |
| 63 | +def test_models_and_str(): |
| 64 | + # GetHumanInputMessage and MessageBody |
| 65 | + gi = GetHumanInputMessage(content="hi") |
| 66 | + assert gi.content == "hi" |
| 67 | + mb = MessageBody(content="body") |
| 68 | + assert mb.content == "body" |
| 69 | + |
| 70 | + # GroupChatMessage with content attr |
| 71 | + class B1: |
| 72 | + def __init__(self, content): |
| 73 | + self.content = content |
| 74 | + g1 = GroupChatMessage(body=B1("c1"), source="S1", session_id="SID", target="T1") |
| 75 | + assert str(g1) == "GroupChatMessage(source=S1, content=c1)" |
| 76 | + |
| 77 | + # GroupChatMessage without content attr |
| 78 | + class B2: |
| 79 | + def __str__(self): return "bodystr" |
| 80 | + g2 = GroupChatMessage(body=B2(), source="S2", session_id="SID2", target="") |
| 81 | + assert "bodystr" in str(g2) |
| 82 | + |
| 83 | +@pytest.mark.asyncio |
| 84 | +async def test_needs_user_handler_all_branches(): |
| 85 | + h = NeedsUserInputHandler() |
| 86 | + # initial |
| 87 | + assert not h.needs_human_input |
| 88 | + assert h.question_content is None |
| 89 | + assert h.get_messages() == [] |
| 90 | + |
| 91 | + # human input message |
| 92 | + human = GetHumanInputMessage(content="ask") |
| 93 | + ret = await h.on_message(human, sender_type="T", sender_key="K") |
| 94 | + assert ret is human |
| 95 | + assert h.needs_human_input |
| 96 | + assert h.question_content == "ask" |
| 97 | + msgs = h.get_messages() |
| 98 | + assert msgs == [{"agent": {"type": "T", "key": "K"}, "content": "ask"}] |
| 99 | + |
| 100 | + # group chat message |
| 101 | + class B: |
| 102 | + content = "grp" |
| 103 | + grp = GroupChatMessage(body=B(), source="A", session_id="SID3", target="") |
| 104 | + ret2 = await h.on_message(grp, sender_type="A", sender_key="B") |
| 105 | + assert ret2 is grp |
| 106 | + # human_input remains |
| 107 | + assert h.needs_human_input |
| 108 | + msgs2 = h.get_messages() |
| 109 | + assert msgs2 == [{"agent": {"type": "A", "key": "B"}, "content": "grp"}] |
| 110 | + |
| 111 | + # dict message branch |
| 112 | + d = {"content": "xyz"} |
| 113 | + ret3 = await h.on_message(d, sender_type="X", sender_key="Y") |
| 114 | + assert isinstance(h.question_for_human, GetHumanInputMessage) |
| 115 | + assert h.question_content == "xyz" |
| 116 | + msgs3 = h.get_messages() |
| 117 | + assert msgs3 == [{"agent": {"type": "X", "key": "Y"}, "content": "xyz"}] |
| 118 | + |
| 119 | +@pytest.mark.asyncio |
| 120 | +async def test_needs_user_handler_unrelated(): |
| 121 | + h = NeedsUserInputHandler() |
| 122 | + class C: pass |
| 123 | + obj = C() |
| 124 | + ret = await h.on_message(obj, sender_type="t", sender_key="k") |
| 125 | + assert ret is obj |
| 126 | + assert not h.needs_human_input |
| 127 | + assert h.get_messages() == [] |
| 128 | + |
| 129 | +@pytest.mark.asyncio |
| 130 | +async def test_assistant_response_handler_various(): |
| 131 | + h = AssistantResponseHandler() |
| 132 | + # no response yet |
| 133 | + assert not h.has_response |
| 134 | + |
| 135 | + # writer branch with content attr |
| 136 | + class Body: |
| 137 | + content = "r1" |
| 138 | + msg = type("M", (), {"body": Body()})() |
| 139 | + out = await h.on_message(msg, sender_type="writer") |
| 140 | + assert out is msg |
| 141 | + assert h.has_response and h.get_response() == "r1" |
| 142 | + |
| 143 | + # editor branch with no content attr |
| 144 | + class Body2: |
| 145 | + def __str__(self): return "s2" |
| 146 | + msg2 = type("M2", (), {"body": Body2()})() |
| 147 | + await h.on_message(msg2, sender_type="editor") |
| 148 | + assert h.get_response() == "s2" |
| 149 | + |
| 150 | + # dict/value branch |
| 151 | + await h.on_message({"value": "v2"}, sender_type="any") |
| 152 | + assert h.get_response() == "v2" |
| 153 | + |
| 154 | + # no-match |
| 155 | + prev = h.assistant_response |
| 156 | + await h.on_message(123, sender_type="writer") |
| 157 | + assert h.assistant_response == prev |
| 158 | + |
| 159 | + |
| 160 | +def test_register_and_get_handlers_flow(): |
| 161 | + k = StubKernel() |
| 162 | + u1, a1 = register_handlers(k, "sess") |
| 163 | + assert ("user_input_handler_sess", "on_message") in k.functions |
| 164 | + assert ("assistant_handler_sess", "on_message") in k.functions |
| 165 | + assert k.get_variable("input_handler_sess") is u1 |
| 166 | + assert k.get_variable("response_handler_sess") is a1 |
| 167 | + |
| 168 | + # get existing |
| 169 | + u2, a2 = get_handlers(k, "sess") |
| 170 | + assert u2 is u1 and a2 is a1 |
| 171 | + |
| 172 | + # new pair when missing |
| 173 | + k2 = StubKernel() |
| 174 | + k2.set_variable("input_handler_new", None) |
| 175 | + k2.set_variable("response_handler_new", None) |
| 176 | + u3, a3 = get_handlers(k2, "new") |
| 177 | + assert isinstance(u3, NeedsUserInputHandler) |
| 178 | + assert isinstance(a3, AssistantResponseHandler) |
0 commit comments