Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ ex_aws_bedrock-*.tar

# Temporary files, for example, from tests.
/tmp/

.env
149 changes: 149 additions & 0 deletions lib/ex_aws/bedrock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,155 @@ defmodule ExAws.Bedrock do
%{post | stream_builder: &EventStream.stream_objects!(post, nil, &1)}
end

@doc """
Sends messages to the specified Amazon Bedrock model using the Converse API.

`Converse` provides a consistent interface that works with all models that support messages,
allowing you to write code once and use it with different models. This is the recommended
API for chat models like Claude 3.x.

The request should include a map containing a `messages` list, and can optionally include
additional fields like `system`, `inferenceConfig`, `toolConfig`, etc.

## Example

request_body = %{
"messages" => [
%{"role" => "user", "content" => [%{"text" => "Hello, how are you?"}]}
],
"system" => [%{"text" => "You are a helpful assistant"}],
"max_tokens" => 500,
"anthropic_version" => "bedrock-2023-05-31"
}

request = ExAws.Bedrock.converse("us.anthropic.claude-3-7-sonnet-20250219-v1:0", request_body)
{:ok, response} = ExAws.Bedrock.request(request)
output_text = get_in(response, ["output", "message", "content", Access.at(0), "text"])

[AWS API Docs](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html)
"""
@spec converse(String.t(), map | struct) :: ExAws.Operation.JSON.t()
def converse(model_id, body) when is_binary(model_id) do
%ExAws.Operation.JSON{
data: body,
headers: @json_request_headers,
http_method: :post,
path: "/model/#{model_id}/converse",
service: :"bedrock-runtime"
}
end

@doc """
Sends messages to the specified Amazon Bedrock model using the ConverseStream API.

Similar to `converse/2`, but returns a streamed response that allows you to receive
chunks of the model's response in real time. This is ideal for displaying responses
incrementally as they're generated.

## Example - Basic Usage

request_body = %{
"messages" => [
%{"role" => "user", "content" => [%{"text" => "Hello, how are you?"}]}
],
"system" => [%{"text" => "You are a helpful assistant"}],
"max_tokens" => 500,
"anthropic_version" => "bedrock-2023-05-31"
}

stream = (
ExAws.Bedrock.converse_stream(model_id, request_body)
|> ExAws.Bedrock.stream!()
|> Stream.map(fn {:chunk, chunk} ->
# Process each chunk of the response
IO.write(get_in(chunk, ["delta", "text"]) || "")
end)
|> Enum.to_list()
)

## Example - Using Tools

system_prompt = "Always use the tool top_song."

model_id = "us.anthropic.claude-3-7-sonnet-20250219-v1:0"
anthropic_version = "bedrock-2023-05-31"
prompt = "Find the most popular song for me on the station WZPZ"

request_body = %{
anthropic_version: anthropic_version,
max_tokens: 500,
temperature: 0.5,
top_p: 0.9,
system: [%{text: system_prompt, type: "text"}],
messages: [
%{role: "user", content: [
%{text: prompt, type: "text"}
]}
],
toolConfig: %{
tools: [
%{
toolSpec: %{
name: "top_song",
description: "Get the most popular song played on a radio station.",
inputSchema: %{
json: %{
type: "object",
properties: %{
sign: %{
type: "string",
description: "The call sign for the radio station for which you want the most popular song. Example calls signs are WZPZ and WKRP."
}
},
required: [
"sign"
]
}
}
}
}
]
}
}

new_stream = ExAws.Bedrock.converse_stream(model_id, request_body)
stream = ExAws.Bedrock.stream!(new_stream)
for event <- stream do
IO.puts(inspect(event))
end

# Example output:
# {:chunk, %{"messageStart" => %{"role" => "assistant"}}}
# {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => "I'll"}}}}
# {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => " help"}}}}
# {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => " you find the most"}}}}
# {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => " popular song on"}}}}
# {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => " the radio station"}}}}
# {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => " WZPZ."}}}}
# {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => " Let"}}}}
# {:chunk, %{"contentBlockStop" => %{"contentBlockIndex" => 0}}}
# {:chunk, %{"contentBlockStart" => %{"contentBlockIndex" => 1, "start" => %{"toolUse" => %{"name" => "top_song", "toolUseId" => "tooluse_m5eQCV9vRCmgvHj6yF8zHQ"}}}}}
# {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 1, "delta" => %{"toolUse" => %{"input" => "{\"sign\": \"WZPZ\"}"}}}}}
# {:chunk, %{"contentBlockStop" => %{"contentBlockIndex" => 1}}}
# {:chunk, %{"messageStop" => %{"stopReason" => "tool_use"}}}
# {:chunk, %{"metadata" => %{"metrics" => %{"latencyMs" => 1940}, "usage" => %{"inputTokens" => 436, "outputTokens" => 66, "totalTokens" => 502}}}}

[AWS API Docs](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html)
"""
@spec converse_stream(String.t(), map | struct) :: ExAws.Operation.JSON.t()
def converse_stream(model_id, body) when is_binary(model_id) do
post =
%ExAws.Operation.JSON{
data: body,
headers: @json_request_headers,
http_method: :post,
path: "/model/#{model_id}/converse-stream",
service: :"bedrock-runtime"
}

%{post | stream_builder: &EventStream.stream_objects!(post, nil, &1)}
end

@doc """
List of Amazon Bedrock foundation models that you can use.

Expand Down
164 changes: 144 additions & 20 deletions lib/ex_aws/bedrock/event_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule ExAws.Bedrock.EventStream do

[AWS API Docs](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ResponseStream.html)
"""
def stream_objects!(%{service: service, data: data} = post_operation, _opts, config) do
def stream_objects!(%{service: service, data: data} = post_operation, opts, config) do
encoded_data = Jason.encode!(data)
url = build_request_url(post_operation, config)
config = Map.put(config, :service_override, :bedrock)
Expand All @@ -51,8 +51,11 @@ defmodule ExAws.Bedrock.EventStream do
encoded_data
)

# Extract HTTP options and build hackney options with timeout configurations
hackney_options = build_hackney_options(config, opts)

request_fun = fn [] ->
{:ok, ref} = :hackney.post(url, full_headers, encoded_data, @hackney_options)
{:ok, ref} = :hackney.post(url, full_headers, encoded_data, hackney_options)

receive do
{:hackney_response, ^ref, {:status, 200, _reason}} ->
Expand Down Expand Up @@ -153,8 +156,8 @@ defmodule ExAws.Bedrock.EventStream do
<<
message_total_length::unsigned-32,
headers_length::unsigned-32,
_prelude_checksum::unsigned-32,
_headers::binary-size(headers_length),
prelude_checksum::unsigned-32,
headers::binary-size(headers_length),
rest::binary
>> = data
)
Expand All @@ -165,43 +168,164 @@ defmodule ExAws.Bedrock.EventStream do
if byte_size(rest) >= body_length + @checksum_size do
<<
body::binary-size(body_length),
_message_checksum::unsigned-32,
message_checksum::unsigned-32,
next_data::binary
>> = rest

case process_chunk(body) do
{:ok, chunk} ->
{:ok, chunk, next_data}
prelude = <<message_total_length::unsigned-32, headers_length::unsigned-32>>

{:error, reason} ->
{:error, reason, next_data}
with :ok <- verify_prelude_checksum(prelude, prelude_checksum),
:ok <-
verify_message_checksum(prelude, prelude_checksum, headers, body, message_checksum),
{:ok, chunk} <- process_chunk(body) do
{:ok, chunk, next_data}
else
{:error, reason} -> {:error, reason, next_data}
end
else
:incomplete
end
end

defp parse_chunk(data) when byte_size(data) < @prelude_length do
# Not enough data to read prelude
:incomplete
end

defp parse_chunk(_data) do
# Invalid chunk
{:error, :invalid_chunk, <<>>}
end

defp process_chunk(body) do
with {:ok, %{"bytes" => bytes}} <- Jason.decode(body),
{:ok, json} <- Base.decode64(bytes),
{:ok, payload} <- Jason.decode(json) do
{:ok, payload}
defp verify_prelude_checksum(prelude, checksum) do
if crc32(prelude) == checksum do
:ok
else
{:error, error} ->
{:error, error}
{:error, :invalid_prelude_checksum}
end
end

defp verify_message_checksum(prelude, prelude_checksum, headers, body, checksum) do
message = prelude <> <<prelude_checksum::unsigned-32>> <> headers <> body

error ->
if crc32(message) == checksum do
:ok
else
{:error, :invalid_message_checksum}
end
end

defp crc32(data) do
:erlang.crc32(data)
end

defp process_chunk(body) do
case Jason.decode(body) do
# Format for invoke_model_with_response_stream
{:ok, %{"bytes" => bytes}} ->
with {:ok, json} <- Base.decode64(bytes),
{:ok, payload} <- Jason.decode(json) do
{:ok, payload}
end

# Process converse_stream events - simple direct transformation
{:ok, payload} when is_map(payload) ->
process_converse_chunk(payload)

# Format for other direct JSON without base64 encoding
{:ok, payload} ->
{:ok, payload}

{:error, error} ->
{:error, error}
end
end

@doc false
@spec process_converse_chunk(map()) :: {:ok, map()}
defp process_converse_chunk(payload) do
# First remove the "p" field which is internal metadata
payload = Map.drop(payload, ["p"])

# Map the AWS event format to the expected output structure
processed_payload =
case payload do
# Message start event
%{"role" => "assistant"} ->
%{"messageStart" => %{"role" => "assistant"}}

# Content block delta with text
%{"contentBlockIndex" => index, "delta" => %{"text" => text}} ->
%{"contentBlockDelta" => %{"delta" => %{"text" => text}, "contentBlockIndex" => index}}

# Content block delta with tool use
%{"contentBlockIndex" => index, "delta" => %{"toolUse" => tool_use}} ->
%{
"contentBlockDelta" => %{
"delta" => %{"toolUse" => tool_use},
"contentBlockIndex" => index
}
}

# Content block start
%{"contentBlockIndex" => index, "start" => start} ->
%{"contentBlockStart" => %{"start" => start, "contentBlockIndex" => index}}

# Content block stop
%{"contentBlockIndex" => index}
when not is_map_key(payload, "delta") and not is_map_key(payload, "start") ->
%{"contentBlockStop" => %{"contentBlockIndex" => index}}

# Message stop
%{"stopReason" => reason} ->
%{"messageStop" => %{"stopReason" => reason}}

# Metadata
%{"usage" => usage, "metrics" => metrics} ->
%{"metadata" => %{"usage" => usage, "metrics" => metrics}}

# Other events pass through unchanged
_ ->
payload
end

{:ok, processed_payload}
end

# Build hackney options by merging base options with HTTP timeout configurations
defp build_hackney_options(config, opts) do
# Start with base options
base_options = @hackney_options

# Extract HTTP options from ExAws config
http_opts = get_http_opts(config, opts)

# Extract timeout-related options and convert to hackney format
timeout_options = extract_timeout_options(http_opts)

# Merge all options, with timeout_options taking precedence
base_options ++ timeout_options
end

# Get HTTP options from config and opts, with opts taking precedence
defp get_http_opts(config, opts) do
config_http_opts = Map.get(config, :http_opts, [])

opts_http_opts =
case opts do
nil -> []
opts when is_list(opts) -> Keyword.get(opts, :http_opts, [])
_ -> []
end

# Merge config options with opts, opts taking precedence
Keyword.merge(config_http_opts, opts_http_opts)
end

# Extract timeout options that hackney understands
defp extract_timeout_options(http_opts) do
timeout_keys = [:connect_timeout, :recv_timeout, :timeout]

timeout_keys
|> Enum.filter(fn key -> Keyword.has_key?(http_opts, key) end)
|> Enum.map(fn key -> {key, Keyword.get(http_opts, key)} end)
end
end
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ defmodule ExAws.Bedrock.MixProject do

defp deps do
[
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
{:credo, "~> 1.7.12", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.0", only: [:dev, :test], runtime: false},
{:ex_aws, ">= 2.5.1"},
{:ex_aws, ">= 2.5.9"},
{:ex_doc, ">= 0.0.0", only: :dev},
{:hackney, ">= 0.0.0", only: [:dev, :test]},
{:jason, ">= 0.1.0", only: [:dev, :test]}
Expand Down
Loading
Loading