Skip to content

Commit 41dd069

Browse files
authored
Ignore leading whitespace when streaming text, fixing run_stream + Ollama + Qwen3 (#2294)
1 parent 4104aca commit 41dd069

File tree

16 files changed

+116
-73
lines changed

16 files changed

+116
-73
lines changed

pydantic_ai_slim/pydantic_ai/_parts_manager.py

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
from collections.abc import Hashable
1717
from dataclasses import dataclass, field, replace
18-
from typing import Any, Literal, Union, overload
18+
from typing import Any, Union
1919

2020
from pydantic_ai._thinking_part import END_THINK_TAG, START_THINK_TAG
2121
from pydantic_ai.exceptions import UnexpectedModelBehavior
@@ -67,23 +67,6 @@ def get_parts(self) -> list[ModelResponsePart]:
6767
"""
6868
return [p for p in self._parts if not isinstance(p, ToolCallPartDelta)]
6969

70-
@overload
71-
def handle_text_delta(
72-
self,
73-
*,
74-
vendor_part_id: VendorId | None,
75-
content: str,
76-
) -> ModelResponseStreamEvent: ...
77-
78-
@overload
79-
def handle_text_delta(
80-
self,
81-
*,
82-
vendor_part_id: VendorId,
83-
content: str,
84-
extract_think_tags: Literal[True],
85-
) -> ModelResponseStreamEvent | None: ...
86-
8770
def handle_text_delta(
8871
self,
8972
*,
@@ -105,7 +88,9 @@ def handle_text_delta(
10588
extract_think_tags: Whether to extract `<think>` tags from the text content and handle them as thinking parts.
10689
10790
Returns:
108-
A `PartStartEvent` if a new part was created, or a `PartDeltaEvent` if an existing part was updated.
91+
- A `PartStartEvent` if a new part was created.
92+
- A `PartDeltaEvent` if an existing part was updated.
93+
- `None` if no new event is emitted (e.g., the first text part was all whitespace).
10994
11095
Raises:
11196
UnexpectedModelBehavior: If attempting to apply text content to a part that is not a TextPart.
@@ -144,6 +129,12 @@ def handle_text_delta(
144129
return self.handle_thinking_delta(vendor_part_id=vendor_part_id, content='')
145130

146131
if existing_text_part_and_index is None:
132+
# If the first text delta is all whitespace, don't emit a new part yet.
133+
# This is a workaround for models that emit `<think>\n</think>\n\n` ahead of tool calls (e.g. Ollama + Qwen3),
134+
# which we don't want to end up treating as a final result.
135+
if content.isspace():
136+
return None
137+
147138
# There is no existing text part that should be updated, so create a new one
148139
new_part_index = len(self._parts)
149140
part = TextPart(content=content)

pydantic_ai_slim/pydantic_ai/messages.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ class ThinkingPart:
690690

691691
def has_content(self) -> bool:
692692
"""Return `True` if the thinking content is non-empty."""
693-
return bool(self.content) # pragma: no cover
693+
return bool(self.content)
694694

695695
__repr__ = _utils.dataclasses_no_defaults_repr
696696

pydantic_ai_slim/pydantic_ai/models/anthropic.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ class AnthropicStreamedResponse(StreamedResponse):
470470
_response: AsyncIterable[BetaRawMessageStreamEvent]
471471
_timestamp: datetime
472472

473-
async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
473+
async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]: # noqa: C901
474474
current_block: BetaContentBlock | None = None
475475

476476
async for event in self._response:
@@ -479,7 +479,11 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
479479
if isinstance(event, BetaRawContentBlockStartEvent):
480480
current_block = event.content_block
481481
if isinstance(current_block, BetaTextBlock) and current_block.text:
482-
yield self._parts_manager.handle_text_delta(vendor_part_id='content', content=current_block.text)
482+
maybe_event = self._parts_manager.handle_text_delta(
483+
vendor_part_id='content', content=current_block.text
484+
)
485+
if maybe_event is not None: # pragma: no branch
486+
yield maybe_event
483487
elif isinstance(current_block, BetaThinkingBlock):
484488
yield self._parts_manager.handle_thinking_delta(
485489
vendor_part_id='thinking',
@@ -498,7 +502,11 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
498502

499503
elif isinstance(event, BetaRawContentBlockDeltaEvent):
500504
if isinstance(event.delta, BetaTextDelta):
501-
yield self._parts_manager.handle_text_delta(vendor_part_id='content', content=event.delta.text)
505+
maybe_event = self._parts_manager.handle_text_delta(
506+
vendor_part_id='content', content=event.delta.text
507+
)
508+
if maybe_event is not None: # pragma: no branch
509+
yield maybe_event
502510
elif isinstance(event.delta, BetaThinkingDelta):
503511
yield self._parts_manager.handle_thinking_delta(
504512
vendor_part_id='thinking', content=event.delta.thinking

pydantic_ai_slim/pydantic_ai/models/bedrock.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ class BedrockStreamedResponse(StreamedResponse):
572572
_event_stream: EventStream[ConverseStreamOutputTypeDef]
573573
_timestamp: datetime = field(default_factory=_utils.now_utc)
574574

575-
async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
575+
async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]: # noqa: C901
576576
"""Return an async iterator of [`ModelResponseStreamEvent`][pydantic_ai.messages.ModelResponseStreamEvent]s.
577577
578578
This method should be implemented by subclasses to translate the vendor-specific stream of events into
@@ -618,7 +618,9 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
618618
UserWarning,
619619
)
620620
if 'text' in delta:
621-
yield self._parts_manager.handle_text_delta(vendor_part_id=index, content=delta['text'])
621+
maybe_event = self._parts_manager.handle_text_delta(vendor_part_id=index, content=delta['text'])
622+
if maybe_event is not None:
623+
yield maybe_event
622624
if 'toolUse' in delta:
623625
tool_use = delta['toolUse']
624626
maybe_event = self._parts_manager.handle_tool_call_delta(

pydantic_ai_slim/pydantic_ai/models/function.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,9 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
264264
if isinstance(item, str):
265265
response_tokens = _estimate_string_tokens(item)
266266
self._usage += usage.Usage(response_tokens=response_tokens, total_tokens=response_tokens)
267-
yield self._parts_manager.handle_text_delta(vendor_part_id='content', content=item)
267+
maybe_event = self._parts_manager.handle_text_delta(vendor_part_id='content', content=item)
268+
if maybe_event is not None: # pragma: no branch
269+
yield maybe_event
268270
elif isinstance(item, dict) and item:
269271
for dtc_index, delta in item.items():
270272
if isinstance(delta, DeltaThinkingPart):
@@ -286,7 +288,7 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
286288
args=delta.json_args,
287289
tool_call_id=delta.tool_call_id,
288290
)
289-
if maybe_event is not None:
291+
if maybe_event is not None: # pragma: no branch
290292
yield maybe_event
291293
else:
292294
assert_never(delta)

pydantic_ai_slim/pydantic_ai/models/gemini.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,11 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
438438
if 'text' in gemini_part:
439439
# Using vendor_part_id=None means we can produce multiple text parts if their deltas are sprinkled
440440
# amongst the tool call deltas
441-
yield self._parts_manager.handle_text_delta(vendor_part_id=None, content=gemini_part['text'])
441+
maybe_event = self._parts_manager.handle_text_delta(
442+
vendor_part_id=None, content=gemini_part['text']
443+
)
444+
if maybe_event is not None: # pragma: no branch
445+
yield maybe_event
442446

443447
elif 'function_call' in gemini_part:
444448
# Here, we assume all function_call parts are complete and don't have deltas.

pydantic_ai_slim/pydantic_ai/models/google.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,9 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
458458
if part.thought:
459459
yield self._parts_manager.handle_thinking_delta(vendor_part_id='thinking', content=part.text)
460460
else:
461-
yield self._parts_manager.handle_text_delta(vendor_part_id='content', content=part.text)
461+
maybe_event = self._parts_manager.handle_text_delta(vendor_part_id='content', content=part.text)
462+
if maybe_event is not None: # pragma: no branch
463+
yield maybe_event
462464
elif part.function_call:
463465
maybe_event = self._parts_manager.handle_tool_call_delta(
464466
vendor_part_id=uuid4(),

pydantic_ai_slim/pydantic_ai/models/mistral.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,9 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
601601
tool_call_id=maybe_tool_call_part.tool_call_id,
602602
)
603603
else:
604-
yield self._parts_manager.handle_text_delta(vendor_part_id='content', content=text)
604+
maybe_event = self._parts_manager.handle_text_delta(vendor_part_id='content', content=text)
605+
if maybe_event is not None: # pragma: no branch
606+
yield maybe_event
605607

606608
# Handle the explicit tool calls
607609
for index, dtc in enumerate(choice.delta.tool_calls or []):

pydantic_ai_slim/pydantic_ai/models/openai.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1144,7 +1144,11 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
11441144
)
11451145

11461146
elif isinstance(chunk, responses.ResponseTextDeltaEvent):
1147-
yield self._parts_manager.handle_text_delta(vendor_part_id=chunk.content_index, content=chunk.delta)
1147+
maybe_event = self._parts_manager.handle_text_delta(
1148+
vendor_part_id=chunk.content_index, content=chunk.delta
1149+
)
1150+
if maybe_event is not None: # pragma: no branch
1151+
yield maybe_event
11481152

11491153
elif isinstance(chunk, responses.ResponseTextDoneEvent):
11501154
pass # there's nothing we need to do here

pydantic_ai_slim/pydantic_ai/models/test.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,10 +269,14 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
269269
mid = len(text) // 2
270270
words = [text[:mid], text[mid:]]
271271
self._usage += _get_string_usage('')
272-
yield self._parts_manager.handle_text_delta(vendor_part_id=i, content='')
272+
maybe_event = self._parts_manager.handle_text_delta(vendor_part_id=i, content='')
273+
if maybe_event is not None: # pragma: no branch
274+
yield maybe_event
273275
for word in words:
274276
self._usage += _get_string_usage(word)
275-
yield self._parts_manager.handle_text_delta(vendor_part_id=i, content=word)
277+
maybe_event = self._parts_manager.handle_text_delta(vendor_part_id=i, content=word)
278+
if maybe_event is not None: # pragma: no branch
279+
yield maybe_event
276280
elif isinstance(part, ToolCallPart):
277281
yield self._parts_manager.handle_tool_call_part(
278282
vendor_part_id=i, tool_name=part.tool_name, args=part.args, tool_call_id=part.tool_call_id

0 commit comments

Comments
 (0)