From 5954495d6043bdad528b20689c211e03462de0a4 Mon Sep 17 00:00:00 2001 From: Ole Spaarmann Date: Mon, 5 May 2025 15:07:54 +0200 Subject: [PATCH 01/10] Add support for Converse and ConverseStream actions. --- lib/ex_aws/bedrock.ex | 81 +++++++++++++++++++++++++++++ test/ex_aws/bedrock_stream_test.exs | 59 +++++++++++++++++++++ test/ex_aws/bedrock_test.exs | 69 ++++++++++++++++++++++++ 3 files changed, 209 insertions(+) diff --git a/lib/ex_aws/bedrock.ex b/lib/ex_aws/bedrock.ex index 3df9424..8f96df2 100644 --- a/lib/ex_aws/bedrock.ex +++ b/lib/ex_aws/bedrock.ex @@ -118,6 +118,87 @@ defmodule ExAws.Bedrock do %{post | stream_builder: &EventStream.stream_objects!(post, nil, &1)} end + @doc """ + Sends messages to the specified Amazon Bedrock model using the Converse API. + + `Converse` provides a consistent interface that works with all models that support messages, + allowing you to write code once and use it with different models. This is the recommended + API for chat models like Claude 3.x. + + The request should include a map containing a `messages` list, and can optionally include + additional fields like `system`, `inferenceConfig`, `toolConfig`, etc. + + ## Example + + request_body = %{ + "messages" => [ + %{"role" => "user", "content" => [%{"text" => "Hello, how are you?"}]} + ], + "system" => [%{"text" => "You are a helpful assistant"}], + "inferenceConfig" => %{ + "maxTokens" => 1000, + "temperature" => 0.7 + } + } + + request = ExAws.Bedrock.converse("anthropic.claude-3-sonnet-20240229-v1", request_body) + {:ok, response} = ExAws.Bedrock.request(request) + output_text = get_in(response, ["output", "message", "content", Access.at(0), "text"]) + + [AWS API Docs](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html) + """ + @spec converse(String.t(), map | struct) :: ExAws.Operation.JSON.t() + def converse(model_id, body) when is_binary(model_id) do + %ExAws.Operation.JSON{ + data: body, + headers: @json_request_headers, + http_method: :post, + path: "/model/#{model_id}/converse", + service: :"bedrock-runtime" + } + end + + @doc """ + Sends messages to the specified Amazon Bedrock model using the ConverseStream API. + + Similar to `converse/2`, but returns a streamed response that allows you to receive + chunks of the model's response in real time. This is ideal for displaying responses + incrementally as they're generated. + + ## Example + + request_body = %{ + "messages" => [ + %{"role" => "user", "content" => [%{"text" => "Write a short story about a robot."}]} + ] + } + + stream = ( + ExAws.Bedrock.converse_stream("anthropic.claude-3-sonnet-20240229-v1", request_body) + |> ExAws.Bedrock.stream!() + |> Stream.map(fn chunk -> + # Process each chunk of the response + IO.write(get_in(chunk, [:body, "chunk", "bytes"]) || "") + end) + |> Enum.to_list() + ) + + [AWS API Docs](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html) + """ + @spec converse_stream(String.t(), map | struct) :: ExAws.Operation.JSON.t() + def converse_stream(model_id, body) when is_binary(model_id) do + post = + %ExAws.Operation.JSON{ + data: body, + headers: @json_request_headers, + http_method: :post, + path: "/model/#{model_id}/converse-stream", + service: :"bedrock-runtime" + } + + %{post | stream_builder: &EventStream.stream_objects!(post, nil, &1)} + end + @doc """ List of Amazon Bedrock foundation models that you can use. diff --git a/test/ex_aws/bedrock_stream_test.exs b/test/ex_aws/bedrock_stream_test.exs index baf6f6d..63293db 100644 --- a/test/ex_aws/bedrock_stream_test.exs +++ b/test/ex_aws/bedrock_stream_test.exs @@ -46,4 +46,63 @@ defmodule ExAws.Bedrock.InvokeModelStreamTest do {:ok, [model_id: model_id, request: request]} end end + + describe "converse_stream/2 text" do + @tag :aws + test "against AWS", %{request: request} do + response = + request + |> stream!() + |> Enum.to_list() + + assert [%{} | _] = response + end + + test "content type is JSON", %{request: request} do + assert %JSON{headers: headers} = request + assert {_, "application/json"} = List.keyfind(headers, "Content-Type", 0) + end + + test "http post", %{request: request} do + assert %{http_method: :post} = request + end + + test "path", %{model_id: model_id, request: request} do + expected_path = "/model/#{model_id}/converse-stream" + assert %JSON{path: ^expected_path} = request + end + + test "service", %{request: request} do + assert %JSON{service: :"bedrock-runtime"} = request + end + + test "has stream_builder", %{request: request} do + assert %{stream_builder: stream_builder} = request + assert is_function(stream_builder) + end + + setup do + model_id = "anthropic.claude-3-sonnet-20240229-v1" + + request_body = %{ + "messages" => [ + %{ + "role" => "user", + "content" => [ + %{ + "text" => "Write a short paragraph about functional programming." + } + ] + } + ], + "inferenceConfig" => %{ + "maxTokens" => 500, + "temperature" => 0.7 + } + } + + request = Bedrock.converse_stream(model_id, request_body) + {:ok, [model_id: model_id, request: request]} + end + end end diff --git a/test/ex_aws/bedrock_test.exs b/test/ex_aws/bedrock_test.exs index 184d929..6ffd1fb 100644 --- a/test/ex_aws/bedrock_test.exs +++ b/test/ex_aws/bedrock_test.exs @@ -188,4 +188,73 @@ defmodule ExAws.BedrockTest do {:ok, [request: request]} end end + + describe "converse/2" do + test "content type is JSON", %{request: request} do + assert %JSON{headers: headers} = request + assert {_, "application/json"} = List.keyfind(headers, "Content-Type", 0) + end + + test "http post", %{request: request} do + assert %JSON{http_method: :post} = request + end + + test "path", %{request: request} do + assert %JSON{path: "/model/anthropic.claude-3-sonnet-20240229-v1/converse"} = request + end + + test "service", %{request: request} do + assert %JSON{service: :"bedrock-runtime"} = request + end + + setup do + model_id = "anthropic.claude-3-sonnet-20240229-v1" + + request_body = %{ + "messages" => [ + %{"role" => "user", "content" => [%{"text" => "Hello"}]} + ] + } + + request = Bedrock.converse(model_id, request_body) + {:ok, [request: request, model_id: model_id]} + end + end + + describe "converse_stream/2" do + test "content type is JSON", %{request: request} do + assert %JSON{headers: headers} = request + assert {_, "application/json"} = List.keyfind(headers, "Content-Type", 0) + end + + test "http post", %{request: request} do + assert %JSON{http_method: :post} = request + end + + test "path", %{request: request} do + assert %JSON{path: "/model/anthropic.claude-3-sonnet-20240229-v1/converse-stream"} = request + end + + test "service", %{request: request} do + assert %JSON{service: :"bedrock-runtime"} = request + end + + test "has stream_builder", %{request: request} do + assert %JSON{stream_builder: stream_builder} = request + assert is_function(stream_builder) + end + + setup do + model_id = "anthropic.claude-3-sonnet-20240229-v1" + + request_body = %{ + "messages" => [ + %{"role" => "user", "content" => [%{"text" => "Hello"}]} + ] + } + + request = Bedrock.converse_stream(model_id, request_body) + {:ok, [request: request, model_id: model_id]} + end + end end From 939910f3f8a2e4dff942bd4a6cd7dda3c16645d9 Mon Sep 17 00:00:00 2001 From: Ole Spaarmann Date: Mon, 5 May 2025 17:07:37 +0200 Subject: [PATCH 02/10] Adds support for using crc32 checksums. --- lib/ex_aws/bedrock/event_stream.ex | 43 +++++++++++---- test/ex_aws/bedrock/event_stream_test.exs | 66 ++++++++++++++++++++++- 2 files changed, 98 insertions(+), 11 deletions(-) diff --git a/lib/ex_aws/bedrock/event_stream.ex b/lib/ex_aws/bedrock/event_stream.ex index f8c93d9..80e80df 100644 --- a/lib/ex_aws/bedrock/event_stream.ex +++ b/lib/ex_aws/bedrock/event_stream.ex @@ -153,8 +153,8 @@ defmodule ExAws.Bedrock.EventStream do << message_total_length::unsigned-32, headers_length::unsigned-32, - _prelude_checksum::unsigned-32, - _headers::binary-size(headers_length), + prelude_checksum::unsigned-32, + headers::binary-size(headers_length), rest::binary >> = data ) @@ -165,16 +165,19 @@ defmodule ExAws.Bedrock.EventStream do if byte_size(rest) >= body_length + @checksum_size do << body::binary-size(body_length), - _message_checksum::unsigned-32, + message_checksum::unsigned-32, next_data::binary >> = rest - case process_chunk(body) do - {:ok, chunk} -> - {:ok, chunk, next_data} + prelude = <> - {:error, reason} -> - {:error, reason, next_data} + with :ok <- verify_prelude_checksum(prelude, prelude_checksum), + :ok <- + verify_message_checksum(prelude, prelude_checksum, headers, body, message_checksum), + {:ok, chunk} <- process_chunk(body) do + {:ok, chunk, next_data} + else + {:error, reason} -> {:error, reason, next_data} end else :incomplete @@ -182,15 +185,35 @@ defmodule ExAws.Bedrock.EventStream do end defp parse_chunk(data) when byte_size(data) < @prelude_length do - # Not enough data to read prelude :incomplete end defp parse_chunk(_data) do - # Invalid chunk {:error, :invalid_chunk, <<>>} end + defp verify_prelude_checksum(prelude, checksum) do + if crc32(prelude) == checksum do + :ok + else + {:error, :invalid_prelude_checksum} + end + end + + defp verify_message_checksum(prelude, prelude_checksum, headers, body, checksum) do + message = prelude <> <> <> headers <> body + + if crc32(message) == checksum do + :ok + else + {:error, :invalid_message_checksum} + end + end + + defp crc32(data) do + :erlang.crc32(data) + end + defp process_chunk(body) do with {:ok, %{"bytes" => bytes}} <- Jason.decode(body), {:ok, json} <- Base.decode64(bytes), diff --git a/test/ex_aws/bedrock/event_stream_test.exs b/test/ex_aws/bedrock/event_stream_test.exs index 3a1b775..39a0fd0 100644 --- a/test/ex_aws/bedrock/event_stream_test.exs +++ b/test/ex_aws/bedrock/event_stream_test.exs @@ -3,6 +3,11 @@ defmodule ExAws.Bedrock.EventStreamTest do alias ExAws.Bedrock.EventStream + @uint32_size 4 + @checksum_size 4 + @prelude_length @uint32_size * 3 + @message_overhead @prelude_length + @checksum_size + describe "decode_chunk/1" do test "should decode single chunk response", %{chunk: chunk} do assert [{:chunk, %{"outputText" => output_text}}] = EventStream.decode_chunk(chunk) @@ -15,7 +20,22 @@ defmodule ExAws.Bedrock.EventStreamTest do end end + test "should handle invalid prelude checksum", %{chunk_with_invalid_prelude_checksum: chunk} do + assert [{:bad_chunk, ^chunk, :invalid_prelude_checksum}] = + EventStream.decode_chunk(chunk) + end + + test "should handle invalid message checksum", %{chunk_with_invalid_message_checksum: chunk} do + assert [{:bad_chunk, ^chunk, :invalid_message_checksum}] = + EventStream.decode_chunk(chunk) + end + + test "should handle incomplete chunk", %{incomplete_chunk: chunk} do + assert [{:bad_chunk, ^chunk, :invalid_chunk}] = EventStream.decode_chunk(chunk) + end + setup_all do + # Claude 3.5 Sonnet Multi-Chunk response multipart_chunk = <<0, 0, 1, 209, 0, 0, 0, 75, 251, 229, 194, 61, 11, 58, 101, 118, 101, 110, 116, 45, 116, 121, 112, 101, 7, 0, 5, 99, 104, 117, 110, 107, 13, 58, 99, 111, 110, 116, 101, 110, 116, @@ -91,6 +111,50 @@ defmodule ExAws.Bedrock.EventStreamTest do 88, 82, 108, 98, 109, 78, 53, 73, 106, 111, 120, 78, 106, 103, 49, 102, 88, 48, 61, 34, 125, 72, 100, 27, 122>> - %{chunk: chunk, multipart_chunk: multipart_chunk} + # Create a chunk with an invalid prelude checksum + chunk_with_invalid_prelude_checksum = alter_prelude_checksum(chunk) + + # Create a chunk with an invalid message checksum + chunk_with_invalid_message_checksum = alter_message_checksum(chunk) + + # Create an incomplete chunk by truncating the valid chunk + incomplete_chunk = binary_part(chunk, 0, byte_size(chunk) - 10) + + %{ + chunk: chunk, + multipart_chunk: multipart_chunk, + chunk_with_invalid_prelude_checksum: chunk_with_invalid_prelude_checksum, + chunk_with_invalid_message_checksum: chunk_with_invalid_message_checksum, + incomplete_chunk: incomplete_chunk + } + end + + defp alter_prelude_checksum(chunk) do + # Modify the prelude checksum to be invalid + <> = chunk + + invalid_prelude_checksum = 0 + + <> + end + + defp alter_message_checksum(chunk) do + # Parse the chunk to extract message parts + <> = chunk + + message_length = message_total_length - @message_overhead + body_length = message_length - headers_length + + <> = rest + + invalid_message_checksum = 0 + + <> end end From fb839a3604a098d3c27bb8c07aad80ccee511c5f Mon Sep 17 00:00:00 2001 From: Ole Spaarmann Date: Mon, 5 May 2025 17:10:03 +0200 Subject: [PATCH 03/10] Bump ex_aws and credo versions. --- mix.exs | 4 ++-- mix.lock | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/mix.exs b/mix.exs index bf16e2c..913307a 100644 --- a/mix.exs +++ b/mix.exs @@ -43,9 +43,9 @@ defmodule ExAws.Bedrock.MixProject do defp deps do [ - {:credo, "~> 1.7", only: [:dev, :test], runtime: false}, + {:credo, "~> 1.7.12", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 1.0", only: [:dev, :test], runtime: false}, - {:ex_aws, ">= 2.5.1"}, + {:ex_aws, ">= 2.5.9"}, {:ex_doc, ">= 0.0.0", only: :dev}, {:hackney, ">= 0.0.0", only: [:dev, :test]}, {:jason, ">= 0.1.0", only: [:dev, :test]} diff --git a/mix.lock b/mix.lock index 847df86..db569f4 100644 --- a/mix.lock +++ b/mix.lock @@ -1,26 +1,26 @@ %{ "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, - "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, - "credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"}, + "certifi": {:hex, :certifi, "2.14.0", "ed3bef654e69cde5e6c022df8070a579a79e8ba2368a00acf3d75b82d9aceeed", [:rebar3], [], "hexpm", "ea59d87ef89da429b8e905264fdec3419f84f2215bb3d81e07a18aac919026c3"}, + "credo": {:hex, :credo, "1.7.12", "9e3c20463de4b5f3f23721527fcaf16722ec815e70ff6c60b86412c695d426c1", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8493d45c656c5427d9c729235b99d498bd133421f3e0a683e5c1b561471291e5"}, "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_aws": {:hex, :ex_aws, "2.5.1", "7418917974ea42e9e84b25e88b9f3d21a861d5f953ad453e212f48e593d8d39f", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1b95431f70c446fa1871f0eb9b183043c5a625f75f9948a42d25f43ae2eff12b"}, + "ex_aws": {:hex, :ex_aws, "2.5.9", "8e2455172f0e5cbe2f56dd68de514f0dae6bb26d6b6e2f435a06434cf9dbb412", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10 or ~> 0.6 or ~> 1.0", [hex: :req, repo: "hexpm", optional: true]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cbdb6ffb0e6c6368de05ed8641fe1376298ba23354674428e5b153a541f23359"}, "ex_doc": {:hex, :ex_doc, "0.34.0", "ab95e0775db3df71d30cf8d78728dd9261c355c81382bcd4cefdc74610bef13e", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "60734fb4c1353f270c3286df4a0d51e65a2c1d9fba66af3940847cc65a8066d7"}, - "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, - "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, + "file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"}, + "hackney": {:hex, :hackney, "1.23.0", "55cc09077112bcb4a69e54be46ed9bc55537763a96cd4a80a221663a7eafd767", [:rebar3], [{:certifi, "~> 2.14.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "6cd1c04cd15c81e5a493f167b226a15f0938a84fc8f0736ebe4ddcab65c0b44e"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, - "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, - "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, - "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, + "mimerl": {:hex, :mimerl, "1.3.0", "d0cd9fc04b9061f82490f6581e0128379830e78535e017f7780f37fea7545726", [:rebar3], [], "hexpm", "a1e15a50d1887217de95f0b9b0793e32853f7c258a5cd227650889b38839fe9d"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "sweet_xml": {:hex, :sweet_xml, "0.7.4", "a8b7e1ce7ecd775c7e8a65d501bc2cd933bff3a9c41ab763f5105688ef485d08", [:mix], [], "hexpm", "e7c4b0bdbf460c928234951def54fe87edf1a170f6896675443279e2dbeba167"}, - "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, } From 673dced4c0f2d90ab3da51e736b618af5da2a347 Mon Sep 17 00:00:00 2001 From: Ole Spaarmann Date: Mon, 5 May 2025 20:33:59 +0200 Subject: [PATCH 04/10] Add .env to .gitignore. --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index a7e08ca..42a686f 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,5 @@ ex_aws_bedrock-*.tar # Temporary files, for example, from tests. /tmp/ + +.env From 66de11c9e3d89b1977a00c85aae364daf0eff4e6 Mon Sep 17 00:00:00 2001 From: Ole Spaarmann Date: Mon, 5 May 2025 21:07:22 +0200 Subject: [PATCH 05/10] Correctly parse stream from Converse API Action. --- lib/ex_aws/bedrock/event_stream.ex | 20 ++++++++++++-------- test/ex_aws/bedrock_stream_test.exs | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/lib/ex_aws/bedrock/event_stream.ex b/lib/ex_aws/bedrock/event_stream.ex index 80e80df..d6e73a4 100644 --- a/lib/ex_aws/bedrock/event_stream.ex +++ b/lib/ex_aws/bedrock/event_stream.ex @@ -215,15 +215,19 @@ defmodule ExAws.Bedrock.EventStream do end defp process_chunk(body) do - with {:ok, %{"bytes" => bytes}} <- Jason.decode(body), - {:ok, json} <- Base.decode64(bytes), - {:ok, payload} <- Jason.decode(json) do - {:ok, payload} - else - {:error, error} -> - {:error, error} + case Jason.decode(body) do + # Format for invoke_model_with_response_stream + {:ok, %{"bytes" => bytes}} -> + with {:ok, json} <- Base.decode64(bytes), + {:ok, payload} <- Jason.decode(json) do + {:ok, payload} + end + + # Format for converse_stream - direct JSON without base64 encoding + {:ok, payload} -> + {:ok, payload} - error -> + {:error, error} -> {:error, error} end end diff --git a/test/ex_aws/bedrock_stream_test.exs b/test/ex_aws/bedrock_stream_test.exs index 63293db..88b3dac 100644 --- a/test/ex_aws/bedrock_stream_test.exs +++ b/test/ex_aws/bedrock_stream_test.exs @@ -55,7 +55,7 @@ defmodule ExAws.Bedrock.InvokeModelStreamTest do |> stream!() |> Enum.to_list() - assert [%{} | _] = response + assert [{:chunk, _} | _] = response end test "content type is JSON", %{request: request} do From fe3df9a54b4a81e7688e08c41e3d66b703ae4edf Mon Sep 17 00:00:00 2001 From: Ole Spaarmann Date: Mon, 5 May 2025 21:16:15 +0200 Subject: [PATCH 06/10] Adapt request structure in tests and docs. --- lib/ex_aws/bedrock.ex | 21 +++++++++++---------- test/ex_aws/bedrock_stream_test.exs | 14 ++++++-------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/lib/ex_aws/bedrock.ex b/lib/ex_aws/bedrock.ex index 8f96df2..482e979 100644 --- a/lib/ex_aws/bedrock.ex +++ b/lib/ex_aws/bedrock.ex @@ -135,13 +135,11 @@ defmodule ExAws.Bedrock do %{"role" => "user", "content" => [%{"text" => "Hello, how are you?"}]} ], "system" => [%{"text" => "You are a helpful assistant"}], - "inferenceConfig" => %{ - "maxTokens" => 1000, - "temperature" => 0.7 - } + "max_tokens" => 500, + "anthropic_version" => "bedrock-2023-05-31" } - request = ExAws.Bedrock.converse("anthropic.claude-3-sonnet-20240229-v1", request_body) + request = ExAws.Bedrock.converse("us.anthropic.claude-3-7-sonnet-20250219-v1:0", request_body) {:ok, response} = ExAws.Bedrock.request(request) output_text = get_in(response, ["output", "message", "content", Access.at(0), "text"]) @@ -169,16 +167,19 @@ defmodule ExAws.Bedrock do request_body = %{ "messages" => [ - %{"role" => "user", "content" => [%{"text" => "Write a short story about a robot."}]} - ] + %{"role" => "user", "content" => [%{"text" => "Hello, how are you?"}]} + ], + "system" => [%{"text" => "You are a helpful assistant"}], + "max_tokens" => 500, + "anthropic_version" => "bedrock-2023-05-31" } stream = ( - ExAws.Bedrock.converse_stream("anthropic.claude-3-sonnet-20240229-v1", request_body) + ExAws.Bedrock.converse_stream(model_id, request_body) |> ExAws.Bedrock.stream!() - |> Stream.map(fn chunk -> + |> Stream.map(fn {:chunk, chunk} -> # Process each chunk of the response - IO.write(get_in(chunk, [:body, "chunk", "bytes"]) || "") + IO.write(get_in(chunk, ["delta", "text"]) || "") end) |> Enum.to_list() ) diff --git a/test/ex_aws/bedrock_stream_test.exs b/test/ex_aws/bedrock_stream_test.exs index 88b3dac..d4572e6 100644 --- a/test/ex_aws/bedrock_stream_test.exs +++ b/test/ex_aws/bedrock_stream_test.exs @@ -82,23 +82,21 @@ defmodule ExAws.Bedrock.InvokeModelStreamTest do end setup do - model_id = "anthropic.claude-3-sonnet-20240229-v1" + model_id = "us.anthropic.claude-3-7-sonnet-20250219-v1:0" + anthropic_version = "bedrock-2023-05-31" + prompt = "Write a short story about a dog" request_body = %{ "messages" => [ %{ "role" => "user", "content" => [ - %{ - "text" => "Write a short paragraph about functional programming." - } + %{"text" => prompt, "type" => "text"} ] } ], - "inferenceConfig" => %{ - "maxTokens" => 500, - "temperature" => 0.7 - } + "max_tokens" => 500, + "anthropic_version" => anthropic_version } request = Bedrock.converse_stream(model_id, request_body) From 57ad6b54b22b3d946cddb8f97867fc7016922ca6 Mon Sep 17 00:00:00 2001 From: Ole Spaarmann Date: Mon, 5 May 2025 21:19:53 +0200 Subject: [PATCH 07/10] Fix timeout in "test against AWS". --- test/ex_aws/bedrock_stream_test.exs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/test/ex_aws/bedrock_stream_test.exs b/test/ex_aws/bedrock_stream_test.exs index d4572e6..bd24b2c 100644 --- a/test/ex_aws/bedrock_stream_test.exs +++ b/test/ex_aws/bedrock_stream_test.exs @@ -13,7 +13,7 @@ defmodule ExAws.Bedrock.InvokeModelStreamTest do |> stream!() |> Enum.to_list() - assert [{:chunk, %{"completionReason" => nil, "index" => 0}} | _] = response + assert [{:chunk, %{"completionReason" => "FINISH", "index" => 0}} | _] = response assert {:chunk, %{"completionReason" => "FINISH"}} = List.last(response) end @@ -36,12 +36,10 @@ defmodule ExAws.Bedrock.InvokeModelStreamTest do end setup do - prompt = ~s[Write me an article in the style of serious technical about - the Elixir programming language and how it's combination of the Erlang BEAM - and functional programming is a revolution in creating reliable software] + prompt = ~s[Write a short and friendly way to say hello to José Valim. 10 words max.] model_id = "amazon.titan-tg1-large" - inference_parameters = TextModel.build(prompt, max_token_count: 4000, temperature: 0.6) + inference_parameters = TextModel.build(prompt, max_token_count: 400, temperature: 0.6) request = Bedrock.invoke_model_with_response_stream(model_id, inference_parameters) {:ok, [model_id: model_id, request: request]} end From fe6805e94cbc057d5ca0e5979dc8e02c2f8c04a9 Mon Sep 17 00:00:00 2001 From: Ole Spaarmann Date: Tue, 13 May 2025 13:03:21 +0200 Subject: [PATCH 08/10] Improve parsing of ConverseStream response to match python library. --- lib/ex_aws/bedrock/event_stream.ex | 57 +++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/lib/ex_aws/bedrock/event_stream.ex b/lib/ex_aws/bedrock/event_stream.ex index d6e73a4..303db3a 100644 --- a/lib/ex_aws/bedrock/event_stream.ex +++ b/lib/ex_aws/bedrock/event_stream.ex @@ -223,7 +223,11 @@ defmodule ExAws.Bedrock.EventStream do {:ok, payload} end - # Format for converse_stream - direct JSON without base64 encoding + # Process converse_stream events - simple direct transformation + {:ok, payload} when is_map(payload) -> + process_converse_chunk(payload) + + # Format for other direct JSON without base64 encoding {:ok, payload} -> {:ok, payload} @@ -231,4 +235,55 @@ defmodule ExAws.Bedrock.EventStream do {:error, error} end end + + @doc false + @spec process_converse_chunk(map()) :: {:ok, map()} + defp process_converse_chunk(payload) do + # First remove the "p" field which is internal metadata + payload = Map.drop(payload, ["p"]) + + # Map the AWS event format to the expected output structure + processed_payload = + case payload do + # Message start event + %{"role" => "assistant"} -> + %{"messageStart" => %{"role" => "assistant"}} + + # Content block delta with text + %{"contentBlockIndex" => index, "delta" => %{"text" => text}} -> + %{"contentBlockDelta" => %{"delta" => %{"text" => text}, "contentBlockIndex" => index}} + + # Content block delta with tool use + %{"contentBlockIndex" => index, "delta" => %{"toolUse" => tool_use}} -> + %{ + "contentBlockDelta" => %{ + "delta" => %{"toolUse" => tool_use}, + "contentBlockIndex" => index + } + } + + # Content block start + %{"contentBlockIndex" => index, "start" => start} -> + %{"contentBlockStart" => %{"start" => start, "contentBlockIndex" => index}} + + # Content block stop + %{"contentBlockIndex" => index} + when not is_map_key(payload, "delta") and not is_map_key(payload, "start") -> + %{"contentBlockStop" => %{"contentBlockIndex" => index}} + + # Message stop + %{"stopReason" => reason} -> + %{"messageStop" => %{"stopReason" => reason}} + + # Metadata + %{"usage" => usage, "metrics" => metrics} -> + %{"metadata" => %{"usage" => usage, "metrics" => metrics}} + + # Other events pass through unchanged + _ -> + payload + end + + {:ok, processed_payload} + end end From a481a02ac730f4f98105c43dff82d0b5a95d6082 Mon Sep 17 00:00:00 2001 From: Ole Spaarmann Date: Tue, 13 May 2025 13:03:48 +0200 Subject: [PATCH 09/10] Add more verbose examples to docs for converse_stream/2 --- lib/ex_aws/bedrock.ex | 69 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/lib/ex_aws/bedrock.ex b/lib/ex_aws/bedrock.ex index 482e979..cb667c8 100644 --- a/lib/ex_aws/bedrock.ex +++ b/lib/ex_aws/bedrock.ex @@ -163,7 +163,7 @@ defmodule ExAws.Bedrock do chunks of the model's response in real time. This is ideal for displaying responses incrementally as they're generated. - ## Example + ## Example - Basic Usage request_body = %{ "messages" => [ @@ -184,6 +184,73 @@ defmodule ExAws.Bedrock do |> Enum.to_list() ) + ## Example - Using Tools + + system_prompt = "Always use the tool top_song." + + model_id = "us.anthropic.claude-3-7-sonnet-20250219-v1:0" + anthropic_version = "bedrock-2023-05-31" + prompt = "Find the most popular song for me on the station WZPZ" + + request_body = %{ + anthropic_version: anthropic_version, + max_tokens: 500, + temperature: 0.5, + top_p: 0.9, + system: [%{text: system_prompt, type: "text"}], + messages: [ + %{role: "user", content: [ + %{text: prompt, type: "text"} + ]} + ], + toolConfig: %{ + tools: [ + %{ + toolSpec: %{ + name: "top_song", + description: "Get the most popular song played on a radio station.", + inputSchema: %{ + json: %{ + type: "object", + properties: %{ + sign: %{ + type: "string", + description: "The call sign for the radio station for which you want the most popular song. Example calls signs are WZPZ and WKRP." + } + }, + required: [ + "sign" + ] + } + } + } + } + ] + } + } + + new_stream = ExAws.Bedrock.converse_stream(model_id, request_body) + stream = ExAws.Bedrock.stream!(new_stream) + for event <- stream do + IO.puts(inspect(event)) + end + + # Example output: + # {:chunk, %{"messageStart" => %{"role" => "assistant"}}} + # {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => "I'll"}}}} + # {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => " help"}}}} + # {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => " you find the most"}}}} + # {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => " popular song on"}}}} + # {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => " the radio station"}}}} + # {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => " WZPZ."}}}} + # {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 0, "delta" => %{"text" => " Let"}}}} + # {:chunk, %{"contentBlockStop" => %{"contentBlockIndex" => 0}}} + # {:chunk, %{"contentBlockStart" => %{"contentBlockIndex" => 1, "start" => %{"toolUse" => %{"name" => "top_song", "toolUseId" => "tooluse_m5eQCV9vRCmgvHj6yF8zHQ"}}}}} + # {:chunk, %{"contentBlockDelta" => %{"contentBlockIndex" => 1, "delta" => %{"toolUse" => %{"input" => "{\"sign\": \"WZPZ\"}"}}}}} + # {:chunk, %{"contentBlockStop" => %{"contentBlockIndex" => 1}}} + # {:chunk, %{"messageStop" => %{"stopReason" => "tool_use"}}} + # {:chunk, %{"metadata" => %{"metrics" => %{"latencyMs" => 1940}, "usage" => %{"inputTokens" => 436, "outputTokens" => 66, "totalTokens" => 502}}}} + [AWS API Docs](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html) """ @spec converse_stream(String.t(), map | struct) :: ExAws.Operation.JSON.t() From 957b0ee6b0a3a200709a845e214e4cec89bbbe6e Mon Sep 17 00:00:00 2001 From: Ole Spaarmann Date: Thu, 31 Jul 2025 18:02:55 +0200 Subject: [PATCH 10/10] Respect timeout from http_opts from ex_aws configuration. --- lib/ex_aws/bedrock/event_stream.ex | 46 ++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/lib/ex_aws/bedrock/event_stream.ex b/lib/ex_aws/bedrock/event_stream.ex index 303db3a..b14559c 100644 --- a/lib/ex_aws/bedrock/event_stream.ex +++ b/lib/ex_aws/bedrock/event_stream.ex @@ -36,7 +36,7 @@ defmodule ExAws.Bedrock.EventStream do [AWS API Docs](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ResponseStream.html) """ - def stream_objects!(%{service: service, data: data} = post_operation, _opts, config) do + def stream_objects!(%{service: service, data: data} = post_operation, opts, config) do encoded_data = Jason.encode!(data) url = build_request_url(post_operation, config) config = Map.put(config, :service_override, :bedrock) @@ -51,8 +51,11 @@ defmodule ExAws.Bedrock.EventStream do encoded_data ) + # Extract HTTP options and build hackney options with timeout configurations + hackney_options = build_hackney_options(config, opts) + request_fun = fn [] -> - {:ok, ref} = :hackney.post(url, full_headers, encoded_data, @hackney_options) + {:ok, ref} = :hackney.post(url, full_headers, encoded_data, hackney_options) receive do {:hackney_response, ^ref, {:status, 200, _reason}} -> @@ -286,4 +289,43 @@ defmodule ExAws.Bedrock.EventStream do {:ok, processed_payload} end + + # Build hackney options by merging base options with HTTP timeout configurations + defp build_hackney_options(config, opts) do + # Start with base options + base_options = @hackney_options + + # Extract HTTP options from ExAws config + http_opts = get_http_opts(config, opts) + + # Extract timeout-related options and convert to hackney format + timeout_options = extract_timeout_options(http_opts) + + # Merge all options, with timeout_options taking precedence + base_options ++ timeout_options + end + + # Get HTTP options from config and opts, with opts taking precedence + defp get_http_opts(config, opts) do + config_http_opts = Map.get(config, :http_opts, []) + + opts_http_opts = + case opts do + nil -> [] + opts when is_list(opts) -> Keyword.get(opts, :http_opts, []) + _ -> [] + end + + # Merge config options with opts, opts taking precedence + Keyword.merge(config_http_opts, opts_http_opts) + end + + # Extract timeout options that hackney understands + defp extract_timeout_options(http_opts) do + timeout_keys = [:connect_timeout, :recv_timeout, :timeout] + + timeout_keys + |> Enum.filter(fn key -> Keyword.has_key?(http_opts, key) end) + |> Enum.map(fn key -> {key, Keyword.get(http_opts, key)} end) + end end