Skip to content

Websocket LLmProviders#159

Open
akavi wants to merge 7 commits intomainfrom
akavi/websocket-providers
Open

Websocket LLmProviders#159
akavi wants to merge 7 commits intomainfrom
akavi/websocket-providers

Conversation

@akavi
Copy link
Collaborator

@akavi akavi commented Feb 25, 2026

What does this PR do?

Websocket APIs are noticeabley faster for certain models. Most notably, gpt-realtime-1.5 and gpt-5.2

Model                                    Connect      Cold T1      Cold T2      Warm T1      Warm T2
                                        avg±std ms   avg±std ms   avg±std ms   avg±std ms   avg±std ms
  ----------------------------------- ------------ ------------ ------------ ------------ ------------
  realtime / gpt-realtime-1.5             1164±122       517±39       493±41            —            —
  ws-mode / gpt-5.2                       1177±170       696±64       675±54       689±44       699±85
  ws-mode / gpt-5-mini                     1083±72     2025±601    4432±1076     1705±285    4244±1350 (2 err)
  ws-mode / gpt-5-nano                    1185±113    3793±1628    6688±2260    3291±1073    6162±1247

Unfortunately:

  1. LiteLlm doesn't support them
  2. OpenAI has two separate WS APIs, one for realtime and one for "websocket mode". IDK why!

Fortunately, it's straightforward It's not super straightforward to add support for both, but I've done it.

We hide the choice of implementation behind the facade of LLmProvider, so it's seamless from the developer PoV

This is a pretty substantial PR, so I've split it into individual commits:

  1. Extract tool merging/resolution from LlmAgent into tools/utils.py: We want to reuse these in the providers
  2. Split provider.py into LlmProvider facade and HttpProvider backend: Prep for introducing new websocket/realtime providers
  3. Add RealtimeProvider and WebSocketProvider backends: The meat of the PR, actually adding the support
  4. Add bench_latency.py for LLM provider latency benchmarking (to verify these providers are actually lower latency)
  5. Support 3.9: Updating the providers to correctly support python 3.9, which requires lazy initialization of asyncio primitives
  6. Centralize configuration detection: I should've probably done this in 2, but I didn't, so doing it here

Type of change

  • Bug fix
  • New feature
  • Breaking change
  • Documentation
  • Other: ___________

Testing

Unit tests + "real" provider tests

Checklist

  • I have read the contributing guidelines
  • I have added tests that prove my fix is effective or that my feature works
  • I have formatted my code with make format

Note

High Risk
Introduces new WebSocket/Reatltime LLM backends and refactors the provider/streaming interface, which can affect core agent response generation, tool-calling, and connection lifecycle behavior across models.

Overview
Adds a new LlmProvider facade that routes between HTTP (LiteLLM), OpenAI Realtime WS, and OpenAI Responses WS backends based on model name, with an HTTP fallback when WebSocket mode can’t support certain LlmConfig fields.

Refactors LlmAgent and example/test scripts to use the new provider API (async-iterable chat() without async with), adds provider warmup() on CallStarted, and centralizes tool normalization/merging (including native vs fallback web_search) in tools.utils.

Introduces substantial new WebSocket infrastructure: shared WS stream utilities (stream.py), a diff-sync Realtime provider (realtime_provider.py), a Responses WS provider with divergence handling (websocket_provider.py), updated OpenAI tool schema conversion for WS APIs, plus new latency/verification scripts and expanded unit tests for routing, warmup, and tool/web-search behavior.

Written by Cursor Bugbot for commit 4e0603c. This will update automatically on new commits. Configure here.

@akavi akavi requested review from lucyliulee and sauhardjain and removed request for lucyliulee February 25, 2026 02:02
@akavi akavi changed the title Realtime providers Websocket LLmProviders Feb 25, 2026
@akavi akavi force-pushed the akavi/websocket-providers branch from e98aadd to 8022743 Compare February 25, 2026 20:15
@akavi akavi force-pushed the akavi/websocket-providers branch from 8022743 to 598f448 Compare February 26, 2026 00:33
@akavi akavi force-pushed the akavi/websocket-providers branch from 598f448 to 08e8a93 Compare March 4, 2026 01:24
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Autofix Details

Bugbot Autofix prepared fixes for all 3 issues found in the latest run.

  • ✅ Fixed: Benchmark script uses removed async-with stream pattern
    • Updated stream_turn to iterate directly over provider.chat(...) with async for, matching the stream interface that only implements __aiter__.
  • ✅ Fixed: Message identity only considers first tool call
    • Message identities now include all assistant tool calls and the Realtime diff path expands multi-tool-call assistant messages into per-call items so no tool calls are dropped.
  • ✅ Fixed: Double normalization of tools in LlmAgent+LlmProvider pipeline
    • Added a fast-path tool resolver in LlmProvider that reuses already-normalized FunctionTool inputs and only calls _normalize_tools when needed.

Create PR

Or push these changes by commenting:

@cursor push 471fcca35d
Preview (471fcca35d)
diff --git a/line/llm_agent/provider.py b/line/llm_agent/provider.py
--- a/line/llm_agent/provider.py
+++ b/line/llm_agent/provider.py
@@ -14,7 +14,7 @@
 from typing import Any, List, Optional, Protocol, Tuple, runtime_checkable
 
 from line.llm_agent.config import LlmConfig, _normalize_config
-from line.llm_agent.tools.utils import _normalize_tools
+from line.llm_agent.tools.utils import FunctionTool, _normalize_tools
 
 
 @dataclass
@@ -105,9 +105,8 @@
     ):
         self._model = model
         normalized_config = _normalize_config(config or LlmConfig())
-        normalized_tools, _ = _normalize_tools(tools, model=model) if tools else (None, None)
         self._config = normalized_config
-        self._tools = normalized_tools or []
+        self._tools = _resolve_tools(tools, model=model)
 
         use_realtime = backend == "realtime" or (backend is None and _is_realtime_model(model))
         use_websocket = backend == "websocket" or (backend is None and _is_websocket_model(model))
@@ -140,7 +139,7 @@
 
     def chat(self, messages, tools=None, config=None, **kwargs):
         cfg = _normalize_config(config) if config else self._config
-        effective_tools = _normalize_tools(tools, model=self._model)[0] if tools else self._tools
+        effective_tools = _resolve_tools(tools, model=self._model) if tools else self._tools
         return self._backend.chat(messages, effective_tools, config=cfg, **kwargs)
 
     async def warmup(self, config=None):
@@ -199,18 +198,29 @@
     return lower.startswith("gpt-5.2") or lower.startswith("gpt5.2")
 
 
+def _resolve_tools(tools: Optional[List[Any]], model: str) -> List[FunctionTool]:
+    """Resolve tools to FunctionTools, avoiding no-op re-normalization."""
+    if not tools:
+        return []
+    if all(isinstance(tool, FunctionTool) for tool in tools):
+        return list(tools)
+    return _normalize_tools(tools, model=model)[0]
+
+
 def _message_identity(msg: Message) -> tuple:
     """Compute an identity fingerprint for a single Message.
 
     Used by both WebSocket providers for divergence detection / diff-sync.
 
-    For assistant messages with tool calls, identity is derived from the
-    *first* tool call (mirrors how the server tracks multi-tool-call turns
-    as a single logical unit).
+    For assistant messages with tool calls, identity includes all tool calls
+    so divergence checks detect changes to any call in the turn.
     """
     if msg.tool_calls:
-        tc = msg.tool_calls[0]
-        return ("assistant_tool_call", tc.name, tc.arguments, tc.id)
+        if len(msg.tool_calls) == 1:
+            tc = msg.tool_calls[0]
+            return ("assistant_tool_call", tc.name, tc.arguments, tc.id)
+        tool_calls_key = tuple((tc.name, tc.arguments, tc.id) for tc in msg.tool_calls)
+        return ("assistant_tool_calls", tool_calls_key)
     return (msg.role, msg.content or "", msg.tool_call_id or "", msg.name or "")
 
 

diff --git a/line/llm_agent/realtime_provider.py b/line/llm_agent/realtime_provider.py
--- a/line/llm_agent/realtime_provider.py
+++ b/line/llm_agent/realtime_provider.py
@@ -399,11 +399,8 @@
 def _message_to_item(msg: Message) -> Dict[str, Any]:
     """Convert a Message to a Realtime API conversation item dict.
 
-    Note: for assistant messages with multiple tool calls, only the first
-    tool call is converted.  The Realtime API represents each tool call as a
-    separate conversation item, but the diff algorithm tracks identity at the
-    message level.  Handling multi-tool-call expansion here would require
-    reworking the diff model.
+    Assistant tool-call messages must contain exactly one tool call; callers
+    are responsible for expanding multi-tool-call turns into separate messages.
     """
     if msg.role == "user":
         return {
@@ -414,13 +411,8 @@
 
     if msg.role == "assistant":
         if msg.tool_calls:
-            if len(msg.tool_calls) > 1:
-                logger.warning(
-                    "Realtime API: assistant message has %d tool calls but only "
-                    "the first is converted (dropping %s)",
-                    len(msg.tool_calls),
-                    [tc.name for tc in msg.tool_calls[1:]],
-                )
+            if len(msg.tool_calls) != 1:
+                raise ValueError("Assistant tool-call message must contain exactly one tool call")
             tc = msg.tool_calls[0]
             return {
                 "type": "function_call",
@@ -464,7 +456,19 @@
         if msg.role == "system":
             system_parts.append(msg.content or "")
         else:
-            non_system.append(msg)
+            if msg.role == "assistant" and msg.tool_calls and len(msg.tool_calls) > 1:
+                for tc in msg.tool_calls:
+                    non_system.append(
+                        Message(
+                            role="assistant",
+                            content=msg.content,
+                            tool_calls=[tc],
+                            tool_call_id=msg.tool_call_id,
+                            name=msg.name,
+                        )
+                    )
+            else:
+                non_system.append(msg)
 
     desired_instructions = "\n\n".join(system_parts) if system_parts else None
 

diff --git a/line/llm_agent/scripts/bench_latency.py b/line/llm_agent/scripts/bench_latency.py
--- a/line/llm_agent/scripts/bench_latency.py
+++ b/line/llm_agent/scripts/bench_latency.py
@@ -164,12 +164,11 @@
     ttft = None
     text_parts: list[str] = []
 
-    async with provider.chat(messages, config=config) as stream:
-        async for chunk in stream:
-            if chunk.text:
-                if ttft is None:
-                    ttft = (time.perf_counter() - t0) * 1000
-                text_parts.append(chunk.text)
+    async for chunk in provider.chat(messages, config=config):
+        if chunk.text:
+            if ttft is None:
+                ttft = (time.perf_counter() - t0) * 1000
+            text_parts.append(chunk.text)
 
     total = (time.perf_counter() - t0) * 1000
     return TurnResult(

diff --git a/line/llm_agent/websocket_provider.py b/line/llm_agent/websocket_provider.py
--- a/line/llm_agent/websocket_provider.py
+++ b/line/llm_agent/websocket_provider.py
@@ -447,19 +447,26 @@
 def _extract_model_output_identity(response: Dict[str, Any]) -> Optional[tuple]:
     """Derive a single message-level identity from a Responses API output.
 
-    Mirrors ``_message_identity``: if the model produced tool calls we key
-    on the first one; otherwise we key on the full text.
+    Mirrors ``_message_identity``: single-tool-call outputs use a compact key,
+    while multi-tool-call outputs include every call in order.
     """
     output_items = response.get("output", [])
     function_calls = [i for i in output_items if i.get("type") == "function_call"]
 
     if function_calls:
-        fc = function_calls[0]
+        if len(function_calls) == 1:
+            fc = function_calls[0]
+            return (
+                "assistant_tool_call",
+                fc.get("name", ""),
+                fc.get("arguments", ""),
+                fc.get("call_id", ""),
+            )
         return (
-            "assistant_tool_call",
-            fc.get("name", ""),
-            fc.get("arguments", ""),
-            fc.get("call_id", ""),
+            "assistant_tool_calls",
+            tuple(
+                (fc.get("name", ""), fc.get("arguments", ""), fc.get("call_id", "")) for fc in function_calls
+            ),
         )
 
     # Concatenate text across all message output items.
This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

@akavi akavi force-pushed the akavi/websocket-providers branch from 08e8a93 to e97ad49 Compare March 4, 2026 18:30
@akavi akavi force-pushed the akavi/websocket-providers branch from e97ad49 to dc4c1a9 Compare March 5, 2026 22:20
@akavi akavi force-pushed the akavi/websocket-providers branch from dc4c1a9 to 67069c3 Compare March 6, 2026 17:30
@akavi akavi force-pushed the akavi/websocket-providers branch from 67069c3 to 149738e Compare March 6, 2026 18:57
akavi and others added 2 commits March 6, 2026 15:37
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@akavi akavi force-pushed the akavi/websocket-providers branch from 149738e to 7a2463b Compare March 7, 2026 00:50
@akavi akavi force-pushed the akavi/websocket-providers branch 2 times, most recently from 0ad8d18 to 80a08e5 Compare March 9, 2026 18:14
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
akavi and others added 3 commits March 9, 2026 14:26
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
3.9 requires async primitives (lock/queues) to be initialized inside the
context of a loop. The loop isn't available at initialization time.

Fix: lazy initialize
@akavi akavi force-pushed the akavi/websocket-providers branch from 3a53537 to a8f738f Compare March 9, 2026 21:26
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: x
    • x

Create PR

Or push these changes by commenting:

@cursor push d109cf302a
Preview (d109cf302a)
diff --git a/line/llm_agent/llm_agent.py b/line/llm_agent/llm_agent.py
--- a/line/llm_agent/llm_agent.py
+++ b/line/llm_agent/llm_agent.py
@@ -311,7 +311,7 @@
 
             stream = self._llm.chat(
                 messages,
-                tools or None,
+                tools,
                 config=config,
                 **chat_kwargs,
             )
This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants