Skip to content

Commit cfe0838

Browse files
authored
Merge pull request #38 from XSpoonAi/fix/agent_loop
Fix: reset per-request think wrapper to stop stale task prompts leaki…
2 parents 7f39eea + 6b917fd commit cfe0838

File tree

2 files changed

+123
-4
lines changed

2 files changed

+123
-4
lines changed

spoon_bot/agent/loop.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,7 @@ async def initialize(self) -> None:
477477
# and can cause the model to summarize instead of continue).
478478
self._agent.next_step_prompt = self.DEFAULT_NEXT_STEP_PROMPT
479479
self._agent._custom_next_step_prompt = True
480+
self._agent._spoon_bot_base_think = self._agent.think
480481

481482
# Build dynamic-tools prompt now that all tools (native + skill) are registered
482483
inactive_tools = self.tools.get_inactive_tools()
@@ -1036,6 +1037,7 @@ async def process(
10361037
raise
10371038
finally:
10381039
# Always ensure agent is back in IDLE state after processing
1040+
self._restore_agent_think()
10391041
if hasattr(self._agent, 'state') and self._agent.state != AgentState.IDLE:
10401042
logger.warning(
10411043
f"Post-run cleanup: resetting agent from {self._agent.state} to IDLE"
@@ -1309,7 +1311,12 @@ def _install_anti_loop_tracker(self, base_prompt: str) -> None:
13091311
return
13101312

13111313
agent_loop = self
1312-
original_think = agent.think
1314+
original_think = getattr(agent, "_spoon_bot_base_think", None)
1315+
if original_think is None:
1316+
original_think = agent.think
1317+
setattr(agent, "_spoon_bot_base_think", original_think)
1318+
else:
1319+
agent.think = original_think
13131320
call_tracker: Counter = Counter()
13141321
detail_tracker: Counter = Counter()
13151322
read_files: set = set()
@@ -1488,6 +1495,15 @@ def _log_tool_calls():
14881495

14891496
agent.think = _tracked_think
14901497

1498+
def _restore_agent_think(self) -> None:
1499+
"""Restore the agent's base think() implementation after a request."""
1500+
agent = self._agent
1501+
if agent is None:
1502+
return
1503+
original_think = getattr(agent, "_spoon_bot_base_think", None)
1504+
if original_think is not None:
1505+
agent.think = original_think
1506+
14911507
def _filter_execution_steps(self, content: str) -> str:
14921508
"""
14931509
Filter out technical execution steps from agent output.
@@ -1578,6 +1594,9 @@ async def stream(
15781594
logger.warning(f"Failed to load memory context: {e}")
15791595

15801596
full_content = ""
1597+
stream_completed = False
1598+
stream_cancelled = False
1599+
bg_task: asyncio.Task[None] | None = None
15811600

15821601
# Trim and inject persisted history into runtime memory
15831602
await self._prepare_request_context()
@@ -1658,8 +1677,9 @@ async def _run_and_signal() -> None:
16581677
except asyncio.TimeoutError:
16591678
continue
16601679
except asyncio.CancelledError:
1661-
logger.warning("Streaming cancelled")
1662-
break
1680+
stream_cancelled = True
1681+
logger.warning("Streaming cancelled while waiting for output")
1682+
raise
16631683
except Exception as e:
16641684
logger.warning(f"Queue get error: {type(e).__name__}: {e}")
16651685
continue
@@ -1759,15 +1779,29 @@ async def _run_and_signal() -> None:
17591779
}
17601780

17611781
# Emit done
1782+
stream_completed = True
17621783
yield {"type": "done", "delta": "", "metadata": {"content": full_content}}
17631784

1785+
except asyncio.CancelledError:
1786+
stream_cancelled = True
1787+
logger.warning("Streaming cancelled")
1788+
raise
17641789
except Exception as e:
17651790
logger.error(f"Streaming error: {e}")
1791+
stream_completed = True
17661792
yield {"type": "error", "delta": str(e), "metadata": {"error": str(e)}}
17671793
yield {"type": "done", "delta": "", "metadata": {"error": str(e)}}
1794+
finally:
1795+
self._restore_agent_think()
1796+
if bg_task is not None and not bg_task.done():
1797+
bg_task.cancel()
1798+
try:
1799+
await asyncio.wait_for(bg_task, timeout=5.0)
1800+
except (asyncio.CancelledError, asyncio.TimeoutError, Exception):
1801+
pass
17681802

17691803
# Save to session only if we got actual content
1770-
if full_content:
1804+
if full_content and stream_completed and not stream_cancelled:
17711805
try:
17721806
self._session.add_message("user", message)
17731807
self._session.add_message("assistant", full_content)
@@ -1834,6 +1868,8 @@ async def process_with_thinking(
18341868
except Exception as e:
18351869
logger.error(f"Agent processing error: {e}")
18361870
raise
1871+
finally:
1872+
self._restore_agent_think()
18371873

18381874
# Save to session
18391875
try:

tests/test_streaming_thinking.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,89 @@ async def mock_stream(message, **kwargs):
576576
agent._session.add_message.assert_any_call("assistant", "hello")
577577
agent.sessions.save.assert_called_once()
578578

579+
@pytest.mark.asyncio
580+
async def test_stream_close_cancels_background_run_and_skips_session_save(self):
581+
"""Closing the stream should stop the background run and avoid persisting stale output."""
582+
from spoon_bot.agent.loop import AgentLoop
583+
584+
run_cancelled = asyncio.Event()
585+
586+
async def mock_run(request):
587+
await agent._agent.output_queue.put({"content": "hello"})
588+
try:
589+
await asyncio.Future()
590+
except asyncio.CancelledError:
591+
run_cancelled.set()
592+
raise
593+
594+
agent = MagicMock(spec=AgentLoop)
595+
agent._initialized = True
596+
agent._agent = MagicMock()
597+
agent._agent.output_queue = asyncio.Queue()
598+
agent._agent.task_done = asyncio.Event()
599+
agent._agent.run = mock_run
600+
agent._agent.state = "idle"
601+
agent._session = MagicMock()
602+
agent._session.add_message = MagicMock()
603+
agent.sessions = MagicMock()
604+
agent.sessions.save = MagicMock()
605+
agent.memory = MagicMock()
606+
agent.memory.get_memory_context = MagicMock(return_value=None)
607+
agent.context = MagicMock()
608+
agent._prepare_request_context = AsyncMock()
609+
agent._build_step_prompt = MagicMock(return_value="prompt")
610+
agent._install_anti_loop_tracker = MagicMock()
611+
612+
stream = AgentLoop.stream(agent, message="test message")
613+
first_chunk = await stream.__anext__()
614+
615+
assert first_chunk["type"] == "content"
616+
assert first_chunk["delta"] == "hello"
617+
618+
await stream.aclose()
619+
await asyncio.wait_for(run_cancelled.wait(), timeout=1.0)
620+
621+
agent.sessions.save.assert_not_called()
622+
agent._session.add_message.assert_not_called()
623+
624+
@pytest.mark.asyncio
625+
async def test_install_anti_loop_tracker_does_not_stack_previous_request_prompt(self):
626+
"""A new request should not inherit the previous request's anti-loop wrapper."""
627+
from pathlib import Path
628+
from spoon_bot.agent.loop import AgentLoop
629+
630+
seen_prompts = []
631+
632+
async def base_think():
633+
seen_prompts.append(agent._agent.next_step_prompt)
634+
return True
635+
636+
tool_call = MagicMock()
637+
tool_call.function = MagicMock()
638+
tool_call.function.name = "shell"
639+
tool_call.function.arguments = '{"command":"cd /workspace && ls -la .agents/skills/pdf"}'
640+
641+
agent = MagicMock(spec=AgentLoop)
642+
agent.workspace = Path("/workspace")
643+
agent._agent = MagicMock()
644+
agent._agent.think = base_think
645+
agent._agent._spoon_bot_base_think = base_think
646+
agent._agent.next_step_prompt = ""
647+
agent._agent.tool_calls = [tool_call]
648+
agent._agent.memory = MagicMock()
649+
agent._agent.memory.messages = []
650+
agent._compress_runtime_context = MagicMock(return_value=0)
651+
652+
AgentLoop._install_anti_loop_tracker(agent, "prompt one")
653+
agent._agent.next_step_prompt = "prompt one"
654+
await agent._agent.think()
655+
656+
AgentLoop._install_anti_loop_tracker(agent, "prompt two")
657+
agent._agent.next_step_prompt = "prompt two"
658+
await agent._agent.think()
659+
660+
assert seen_prompts[-1] == "prompt two"
661+
579662

580663
@pytest.mark.requires_spoon_core
581664
class TestAgentLoopProcessWithThinking:

0 commit comments

Comments
 (0)