Skip to content

Commit 53e1fac

Browse files
committed
feat(anthropic): enable interleaved thinking for agentic workflows
Add support for Claude to think between tool calls in agentic sessions: - Add anthropic-beta header (interleaved-thinking-2025-05-14) for Claude models - Track thinking block transitions in streaming accumulator - Handle standalone signature deltas in streaming pipeline - Close text blocks before starting new thinking blocks (enables interleaving) - Log interleaved thinking summary at stream completion Enables Claude Code to display thinking between tool executions.
1 parent 3fca535 commit 53e1fac

File tree

3 files changed

+89
-5
lines changed

3 files changed

+89
-5
lines changed

src/proxy_app/main.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,6 +1015,13 @@ async def anthropic_messages(
10151015
# Initialize logger if enabled
10161016
logger = DetailedLogger() if ENABLE_REQUEST_LOGGING else None
10171017

1018+
# Log if client is requesting interleaved thinking (informational)
1019+
anthropic_beta = request.headers.get("anthropic-beta", "")
1020+
if "interleaved-thinking" in anthropic_beta:
1021+
logging.getLogger("rotator_library").debug(
1022+
f"[Anthropic API] Client requested interleaved thinking: {anthropic_beta}"
1023+
)
1024+
10181025
try:
10191026
# Log the request to console
10201027
log_request_to_console(

src/rotator_library/anthropic_compat/streaming.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,26 @@ async def anthropic_streaming_wrapper(
163163
# Always capture signature if available (may come in later deltas)
164164
if thought_sig_from_delta and not thinking_signature:
165165
thinking_signature = thought_sig_from_delta
166+
logger.debug(
167+
f"[SIGNATURE] Streaming wrapper captured signature: "
168+
f"{thought_sig_from_delta[:50]}..."
169+
)
166170

167171
if reasoning_content:
168-
import logging
169-
logging.getLogger("rotator_library").debug(
170-
f"[Anthropic Stream] Sending thinking ({len(reasoning_content)} chars), sig={bool(thinking_signature)}"
172+
# Enhanced DEBUG logging for interleaved thinking verification
173+
context = "initial" if current_block_index == 0 else "interleaved"
174+
prev_block = "none"
175+
if thinking_block_started:
176+
prev_block = "thinking"
177+
elif content_block_started:
178+
prev_block = "text"
179+
elif tool_calls_by_index:
180+
prev_block = "tool_use"
181+
182+
logger.debug(
183+
f"[INTERLEAVED] Anthropic stream: block #{current_block_index}, "
184+
f"context={context}, prev={prev_block}, "
185+
f"chars={len(reasoning_content)}, sig={bool(thinking_signature)}"
171186
)
172187
if not thinking_block_started:
173188
# Close any open text block before starting a new thinking block
@@ -199,6 +214,10 @@ async def anthropic_streaming_wrapper(
199214
if content:
200215
# If we were in a thinking block, close it first
201216
if thinking_block_started and not content_block_started:
217+
logger.debug(
218+
f"[INTERLEAVED] Closing thinking block (idx={current_block_index}) for text, "
219+
f"has_sig={bool(thinking_signature)}"
220+
)
202221
# Send signature_delta if we have a signature
203222
if thinking_signature:
204223
sig_delta = {
@@ -238,6 +257,9 @@ async def anthropic_streaming_wrapper(
238257
if tc_index not in tool_calls_by_index:
239258
# Close previous thinking block if open
240259
if thinking_block_started:
260+
logger.debug(
261+
f"[INTERLEAVED] Closing thinking block (idx={current_block_index}) for tool_use"
262+
)
241263
# Send signature_delta if we have a signature
242264
if thinking_signature:
243265
sig_delta = {

src/rotator_library/providers/antigravity_provider.py

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3741,20 +3741,36 @@ def _gemini_to_openai_chunk(
37413741
if has_sig and is_thought and accumulator is not None:
37423742
accumulator["thought_signature"] = part["thoughtSignature"]
37433743

3744-
# Skip standalone signature parts
3744+
# Handle standalone signature parts - send them as a delta with just the signature
3745+
# This is critical for Claude Code to receive the signature for thinking blocks
37453746
if has_sig and not has_func and (not has_text or not part.get("text")):
3746-
continue
3747+
# Don't skip! Instead, we'll send a minimal delta with the signature below
3748+
# The signature will be included via the accumulator
3749+
pass # Continue to build delta below
37473750

37483751
if has_text:
37493752
text = part["text"]
37503753
if is_thought:
37513754
reasoning_content += text
37523755
if accumulator is not None:
37533756
accumulator["reasoning_content"] += text
3757+
# Track interleaved thinking for DEBUG logging
3758+
last_type = accumulator.get("last_content_type")
3759+
if last_type and last_type != "thinking":
3760+
accumulator["thinking_block_count"] = (
3761+
accumulator.get("thinking_block_count", 0) + 1
3762+
)
3763+
lib_logger.debug(
3764+
f"[INTERLEAVED] Thinking block "
3765+
f"#{accumulator['thinking_block_count']}: "
3766+
f"after={last_type}, chars={len(text)}"
3767+
)
3768+
accumulator["last_content_type"] = "thinking"
37543769
else:
37553770
text_content += text
37563771
if accumulator is not None:
37573772
accumulator["text_content"] += text
3773+
accumulator["last_content_type"] = "text"
37583774

37593775
if has_func:
37603776
# Get tool_schemas from accumulator for schema-aware parsing
@@ -3769,6 +3785,9 @@ def _gemini_to_openai_chunk(
37693785

37703786
tool_calls.append(tool_call)
37713787
tool_idx += 1
3788+
# Track tool call for interleaved thinking detection
3789+
if accumulator is not None:
3790+
accumulator["last_content_type"] = "tool_call"
37723791

37733792
# Build delta
37743793
delta = {}
@@ -3777,8 +3796,19 @@ def _gemini_to_openai_chunk(
37773796
if reasoning_content:
37783797
delta["reasoning_content"] = reasoning_content
37793798
# Include thought_signature if available (from accumulator)
3799+
# The signature arrives at the END of thinking, so we include it
3800+
# with EVERY reasoning delta once captured - streaming wrapper
3801+
# will capture it and use it when closing the thinking block
37803802
if accumulator and accumulator.get("thought_signature"):
37813803
delta["thought_signature"] = accumulator["thought_signature"]
3804+
# Send signature-only delta when signature arrives without content
3805+
# This ensures the streaming wrapper receives the signature
3806+
elif accumulator and accumulator.get("thought_signature"):
3807+
# Check if we just captured a new signature (standalone signature part)
3808+
sig = accumulator.get("thought_signature")
3809+
if sig and not text_content and not tool_calls:
3810+
delta["thought_signature"] = sig
3811+
delta["role"] = "assistant"
37823812
if tool_calls:
37833813
delta["tool_calls"] = tool_calls
37843814
delta["role"] = "assistant"
@@ -4284,6 +4314,20 @@ async def acompletion(
42844314
**ANTIGRAVITY_HEADERS,
42854315
}
42864316

4317+
# Add interleaved thinking header for Claude thinking models
4318+
# This enables thinking between tool calls for agentic workflows
4319+
if self._is_claude(model) and reasoning_effort and reasoning_effort != "disable":
4320+
interleaved_header = "interleaved-thinking-2025-05-14"
4321+
existing_beta = headers.get("anthropic-beta", "")
4322+
if existing_beta:
4323+
if interleaved_header not in existing_beta:
4324+
headers["anthropic-beta"] = f"{existing_beta},{interleaved_header}"
4325+
else:
4326+
headers["anthropic-beta"] = interleaved_header
4327+
lib_logger.debug(
4328+
f"[Antigravity] Added interleaved thinking header for {model}"
4329+
)
4330+
42874331
# Track malformed call retries (separate from empty response retries)
42884332
malformed_retry_count = 0
42894333
# Keep a mutable reference to gemini_contents for retry injection
@@ -4620,6 +4664,9 @@ async def _handle_streaming(
46204664
"tool_schemas": tool_schemas, # For schema-aware JSON string parsing
46214665
"malformed_call": None, # Track MALFORMED_FUNCTION_CALL if detected
46224666
"response_id": None, # Track original response ID for synthetic chunks
4667+
# Interleaved thinking tracking for DEBUG logging
4668+
"thinking_block_count": 0, # Count of thinking block transitions
4669+
"last_content_type": None, # Track: "thinking", "text", "tool_call"
46234670
}
46244671

46254672
async with client.stream(
@@ -4706,6 +4753,14 @@ async def _handle_streaming(
47064753
final_chunk["usage"] = accumulator["last_usage"]
47074754
yield litellm.ModelResponse(**final_chunk)
47084755

4756+
# Log interleaved thinking summary at stream completion
4757+
thinking_block_count = accumulator.get("thinking_block_count", 0)
4758+
if thinking_block_count > 0:
4759+
lib_logger.info(
4760+
f"[Antigravity] Stream completed with {thinking_block_count} "
4761+
f"interleaved thinking block(s) for {model}"
4762+
)
4763+
47094764
# Cache Claude thinking after stream completes
47104765
if (
47114766
self._is_claude(model)

0 commit comments

Comments
 (0)