Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/ready-hands-share.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@roo-code/types": minor
"roo-cline": minor
---

Add Anthropic Batch API support with 50% cost savings toggle. Users can now enable async batch processing for Anthropic API requests, reducing costs by 50% with a simple settings toggle.
1 change: 1 addition & 0 deletions packages/types/src/provider-settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ const anthropicSchema = apiModelIdProviderModelSchema.extend({
anthropicBaseUrl: z.string().optional(),
anthropicUseAuthToken: z.boolean().optional(),
anthropicBeta1MContext: z.boolean().optional(), // Enable 'context-1m-2025-08-07' beta for 1M context window.
anthropicUseBatchApi: z.boolean().optional(), // Enable batch API for 50% cost savings (async processing)
})

const claudeCodeSchema = apiModelIdProviderModelSchema.extend({
Expand Down
327 changes: 237 additions & 90 deletions src/api/providers/anthropic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ import { getModelParams } from "../transform/model-params"

import { BaseProvider } from "./base-provider"
import type { SingleCompletionHandler, ApiHandlerCreateMessageMetadata } from "../index"
import { calculateApiCostAnthropic } from "../../shared/cost"
import { calculateApiCostAnthropic, applyBatchApiDiscount } from "../../shared/cost"

// Batch API polling configuration
const BATCH_POLL_INTERVAL_MS = 5000 // Poll every 5 seconds
const BATCH_MAX_POLL_TIME_MS = 600000 // Max 10 minutes polling
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout constant is set to 10 minutes (600,000ms), but the PR description states "max 5 minutes timeout". This discrepancy between code and documentation could confuse users about the actual timeout behavior. Either update the constant to match the documented 5 minutes (300000) or update the PR description to reflect the 10-minute timeout.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this in the PR comment, although i don't think its that big of an issue really


export class AnthropicHandler extends BaseProvider implements SingleCompletionHandler {
private options: ApiHandlerOptions
Expand All @@ -36,12 +40,67 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa
})
}

/**
* Models that support prompt caching
*/
private supportsPromptCaching(modelId: string): boolean {
return [
"claude-sonnet-4-5",
"claude-sonnet-4-20250514",
"claude-opus-4-1-20250805",
"claude-opus-4-20250514",
"claude-3-7-sonnet-20250219",
"claude-3-5-sonnet-20241022",
"claude-3-5-haiku-20241022",
"claude-3-opus-20240229",
"claude-3-haiku-20240307",
].includes(modelId)
}

/**
* Applies cache control to messages for prompt caching
*/
private applyCacheBreakpoints(
messages: Anthropic.Messages.MessageParam[],
cacheControl: CacheControlEphemeral,
): Anthropic.Messages.MessageParam[] {
const userMsgIndices = messages.reduce(
(acc, msg, index) => (msg.role === "user" ? [...acc, index] : acc),
[] as number[],
)

const lastUserMsgIndex = userMsgIndices[userMsgIndices.length - 1] ?? -1
const secondLastMsgUserIndex = userMsgIndices[userMsgIndices.length - 2] ?? -1

return messages.map((message, index) => {
if (index === lastUserMsgIndex || index === secondLastMsgUserIndex) {
return {
...message,
content:
typeof message.content === "string"
? [{ type: "text" as const, text: message.content, cache_control: cacheControl }]
: message.content.map((content, contentIndex) =>
contentIndex === message.content.length - 1
? { ...content, cache_control: cacheControl }
: content,
),
}
}
return message
})
}

async *createMessage(
systemPrompt: string,
messages: Anthropic.Messages.MessageParam[],
metadata?: ApiHandlerCreateMessageMetadata,
): ApiStream {
let stream: AnthropicStream<Anthropic.Messages.RawMessageStreamEvent>
// Use batch API if enabled (50% cost savings, async processing)
if (this.options.anthropicUseBatchApi) {
yield* this.createBatchMessage(systemPrompt, messages, metadata)
return
}

const cacheControl: CacheControlEphemeral = { type: "ephemeral" }
let { id: modelId, betas = [], maxTokens, temperature, reasoning: thinking } = this.getModel()

Expand All @@ -53,98 +112,42 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa
betas.push("context-1m-2025-08-07")
}

switch (modelId) {
case "claude-sonnet-4-5":
case "claude-sonnet-4-20250514":
case "claude-opus-4-1-20250805":
case "claude-opus-4-20250514":
case "claude-3-7-sonnet-20250219":
case "claude-3-5-sonnet-20241022":
case "claude-3-5-haiku-20241022":
case "claude-3-opus-20240229":
case "claude-haiku-4-5-20251001":
case "claude-3-haiku-20240307": {
/**
* The latest message will be the new user message, one before
* will be the assistant message from a previous request, and
* the user message before that will be a previously cached user
* message. So we need to mark the latest user message as
* ephemeral to cache it for the next request, and mark the
* second to last user message as ephemeral to let the server
* know the last message to retrieve from the cache for the
* current request.
*/
const userMsgIndices = messages.reduce(
(acc, msg, index) => (msg.role === "user" ? [...acc, index] : acc),
[] as number[],
)

const lastUserMsgIndex = userMsgIndices[userMsgIndices.length - 1] ?? -1
const secondLastMsgUserIndex = userMsgIndices[userMsgIndices.length - 2] ?? -1

stream = await this.client.messages.create(
{
model: modelId,
max_tokens: maxTokens ?? ANTHROPIC_DEFAULT_MAX_TOKENS,
temperature,
thinking,
// Setting cache breakpoint for system prompt so new tasks can reuse it.
system: [{ text: systemPrompt, type: "text", cache_control: cacheControl }],
messages: messages.map((message, index) => {
if (index === lastUserMsgIndex || index === secondLastMsgUserIndex) {
return {
...message,
content:
typeof message.content === "string"
? [{ type: "text", text: message.content, cache_control: cacheControl }]
: message.content.map((content, contentIndex) =>
contentIndex === message.content.length - 1
? { ...content, cache_control: cacheControl }
: content,
),
}
}
return message
}),
stream: true,
},
(() => {
// prompt caching: https://x.com/alexalbert__/status/1823751995901272068
// https://github.com/anthropics/anthropic-sdk-typescript?tab=readme-ov-file#default-headers
// https://github.com/anthropics/anthropic-sdk-typescript/commit/c920b77fc67bd839bfeb6716ceab9d7c9bbe7393

// Then check for models that support prompt caching
switch (modelId) {
case "claude-sonnet-4-5":
case "claude-sonnet-4-20250514":
case "claude-opus-4-1-20250805":
case "claude-opus-4-20250514":
case "claude-3-7-sonnet-20250219":
case "claude-3-5-sonnet-20241022":
case "claude-3-5-haiku-20241022":
case "claude-3-opus-20240229":
case "claude-haiku-4-5-20251001":
case "claude-3-haiku-20240307":
betas.push("prompt-caching-2024-07-31")
return { headers: { "anthropic-beta": betas.join(",") } }
default:
return undefined
}
})(),
)
break
}
default: {
stream = (await this.client.messages.create({
let stream: AnthropicStream<Anthropic.Messages.RawMessageStreamEvent>

if (this.supportsPromptCaching(modelId)) {
/**
* The latest message will be the new user message, one before
* will be the assistant message from a previous request, and
* the user message before that will be a previously cached user
* message. So we need to mark the latest user message as
* ephemeral to cache it for the next request, and mark the
* second to last user message as ephemeral to let the server
* know the last message to retrieve from the cache for the
* current request.
*/
betas.push("prompt-caching-2024-07-31")

stream = await this.client.messages.create(
{
model: modelId,
max_tokens: maxTokens ?? ANTHROPIC_DEFAULT_MAX_TOKENS,
temperature,
system: [{ text: systemPrompt, type: "text" }],
messages,
thinking,
system: [{ text: systemPrompt, type: "text", cache_control: cacheControl }],
messages: this.applyCacheBreakpoints(messages, cacheControl),
stream: true,
})) as any
break
}
},
{ headers: { "anthropic-beta": betas.join(",") } },
)
} else {
stream = (await this.client.messages.create({
model: modelId,
max_tokens: maxTokens ?? ANTHROPIC_DEFAULT_MAX_TOKENS,
temperature,
system: [{ text: systemPrompt, type: "text" }],
messages,
stream: true,
})) as any
}

let inputTokens = 0
Expand Down Expand Up @@ -266,6 +269,11 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa
}
}

// Apply 50% discount for Batch API (applies after 1M context pricing if both enabled)
if (this.options.anthropicUseBatchApi) {
info = applyBatchApiDiscount(info)
}

const params = getModelParams({
format: "anthropic",
modelId: id,
Expand Down Expand Up @@ -301,6 +309,145 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa
return content?.type === "text" ? content.text : ""
}

/**
* Creates a message using the Batch API for 50% cost savings.
* This method handles the async batch job lifecycle: create, poll, and retrieve results.
*/
private async *createBatchMessage(
Comment on lines +312 to +316
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new Batch API functionality lacks test coverage. Given the complexity of the batch processing lifecycle (polling, timeout handling, result retrieval) and the potential for errors at each stage, this code should have comprehensive tests. Consider adding tests that cover:

  • Successful batch processing with prompt caching enabled
  • Successful batch processing without prompt caching
  • Batch timeout scenarios
  • Batch expiration/cancellation scenarios
  • Error handling in batch results
  • Verification that the 50% cost discount is applied correctly
  • Verification that beta headers are included when needed

The existing anthropic.spec.ts provides a good pattern to follow for mocking the SDK's batch API methods.

systemPrompt: string,
messages: Anthropic.Messages.MessageParam[],
metadata?: ApiHandlerCreateMessageMetadata,
): ApiStream {
const cacheControl: CacheControlEphemeral = { type: "ephemeral" }
let { id: modelId, betas = [], maxTokens, temperature, reasoning: thinking } = this.getModel()

// Add 1M context beta flag if enabled for Claude Sonnet 4 and 4.5
if (
(modelId === "claude-sonnet-4-20250514" || modelId === "claude-sonnet-4-5") &&
this.options.anthropicBeta1MContext
) {
betas.push("context-1m-2025-08-07")
}
Comment on lines +324 to +330
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prompt caching beta header is missing when creating batch requests with prompt caching support. When supportsPromptCaching(modelId) returns true, the "prompt-caching-2024-07-31" beta should be added to the betas array (similar to line 128 in the streaming path). Without this header, prompt caching won't work correctly in batch mode even though cache breakpoints are being added to the messages.

Suggested change
// Add 1M context beta flag if enabled for Claude Sonnet 4 and 4.5
if (
(modelId === "claude-sonnet-4-20250514" || modelId === "claude-sonnet-4-5") &&
this.options.anthropicBeta1MContext
) {
betas.push("context-1m-2025-08-07")
}
// Add 1M context beta flag if enabled for Claude Sonnet 4 and 4.5
if (
(modelId === "claude-sonnet-4-20250514" || modelId === "claude-sonnet-4-5") &&
this.options.anthropicBeta1MContext
) {
betas.push("context-1m-2025-08-07")
}
// Add prompt caching beta if model supports it
if (this.supportsPromptCaching(modelId)) {
betas.push("prompt-caching-2024-07-31")
}


// Add prompt caching beta if model supports it
if (this.supportsPromptCaching(modelId)) {
betas.push("prompt-caching-2024-07-31")
}

// Notify user about batch processing
yield {
type: "text",
text: "⏳ **Using Batch API (50% cost savings)** - Processing request asynchronously, this may take a moment...\n\n",
}

// Prepare request with cache breakpoints if supported
const processedMessages = this.supportsPromptCaching(modelId)
? this.applyCacheBreakpoints(messages, cacheControl)
: messages

const batchRequest: Anthropic.Messages.MessageCreateParamsNonStreaming = {
model: modelId,
max_tokens: maxTokens ?? ANTHROPIC_DEFAULT_MAX_TOKENS,
temperature,
thinking,
system: this.supportsPromptCaching(modelId)
? [{ text: systemPrompt, type: "text", cache_control: cacheControl }]
: [{ text: systemPrompt, type: "text" }],
messages: processedMessages,
}

// Create batch job with beta headers if needed
const batchOptions = betas.length > 0 ? { headers: { "anthropic-beta": betas.join(",") } } : undefined
const batch = await this.client.messages.batches.create(
{
requests: [
{
// Using Date.now() is sufficient since we only send one request per batch
// If we support multiple requests per batch in the future, consider using crypto.randomUUID()
custom_id: `req_${Date.now()}`,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The custom_id uses only Date.now() which could theoretically cause collisions if multiple batch requests are initiated in the same millisecond (e.g., in high-concurrency scenarios or automated testing). While unlikely in typical usage, a more robust approach would include additional entropy to guarantee uniqueness.

Suggested change
custom_id: `req_${Date.now()}`,
custom_id: `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,

params: batchRequest,
},
],
},
batchOptions,
)

// Poll for batch completion (silently)
const startTime = Date.now()
let completedBatch: Anthropic.Beta.Messages.Batches.BetaMessageBatch | null = null

while (Date.now() - startTime < BATCH_MAX_POLL_TIME_MS) {
const status = await this.client.messages.batches.retrieve(batch.id)

if (status.processing_status === "ended") {
completedBatch = status
break
}

// Only fail on truly failed states; continue polling for all valid transitional states
// Note: SDK types may not include all possible states, so we check the actual string value
const statusStr = status.processing_status as string
if (statusStr === "errored" || statusStr === "expired" || statusStr === "canceled") {
throw new Error(`Batch processing failed with status: ${status.processing_status}`)
}

// Wait before next poll
await new Promise((resolve) => setTimeout(resolve, BATCH_POLL_INTERVAL_MS))
}

if (!completedBatch) {
throw new Error("Batch processing timeout exceeded")
}

// Retrieve results
const results = await this.client.messages.batches.results(batch.id)

// Process results
for await (const result of results) {
if (result.result.type === "succeeded") {
const message = result.result.message

// Yield content blocks
for (const content of message.content) {
if (content.type === "text") {
yield { type: "text", text: content.text }
} else if (content.type === "thinking") {
yield { type: "reasoning", text: content.thinking }
}
}

// Yield usage information
const usage = message.usage
yield {
type: "usage",
inputTokens: usage.input_tokens || 0,
outputTokens: usage.output_tokens || 0,
cacheWriteTokens: usage.cache_creation_input_tokens || undefined,
cacheReadTokens: usage.cache_read_input_tokens || undefined,
}

// Calculate and yield cost
yield {
type: "usage",
inputTokens: 0,
outputTokens: 0,
totalCost: calculateApiCostAnthropic(
this.getModel().info,
usage.input_tokens || 0,
usage.output_tokens || 0,
usage.cache_creation_input_tokens || 0,
usage.cache_read_input_tokens || 0,
),
}
} else if (result.result.type === "errored") {
const error = result.result.error
// ErrorResponse only has 'type' field in SDK types, but may have 'message' at runtime
const errorDetails = JSON.stringify(error)
throw new Error(`Batch request failed: ${error.type} - ${errorDetails}`)
}
}
}

/**
* Counts tokens for the given content using Anthropic's API
*
Expand Down
Loading