Skip to content

Commit 8b8a49a

Browse files
authored
Merge pull request #260 from veithly/fix/streaming-infrastructure
fix: streaming infrastructure — queue, loop condition, and timeout
2 parents c236fee + fea937c commit 8b8a49a

File tree

9 files changed

+403
-36
lines changed

9 files changed

+403
-36
lines changed

spoon_ai/agents/base.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ def __init__(self, maxsize: int = 0):
5757
async def put(self, item: Any) -> None:
5858
await self._queue.put(item)
5959

60+
def put_nowait(self, item: Any) -> None:
61+
"""Non-blocking put - delegates to the underlying asyncio.Queue."""
62+
self._queue.put_nowait(item)
63+
6064
async def get(self, timeout: float | None = 30.0) -> Any:
6165
"""Get item with timeout and fair access"""
6266
consumer_id = id(asyncio.current_task())
@@ -104,7 +108,7 @@ class BaseAgent(BaseModel, ABC):
104108
# Thread-safe replacements
105109
output_queue: ThreadSafeOutputQueue = Field(default_factory=ThreadSafeOutputQueue, description="Thread-safe output queue")
106110
task_done: asyncio.Event = Field(default_factory=asyncio.Event, description="The signal of agent run done")
107-
111+
108112
# Callback system
109113
callbacks: list[BaseCallbackHandler] = Field(default_factory=list, description="Callback handlers for monitoring")
110114

@@ -335,7 +339,7 @@ async def add_message_with_image(
335339

336340
if not image_url and not image_data:
337341
raise ValueError("Either image_url or image_data must be provided")
338-
342+
339343
# Validate image_data is not empty (if provided)
340344
# Three upload methods:
341345
# - Method 1: image_data (base64) - image_data must have a value
@@ -349,7 +353,7 @@ async def add_message_with_image(
349353
# Check if only whitespace (empty after strip)
350354
if not image_data.strip():
351355
raise ValueError("image_data cannot be empty (only whitespace). If you want to use URL-based images, use image_url parameter instead.")
352-
356+
353357
# Validate image_url format if provided
354358
# image_url supports both external URLs (way 2) and data URLs (way 3)
355359
if image_url:
@@ -366,7 +370,7 @@ async def add_message_with_image(
366370
f"Invalid image URL format: {image_url}. "
367371
f"Must be a valid HTTP/HTTPS URL (for external images) or data URL (for embedded images)."
368372
)
369-
373+
370374
# No MIME type validation - pass through all types to LLM providers
371375

372376
content_blocks: list[ContentBlock] = [TextContent(text=text)]
@@ -1055,7 +1059,12 @@ async def stream(self, timeout: float | None = None):
10551059
try:
10561060
self._active_operations.add(stream_id)
10571061

1058-
while not (self.task_done.is_set() or self.output_queue.empty()):
1062+
# Continue streaming while the task is still running OR the queue
1063+
# has remaining items to drain. The previous condition
1064+
# ``not (done or empty)`` was equivalent to ``not done and not empty``
1065+
# which would exit immediately when the queue started empty even
1066+
# though the background task had not yet produced output.
1067+
while not (self.task_done.is_set() and self.output_queue.empty()):
10591068
try:
10601069
# Create tasks for queue and done event
10611070
queue_task = asyncio.create_task(

spoon_ai/agents/spoon_react.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,12 @@ async def _ensure_x402_tools(self) -> None:
187187

188188
self._x402_tools_initialized = True
189189

190-
async def run(self, request: Optional[str] = None) -> str:
190+
async def run(self, request: Optional[str] = None, timeout: Optional[float] = None) -> str:
191191
"""Ensure prompts reflect current tools before running."""
192192
self._refresh_prompts()
193-
return await super().run(request)
193+
kwargs: dict = {}
194+
if request is not None:
195+
kwargs["request"] = request
196+
if timeout is not None:
197+
kwargs["timeout"] = timeout
198+
return await super().run(**kwargs)

spoon_ai/agents/spoon_react_skill.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def __init__(self, **kwargs):
8989

9090
self._skill_manager_initialized = True
9191

92-
async def run(self, request: Optional[str] = None) -> str:
92+
async def run(self, request: Optional[str] = None, timeout: Optional[float] = None) -> str:
9393
"""
9494
Execute agent with per-turn auto skill activation.
9595
@@ -102,15 +102,20 @@ async def run(self, request: Optional[str] = None) -> str:
102102
103103
Args:
104104
request: User request/message
105+
timeout: Optional timeout in seconds
105106
106107
Returns:
107108
Agent response
108109
"""
109-
110110
async def _runner(req: Optional[str]) -> str:
111111
# SpoonReactAI.run() rebuilds prompts from available_tools.
112112
# Ensure skill tools are synced first, then delegate to parent.
113-
return await super(SpoonReactSkill, self).run(req)
113+
kwargs: dict = {}
114+
if req is not None:
115+
kwargs["request"] = req
116+
if timeout is not None:
117+
kwargs["timeout"] = timeout
118+
return await super(SpoonReactSkill, self).run(**kwargs)
114119

115120
return await self._run_with_auto_skills(request, _runner)
116121

spoon_ai/agents/toolcall.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,9 @@ def convert_mcp_tool(tool: MCPTool) -> dict:
215215
logger.info(f"🏁 {self.name} terminating due to finish_reason signals (no tool calls)")
216216
self.state = AgentState.FINISHED
217217
await self.add_message("assistant", response.content or "Task completed")
218+
# Emit content to output queue for streaming consumers
219+
if self.output_queue:
220+
self.output_queue.put_nowait({"content": response.content or "Task completed"})
218221
# Set a flag to indicate finish_reason termination and store the content
219222
self._finish_reason_terminated = True
220223
self._final_response_content = response.content or "Task completed"

spoon_ai/skills/loader.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -109,22 +109,25 @@ def discover(self) -> List[Path]:
109109
Returns:
110110
List of paths to SKILL.md files
111111
"""
112-
skill_files = []
112+
skill_files: List[Path] = []
113+
seen_resolved: set = set()
113114

114115
for base_path in self._paths:
115116
if not base_path.exists():
116117
continue
117118

118-
# Find all SKILL.md files (case-insensitive on Windows)
119-
for skill_md in base_path.rglob("SKILL.md"):
120-
skill_files.append(skill_md)
121-
logger.debug(f"Discovered skill: {skill_md}")
122-
123-
# Also check for skill.md (lowercase)
124-
for skill_md in base_path.rglob("skill.md"):
125-
if skill_md not in skill_files:
126-
skill_files.append(skill_md)
127-
logger.debug(f"Discovered skill: {skill_md}")
119+
# Use a single glob and case-insensitive name check to avoid
120+
# duplicates on Windows (where rglob("SKILL.md") and
121+
# rglob("skill.md") may return the same files).
122+
for md_file in base_path.rglob("*.md"):
123+
if md_file.name.lower() != "skill.md":
124+
continue
125+
resolved = md_file.resolve()
126+
if resolved in seen_resolved:
127+
continue
128+
seen_resolved.add(resolved)
129+
skill_files.append(md_file)
130+
logger.debug(f"Discovered skill: {md_file}")
128131

129132
return skill_files
130133

@@ -141,7 +144,10 @@ def parse(self, file_path: Path) -> Tuple[SkillMetadata, str]:
141144
Raises:
142145
ValueError: If file format is invalid
143146
"""
144-
content = file_path.read_text(encoding='utf-8')
147+
# Use utf-8-sig to strip BOM if present (common on Windows editors)
148+
content = file_path.read_text(encoding='utf-8-sig')
149+
# Normalize line endings for regex reliability (\r\n → \n)
150+
content = content.replace('\r\n', '\n').replace('\r', '\n')
145151

146152
match = self.FRONTMATTER_PATTERN.match(content)
147153
if not match:

spoon_ai/skills/models.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ class SkillScript(BaseModel):
5252
timeout: int = Field(default=30, ge=1, le=600, description="Execution timeout in seconds")
5353
working_directory: Optional[str] = Field(default=None, description="Working directory override")
5454

55+
# Structured input schema (mirrors JSON Schema for script stdin)
56+
input_schema: Optional[Dict[str, Any]] = Field(
57+
default=None,
58+
description="JSON Schema describing the structured input the script expects via stdin"
59+
)
60+
5561
# Lifecycle hooks
5662
run_on_activation: bool = Field(default=False, description="Run when skill activates")
5763
run_on_deactivation: bool = Field(default=False, description="Run when skill deactivates")

spoon_ai/skills/script_tool.py

Lines changed: 72 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
AI decides how to use scripts - users only control whether scripts are allowed.
66
"""
77

8+
import json
89
import logging
910
from typing import Any, Dict, List, Optional
1011

@@ -22,7 +23,10 @@ class ScriptTool(BaseTool):
2223
Tool wrapper for skill scripts.
2324
2425
Exposes a SkillScript as a callable tool that agents can invoke.
25-
The AI decides what input to provide - there's no fixed parameter schema.
26+
When the script defines an ``input_schema``, the tool parameters are
27+
derived from that schema so the LLM receives a structured contract.
28+
Otherwise a generic ``input`` string parameter is used for backward
29+
compatibility.
2630
"""
2731

2832
name: str = Field(..., description="Tool name")
@@ -33,6 +37,7 @@ class ScriptTool(BaseTool):
3337
script: SkillScript = Field(..., exclude=True)
3438
skill_name: str = Field(..., exclude=True)
3539
working_directory: Optional[str] = Field(default=None, exclude=True)
40+
_uses_structured_schema: bool = False
3641

3742
def __init__(
3843
self,
@@ -55,17 +60,49 @@ def __init__(
5560
desc = script.description or f"Execute the '{script.name}' script"
5661
description = f"{desc} (Type: {script.type.value})"
5762

58-
# Simple parameter schema - just optional input
59-
parameters = {
60-
"type": "object",
61-
"properties": {
62-
"input": {
63-
"type": "string",
64-
"description": "Optional input text to pass to the script via stdin"
63+
# Derive parameter schema from script.input_schema when available (#8)
64+
uses_structured = False
65+
if script.input_schema and isinstance(script.input_schema, dict):
66+
schema_type = script.input_schema.get("type", "object")
67+
# Tool/function calling interfaces expect top-level object schema.
68+
# If skill metadata declares non-object type, degrade gracefully.
69+
if schema_type != "object":
70+
logger.warning(
71+
"Script '%s' in skill '%s' has non-object input_schema.type=%s; "
72+
"falling back to generic object schema",
73+
script.name,
74+
skill_name,
75+
schema_type,
76+
)
77+
parameters = {
78+
"type": "object",
79+
"properties": {
80+
"input": {
81+
"type": "string",
82+
"description": "Optional input text to pass to the script via stdin"
83+
}
84+
},
85+
"required": []
6586
}
66-
},
67-
"required": []
68-
}
87+
else:
88+
parameters = {
89+
"type": "object",
90+
"properties": script.input_schema.get("properties", {}),
91+
"required": script.input_schema.get("required", []),
92+
}
93+
uses_structured = True
94+
else:
95+
# Fallback: generic optional input string (backward compat)
96+
parameters = {
97+
"type": "object",
98+
"properties": {
99+
"input": {
100+
"type": "string",
101+
"description": "Optional input text to pass to the script via stdin"
102+
}
103+
},
104+
"required": []
105+
}
69106

70107
super().__init__(
71108
name=tool_name,
@@ -75,14 +112,20 @@ def __init__(
75112
skill_name=skill_name,
76113
working_directory=working_directory
77114
)
115+
object.__setattr__(self, "_uses_structured_schema", uses_structured)
78116

79117
async def execute(self, input: Optional[str] = None, **kwargs) -> str:
80118
"""
81119
Execute the script.
82120
121+
When the script declares an ``input_schema``, the LLM's structured
122+
kwargs are serialized to JSON and piped to stdin. For legacy scripts
123+
that only declare a generic ``input`` string, the raw value is passed
124+
through as-is.
125+
83126
Args:
84-
input: Optional input text to pass to script via stdin
85-
**kwargs: Additional arguments (ignored)
127+
input: Optional input text (legacy path)
128+
**kwargs: Structured arguments matching input_schema
86129
87130
Returns:
88131
Script output as string
@@ -91,9 +134,24 @@ async def execute(self, input: Optional[str] = None, **kwargs) -> str:
91134

92135
logger.debug(f"ScriptTool '{self.name}' executing")
93136

137+
# Decide what to send to the script on stdin
138+
if self._uses_structured_schema:
139+
# Build a JSON payload from all kwargs (including 'input' if present)
140+
payload: Dict[str, Any] = {}
141+
if input is not None:
142+
payload["input"] = input
143+
payload.update(kwargs)
144+
input_text = json.dumps(payload, ensure_ascii=False)
145+
else:
146+
# Legacy path: plain string or try JSON passthrough
147+
input_text = input
148+
if input_text is None and kwargs:
149+
# Model may have sent structured args despite generic schema
150+
input_text = json.dumps(kwargs, ensure_ascii=False)
151+
94152
result: ScriptResult = await executor.execute(
95153
script=self.script,
96-
input_text=input,
154+
input_text=input_text,
97155
working_directory=self.working_directory
98156
)
99157

spoon_ai/tools/mcp_tool.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,20 @@ async def _fetch_and_set_parameters(self):
142142
async with self.get_session() as session:
143143
tools = await asyncio.wait_for(session.list_tools(), timeout=self._connection_timeout)
144144
if tools:
145+
# Try exact name match first; fall back to first tool
145146
target_tool = None
146147
for tool in tools:
147148
if getattr(tool, 'name', '') == self.name:
148149
target_tool = tool
149150
break
151+
# Also try matching against _server_name (the original
152+
# alias) so that the first call still resolves after
153+
# a previous expand_server_tools renamed `self.name`.
154+
if not target_tool and hasattr(self, '_server_name'):
155+
for tool in tools:
156+
if getattr(tool, 'name', '') == self._server_name:
157+
target_tool = tool
158+
break
150159

151160
if not target_tool and tools:
152161
target_tool = tools[0]
@@ -350,6 +359,50 @@ async def call_mcp_tool(self, tool_name: str, **kwargs):
350359
logger.error(f"MCP tool '{tool_name}' call failed: {e}")
351360
raise RuntimeError(f"MCP tool '{tool_name}' execution failed: {str(e)}") from e
352361

362+
async def expand_server_tools(self) -> List["MCPTool"]:
363+
"""Expand this single MCPTool (one-per-server) into one MCPTool per
364+
real server tool. Each returned tool shares the same MCP transport
365+
config and delegates execution to ``call_mcp_tool(real_name)``.
366+
367+
If the server is unreachable or returns no tools, an empty list is
368+
returned (callers should keep the original proxy as fallback).
369+
370+
Returns:
371+
List of MCPTool instances, one per discovered server tool.
372+
"""
373+
server_tools = await self.list_available_tools()
374+
if not server_tools:
375+
return []
376+
377+
expanded: List[MCPTool] = []
378+
server_name = self.name # Original alias (e.g. "filesystem")
379+
380+
for tool_info in server_tools:
381+
real_name = tool_info.get("name", "")
382+
if not real_name:
383+
continue
384+
desc = tool_info.get("description", f"MCP tool from {server_name}")
385+
schema = tool_info.get("inputSchema") or {
386+
"type": "object", "properties": {}, "required": []
387+
}
388+
389+
child = MCPTool(
390+
name=real_name,
391+
description=desc,
392+
parameters=schema,
393+
mcp_config=self.mcp_config,
394+
)
395+
child._parameters_loaded = True # Already resolved
396+
# Store original server alias so execute fallback can find it
397+
object.__setattr__(child, '_server_name', server_name)
398+
expanded.append(child)
399+
400+
logger.info(
401+
f"Expanded MCP server '{server_name}' into {len(expanded)} tools: "
402+
f"{[t.name for t in expanded]}"
403+
)
404+
return expanded
405+
353406
async def list_available_tools(self) -> list:
354407
"""List available tools from the MCP server."""
355408
try:

0 commit comments

Comments
 (0)