diff --git a/deps/rabbitmq_aws/Makefile b/deps/rabbitmq_aws/Makefile
index 7ba1f949b3dd..debfa2b8d59c 100644
--- a/deps/rabbitmq_aws/Makefile
+++ b/deps/rabbitmq_aws/Makefile
@@ -6,6 +6,7 @@ define PROJECT_ENV
[]
endef
+DEPS = gun
LOCAL_DEPS = crypto inets ssl xmerl public_key
BUILD_DEPS = rabbit_common
# We do not depend on rabbit therefore can't run the broker.
diff --git a/deps/rabbitmq_aws/include/rabbitmq_aws.hrl b/deps/rabbitmq_aws/include/rabbitmq_aws.hrl
index ab16d9ed49f4..8031d6fb9cb6 100644
--- a/deps/rabbitmq_aws/include/rabbitmq_aws.hrl
+++ b/deps/rabbitmq_aws/include/rabbitmq_aws.hrl
@@ -68,7 +68,9 @@
security_token :: security_token() | undefined,
region :: region() | undefined,
imdsv2_token:: imdsv2token() | undefined,
- error :: atom() | string() | undefined}).
+ error :: atom() | string() | undefined,
+ gun_connections = #{} :: #{string() => pid()} % host -> gun_pid mapping
+ }).
-type state() :: #state{}.
-type scheme() :: atom().
diff --git a/deps/rabbitmq_aws/src/rabbitmq_aws.erl b/deps/rabbitmq_aws/src/rabbitmq_aws.erl
index 5a45a597d851..8df0f0fe3b12 100644
--- a/deps/rabbitmq_aws/src/rabbitmq_aws.erl
+++ b/deps/rabbitmq_aws/src/rabbitmq_aws.erl
@@ -9,15 +9,19 @@
-behavior(gen_server).
%% API exports
--export([get/2, get/3,
+-export([get/2, get/3, get/4,
+ put/4, put/5,
post/4,
refresh_credentials/0,
request/5, request/6, request/7,
set_credentials/2,
has_credentials/0,
+ parse_uri/1,
set_region/1,
ensure_imdsv2_token_valid/0,
- api_get_request/2]).
+ api_get_request/2,
+ close_connection/3,
+ status_text/1]).
%% gen-server exports
-export([start_link/0,
@@ -58,7 +62,10 @@ get(Service, Path) ->
%% format.
%% @end
get(Service, Path, Headers) ->
- request(Service, get, Path, "", Headers).
+ request(Service, get, Path, "", Headers, []).
+
+get(Service, Path, Headers, Options) ->
+ request(Service, get, Path, "", Headers, Options).
-spec post(Service :: string(),
@@ -73,12 +80,29 @@ post(Service, Path, Body, Headers) ->
request(Service, post, Path, Body, Headers).
+-spec put(Service :: string(),
+ Path :: path(),
+ Body :: body(),
+ Headers :: headers()) -> result().
+%% @doc Perform a HTTP Post request to the AWS API for the specified service. The
+%% response will automatically be decoded if it is either in JSON or XML
+%% format.
+%% @end
+put(Service, Path, Body, Headers) ->
+ put(Service, Path, Body, Headers, []).
+
+put(Service, Path, Body, Headers, Options) ->
+ request(Service, put, Path, Body, Headers, Options).
+
+
-spec refresh_credentials() -> ok | error.
%% @doc Manually refresh the credentials from the environment, filesystem or EC2 Instance Metadata Service.
%% @end
refresh_credentials() ->
- gen_server:call(rabbitmq_aws, refresh_credentials).
+ gen_server:call(rabbitmq_aws, refresh_credentials).
+close_connection(Service, Path, Options) ->
+ gen_server:cast(?MODULE, {close_connection, Service, Path, Options}).
-spec refresh_credentials(state()) -> ok | error.
%% @doc Manually refresh the credentials from the environment, filesystem or EC2 Instance Metadata Service.
@@ -177,12 +201,15 @@ start_link() ->
-spec init(list()) -> {ok, state()}.
init([]) ->
- {ok, #state{}}.
-
-
-terminate(_, _) ->
- ok.
+ {ok, _} = application:ensure_all_started(gun),
+ {ok, #state{}}.
+terminate(_, State) ->
+ %% Close all Gun connections
+ maps:fold(fun(_Host, ConnPid, _Acc) ->
+ gun:close(ConnPid)
+ end, ok, State#state.gun_connections),
+ ok.
code_change(_, _, State) ->
{ok, State}.
@@ -190,8 +217,11 @@ code_change(_, _, State) ->
handle_call(Msg, _From, State) ->
handle_msg(Msg, State).
+
+handle_cast({close_connection, Service, Path, Options}, State) ->
+ {noreply, close_connection(Service, Path, Options, State)};
handle_cast(_Request, State) ->
- {noreply, State}.
+ {noreply, State}.
handle_info(_Info, State) ->
@@ -219,11 +249,15 @@ handle_msg({set_credentials, AccessKey, SecretAccessKey}, State) ->
error = undefined}};
handle_msg({set_credentials, NewState}, State) ->
- {reply, ok, State#state{access_key = NewState#state.access_key,
- secret_access_key = NewState#state.secret_access_key,
- security_token = NewState#state.security_token,
- expiration = NewState#state.expiration,
- error = NewState#state.error}};
+ spawn(fun() -> maps:fold(fun(_Host, ConnPid, _Acc) ->
+ gun:close(ConnPid)
+ end, ok, State#state.gun_connections) end),
+ {reply, ok, State#state{access_key = NewState#state.access_key,
+ secret_access_key = NewState#state.secret_access_key,
+ security_token = NewState#state.security_token,
+ expiration = NewState#state.expiration,
+ error = NewState#state.error,
+ gun_connections = #{}}}; % Potentially new credentials, so clear the connection pool?
handle_msg({set_region, Region}, State) ->
{reply, ok, State#state{region = Region}};
@@ -278,6 +312,8 @@ endpoint_tld(_Other) ->
%% @end
format_response({ok, {{_Version, 200, _Message}, Headers, Body}}) ->
{ok, {Headers, maybe_decode_body(get_content_type(Headers), Body)}};
+format_response({ok, {{_Version, 206, _Message}, Headers, Body}}) ->
+ {ok, {Headers, maybe_decode_body(get_content_type(Headers), Body)}};
format_response({ok, {{_Version, StatusCode, Message}, Headers, Body}}) when StatusCode >= 400 ->
{error, Message, {Headers, maybe_decode_body(get_content_type(Headers), Body)}};
format_response({error, Reason}) ->
@@ -288,12 +324,12 @@ format_response({error, Reason}) ->
%% {Type, Subtype}.
%% @end
get_content_type(Headers) ->
- Value = case proplists:get_value("content-type", Headers, undefined) of
+ Value = case proplists:get_value(<<"content-type">>, Headers, undefined) of
undefined ->
- proplists:get_value("Content-Type", Headers, "text/xml");
+ proplists:get_value(<<"Content-Type">>, Headers, "text/xml");
Other -> Other
end,
- parse_content_type(Value).
+ parse_content_type(Value).
-spec has_credentials() -> boolean().
has_credentials() ->
@@ -324,7 +360,7 @@ expired_credentials(Expiration) ->
%% - Credentials file
%% - EC2 Instance Metadata Service
%% @end
-load_credentials(#state{region = Region}) ->
+load_credentials(#state{region = Region, gun_connections = GunConnections}) ->
case rabbitmq_aws_config:credentials() of
{ok, AccessKey, SecretAccessKey, Expiration, SecurityToken} ->
{ok, #state{region = Region,
@@ -333,7 +369,8 @@ load_credentials(#state{region = Region}) ->
secret_access_key = SecretAccessKey,
expiration = Expiration,
security_token = SecurityToken,
- imdsv2_token = undefined}};
+ imdsv2_token = undefined,
+ gun_connections = GunConnections}};
{error, Reason} ->
?LOG_ERROR("Could not load AWS credentials from environment variables, AWS_CONFIG_FILE, AWS_SHARED_CREDENTIALS_FILE or EC2 metadata endpoint: ~tp. Will depend on config settings to be set~n", [Reason]),
{error, #state{region = Region,
@@ -342,7 +379,8 @@ load_credentials(#state{region = Region}) ->
secret_access_key = undefined,
expiration = undefined,
security_token = undefined,
- imdsv2_token = undefined}}
+ imdsv2_token = undefined,
+ gun_connections = GunConnections}}
end.
@@ -357,6 +395,8 @@ local_time() ->
-spec maybe_decode_body(ContentType :: {nonempty_string(), nonempty_string()}, Body :: body()) -> list() | body().
%% @doc Attempt to decode the response body by its MIME
%% @end
+maybe_decode_body(_, <<>>) ->
+ <<>>;
maybe_decode_body({"application", "x-amz-json-1.0"}, Body) ->
rabbitmq_aws_json:decode(Body);
maybe_decode_body({"application", "json"}, Body) ->
@@ -370,6 +410,8 @@ maybe_decode_body(_ContentType, Body) ->
-spec parse_content_type(ContentType :: string()) -> {Type :: string(), Subtype :: string()}.
%% @doc parse a content type string returning a tuple of type/subtype
%% @end
+parse_content_type(ContentType) when is_binary(ContentType)->
+ parse_content_type(binary_to_list(ContentType));
parse_content_type(ContentType) ->
Parts = string:tokens(ContentType, ";"),
[Type, Subtype] = string:tokens(lists:nth(1, Parts), "/"),
@@ -383,7 +425,7 @@ parse_content_type(ContentType) ->
%% @doc Make the API request and return the formatted response.
%% @end
perform_request(State, Service, Method, Headers, Path, Body, Options, Host) ->
- perform_request_has_creds(has_credentials(State), State, Service, Method,
+ perform_request_has_creds(has_credentials(State), State, Service, Method,
Headers, Path, Body, Options, Host).
@@ -397,7 +439,7 @@ perform_request(State, Service, Method, Headers, Path, Body, Options, Host) ->
%% otherwise return an error result.
%% @end
perform_request_has_creds(true, State, Service, Method, Headers, Path, Body, Options, Host) ->
- perform_request_creds_expired(expired_credentials(State#state.expiration), State,
+ perform_request_creds_expired(expired_credentials(State#state.expiration), State,
Service, Method, Headers, Path, Body, Options, Host);
perform_request_has_creds(false, State, _, _, _, _, _, _, _) ->
perform_request_creds_error(State).
@@ -413,7 +455,7 @@ perform_request_has_creds(false, State, _, _, _, _, _, _, _) ->
%% credentials before performing the request.
%% @end
perform_request_creds_expired(false, State, Service, Method, Headers, Path, Body, Options, Host) ->
- perform_request_with_creds(State, Service, Method, Headers, Path, Body, Options, Host);
+ perform_request_with_creds(State, Service, Method, Headers, Path, Body, Options, Host);
perform_request_creds_expired(true, State, _, _, _, _, _, _, _) ->
perform_request_creds_error(State#state{error = "Credentials expired!"}).
@@ -426,27 +468,26 @@ perform_request_creds_expired(true, State, _, _, _, _, _, _, _) ->
%% expired, perform the request and return the response.
%% @end
perform_request_with_creds(State, Service, Method, Headers, Path, Body, Options, Host) ->
- URI = endpoint(State, Host, Service, Path),
- SignedHeaders = sign_headers(State, Service, Method, URI, Headers, Body),
- ContentType = proplists:get_value("content-type", SignedHeaders, undefined),
- perform_request_with_creds(State, Method, URI, SignedHeaders, ContentType, Body, Options).
+ URI = endpoint(State, Host, Service, Path),
+ SignedHeaders = sign_headers(State, Service, Method, URI, Headers, Body),
+ perform_request_with_creds(State, Method, URI, SignedHeaders, Body, Options).
-spec perform_request_with_creds(State :: state(), Method :: method(), URI :: string(),
- Headers :: headers(), ContentType :: string() | undefined,
+ Headers :: headers(),
Body :: body(), Options :: http_options())
-> {Result :: result(), NewState :: state()}.
%% @doc Once it is validated that there are credentials to try and that they have not
%% expired, perform the request and return the response.
%% @end
-perform_request_with_creds(State, Method, URI, Headers, undefined, "", Options0) ->
+perform_request_with_creds(State, Method, URI, Headers, "", Options0) ->
Options1 = ensure_timeout(Options0),
- Response = httpc:request(Method, {URI, Headers}, Options1, []),
- {format_response(Response), State};
-perform_request_with_creds(State, Method, URI, Headers, ContentType, Body, Options0) ->
+ {Response, NewState} = gun_request(State, Method, URI, Headers, <<>>, Options1),
+ {format_response(Response), NewState};
+perform_request_with_creds(State, Method, URI, Headers, Body, Options0) ->
Options1 = ensure_timeout(Options0),
- Response = httpc:request(Method, {URI, Headers, ContentType, Body}, Options1, []),
- {format_response(Response), State}.
+ {Response, NewState} = gun_request(State, Method, URI, Headers, Body, Options1),
+ {format_response(Response), NewState}.
-spec perform_request_creds_error(State :: state()) ->
@@ -567,3 +608,158 @@ api_get_request_with_retries(Service, Path, Retries, WaitTimeBetweenRetries) ->
timer:sleep(WaitTimeBetweenRetries),
api_get_request_with_retries(Service, Path, Retries - 1, WaitTimeBetweenRetries)
end.
+
+%% Gun HTTP client functions
+gun_request(State, Method, URI, Headers, Body, Options) ->
+ HeadersBin = lists:map(
+ fun({Key, Value}) ->
+ {list_to_binary(Key), list_to_binary(Value)}
+ end, Headers),
+ {Host, Port, Path} = parse_uri(URI),
+ {ConnPid, NewState} = get_or_create_gun_connection(State, Host, Port, Path, Options),
+ Timeout = proplists:get_value(timeout, Options, ?DEFAULT_HTTP_TIMEOUT),
+ try
+ StreamRef = do_gun_request(ConnPid, Method, Path, HeadersBin, Body),
+ case gun:await(ConnPid, StreamRef, Timeout) of
+ {response, fin, Status, RespHeaders} ->
+ Response = {ok, {{http_version, Status, status_text(Status)}, RespHeaders, <<>>}},
+ {Response, NewState};
+ {response, nofin, Status, RespHeaders} ->
+ {ok, RespBody} = gun:await_body(ConnPid, StreamRef, Timeout),
+ Response = {ok, {{http_version, Status, status_text(Status)}, RespHeaders, RespBody}},
+ {Response, NewState};
+ {error, Reason} ->
+ {{error, Reason}, NewState}
+ end
+ catch
+ _:Error ->
+ % Connection failed, remove from pool and return error
+ HostKey = get_connection_key(Host, Port, Path, Options),
+ NewConnections = maps:remove(HostKey, NewState#state.gun_connections),
+ gun:close(ConnPid),
+ {{error, Error}, NewState#state{gun_connections = NewConnections}}
+ end.
+
+do_gun_request(ConnPid, get, Path, Headers, _Body) ->
+ gun:get(ConnPid, Path, Headers);
+do_gun_request(ConnPid, post, Path, Headers, Body) ->
+ gun:post(ConnPid, Path, Headers, Body, #{});
+do_gun_request(ConnPid, put, Path, Headers, Body) ->
+ gun:put(ConnPid, Path, Headers, Body, #{});
+do_gun_request(ConnPid, head, Path, Headers, _Body) ->
+ gun:head(ConnPid, Path, Headers, #{});
+do_gun_request(ConnPid, delete, Path, Headers, _Body) ->
+ gun:delete(ConnPid, Path, Headers, #{});
+do_gun_request(ConnPid, patch, Path, Headers, Body) ->
+ gun:patch(ConnPid, Path, Headers, Body, #{});
+do_gun_request(ConnPid, options, Path, Headers, _Body) ->
+ gun:options(ConnPid, Path, Headers, #{}).
+
+get_or_create_gun_connection(State, Host, Port, Path, Options) ->
+ HostKey = get_connection_key(Host, Port, Path, Options),
+ case maps:get(HostKey, State#state.gun_connections, undefined) of
+ undefined ->
+ create_gun_connection(State, Host, Port, HostKey, Options);
+ ConnPid ->
+ case is_process_alive(ConnPid) andalso gun:info(ConnPid) =/= undefined of
+ true ->
+ {ConnPid, State};
+ false ->
+ % Connection is dead, create new one
+ gun:close(ConnPid),
+ create_gun_connection(State, Host, Port, HostKey, Options)
+ end
+ end.
+
+get_connection_key(Host, Port, Path, Options) ->
+ case proplists:get_value(connection_key_type, Options, host) of
+ host ->
+ Host ++ ":" ++ integer_to_list(Port);
+ path ->
+ Host ++ ":" ++ integer_to_list(Port) ++ Path;
+ {path_custom, Extra} ->
+ Host ++ ":" ++ integer_to_list(Port) ++ Path ++ ":" ++ Extra;
+ _ ->
+ Host ++ ":" ++ integer_to_list(Port)
+ end.
+
+create_gun_connection(State, Host, Port, HostKey, Options) ->
+ % Map HTTP version to Gun protocols, always include http as fallback
+ HttpVersion = proplists:get_value(version, Options, "HTTP/1.1"),
+ Protocols = case HttpVersion of
+ "HTTP/2" -> [http2, http];
+ "HTTP/2.0" -> [http2, http];
+ "HTTP/1.1" -> [http];
+ "HTTP/1.0" -> [http];
+ _ -> [http2, http] % Default: try HTTP/2, fallback to HTTP/1.1
+ end,
+ ConnectTimeout = proplists:get_value(connect_timeout, Options, 5000),
+ Opts = #{
+ transport => if Port == 443 -> tls; true -> tcp end,
+ protocols => Protocols,
+ connect_timeout => ConnectTimeout
+ },
+ case gun:open(Host, Port, Opts) of
+ {ok, ConnPid} ->
+ case gun:await_up(ConnPid, ConnectTimeout) of
+ {ok, _Protocol} ->
+ NewConnections = maps:put(HostKey, ConnPid, State#state.gun_connections),
+ NewState = State#state{gun_connections = NewConnections},
+ {ConnPid, NewState};
+ {error, Reason} ->
+ gun:close(ConnPid),
+ error({gun_connection_failed, Reason})
+ end;
+ {error, Reason} ->
+ error({gun_open_failed, Reason})
+ end.
+
+close_connection(Service, Path, Options, State) ->
+ URI = endpoint(State, undefined, Service, Path),
+ {Host, Port, Path} = parse_uri(URI),
+ HostKey = get_connection_key(Host, Port, Path, Options),
+ case maps:get(HostKey, State#state.gun_connections, undefined) of
+ undefined ->
+ State;
+ ConnPid ->
+ gun:close(ConnPid),
+ NewConnections = maps:remove(HostKey, State#state.gun_connections),
+ State#state{gun_connections = NewConnections}
+ end.
+
+
+parse_uri(URI) ->
+ case string:split(URI, "://", leading) of
+ [Scheme, Rest] ->
+ case string:split(Rest, "/", leading) of
+ [HostPort] ->
+ {Host, Port} = parse_host_port(HostPort, Scheme),
+ {Host, Port, "/"};
+ [HostPort, Path] ->
+ {Host, Port} = parse_host_port(HostPort, Scheme),
+ {Host, Port, "/" ++ Path}
+ end
+ end.
+
+parse_host_port(HostPort, Scheme) ->
+ DefaultPort = case Scheme of
+ "https" -> 443;
+ "http" -> 80;
+ _ -> 443 % Fallback to HTTPS
+ end,
+ case string:split(HostPort, ":", trailing) of
+ [Host] ->
+ {Host, DefaultPort};
+ [Host, PortStr] ->
+ {Host, list_to_integer(PortStr)}
+ end.
+
+status_text(200) -> "OK";
+status_text(206) -> "Partial Content";
+status_text(400) -> "Bad Request";
+status_text(401) -> "Unauthorized";
+status_text(403) -> "Forbidden";
+status_text(404) -> "Not Found";
+status_text(416) -> "Range Not Satisfiable";
+status_text(500) -> "Internal Server Error";
+status_text(Code) -> integer_to_list(Code).
diff --git a/deps/rabbitmq_aws/src/rabbitmq_aws_config.erl b/deps/rabbitmq_aws/src/rabbitmq_aws_config.erl
index b9c722e8f1b8..119492198e19 100644
--- a/deps/rabbitmq_aws/src/rabbitmq_aws_config.erl
+++ b/deps/rabbitmq_aws/src/rabbitmq_aws_config.erl
@@ -615,8 +615,10 @@ maybe_get_role_from_instance_metadata() ->
%% Instance Metadata service, returning the Region if successful.
%% end.
parse_az_response({error, _}) -> {error, undefined};
-parse_az_response({ok, {{_, 200, _}, _, Body}})
- -> {ok, region_from_availability_zone(Body)};
+parse_az_response({ok, {{_, 200, _}, _, Body}}) when is_binary(Body) ->
+ {ok, region_from_availability_zone(binary_to_list(Body))};
+parse_az_response({ok, {{_, 200, _}, _, Body}}) ->
+ {ok, region_from_availability_zone(Body)};
parse_az_response({ok, {{_, _, _}, _, _}}) -> {error, undefined}.
@@ -626,7 +628,8 @@ parse_az_response({ok, {{_, _, _}, _, _}}) -> {error, undefined}.
%% body value is the string to process.
%% end.
parse_body_response({error, _}) -> {error, undefined};
-parse_body_response({ok, {{_, 200, _}, _, Body}}) -> {ok, Body};
+parse_body_response({ok, {{_, 200, _}, _, Body}}) when is_binary(Body) -> {ok, binary_to_list(Body)};
+parse_body_response({ok, {{_, 200, _}, _, Body}}) when is_list(Body) -> {ok, Body};
parse_body_response({ok, {{_, 401, _}, _, _}}) ->
?LOG_ERROR(get_instruction_on_instance_metadata_error("Unauthorized instance metadata service request.")),
{error, undefined};
@@ -655,9 +658,35 @@ parse_credentials_response({ok, {{_, 200, _}, _, Body}}) ->
%% @doc Wrap httpc:get/4 to simplify Instance Metadata service v2 requests
%% @end
perform_http_get_instance_metadata(URL) ->
- ?LOG_DEBUG("Querying instance metadata service: ~tp", [URL]),
- httpc:request(get, {URL, instance_metadata_request_headers()},
- [{timeout, ?DEFAULT_HTTP_TIMEOUT}], []).
+ ?LOG_DEBUG("Querying instance metadata service: ~tp", [URL]),
+ % Parse metadata service URL
+ {Host, Port, Path} = rabbitmq_aws:parse_uri(URL),
+ % Simple Gun connection for metadata service
+ Opts = #{transport => tcp, protocols => [http]}, % HTTP only, no TLS
+ case gun:open(Host, Port, Opts) of
+ {ok, ConnPid} ->
+ case gun:await_up(ConnPid, 5000) of
+ {ok, _Protocol} ->
+ Headers = instance_metadata_request_headers(),
+ StreamRef = gun:get(ConnPid, Path, Headers),
+ Result = case gun:await(ConnPid, StreamRef, ?DEFAULT_HTTP_TIMEOUT) of
+ {response, fin, Status, RespHeaders} ->
+ {ok, {{http_version, Status, rabbitmq_aws:status_text(Status)}, RespHeaders, <<>>}};
+ {response, nofin, Status, RespHeaders} ->
+ {ok, Body} = gun:await_body(ConnPid, StreamRef, ?DEFAULT_HTTP_TIMEOUT),
+ {ok, {{http_version, Status, rabbitmq_aws:status_text(Status)}, RespHeaders, Body}};
+ {error, Reason} ->
+ {error, Reason}
+ end,
+ gun:close(ConnPid),
+ Result;
+ {error, Reason} ->
+ gun:close(ConnPid),
+ {error, Reason}
+ end;
+ {error, Reason} ->
+ {error, Reason}
+ end.
-spec get_instruction_on_instance_metadata_error(string()) -> string().
%% @doc Return error message on failures related to EC2 Instance Metadata Service with a reference to AWS document.
@@ -717,23 +746,56 @@ region_from_availability_zone(Value) ->
%% @doc Attempt to obtain EC2 IMDSv2 token.
%% @end
load_imdsv2_token() ->
- TokenUrl = imdsv2_token_url(),
- ?LOG_INFO("Attempting to obtain EC2 IMDSv2 token from ~tp ...", [TokenUrl]),
- case httpc:request(put, {TokenUrl, [{?METADATA_TOKEN_TTL_HEADER, integer_to_list(?METADATA_TOKEN_TTL_SECONDS)}]},
- [{timeout, ?DEFAULT_HTTP_TIMEOUT}], []) of
- {ok, {{_, 200, _}, _, Value}} ->
- ?LOG_DEBUG("Successfully obtained EC2 IMDSv2 token."),
- Value;
- {error, {{_, 400, _}, _, _}} ->
- ?LOG_WARNING("Failed to obtain EC2 IMDSv2 token: Missing or Invalid Parameters – The PUT request is not valid."),
- undefined;
- Other ->
- ?LOG_WARNING(
- get_instruction_on_instance_metadata_error("Failed to obtain EC2 IMDSv2 token: ~tp. "
- "Falling back to EC2 IMDSv1 for now. It is recommended to use EC2 IMDSv2."), [Other]),
- undefined
- end.
-
+ TokenUrl = imdsv2_token_url(),
+ ?LOG_INFO("Attempting to obtain EC2 IMDSv2 token from ~tp ...", [TokenUrl]),
+ % Parse metadata service URL
+ {Host, Port, Path} = rabbitmq_aws:parse_uri(TokenUrl),
+ % Simple Gun connection for metadata service
+ Opts = #{transport => tcp, protocols => [http]}, % HTTP only, no TLS
+ case gun:open(Host, Port, Opts) of
+ {ok, ConnPid} ->
+ case gun:await_up(ConnPid, 5000) of
+ {ok, _Protocol} ->
+ % PUT request with IMDSv2 token TTL header
+ Headers = [{?METADATA_TOKEN_TTL_HEADER, integer_to_list(?METADATA_TOKEN_TTL_SECONDS)}],
+ StreamRef = gun:put(ConnPid, Path, Headers, <<>>),
+ Result = case gun:await(ConnPid, StreamRef, ?DEFAULT_HTTP_TIMEOUT) of
+ {response, fin, 200, _RespHeaders} ->
+ ?LOG_DEBUG("Successfully obtained EC2 IMDSv2 token."),
+ <<>>; % Empty body for fin response
+ {response, nofin, 200, _RespHeaders} ->
+ {ok, Body} = gun:await_body(ConnPid, StreamRef, ?DEFAULT_HTTP_TIMEOUT),
+ ?LOG_DEBUG("Successfully obtained EC2 IMDSv2 token."),
+ binary_to_list(Body);
+ {response, _, 400, _RespHeaders} ->
+ ?LOG_WARNING("Failed to obtain EC2 IMDSv2 token: Missing or Invalid Parameters – The PUT request is not valid."),
+ undefined;
+ {error, Reason} ->
+ ?LOG_WARNING(
+ get_instruction_on_instance_metadata_error("Failed to obtain EC2 IMDSv2 token: ~tp. "
+ "Falling back to EC2 IMDSv1 for now. It is recommended to use EC2 IMDSv2."), [Reason]),
+ undefined;
+ Other ->
+ ?LOG_WARNING(
+ get_instruction_on_instance_metadata_error("Failed to obtain EC2 IMDSv2 token: ~tp. "
+ "Falling back to EC2 IMDSv1 for now. It is recommended to use EC2 IMDSv2."), [Other]),
+ undefined
+ end,
+ gun:close(ConnPid),
+ Result;
+ {error, Reason} ->
+ gun:close(ConnPid),
+ ?LOG_WARNING(
+ get_instruction_on_instance_metadata_error("Failed to connect for EC2 IMDSv2 token: ~tp. "
+ "Falling back to EC2 IMDSv1 for now. It is recommended to use EC2 IMDSv2."), [Reason]),
+ undefined
+ end;
+ {error, Reason} ->
+ ?LOG_WARNING(
+ get_instruction_on_instance_metadata_error("Failed to open connection for EC2 IMDSv2 token: ~tp. "
+ "Falling back to EC2 IMDSv1 for now. It is recommended to use EC2 IMDSv2."), [Reason]),
+ undefined
+ end.
-spec instance_metadata_request_headers() -> headers().
%% @doc Return headers used for instance metadata service requests.
diff --git a/deps/rabbitmq_aws/src/rabbitmq_aws_xml.erl b/deps/rabbitmq_aws/src/rabbitmq_aws_xml.erl
index fc3be5c642a8..4787ea82f270 100644
--- a/deps/rabbitmq_aws/src/rabbitmq_aws_xml.erl
+++ b/deps/rabbitmq_aws/src/rabbitmq_aws_xml.erl
@@ -11,6 +11,8 @@
-include_lib("xmerl/include/xmerl.hrl").
-spec parse(Value :: string() | binary()) -> list().
+parse(Value) when is_binary(Value) ->
+ parse(binary_to_list(Value));
parse(Value) ->
{Element, _} = xmerl_scan:string(Value),
parse_node(Element).
diff --git a/deps/rabbitmq_aws/test/rabbitmq_aws_config_tests.erl b/deps/rabbitmq_aws/test/rabbitmq_aws_config_tests.erl
index c8329f280c07..a61a381bcc98 100644
--- a/deps/rabbitmq_aws/test/rabbitmq_aws_config_tests.erl
+++ b/deps/rabbitmq_aws/test/rabbitmq_aws_config_tests.erl
@@ -99,10 +99,10 @@ credentials_test_() ->
{
foreach,
fun () ->
- meck:new(httpc),
- meck:new(rabbitmq_aws),
+ meck:new(gun, []),
+ meck:new(rabbitmq_aws, [passthrough]),
reset_environment(),
- [httpc, rabbitmq_aws]
+ [gun, rabbitmq_aws]
end,
fun meck:unload/1,
[
@@ -176,9 +176,16 @@ credentials_test_() ->
end},
{"from instance metadata service", fun() ->
CredsBody = "{\n \"Code\" : \"Success\",\n \"LastUpdated\" : \"2016-03-31T21:51:49Z\",\n \"Type\" : \"AWS-HMAC\",\n \"AccessKeyId\" : \"ASIAIMAFAKEACCESSKEY\",\n \"SecretAccessKey\" : \"2+t64tZZVaz0yp0x1G23ZRYn+FAKEyVALUEs/4qh\",\n \"Token\" : \"FAKE//////////wEAK/TOKEN/VALUE=\",\n \"Expiration\" : \"2016-04-01T04:13:28Z\"\n}",
- meck:sequence(httpc, request, 4,
- [{ok, {{protocol, 200, message}, headers, "Bob"}},
- {ok, {{protocol, 200, message}, headers, CredsBody}}]),
+ meck:expect(gun, open, fun(_,_,_) -> {ok, pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok, protocol} end),
+ meck:sequence(gun, get, 3, [stream_ref1, stream_ref2]),
+ meck:sequence(gun, await, 3,
+ [{response, nofin, 200, headers},
+ {response, nofin, 200, headers}]),
+ meck:sequence(gun, await_body, 3,
+ [{ok, <<"Bob">>},
+ {ok, list_to_binary(CredsBody)}]),
meck:expect(rabbitmq_aws, ensure_imdsv2_token_valid, 0, undefined),
Expectation = {ok, "ASIAIMAFAKEACCESSKEY", "2+t64tZZVaz0yp0x1G23ZRYn+FAKEyVALUEs/4qh",
{{2016,4,1},{4,13,28}}, "FAKE//////////wEAK/TOKEN/VALUE="},
@@ -187,30 +194,46 @@ credentials_test_() ->
},
{"with instance metadata service role error", fun() ->
meck:expect(rabbitmq_aws, ensure_imdsv2_token_valid, 0, undefined),
- meck:expect(httpc, request, 4, {error, timeout}),
+ meck:expect(gun, open, fun(_,_,_) -> {error, timeout} end),
?assertEqual({error, undefined}, rabbitmq_aws_config:credentials())
end
},
{"with instance metadata service role http error", fun() ->
meck:expect(rabbitmq_aws, ensure_imdsv2_token_valid, 0, undefined),
- meck:expect(httpc, request, 4,
- {ok, {{protocol, 500, message}, headers, "Internal Server Error"}}),
+ meck:expect(gun, open, fun(_,_,_) -> {ok, pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok, protocol} end),
+ meck:expect(gun, get, fun(_,_,_) -> stream_ref end),
+ meck:expect(gun, await, fun(_,_,_) -> {response, nofin, 500, headers} end),
+ meck:expect(gun, await_body, fun(_,_,_) -> {ok, <<"Internal Server Error">>} end),
?assertEqual({error, undefined}, rabbitmq_aws_config:credentials())
end
},
{"with instance metadata service credentials error", fun() ->
meck:expect(rabbitmq_aws, ensure_imdsv2_token_valid, 0, undefined),
- meck:sequence(httpc, request, 4,
- [{ok, {{protocol, 200, message}, headers, "Bob"}},
+ meck:expect(gun, open, fun(_,_,_) -> {ok, pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok, protocol} end),
+ meck:sequence(gun, get, 3, [stream_ref1, stream_ref2]),
+ meck:sequence(gun, await, 3,
+ [{response, nofin, 200, headers},
{error, timeout}]),
+ meck:expect(gun, await_body, fun(_,_,_) -> {ok, <<"Bob">>} end),
?assertEqual({error, undefined}, rabbitmq_aws_config:credentials())
end
},
{"with instance metadata service credentials not found", fun() ->
meck:expect(rabbitmq_aws, ensure_imdsv2_token_valid, 0, undefined),
- meck:sequence(httpc, request, 4,
- [{ok, {{protocol, 200, message}, headers, "Bob"}},
- {ok, {{protocol, 404, message}, headers, "File Not Found"}}]),
+ meck:expect(gun, open, fun(_,_,_) -> {ok, pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok, protocol} end),
+ meck:sequence(gun, get, 3, [stream_ref1, stream_ref2]),
+ meck:sequence(gun, await, 3,
+ [{response, nofin, 200, headers},
+ {response, nofin, 404, headers}]),
+ meck:sequence(gun, await_body, 3,
+ [{ok, <<"Bob">>},
+ {ok, <<"File Not Found">>}]),
?assertEqual({error, undefined}, rabbitmq_aws_config:credentials())
end
}
@@ -293,10 +316,10 @@ region_test_() ->
{
foreach,
fun () ->
- meck:new(httpc),
- meck:new(rabbitmq_aws),
+ meck:new(gun, []),
+ meck:new(rabbitmq_aws, [passthrough]),
reset_environment(),
- [httpc, rabbitmq_aws]
+ [gun, rabbitmq_aws]
end,
fun meck:unload/1,
[
@@ -319,8 +342,12 @@ region_test_() ->
end},
{"from instance metadata service", fun() ->
meck:expect(rabbitmq_aws, ensure_imdsv2_token_valid, 0, undefined),
- meck:expect(httpc, request, 4,
- {ok, {{protocol, 200, message}, headers, "us-west-1a"}}),
+ meck:expect(gun, open, fun(_,_,_) -> {ok, pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok, protocol} end),
+ meck:expect(gun, get, fun(_,_,_) -> stream_ref end),
+ meck:expect(gun, await, fun(_,_,_) -> {response, nofin, 200, headers} end),
+ meck:expect(gun, await_body, fun(_,_,_) -> {ok, <<"us-west-1a">>} end),
?assertEqual({ok, "us-west-1"}, rabbitmq_aws_config:region())
end},
{"full lookup failure", fun() ->
@@ -329,8 +356,12 @@ region_test_() ->
end},
{"http error failure", fun() ->
meck:expect(rabbitmq_aws, ensure_imdsv2_token_valid, 0, undefined),
- meck:expect(httpc, request, 4,
- {ok, {{protocol, 500, message}, headers, "Internal Server Error"}}),
+ meck:expect(gun, open, fun(_,_,_) -> {ok, pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok, protocol} end),
+ meck:expect(gun, get, fun(_,_,_) -> stream_ref end),
+ meck:expect(gun, await, fun(_,_,_) -> {response, nofin, 500, headers} end),
+ meck:expect(gun, await_body, fun(_,_,_) -> {ok, <<"Internal Server Error">>} end),
?assertEqual({ok, ?DEFAULT_REGION}, rabbitmq_aws_config:region())
end}
]}.
@@ -340,31 +371,46 @@ instance_id_test_() ->
{
foreach,
fun () ->
- meck:new(httpc),
- meck:new(rabbitmq_aws),
+ meck:new(gun, []),
+ meck:new(rabbitmq_aws, [passthrough]),
reset_environment(),
- [httpc, rabbitmq_aws]
+ [gun, rabbitmq_aws]
end,
fun meck:unload/1,
[
{"get instance id successfully",
fun() ->
meck:expect(rabbitmq_aws, ensure_imdsv2_token_valid, 0, undefined),
- meck:expect(httpc, request, 4, {ok, {{protocol, 200, message}, headers, "instance-id"}}),
+ meck:expect(gun, open, fun(_,_,_) -> {ok, pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok, protocol} end),
+ meck:expect(gun, get, fun(_,_,_) -> stream_ref end),
+ meck:expect(gun, await, fun(_,_,_) -> {response, nofin, 200, headers} end),
+ meck:expect(gun, await_body, fun(_,_,_) -> {ok, <<"instance-id">>} end),
?assertEqual({ok, "instance-id"}, rabbitmq_aws_config:instance_id())
end
},
{"getting instance id is rejected with invalid token error",
fun() ->
meck:expect(rabbitmq_aws, ensure_imdsv2_token_valid, 0, "invalid"),
- meck:expect(httpc, request, 4, {error, {{protocol, 401, message}, headers, "Invalid token"}}),
+ meck:expect(gun, open, fun(_,_,_) -> {ok, pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok, protocol} end),
+ meck:expect(gun, get, fun(_,_,_) -> stream_ref end),
+ meck:expect(gun, await, fun(_,_,_) -> {response, nofin, 401, headers} end),
+ meck:expect(gun, await_body, fun(_,_,_) -> {ok, <<"Invalid token">>} end),
?assertEqual({error, undefined}, rabbitmq_aws_config:instance_id())
end
},
{"getting instance id is rejected with access denied error",
fun() ->
meck:expect(rabbitmq_aws, ensure_imdsv2_token_valid, 0, "expired token"),
- meck:expect(httpc, request, 4, {error, {{protocol, 403, message}, headers, "access denied"}}),
+ meck:expect(gun, open, fun(_,_,_) -> {ok, pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok, protocol} end),
+ meck:expect(gun, get, fun(_,_,_) -> stream_ref end),
+ meck:expect(gun, await, fun(_,_,_) -> {response, nofin, 403, headers} end),
+ meck:expect(gun, await_body, fun(_,_,_) -> {ok, <<"access denied">>} end),
?assertEqual({error, undefined}, rabbitmq_aws_config:instance_id())
end
}
@@ -375,26 +421,35 @@ load_imdsv2_token_test_() ->
{
foreach,
fun () ->
- meck:new(httpc),
- [httpc]
+ meck:new(gun, []),
+ [gun]
end,
fun meck:unload/1,
[
{"fail to get imdsv2 token - timeout",
fun() ->
- meck:expect(httpc, request, 4, {error, timeout}),
+ meck:expect(gun, open, fun(_,_,_) -> {error, timeout} end),
?assertEqual(undefined, rabbitmq_aws_config:load_imdsv2_token())
end},
{"fail to get imdsv2 token - PUT request is not valid",
fun() ->
- meck:expect(httpc, request, 4, {error, {{protocol, 400, messge}, headers, "Missing or Invalid Parameters – The PUT request is not valid."}}),
+ meck:expect(gun, open, fun(_,_,_) -> {ok, pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok, protocol} end),
+ meck:expect(gun, put, fun(_,_,_,_) -> stream_ref end),
+ meck:expect(gun, await, fun(_,_,_) -> {response, nofin, 400, headers} end),
+ meck:expect(gun, await_body, fun(_,_,_) -> {ok, <<"Missing or Invalid Parameters – The PUT request is not valid.">>} end),
?assertEqual(undefined, rabbitmq_aws_config:load_imdsv2_token())
end},
{"successfully get imdsv2 token from instance metadata service",
fun() ->
IMDSv2Token = "super_secret_token_value",
- meck:sequence(httpc, request, 4,
- [{ok, {{protocol, 200, message}, headers, IMDSv2Token}}]),
+ meck:expect(gun, open, fun(_,_,_) -> {ok, pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok, protocol} end),
+ meck:expect(gun, put, fun(_,_,_,_) -> stream_ref end),
+ meck:expect(gun, await, fun(_,_,_) -> {response, nofin, 200, headers} end),
+ meck:expect(gun, await_body, fun(_,_,_) -> {ok, list_to_binary(IMDSv2Token)} end),
?assertEqual(IMDSv2Token, rabbitmq_aws_config:load_imdsv2_token())
end}
]
@@ -405,7 +460,7 @@ maybe_imdsv2_token_headers_test_() ->
{
foreach,
fun () ->
- meck:new(rabbitmq_aws),
+ meck:new(rabbitmq_aws, [passthrough]),
[rabbitmq_aws]
end,
fun meck:unload/1,
@@ -430,7 +485,7 @@ reset_environment() ->
setup_test_file_with_env_var("AWS_CONFIG_FILE", "bad_config.ini"),
setup_test_file_with_env_var("AWS_SHARED_CREDENTIALS_FILE",
"bad_credentials.ini"),
- meck:expect(httpc, request, 4, {error, timeout}).
+ meck:expect(gun, open, fun(_,_,_) -> {error, timeout} end).
setup_test_config_env_var() ->
setup_test_file_with_env_var("AWS_CONFIG_FILE", "test_aws_config.ini").
diff --git a/deps/rabbitmq_aws/test/rabbitmq_aws_tests.erl b/deps/rabbitmq_aws/test/rabbitmq_aws_tests.erl
index d622d1359731..a07fbc0f24bc 100644
--- a/deps/rabbitmq_aws/test/rabbitmq_aws_tests.erl
+++ b/deps/rabbitmq_aws/test/rabbitmq_aws_tests.erl
@@ -25,7 +25,7 @@ init_test_() ->
ok = gen_server:stop(Pid),
os:unsetenv("AWS_ACCESS_KEY_ID"),
os:unsetenv("AWS_SECRET_ACCESS_KEY"),
- Expectation = {state,"Sésame","ouvre-toi",undefined,undefined,"us-west-3", undefined,undefined},
+ Expectation = {state,"Sésame","ouvre-toi",undefined,undefined,"us-west-3", undefined,undefined, #{}},
?assertEqual(Expectation, State)
end},
{"error", fun() ->
@@ -35,7 +35,7 @@ init_test_() ->
rabbitmq_aws:refresh_credentials(),
{ok, State} = gen_server:call(Pid, get_state),
ok = gen_server:stop(Pid),
- Expectation = {state,undefined,undefined,undefined,undefined,"us-west-3",undefined,test_result},
+ Expectation = {state,undefined,undefined,undefined,undefined,"us-west-3",undefined,test_result, #{}},
?assertEqual(Expectation, State),
meck:validate(rabbitmq_aws_config)
end}
@@ -43,7 +43,7 @@ init_test_() ->
}.
terminate_test() ->
- ?assertEqual(ok, rabbitmq_aws:terminate(foo, bar)).
+ ?assertEqual(ok, rabbitmq_aws:terminate(foo, {state,undefined,undefined,undefined,undefined,"us-west-3",undefined,test_result, #{}})).
code_change_test() ->
?assertEqual({ok, {state, denial}}, rabbitmq_aws:code_change(foo, bar, {state, denial})).
@@ -120,8 +120,8 @@ expired_credentials_test_() ->
format_response_test_() ->
[
{"ok", fun() ->
- Response = {ok, {{"HTTP/1.1", 200, "Ok"}, [{"Content-Type", "text/xml"}], "Value"}},
- Expectation = {ok, {[{"Content-Type", "text/xml"}], [{"test", "Value"}]}},
+ Response = {ok, {{"HTTP/1.1", 200, "Ok"}, [{<<"Content-Type">>, <<"text/xml">>}], "Value"}},
+ Expectation = {ok, {[{<<"Content-Type">>, <<"text/xml">>}], [{"test", "Value"}]}},
?assertEqual(Expectation, rabbitmq_aws:format_response(Response))
end},
{"error", fun() ->
@@ -141,8 +141,8 @@ gen_server_call_test_() ->
os:putenv("AWS_DEFAULT_REGION", "us-west-3"),
os:putenv("AWS_ACCESS_KEY_ID", "Sésame"),
os:putenv("AWS_SECRET_ACCESS_KEY", "ouvre-toi"),
- meck:new(httpc, []),
- [httpc]
+ meck:new(gun, []),
+ [gun]
end,
fun (Mods) ->
meck:unload(Mods),
@@ -164,14 +164,24 @@ gen_server_call_test_() ->
Body = "",
Options = [],
Host = undefined,
- meck:expect(httpc, request,
- fun(get, {"https://ec2.us-east-1.amazonaws.com/?Action=DescribeTags&Version=2015-10-01", _Headers}, _Options, []) ->
- {ok, {{"HTTP/1.0", 200, "OK"}, [{"content-type", "application/json"}], "{\"pass\": true}"}}
- end),
- Expectation = {reply, {ok, {[{"content-type", "application/json"}], [{"pass", true}]}}, State},
+ meck:expect(gun, open, fun(_,_,_) -> {ok,pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok,protocol} end),
+ meck:expect(gun, get,
+ fun(_Pid, _Path, _Headers) -> nofin end),
+ %% {ok, {{"HTTP/1.0", 200, "OK"}, [{"content-type", "application/json"}], "{\"pass\": true}"}}
+ %% end),
+ meck:expect(gun, await,
+ fun(_Pid, _, _) -> {response, nofin, 200, [{<<"content-type">>, <<"application/json">>}]} end),
+ meck:expect(gun, await_body,
+ fun(_Pid, _, _) -> {ok, <<"{\"pass\": true}">>} end),
+
+ %% {ok, {{"HTTP/1.0", 200, "OK"}, [{"content-type", "application/json"}], "{\"pass\": true}"}}
+ %% end),
+ Expectation = {reply, {ok, {[{<<"content-type">>, <<"application/json">>}], [{"pass", true}]}}, State#state{gun_connections = #{"ec2.us-east-1.amazonaws.com:443" => pid}}},
Result = rabbitmq_aws:handle_call({request, Service, Method, Headers, Path, Body, Options, Host}, eunit, State),
?assertEqual(Expectation, Result),
- meck:validate(httpc)
+ meck:validate(gun)
end
},
{
@@ -327,9 +337,9 @@ perform_request_test_() ->
{
foreach,
fun () ->
- meck:new(httpc, []),
+ meck:new(gun, []),
meck:new(rabbitmq_aws_config, []),
- [httpc, rabbitmq_aws_config]
+ [gun, rabbitmq_aws_config]
end,
fun meck:unload/1,
[
@@ -347,19 +357,21 @@ perform_request_test_() ->
Options = [],
Host = undefined,
ExpectURI = "https://ec2.us-east-1.amazonaws.com/?Action=DescribeTags&Version=2015-10-01",
- meck:expect(httpc, request,
- fun(get, {URI, _Headers}, _Options, []) ->
- case URI of
- ExpectURI ->
- {ok, {{"HTTP/1.0", 200, "OK"}, [{"content-type", "application/json"}], "{\"pass\": true}"}};
- _ ->
- {ok, {{"HTTP/1.0", 400, "RequestFailure", [{"content-type", "application/json"}], "{\"pass\": false}"}}}
- end
- end),
- Expectation = {{ok, {[{"content-type", "application/json"}], [{"pass", true}]}}, State},
+ meck:expect(gun, open, fun(_,_,_) -> {ok,pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok,protocol} end),
+
+ meck:expect(gun, get,
+ fun(_Pid, "/?Action=DescribeTags&Version=2015-10-01", _Headers) -> nofin end),
+ meck:expect(gun, await,
+ fun(_Pid, _, _) -> {response, nofin, 200, [{<<"content-type">>, <<"application/json">>}]} end),
+ meck:expect(gun, await_body,
+ fun(_Pid, _, _) -> {ok, <<"{\"pass\": true}">>} end),
+
+ Expectation = {{ok, {[{<<"content-type">>, <<"application/json">>}], [{"pass", true}]}}, State#state{gun_connections = #{"ec2.us-east-1.amazonaws.com:443" => pid}}},
Result = rabbitmq_aws:perform_request(State, Service, Method, Headers, Path, Body, Options, Host),
?assertEqual(Expectation, Result),
- meck:validate(httpc)
+ meck:validate(gun)
end},
{
"has_credentials false",
@@ -372,11 +384,9 @@ perform_request_test_() ->
Body = "",
Options = [],
Host = undefined,
- meck:expect(httpc, request, fun(get, {_URI, _Headers}, _Options, []) -> {ok, {{"HTTP/1.0", 400, "RequestFailure"}, [{"content-type", "application/json"}], "{\"pass\": false}"}} end),
Expectation = {{error, {credentials, State#state.error}}, State},
Result = rabbitmq_aws:perform_request(State, Service, Method, Headers, Path, Body, Options, Host),
- ?assertEqual(Expectation, Result),
- meck:validate(httpc)
+ ?assertEqual(Expectation, Result)
end
},
{
@@ -447,9 +457,9 @@ api_get_request_test_() ->
{
foreach,
fun () ->
- meck:new(httpc, []),
+ meck:new(gun, []),
meck:new(rabbitmq_aws_config, []),
- [httpc, rabbitmq_aws_config]
+ [gun, rabbitmq_aws_config]
end,
fun meck:unload/1,
[
@@ -459,14 +469,25 @@ api_get_request_test_() ->
secret_access_key = "ExpiredAccessKey",
region = "us-east-1",
expiration = {{3016, 4, 1}, {12, 0, 0}}},
- meck:expect(httpc, request, 4, {ok, {{"HTTP/1.0", 200, "OK"}, [{"content-type", "application/json"}], "{\"data\": \"value\"}"}}),
+
+ meck:expect(gun, open, fun(_,_,_) -> {ok,pid} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok,protocol} end),
+ meck:expect(gun, get,
+ fun(_Pid, _Path, _Headers) -> nofin end),
+ meck:expect(gun, await,
+ fun(_Pid, _, _) -> {response, nofin, 200, [{<<"content-type">>, <<"application/json">>}]} end),
+ meck:expect(gun, await_body,
+ fun(_Pid, _, _) -> {ok, <<"{\"data\": \"value\"}">>} end),
+
+
{ok, Pid} = rabbitmq_aws:start_link(),
rabbitmq_aws:set_region("us-east-1"),
rabbitmq_aws:set_credentials(State),
Result = rabbitmq_aws:api_get_request("AWS", "API"),
ok = gen_server:stop(Pid),
?assertEqual({ok, [{"data","value"}]}, Result),
- meck:validate(httpc)
+ meck:validate(gun)
end
},
{"AWS service API request failed - credentials",
@@ -485,14 +506,22 @@ api_get_request_test_() ->
secret_access_key = "ExpiredAccessKey",
region = "us-east-1",
expiration = {{3016, 4, 1}, {12, 0, 0}}},
- meck:expect(httpc, request, 4, {error, "network error"}),
+
+ meck:expect(gun, open, fun(_,_,_) -> {ok, spawn(fun() -> ok end)} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok,protocol} end),
+ meck:expect(gun, get,
+ fun(_Pid, _Path, _Headers) -> nofin end),
+ meck:expect(gun, await,
+ fun(_Pid, _, _) -> {error, "network error"} end),
+
{ok, Pid} = rabbitmq_aws:start_link(),
rabbitmq_aws:set_region("us-east-1"),
rabbitmq_aws:set_credentials(State),
Result = rabbitmq_aws:api_get_request_with_retries("AWS", "API", 3, 1),
ok = gen_server:stop(Pid),
?assertEqual({error, "AWS service is unavailable"}, Result),
- meck:validate(httpc)
+ meck:validate(gun)
end
},
{"AWS service API request succeeded after a transient error",
@@ -501,18 +530,28 @@ api_get_request_test_() ->
secret_access_key = "ExpiredAccessKey",
region = "us-east-1",
expiration = {{3016, 4, 1}, {12, 0, 0}}},
- meck:expect(httpc, request, 4, meck:seq([
- {error, "network error"},
- {ok, {{"HTTP/1.0", 500, "OK"}, [{"content-type", "application/json"}], "{\"error\": \"server error\"}"}},
- {ok, {{"HTTP/1.0", 200, "OK"}, [{"content-type", "application/json"}], "{\"data\": \"value\"}"}}
- ])),
+
+ meck:expect(gun, open, fun(_,_,_) -> {ok, spawn(fun() -> ok end)} end),
+ meck:expect(gun, close, fun(_) -> ok end),
+ meck:expect(gun, await_up, fun(_,_) -> {ok,protocol} end),
+ meck:expect(gun, get,
+ fun(_Pid, _Path, _Headers) -> nofin end),
+
+ %% meck:expect(gun, get, 3, meck:seq(
+ %% fun(_Pid, _Path, _Headers) -> {error, "network errors"} end),
+ meck:expect(gun, await, 3, meck:seq([{error, "network error"},
+ {response, nofin, 500, [{<<"content-type">>, <<"application/json">>}]},
+ {response, nofin, 200, [{<<"content-type">>, <<"application/json">>}]}])),
+
+ meck:expect(gun, await_body, 3, meck:seq([{ok, <<"{\"error\": \"server error\"}">>},
+ {ok, <<"{\"data\": \"value\"}">>}])),
{ok, Pid} = rabbitmq_aws:start_link(),
rabbitmq_aws:set_region("us-east-1"),
rabbitmq_aws:set_credentials(State),
Result = rabbitmq_aws:api_get_request_with_retries("AWS", "API", 3, 1),
ok = gen_server:stop(Pid),
?assertEqual({ok, [{"data","value"}]}, Result),
- meck:validate(httpc)
+ meck:validate(gun)
end
}
]