Skip to content

Commit 12b9599

Browse files
committed
[D5] Verify provider streaming specs + docs/tests
1 parent db3307e commit 12b9599

File tree

4 files changed

+288
-45
lines changed

4 files changed

+288
-45
lines changed

docs/ai/streaming_providers.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Streaming providers (SSE): OpenAI / Anthropic / Gemini
2+
3+
Last verified: 2026-02-10
4+
5+
This project implements streaming via Server-Sent Events (SSE) and extracts incremental text via provider strategies.
6+
7+
## Common SSE notes
8+
9+
- SSE frames are separated by a blank line (`\n\n`). Each frame may include `event:` and one or more `data:` lines.
10+
- Networks/proxies can split or coalesce frames arbitrarily at the byte level, so the client must buffer and parse by newline boundaries (not by “chunk” boundaries).
11+
12+
Implementation notes:
13+
14+
- `src/core/ai_client.py`: `_iter_sse_event_data()` parses SSE from arbitrary byte chunks and yields `(event, data)` tuples.
15+
- Provider-specific text extraction lives in `src/core/ai_providers/*` via `extract_stream_content()`.
16+
17+
## OpenAI (Chat Completions API)
18+
19+
- Endpoint: `POST https://api.openai.com/v1/chat/completions`
20+
- Headers: `Authorization: Bearer <api_key>`, `Content-Type: application/json`
21+
- Request: `model`, `messages`, `stream: true`
22+
- Standard models: `max_tokens`
23+
- Reasoning models (as used by this app for `o1/o3/o4*`): `max_completion_tokens`
24+
- Optional: `stream_options: { include_usage: true }`
25+
- Response: `text/event-stream`
26+
- Each SSE frame is `data: { ...json... }`
27+
- Stream terminates with: `data: [DONE]`
28+
- Incremental text extraction used by this app: `choices[0].delta.content`
29+
30+
## Anthropic (Claude Messages API)
31+
32+
- Endpoint: `POST https://api.anthropic.com/v1/messages`
33+
- Headers: `x-api-key: <api_key>`, `anthropic-version: <date>` (required)
34+
- Request: `model`, `max_tokens`, `stream: true`, `messages`; optional `system`, `temperature`, `tools`
35+
- Response: `text/event-stream` with explicit `event:` names.
36+
- Typical event sequence: `message_start``content_block_start``content_block_delta` → … → `message_stop`
37+
- Content deltas are sent as JSON objects with a top-level `type` field, e.g. `{"type":"content_block_delta","delta":{"type":"text_delta","text":"..."}}`
38+
- Incremental text extraction used by this app: when `type == "content_block_delta"`, use `delta.text` (ignoring non-text deltas).
39+
40+
## Gemini (Google AI for Developers — Generative Language API v1beta)
41+
42+
- Endpoint: `POST https://generativelanguage.googleapis.com/v1beta/models/{model}:streamGenerateContent?alt=sse`
43+
- `alt=sse` is required to receive SSE responses.
44+
- Auth: `x-goog-api-key: <api_key>` header (or `key=<api_key>` query parameter)
45+
- Request: `contents` (role + parts), `generationConfig` (`temperature`, `topP`, `maxOutputTokens`, etc.)
46+
- Response: `text/event-stream` with `data: { ...json... }` frames (no `[DONE]` sentinel; the stream ends when the server closes).
47+
- Incremental text extraction used by this app: `candidates[0].content.parts[].text` (ignoring parts marked `thought: true`).
48+
49+
## References (official)
50+
51+
- OpenAI — Chat Completions API: https://platform.openai.com/docs/api-reference/chat/create-chat-completion
52+
- OpenAI — Streaming responses (Responses API guide): https://platform.openai.com/docs/guides/streaming-responses
53+
- Anthropic — Streaming: https://docs.anthropic.com/en/docs/build-with-claude/streaming
54+
- Google AI for Developers — `models.streamGenerateContent` (SSE example): https://ai.google.dev/api/rest/v1beta/models/streamGenerateContent
55+
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
ID,Title,Description,Acceptance,Test_Method,Tools,Dev_Status,Review1_Status,Regression_Status,Files,Dependencies,Notes
2-
D1,Refactor Ghost Text preview to non-destructive (fix streaming overwrite/caret),"Replace default document-mutating Ghost Text preview with a non-destructive overlay (DeepIntegratedGhostText). Add editor paint overlay rendering and make accept/cancel stable. Ensure streaming chunk rendering never modifies document text or moves caret; clear preview on doc switch/save. Add pytest-qt coverage for overwrite/caret chaos regressions.","Streaming/AI preview does not change editor.toPlainText() until user accepts; caret stays stable during streaming; Tab accept inserts completion once at correct position; Esc cancels and stops updates; stale chunks ignored; tests pass.",".tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_streaming_ghost_no_overwrite.py",none,DONE,DONE,TODO,"src/gui/editor/text_editor.py | src/gui/editor/deep_integrated_ghost_text.py | src/gui/editor/optimal_ghost_text.py | src/gui/editor/smart_completion_manager.py | tests/test_streaming_ghost_no_overwrite.py",none,"user_report: streaming overwrites existing text + caret chaos | test:.tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_streaming_ghost_no_overwrite.py PASS | done_at:2026-02-10"
3-
D2,Scratch autosave/restore + flush on close,"Persist unbound scratch editor content to a recovery file on autosave and on app close; restore it on startup when no project is restored. Also flush pending autosave for project-backed editors on app close so quick exits don’t lose content. Add tests + manual restart checklist.", "Typing in scratch tab persists after restart; closing app quickly still saves latest edits (scratch + project docs); tests pass (or limited validation recorded with risk).",".tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_scratch_autosave_restore.py",none,DONE,DONE,TODO,"src/gui/editor/editor_panel.py | src/gui/editor/text_editor.py | src/gui/main_window_parts/state.py | src/main.py | src/core/config.py | tests/test_scratch_autosave_restore.py",none,"manual_checklist: 1) Start app, type in scratch tab, close immediately. 2) Reopen; expect content restored. 3) Open project, edit scene, close immediately; reopen project; expect content present. | test:.tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_scratch_autosave_restore.py PASS | done_at:2026-02-10"
4-
D3,Bottom status bar separator/overlap cleanup,"Reduce stacked separator lines and clipping in EnhancedStatusBar. Use a single separator strategy (either borders or VLines) with consistent heights/margins and remove duplicate/overlapping lines. Update status bar layout test accordingly.","Bottom bar shows a single clean row; no stacked/thick separator lines; labels are not clipped; test passes.",".tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_status_bar_layout.py",none,DONE,DONE,TODO,"src/gui/status/status_bar.py | tests/test_status_bar_layout.py",none,"user_report: bottom bar lines stacked/overlapping | test:.tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_status_bar_layout.py PASS | done_at:2026-02-10"
5-
D4,"Max tokens UI supports >=1,000,000","Ensure AI配置中心的最大tokens输入允许很大的值(>=1,000,000)且不会出现输入首位数字就被限制/截断的问题;保存/加载配置保持一致。添加pytest-qt回归测试。","Max tokens can be set to 1,000,000+ and persists after reopening AI配置中心; regression test passes.",".tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_ai_config_max_tokens_range.py",none,DONE,DONE,TODO,"src/gui/ai/unified_ai_config_dialog.py | tests/test_ai_config_max_tokens_range.py",none,"user_report: max_tokens max=3999 and typing '4' clamps; verify and prevent regression | test:.tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_ai_config_max_tokens_range.py PASS | done_at:2026-02-10"
6-
D5,Verify provider streaming specs + docs/tests,"Use official docs (OpenAI/Anthropic/Gemini) to verify streaming endpoints/headers/params and SSE parsing requirements; adjust provider strategies and AIClient stream parsing if needed; add a short doc summary and tests for OpenAI+Claude streaming fixtures.","Provider streaming endpoints/params match official docs as of 2026-02-10; OpenAI/Claude streaming extraction tests pass; doc added.",".tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_provider_streaming_openai_claude.py",manual,TODO,TODO,TODO,"src/core/ai_client.py | src/core/ai_providers/openai.py | src/core/ai_providers/claude.py | src/core/ai_providers/gemini.py | docs/ai/streaming_providers.md | tests/test_provider_streaming_openai_claude.py",none,"requires web research for up-to-date streaming specs; keep tests offline via fixtures"
2+
D1,Refactor Ghost Text preview to non-destructive (fix streaming overwrite/caret),Replace default document-mutating Ghost Text preview with a non-destructive overlay (DeepIntegratedGhostText). Add editor paint overlay rendering and make accept/cancel stable. Ensure streaming chunk rendering never modifies document text or moves caret; clear preview on doc switch/save. Add pytest-qt coverage for overwrite/caret chaos regressions.,Streaming/AI preview does not change editor.toPlainText() until user accepts; caret stays stable during streaming; Tab accept inserts completion once at correct position; Esc cancels and stops updates; stale chunks ignored; tests pass.,.tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_streaming_ghost_no_overwrite.py,none,DONE,DONE,TODO,src/gui/editor/text_editor.py | src/gui/editor/deep_integrated_ghost_text.py | src/gui/editor/optimal_ghost_text.py | src/gui/editor/smart_completion_manager.py | tests/test_streaming_ghost_no_overwrite.py,none,user_report: streaming overwrites existing text + caret chaos | test:.tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_streaming_ghost_no_overwrite.py PASS | done_at:2026-02-10
3+
D2,Scratch autosave/restore + flush on close,Persist unbound scratch editor content to a recovery file on autosave and on app close; restore it on startup when no project is restored. Also flush pending autosave for project-backed editors on app close so quick exits don’t lose content. Add tests + manual restart checklist.," ""Typing in scratch tab persists after restart; closing app quickly still saves latest edits (scratch + project docs); tests pass (or limited validation recorded with risk).""",.tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_scratch_autosave_restore.py,none,DONE,DONE,TODO,src/gui/editor/editor_panel.py | src/gui/editor/text_editor.py | src/gui/main_window_parts/state.py | src/main.py | src/core/config.py | tests/test_scratch_autosave_restore.py,none,"manual_checklist: 1) Start app, type in scratch tab, close immediately. 2) Reopen; expect content restored. 3) Open project, edit scene, close immediately; reopen project; expect content present. | test:.tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_scratch_autosave_restore.py PASS | done_at:2026-02-10"
4+
D3,Bottom status bar separator/overlap cleanup,Reduce stacked separator lines and clipping in EnhancedStatusBar. Use a single separator strategy (either borders or VLines) with consistent heights/margins and remove duplicate/overlapping lines. Update status bar layout test accordingly.,Bottom bar shows a single clean row; no stacked/thick separator lines; labels are not clipped; test passes.,.tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_status_bar_layout.py,none,DONE,DONE,TODO,src/gui/status/status_bar.py | tests/test_status_bar_layout.py,none,user_report: bottom bar lines stacked/overlapping | test:.tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_status_bar_layout.py PASS | done_at:2026-02-10
5+
D4,"Max tokens UI supports >=1,000,000","Ensure AI配置中心的最大tokens输入允许很大的值(>=1,000,000)且不会出现输入首位数字就被限制/截断的问题;保存/加载配置保持一致。添加pytest-qt回归测试。","Max tokens can be set to 1,000,000+ and persists after reopening AI配置中心; regression test passes.",.tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_ai_config_max_tokens_range.py,none,DONE,DONE,TODO,src/gui/ai/unified_ai_config_dialog.py | tests/test_ai_config_max_tokens_range.py,none,user_report: max_tokens max=3999 and typing '4' clamps; verify and prevent regression | test:.tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_ai_config_max_tokens_range.py PASS | done_at:2026-02-10
6+
D5,Verify provider streaming specs + docs/tests,Use official docs (OpenAI/Anthropic/Gemini) to verify streaming endpoints/headers/params and SSE parsing requirements; adjust provider strategies and AIClient stream parsing if needed; add a short doc summary and tests for OpenAI+Claude streaming fixtures.,Provider streaming endpoints/params match official docs as of 2026-02-10; OpenAI/Claude streaming extraction tests pass; doc added.,.tmp\\ane0305-venv-311\\Scripts\\python.exe -m pytest -q tests/test_provider_streaming_openai_claude.py,manual,DONE,DONE,TODO,src/core/ai_client.py | src/core/ai_providers/openai.py | src/core/ai_providers/claude.py | src/core/ai_providers/gemini.py | docs/ai/streaming_providers.md | tests/test_provider_streaming_openai_claude.py,none,requires web research for up-to-date streaming specs; keep tests offline via fixtures | test:.tmp\ane0305-venv-311\Scripts\python.exe -m pytest -q tests/test_provider_streaming_openai_claude.py PASS | doc:docs/ai/streaming_providers.md | done_at:2026-02-10

src/core/ai_client.py

Lines changed: 78 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,68 @@ def _get_stream_endpoint_url(self) -> str:
214214
return self._provider_strategy.get_endpoint_url()
215215
except Exception as e:
216216
raise AIClientError(str(e))
217+
218+
async def _iter_sse_event_data(self, byte_iter) -> AsyncGenerator[tuple[str | None, str], None]:
219+
"""Parse SSE stream and yield (event, data) tuples.
220+
221+
Notes:
222+
- aiohttp may yield arbitrary byte chunks, not line-delimited data.
223+
- This parser tolerates chunk boundaries splitting SSE lines and multiple SSE events per chunk.
224+
- For non-SSE servers that return raw JSON lines, it will yield them as (None, line).
225+
"""
226+
buffer = b""
227+
current_event: str | None = None
228+
data_lines: List[str] = []
229+
230+
async for chunk in byte_iter:
231+
if not chunk:
232+
continue
233+
if not isinstance(chunk, (bytes, bytearray)):
234+
continue
235+
236+
buffer += bytes(chunk)
237+
while True:
238+
newline_index = buffer.find(b"\n")
239+
if newline_index < 0:
240+
break
241+
line_bytes = buffer[:newline_index]
242+
buffer = buffer[newline_index + 1 :]
243+
244+
if line_bytes.endswith(b"\r"):
245+
line_bytes = line_bytes[:-1]
246+
247+
line = line_bytes.decode("utf-8", errors="replace")
248+
if line == "":
249+
if data_lines:
250+
yield current_event, "\n".join(data_lines)
251+
current_event = None
252+
data_lines = []
253+
continue
254+
255+
if line.startswith(":"):
256+
continue
257+
if line.startswith("event:"):
258+
current_event = line[6:].lstrip() or None
259+
continue
260+
if line.startswith("data:"):
261+
data_lines.append(line[5:].lstrip())
262+
continue
263+
264+
# Fallback: some providers/proxies may return JSON lines without SSE framing.
265+
stripped = line.strip()
266+
if stripped.startswith("{") or stripped.startswith("["):
267+
yield None, stripped
268+
269+
# Flush any remaining buffered content.
270+
if buffer:
271+
tail = buffer.decode("utf-8", errors="replace").strip()
272+
if tail.startswith("data:"):
273+
data_lines.append(tail[5:].lstrip())
274+
elif tail.startswith("{") or tail.startswith("["):
275+
yield None, tail
276+
277+
if data_lines:
278+
yield current_event, "\n".join(data_lines)
217279

218280
def _build_messages(self, prompt: Union[str, List[MultimodalMessage]], system_prompt: Optional[str] = None) -> List[Dict[str, Any]]:
219281
"""构建消息列表 - 支持多模态内容"""
@@ -747,31 +809,17 @@ async def complete_multimodal_stream(self, messages: List[MultimodalMessage], sy
747809
headers=headers,
748810
json=data,
749811
timeout=aiohttp.ClientTimeout(total=self.config.timeout)
750-
) as response:
812+
) as response:
751813
if response.status != 200:
752814
error_text = await response.text()
753815
self._raise_http_error(response.status, error_text)
754816

755817
self.logger.debug("开始接收多模态流式数据")
756818

757-
async for line in response.content:
758-
if not line:
759-
continue
760-
761-
line_str = line.decode("utf-8", errors="replace").strip()
762-
if not line_str:
763-
continue
764-
765-
# 处理Server-Sent Events格式
766-
if line_str.startswith("event:"):
767-
continue
768-
769-
data_str: str | None = None
770-
if line_str.startswith("data:"):
771-
data_str = line_str[5:].lstrip()
772-
elif line_str.startswith("{") or line_str.startswith("["):
773-
data_str = line_str
774-
819+
async for event_name, data_str in self._iter_sse_event_data(
820+
response.content.iter_any()
821+
):
822+
data_str = (data_str or "").strip()
775823
if not data_str:
776824
continue
777825
if data_str == "[DONE]":
@@ -781,7 +829,9 @@ async def complete_multimodal_stream(self, messages: List[MultimodalMessage], sy
781829
try:
782830
chunk_data = json.loads(data_str)
783831
except json.JSONDecodeError as e:
784-
self.logger.warning(f"解析多模态流式数据失败: {e}, 数据: {data_str}")
832+
self.logger.warning(
833+
f"解析多模态流式数据失败: {e}, event: {event_name}, 数据: {data_str}"
834+
)
785835
continue
786836

787837
content = self._extract_stream_content(chunk_data)
@@ -926,31 +976,17 @@ async def complete_stream(self, prompt: str, system_prompt: Optional[str] = None
926976
headers=headers,
927977
json=data,
928978
timeout=aiohttp.ClientTimeout(total=self.config.timeout)
929-
) as response:
979+
) as response:
930980
if response.status != 200:
931981
error_text = await response.text()
932982
self._raise_http_error(response.status, error_text)
933983

934984
self.logger.debug("开始接收流式数据")
935985

936-
async for line in response.content:
937-
if not line:
938-
continue
939-
940-
line_str = line.decode("utf-8", errors="replace").strip()
941-
if not line_str:
942-
continue
943-
944-
# 处理Server-Sent Events格式
945-
if line_str.startswith("event:"):
946-
continue
947-
948-
data_str: str | None = None
949-
if line_str.startswith("data:"):
950-
data_str = line_str[5:].lstrip()
951-
elif line_str.startswith("{") or line_str.startswith("["):
952-
data_str = line_str
953-
986+
async for event_name, data_str in self._iter_sse_event_data(
987+
response.content.iter_any()
988+
):
989+
data_str = (data_str or "").strip()
954990
if not data_str:
955991
continue
956992
if data_str == "[DONE]":
@@ -960,7 +996,9 @@ async def complete_stream(self, prompt: str, system_prompt: Optional[str] = None
960996
try:
961997
chunk_data = json.loads(data_str)
962998
except json.JSONDecodeError as e:
963-
self.logger.warning(f"解析流式数据失败: {e}, 数据: {data_str}")
999+
self.logger.warning(
1000+
f"解析流式数据失败: {e}, event: {event_name}, 数据: {data_str}"
1001+
)
9641002
continue
9651003

9661004
content = self._extract_stream_content(chunk_data)

0 commit comments

Comments
 (0)