Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### New features

* `.stream()` and `.stream_async()` now support a `data_model` parameter for structured data extraction while streaming. (#262)
* `ChatAnthropic()` now uses native structured outputs API for supported models (claude-sonnet-4-5, claude-opus-4-1, claude-opus-4-5, claude-haiku-4-5), enabling streaming with `data_model`. Older models fall back to the tool-based approach. (#263)

## [0.15.0] - 2026-01-06

Expand Down
161 changes: 120 additions & 41 deletions chatlas/_provider_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,34 @@
from ._turn import AssistantTurn, SystemTurn, Turn, UserTurn, user_turn
from ._utils import split_http_client_kwargs

# Models that support the new structured outputs API (output_format parameter)
# https://platform.claude.com/docs/en/build-with-claude/structured-outputs
STRUCTURED_OUTPUT_MODELS = {
"claude-sonnet-4-5",
"claude-opus-4-1",
"claude-opus-4-5",
"claude-haiku-4-5",
}

STRUCTURED_OUTPUTS_BETA = "structured-outputs-2025-11-13"


def use_beta_structured_output(model: str, data_model: Optional[type[BaseModel]]) -> bool:
"""
Check if we should use the beta structured outputs API.

Returns True if data_model is provided and the model supports the new API.
"""
if data_model is None:
return False
# Handle dated model versions like "claude-sonnet-4-5-20250514"
# by checking if any supported model is a prefix
for supported in STRUCTURED_OUTPUT_MODELS:
if model.startswith(supported):
return True
return False


if TYPE_CHECKING:
from anthropic.types import (
Message,
Expand Down Expand Up @@ -353,8 +381,14 @@ def chat_perform(
data_model: Optional[type[BaseModel]] = None,
kwargs: Optional["SubmitInputArgs"] = None,
):
kwargs = self._chat_perform_args(stream, turns, tools, data_model, kwargs)
return self._client.messages.create(**kwargs) # type: ignore
api_kwargs = self._chat_perform_args(stream, turns, tools, data_model, kwargs)
if use_beta_structured_output(self.model, data_model):
# Beta API has slightly different type signatures but is runtime compatible
return self._client.beta.messages.create(
betas=[STRUCTURED_OUTPUTS_BETA],
**api_kwargs, # type: ignore[arg-type]
)
return self._client.messages.create(**api_kwargs) # type: ignore

@overload
async def chat_perform_async(
Expand Down Expand Up @@ -387,8 +421,14 @@ async def chat_perform_async(
data_model: Optional[type[BaseModel]] = None,
kwargs: Optional["SubmitInputArgs"] = None,
):
kwargs = self._chat_perform_args(stream, turns, tools, data_model, kwargs)
return await self._async_client.messages.create(**kwargs) # type: ignore
api_kwargs = self._chat_perform_args(stream, turns, tools, data_model, kwargs)
if use_beta_structured_output(self.model, data_model):
# Beta API has slightly different type signatures but is runtime compatible
return await self._async_client.beta.messages.create(
betas=[STRUCTURED_OUTPUTS_BETA],
**api_kwargs, # type: ignore[arg-type]
)
return await self._async_client.messages.create(**api_kwargs) # type: ignore

def _chat_perform_args(
self,
Expand All @@ -398,42 +438,24 @@ def _chat_perform_args(
data_model: Optional[type[BaseModel]] = None,
kwargs: Optional["SubmitInputArgs"] = None,
) -> "SubmitInputArgs":
"""Build kwargs for the Anthropic messages API."""
tool_schemas = [self._anthropic_tool_schema(tool) for tool in tools.values()]

# If data extraction is requested, add a "mock" tool with parameters inferred from the data model
use_beta = use_beta_structured_output(self.model, data_model)
data_model_tool: Tool | None = None
if data_model is not None:

def _structured_tool_call(**kwargs: Any):
"""Extract structured data"""
pass

data_model_tool = Tool.from_func(_structured_tool_call)

data_model_schema = basemodel_to_param_schema(data_model)

# Extract $defs from the nested schema and place at top level
# JSON Schema $ref pointers like "#/$defs/..." need $defs at the root
defs = data_model_schema.pop("$defs", None)

params: dict[str, Any] = {
"type": "object",
"properties": {
"data": data_model_schema,
},
}
if defs:
params["$defs"] = defs

data_model_tool.schema["function"]["parameters"] = params

if data_model is not None and not use_beta:
# Fall back to the old tool-based approach for older models
data_model_tool = self.create_data_model_tool(data_model)
tool_schemas.append(self._anthropic_tool_schema(data_model_tool))

if stream:
stream = False
warnings.warn(
"Anthropic does not support structured data extraction in streaming mode.",
stacklevel=2,
"Anthropic does not support structured data extraction in "
"streaming mode for older models. Consider using a newer model "
f"like {', '.join(sorted(STRUCTURED_OUTPUT_MODELS))}.",
stacklevel=4,
)

kwargs_full: "SubmitInputArgs" = {
Expand All @@ -445,7 +467,17 @@ def _structured_tool_call(**kwargs: Any):
**(kwargs or {}),
}

if use_beta and data_model is not None:
# Use the new output_format parameter for structured outputs
from anthropic import transform_schema

kwargs_full["output_format"] = { # type: ignore[typeddict-unknown-key]
"type": "json_schema",
"schema": transform_schema(data_model),
}

if data_model_tool:
# Old approach: force tool use
kwargs_full["tool_choice"] = {
"type": "tool",
"name": data_model_tool.name,
Expand All @@ -463,6 +495,34 @@ def _structured_tool_call(**kwargs: Any):

return kwargs_full

def create_data_model_tool(self, data_model: type[BaseModel]) -> Tool:
"""Create a fake tool for structured data extraction (old approach)."""

def _structured_tool_call(**kwargs: Any):
"""Extract structured data"""
pass

data_model_tool = Tool.from_func(_structured_tool_call)

data_model_schema = basemodel_to_param_schema(data_model)

# Extract $defs from the nested schema and place at top level
# JSON Schema $ref pointers like "#/$defs/..." need $defs at the root
defs = data_model_schema.pop("$defs", None)

params: dict[str, Any] = {
"type": "object",
"properties": {
"data": data_model_schema,
},
}
if defs:
params["$defs"] = defs

data_model_tool.schema["function"]["parameters"] = params

return data_model_tool

def stream_text(self, chunk) -> Optional[str]:
if chunk.type == "content_block_delta":
if chunk.delta.type == "text_delta":
Expand Down Expand Up @@ -576,7 +636,7 @@ def _token_count_args(
) -> dict[str, Any]:
turn = user_turn(*args)

kwargs = self._chat_perform_args(
api_kwargs = self._chat_perform_args(
stream=False,
turns=[turn],
tools=tools,
Expand All @@ -591,7 +651,7 @@ def _token_count_args(
"tool_choice",
]

return {arg: kwargs[arg] for arg in args_to_keep if arg in kwargs}
return {arg: api_kwargs[arg] for arg in args_to_keep if arg in api_kwargs}

def translate_model_params(self, params: StandardModelParams) -> "SubmitInputArgs":
res: "SubmitInputArgs" = {}
Expand Down Expand Up @@ -753,11 +813,26 @@ def _anthropic_tool_schema(tool: "Tool | ToolBuiltIn") -> "ToolUnionParam":

def _as_turn(self, completion: Message, has_data_model=False) -> AssistantTurn:
contents = []

# Detect which structured output approach was used:
# - Old approach: has a _structured_tool_call tool_use block
# - New approach: has_data_model=True but no _structured_tool_call (JSON in text)
uses_old_tool_approach = has_data_model and any(
c.type == "tool_use" and c.name == "_structured_tool_call"
for c in completion.content
)
uses_new_output_format = has_data_model and not uses_old_tool_approach

for content in completion.content:
if content.type == "text":
contents.append(ContentText(text=content.text))
if uses_new_output_format:
# New API: JSON response is in text content
contents.append(ContentJson(value=orjson.loads(content.text)))
else:
contents.append(ContentText(text=content.text))
elif content.type == "tool_use":
if has_data_model and content.name == "_structured_tool_call":
if uses_old_tool_approach and content.name == "_structured_tool_call":
# Old API: extract from tool input
if not isinstance(content.input, dict):
raise ValueError(
"Expected data extraction tool to return a dictionary."
Expand Down Expand Up @@ -874,26 +949,30 @@ def batch_submit(
requests: list["BatchRequest"] = []

for i, turns in enumerate(conversations):
kwargs = self._chat_perform_args(
api_kwargs = self._chat_perform_args(
stream=False,
turns=turns,
tools={},
data_model=data_model,
)

params: "MessageCreateParamsNonStreaming" = {
"messages": kwargs.get("messages", {}),
"messages": api_kwargs.get("messages", {}),
"model": self.model,
"max_tokens": kwargs.get("max_tokens", 4096),
"max_tokens": api_kwargs.get("max_tokens", 4096),
}

# If data_model, tools/tool_choice should be present
tools = kwargs.get("tools")
tool_choice = kwargs.get("tool_choice")
# If data_model, tools/tool_choice should be present (old API)
# or output_format (new API)
tools = api_kwargs.get("tools")
tool_choice = api_kwargs.get("tool_choice")
output_format = api_kwargs.get("output_format")
if tools and not isinstance(tools, NotGiven):
params["tools"] = tools
if tool_choice and not isinstance(tool_choice, NotGiven):
params["tool_choice"] = tool_choice
if output_format and not isinstance(output_format, NotGiven):
params["output_format"] = output_format # type: ignore[typeddict-unknown-key]

requests.append({"custom_id": f"request-{i}", "params": params})

Expand Down
Loading