From fa9635f396b61a549b148fa5d4542190b17563ca Mon Sep 17 00:00:00 2001 From: Arjun Mariputtana Kavi Date: Mon, 9 Mar 2026 15:27:20 -0700 Subject: [PATCH] Move conversation validation/normalization to LlmProvider --- line/llm_agent/llm_agent.py | 24 +-------- line/llm_agent/provider.py | 104 +++++++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 23 deletions(-) diff --git a/line/llm_agent/llm_agent.py b/line/llm_agent/llm_agent.py index 4758732..0be3349 100644 --- a/line/llm_agent/llm_agent.py +++ b/line/llm_agent/llm_agent.py @@ -285,23 +285,6 @@ async def _generate_response( # ==== GENERATION CALL ==== # messages = await self._build_messages(context=context, history=history) - # Skip if no messages to send (e.g., empty history or all messages filtered) - if not messages: - logger.warning("Skipping LLM call: no messages to send") - break - - # Validate last message for LLM API compatibility - # - Cannot end with assistant message (Anthropic requires user/tool to continue) - # - User messages must be non-empty (Anthropic rejects empty/whitespace user content) - # - Tool messages are valid (expected after tool_use for loopback) - last_msg = messages[-1] - if last_msg.role == "assistant": - logger.warning("Skipping LLM call: conversation cannot end with assistant message") - break - if last_msg.role == "user" and not last_msg.content.strip(): - logger.warning("Skipping LLM call: last user message must be non-empty") - break - tool_calls_dict: Dict[str, ToolCall] = {} # Build kwargs for LLM chat, including web_search_options if available @@ -542,12 +525,9 @@ async def _build_messages( for event in full_history: # Handle InputEvent types if isinstance(event, UserTextSent): - # Filter empty user messages - prevents invalid API calls from empty ASR - if event.content and event.content.strip(): - messages.append(Message(role="user", content=event.content)) + messages.append(Message(role="user", content=event.content or "")) elif isinstance(event, AgentTextSent): - if event.content and event.content.strip(): - messages.append(Message(role="assistant", content=event.content)) + messages.append(Message(role="assistant", content=event.content or "")) # Handle CustomHistoryEntry (injected history entries) elif isinstance(event, CustomHistoryEntry): # Don't filter - could create invalid message sequences diff --git a/line/llm_agent/provider.py b/line/llm_agent/provider.py index 0827780..a50a043 100644 --- a/line/llm_agent/provider.py +++ b/line/llm_agent/provider.py @@ -13,6 +13,8 @@ from dataclasses import dataclass, field from typing import Any, AsyncIterable, List, Optional, Protocol, Tuple, runtime_checkable +from loguru import logger + from line.llm_agent.config import LlmConfig, _normalize_config from line.llm_agent.tools.utils import _merge_tools, _normalize_tools @@ -74,6 +76,102 @@ def _extract_instructions_and_messages( return instructions, non_system +def _normalize_messages(messages: List["Message"]) -> Optional[List["Message"]]: + """Normalize and validate messages before sending to the LLM backend. + + Applies the following transformations in order: + + 1. **Filter empty messages** – user/assistant messages with no meaningful + content (empty or whitespace-only) *and* no tool calls are dropped. + 2. **Remove unpaired tool calls** – every assistant-side ``tool_calls`` + entry must have a corresponding ``role="tool"`` response. Unpaired + tool calls are logged and removed. Any orphaned tool responses + (responses with no matching tool call) are also dropped. + 3. **Validate terminal message** – the conversation must not end with an + assistant message (providers require user/tool to continue), and the + final user message must be non-empty. + + Returns the cleaned message list, or ``None`` when the list is empty or + fatally invalid (caller should skip the LLM call). + """ + + # 1. Filter empty user/assistant messages + filtered: List[Message] = [] + for msg in messages: + if msg.role in ("user", "assistant"): + has_content = msg.content is not None and msg.content.strip() + has_tool_calls = bool(msg.tool_calls) + if not has_content and not has_tool_calls: + continue + filtered.append(msg) + + # 2. Validate tool-call pairing + # Collect tool_call_ids that have tool responses + response_ids: set[str] = set() + for msg in filtered: + if msg.role == "tool" and msg.tool_call_id: + response_ids.add(msg.tool_call_id) + + result: List[Message] = [] + for msg in filtered: + if msg.tool_calls: + paired = [tc for tc in msg.tool_calls if tc.id in response_ids] + unpaired = [tc for tc in msg.tool_calls if tc.id not in response_ids] + for tc in unpaired: + logger.warning(f"Removing unpaired tool call: {tc.name} (id={tc.id})") + if paired: + result.append( + Message( + role=msg.role, + content=msg.content, + tool_calls=paired, + tool_call_id=msg.tool_call_id, + name=msg.name, + ) + ) + elif msg.content and msg.content.strip(): + # Keep as plain text message without tool calls + result.append(Message(role=msg.role, content=msg.content)) + # else: drop the message entirely + else: + result.append(msg) + + # Remove orphaned tool responses (no matching tool call in the conversation) + remaining_tc_ids: set[str] = set() + for msg in result: + if msg.tool_calls: + for tc in msg.tool_calls: + remaining_tc_ids.add(tc.id) + + result = [ + msg + for msg in result + if not (msg.role == "tool" and msg.tool_call_id and msg.tool_call_id not in remaining_tc_ids) + ] + + if not result: + logger.warning("Skipping LLM call: no messages to send") + return None + + # 3. Validate terminal message + last = result[-1] + if last.role == "assistant": + logger.warning("Skipping LLM call: conversation cannot end with assistant message") + return None + + if last.role == "user" and (not last.content or not last.content.strip()): + logger.warning("Skipping LLM call: last user message must be non-empty") + return None + + return result + + +async def _empty_stream() -> AsyncIterable[StreamChunk]: + """Return an empty async iterable of StreamChunks.""" + return + yield # pragma: no cover – makes this an async generator + + # --------------------------------------------------------------------------- # Provider protocol # --------------------------------------------------------------------------- @@ -191,6 +289,10 @@ def chat( config: Optional[LlmConfig] = None, **kwargs: Any, ) -> AsyncIterable[StreamChunk]: + normalized = _normalize_messages(messages) + if normalized is None: + return _empty_stream() + effective_config = _normalize_config(config) if config else self._config effective_tools, web_search_options = _normalize_tools( _merge_tools(self._tools, tools), model=self._model @@ -200,7 +302,7 @@ def chat( if web_search_options is not None: kwargs = {**kwargs, "web_search_options": web_search_options} - return backend.chat(messages, effective_tools, config=effective_config, **kwargs) + return backend.chat(normalized, effective_tools, config=effective_config, **kwargs) def _set_tools(self, tools: Optional[List[Any]]) -> None: """Replace the provider's default tool specs."""