diff --git a/functions/pipes/anthropic/main.py b/functions/pipes/anthropic/main.py
index 44b8e8e..e2edfdd 100644
--- a/functions/pipes/anthropic/main.py
+++ b/functions/pipes/anthropic/main.py
@@ -1,11 +1,12 @@
"""
-title: Anthropic Manifold Pipe
-authors: justinh-rahb, christian-taillon, jfbloom22
-author_url: https://github.com/justinh-rahb
+title: Anthropic Manifold Pipe with Extended Thinking and Cache Control
+authors: justinh-rahb, christian-taillon, jfbloom22, Mark Kazakov, Vincent, NIK-NUB, cache control added by Snav
+author_url: https://github.com/jfbloom22
funding_url: https://github.com/open-webui
-version: 0.3.0
+version: 0.5.0
required_open_webui_version: 0.3.17
license: MIT
+description: An advanced manifold pipe for interacting with Anthropic's Claude models, featuring extended thinking support, cache control, beta features, and sophisticated model handling for Claude 4.5.
"""
import os
@@ -18,18 +19,46 @@
class Pipe:
+ CACHE_TTL = "1h"
+
class Valves(BaseModel):
- ANTHROPIC_API_KEY: str = Field(default="")
+ ANTHROPIC_API_KEY: str = Field(default="", description="Anthropic API Key")
+ CLAUDE_USE_TEMPERATURE: bool = Field(
+ default=True,
+ description="For Claude 4.x models (Opus 4, Sonnet 4.5, Haiku 3.5+): Use temperature (True) or top_p (False). Claude 4.x models only support one parameter.",
+ )
+ BETA_FEATURES: str = Field(
+ default="",
+ description="Enable Anthropic Beta Features. e.g.: context-management-2025-06-27",
+ )
+ ENABLE_THINKING: bool = Field(
+ default=True,
+ description="Enable Claude's extended thinking capabilities (Claude 4.5 Sonnet with thinking model only)",
+ )
+ THINKING_BUDGET: int = Field(
+ default=16000,
+ description="Maximum number of tokens Claude can use for thinking (min: 1024, max: 32000)",
+ )
+ DISPLAY_THINKING: bool = Field(
+ default=True, description="Display Claude's thinking process in the chat"
+ )
def __init__(self):
self.type = "manifold"
self.id = "anthropic"
self.name = "anthropic/"
self.valves = self.Valves(
- **{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "")}
+ **{
+ "ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""),
+ "CLAUDE_USE_TEMPERATURE": True, # Use temperature for Claude 4.x models
+ "BETA_FEATURES": "",
+ "ENABLE_THINKING": True,
+ "THINKING_BUDGET": 16000,
+ "DISPLAY_THINKING": True,
+ }
)
self.MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image
-
+
# Model cache
self._model_cache: Optional[List[Dict[str, str]]] = None
self._model_cache_time: float = 0
@@ -106,6 +135,105 @@ def get_anthropic_models(self) -> List[Dict[str, str]]:
"""
return self.get_anthropic_models_from_api()
+ def _attach_cache_control(self, block: dict):
+ """Attach cache control to a content block."""
+ if not isinstance(block, dict):
+ return block
+
+ # Skip block types that cannot be cached directly per Anthropic docs
+ if block.get("type") in {"thinking", "redacted_thinking"}:
+ return block
+
+ if not block.get("type"):
+ block["type"] = "text"
+ if "text" not in block:
+ block["text"] = ""
+
+ cache_control = dict(block.get("cache_control", {}))
+ cache_control["type"] = "ephemeral"
+ cache_control["ttl"] = self.CACHE_TTL
+ block["cache_control"] = cache_control
+ return block
+
+ def _normalize_content_blocks(self, raw_content):
+ """Normalize content into proper block format."""
+ blocks = []
+
+ if isinstance(raw_content, list):
+ items = raw_content
+ else:
+ items = [raw_content]
+
+ for item in items:
+ if isinstance(item, dict) and item.get("type"):
+ blocks.append(dict(item))
+ elif isinstance(item, dict) and "content" in item:
+ # Handle message-style dicts that still wrap content
+ blocks.extend(self._normalize_content_blocks(item["content"]))
+ elif item is not None:
+ blocks.append({"type": "text", "text": str(item)})
+
+ return blocks
+
+ def _prepare_system_blocks(self, system_message):
+ """Prepare system message with cache control."""
+ if not system_message:
+ return None
+
+ # Open WebUI may hand us a raw message dict, list of blocks, or plain text
+ content = (
+ system_message.get("content")
+ if isinstance(system_message, dict) and "content" in system_message
+ else system_message
+ )
+
+ normalized_blocks = self._normalize_content_blocks(content)
+ cached_blocks = [
+ self._attach_cache_control(block) for block in normalized_blocks
+ ]
+
+ return cached_blocks if cached_blocks else None
+
+ def _apply_cache_control_to_last_message(self, messages):
+ """Apply cache control to the last user message."""
+ if not messages:
+ return
+
+ last_message = messages[-1]
+ if last_message.get("role") != "user":
+ return
+
+ for block in reversed(last_message.get("content", [])):
+ if isinstance(block, dict) and block.get("type") not in {
+ "thinking",
+ "redacted_thinking",
+ }:
+ self._attach_cache_control(block)
+ break
+
+ def _is_claude_4x_model(self, model_name: str) -> bool:
+ """
+ Determine if a model is a Claude 4.x generation model that has temperature/top_p constraints.
+ Uses a more future-proof approach than simple prefix matching.
+
+ Args:
+ model_name: The model name to check
+
+ Returns:
+ True if this is a Claude 4.x model with constraints
+ """
+ import re
+
+ # Pattern to match Claude 4.x models with various version suffixes
+ # Examples: claude-opus-4, claude-opus-4-1-20250805, claude-sonnet-4-5, claude-sonnet-4-5-20250929, claude-3-5-haiku-latest
+ # The pattern allows for optional sub-versions (like -1, -5) and dates
+ pattern = r"^claude-(opus|sonnet|haiku)-4(?:-\d+)?(?:-\d{8})?$"
+
+ # Also match the latest haiku variants (claude-3-5-haiku-latest is actually Claude 4.x generation)
+ haiku_pattern = r"^claude-3-5-haiku"
+
+ return bool(re.match(pattern, model_name)) or bool(re.match(haiku_pattern, model_name))
+
def pipes(self) -> List[dict]:
return self.get_anthropic_models()
@@ -150,7 +278,7 @@ def pipe(self, body: dict) -> Union[str, Generator, Iterator]:
system_message, messages = pop_system_message(body["messages"])
processed_messages = []
- total_image_size = 0
+ total_image_size = 0.0
for message in messages:
processed_content = []
@@ -172,6 +300,20 @@ def pipe(self, body: dict) -> Union[str, Generator, Iterator]:
raise ValueError(
"Total size of images exceeds 100 MB limit"
)
+ elif item["type"] == "thinking" and "signature" in item:
+ # Include thinking blocks if present in the message
+ processed_content.append(
+ {
+ "type": "thinking",
+ "thinking": item["thinking"],
+ "signature": item["signature"],
+ }
+ )
+ elif item["type"] == "redacted_thinking" and "data" in item:
+ # Include redacted thinking blocks if present
+ processed_content.append(
+ {"type": "redacted_thinking", "data": item["data"]}
+ )
else:
processed_content = [
{"type": "text", "text": message.get("content", "")}
@@ -181,23 +323,71 @@ def pipe(self, body: dict) -> Union[str, Generator, Iterator]:
{"role": message["role"], "content": processed_content}
)
+ system_blocks = self._prepare_system_blocks(system_message)
+ self._apply_cache_control_to_last_message(processed_messages)
+
+ model_name = body["model"][body["model"].find(".") + 1 :]
+
+ # Check if this is a thinking model
+ is_thinking_model = model_name.endswith("-think")
+
+ # Remove the "-think" suffix for API call if present
+ api_model_name = (
+ model_name.replace("-think", "") if is_thinking_model else model_name
+ )
+
+ # Determine if thinking will be enabled
+ will_enable_thinking = (
+ self.valves.ENABLE_THINKING
+ and is_thinking_model
+ and "claude-sonnet-4-5" in model_name
+ )
+
payload = {
- "model": body["model"][body["model"].find(".") + 1 :],
+ "model": api_model_name,
"messages": processed_messages,
"max_tokens": body.get("max_tokens", 4096),
- "temperature": body.get("temperature", 0.8),
- "top_k": body.get("top_k", 40),
- "top_p": body.get("top_p", 0.9),
"stop_sequences": body.get("stop", []),
- **({"system": str(system_message)} if system_message else {}),
"stream": body.get("stream", False),
}
+ if system_blocks:
+ payload["system"] = system_blocks
+
+ # Only add top_k if thinking is NOT enabled
+ if not will_enable_thinking:
+ payload["top_k"] = body.get("top_k", 40)
+
+ # Add extended thinking for Claude 4.5 Sonnet with thinking
+ if will_enable_thinking:
+ # Ensure thinking budget is within reasonable limits (1024-32000 tokens)
+ thinking_budget = max(1024, min(32000, self.valves.THINKING_BUDGET))
+ payload["thinking"] = {"type": "enabled", "budget_tokens": thinking_budget}
+
+ # Handle temperature/top_p settings based on model generation
+ # Claude 4.x models only support either temperature OR top_p, not both
+ is_claude_4x_model = self._is_claude_4x_model(api_model_name)
+
+ if is_claude_4x_model:
+ if is_thinking_model:
+ # For thinking model, always use temperature = 1.0
+ payload["temperature"] = 1.0
+ elif self.valves.CLAUDE_USE_TEMPERATURE:
+ payload["temperature"] = body.get("temperature", 0.8)
+ else:
+ payload["top_p"] = body.get("top_p", 0.9)
+ else:
+ # Other Claude models support both temperature and top_p
+ payload["temperature"] = body.get("temperature", 0.8)
+ payload["top_p"] = body.get("top_p", 0.9)
+
headers = {
"x-api-key": self.valves.ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
+ if self.valves.BETA_FEATURES:
+ headers["anthropic-beta"] = self.valves.BETA_FEATURES
url = "https://api.anthropic.com/v1/messages"
@@ -214,14 +404,26 @@ def pipe(self, body: dict) -> Union[str, Generator, Iterator]:
return f"Error: {e}"
def stream_response(self, url, headers, payload):
+ """Handle streaming response with the OpenWebUI thinking tags."""
try:
with requests.post(
url, headers=headers, json=payload, stream=True, timeout=(3.05, 60)
) as response:
if response.status_code != 200:
- raise Exception(
- f"HTTP Error {response.status_code}: {response.text}"
- )
+ error_text = response.text
+ try:
+ error_json = response.json()
+ if "error" in error_json:
+ error_text = error_json["error"].get("message", error_text)
+ except:
+ pass
+ raise Exception(f"HTTP Error {response.status_code}: {error_text}")
+
+ thinking_content = ""
+ is_thinking_block = False
+ is_text_block = False
+ has_yielded_thinking = False
+ has_yielded_think_tag = False
for line in response.iter_lines():
if line:
@@ -229,13 +431,124 @@ def stream_response(self, url, headers, payload):
if line.startswith("data: "):
try:
data = json.loads(line[6:])
+
+ # Handle content block starts
if data["type"] == "content_block_start":
- yield data["content_block"]["text"]
+ block_type = data["content_block"].get("type", "")
+
+ # Handle thinking block start
+ if block_type == "thinking":
+ is_thinking_block = True
+ # Emit thinking start tag immediately
+ if (
+ not has_yielded_think_tag
+ and self.valves.DISPLAY_THINKING
+ ):
+ yield ""
+ has_yielded_think_tag = True
+
+ # Handle transition to text block
+ elif block_type == "text":
+ # If we were in a thinking block, close it before starting text
+ if is_thinking_block and has_yielded_think_tag:
+ yield ""
+ has_yielded_thinking = True
+
+ is_thinking_block = False
+ is_text_block = True
+
+ # For text blocks, yield the initial text if any
+ if (
+ "text" in data["content_block"]
+ and data["content_block"]["text"]
+ ):
+ yield data["content_block"]["text"]
+
+ # Handle redacted thinking block
+ elif (
+ block_type == "redacted_thinking"
+ and self.valves.DISPLAY_THINKING
+ ):
+ if not has_yielded_think_tag:
+ yield ""
+ has_yielded_think_tag = True
+ yield "[Redacted thinking content]"
+
+ # Handle block deltas
elif data["type"] == "content_block_delta":
- yield data["delta"]["text"]
+ delta = data["delta"]
+
+ # Stream thinking deltas with the thinking tag
+ if (
+ delta["type"] == "thinking_delta"
+ and is_thinking_block
+ and self.valves.DISPLAY_THINKING
+ ):
+ thinking_content += delta["thinking"]
+ yield delta["thinking"]
+
+ # Stream text deltas normally
+ elif (
+ delta["type"] == "text_delta" and is_text_block
+ ):
+ yield delta["text"]
+
+ # Handle block stops
+ elif data["type"] == "content_block_stop":
+ if is_thinking_block:
+ is_thinking_block = False
+ # Close thinking tag at the end of thinking block
+ if (
+ has_yielded_think_tag
+ and not has_yielded_thinking
+ ):
+ yield ""
+ has_yielded_thinking = True
+ elif is_text_block:
+ is_text_block = False
+
+ # Handle message stop
elif data["type"] == "message_stop":
+ # Make sure thinking tag is closed if needed
+ if (
+ has_yielded_think_tag
+ and not has_yielded_thinking
+ ):
+ yield ""
break
+
+ # Handle single message (non-streaming style response in stream)
elif data["type"] == "message":
+ has_thinking = False
+
+ # First check if there's thinking content
+ for content in data.get("content", []):
+ if (
+ content["type"] == "thinking"
+ or content["type"] == "redacted_thinking"
+ ) and self.valves.DISPLAY_THINKING:
+ has_thinking = True
+ break
+
+ # If there's thinking, handle it first
+ if has_thinking:
+ yield ""
+
+ for content in data.get("content", []):
+ if (
+ content["type"] == "thinking"
+ and self.valves.DISPLAY_THINKING
+ ):
+ yield content["thinking"]
+ elif (
+ content["type"] == "redacted_thinking"
+ and self.valves.DISPLAY_THINKING
+ ):
+ yield "[Redacted thinking content]"
+
+ yield ""
+
+ # Then yield all text blocks
for content in data.get("content", []):
if content["type"] == "text":
yield content["text"]
@@ -257,17 +570,51 @@ def stream_response(self, url, headers, payload):
yield f"Error: {e}"
def non_stream_response(self, url, headers, payload):
+ """Handle non-streaming response from Anthropic API, including thinking blocks."""
try:
response = requests.post(
url, headers=headers, json=payload, timeout=(3.05, 60)
)
if response.status_code != 200:
- raise Exception(f"HTTP Error {response.status_code}: {response.text}")
+ error_text = response.text
+ try:
+ error_json = response.json()
+ if "error" in error_json:
+ error_text = error_json["error"].get("message", error_text)
+ except:
+ pass
+ raise Exception(f"HTTP Error {response.status_code}: {error_text}")
res = response.json()
- return (
- res["content"][0]["text"] if "content" in res and res["content"] else ""
- )
+
+ if "content" not in res or not res["content"]:
+ return ""
+
+ has_thinking = False
+ thinking_content = ""
+ text_content = ""
+
+ # First organize content by type
+ for content_block in res["content"]:
+ if content_block["type"] == "thinking" and self.valves.DISPLAY_THINKING:
+ has_thinking = True
+ thinking_content += content_block["thinking"]
+ elif (
+ content_block["type"] == "redacted_thinking"
+ and self.valves.DISPLAY_THINKING
+ ):
+ has_thinking = True
+ thinking_content += "[Redacted thinking content]"
+ elif content_block["type"] == "text":
+ text_content += content_block["text"]
+
+ # Then construct the response with the tags
+ result = ""
+ if has_thinking:
+ result += f"{thinking_content}"
+
+ result += text_content
+ return result
except requests.exceptions.RequestException as e:
print(f"Failed non-stream request: {e}")
return f"Error: {e}"