Skip to content

Commit 27c5487

Browse files
committed
wip
1 parent ee87dc8 commit 27c5487

File tree

1 file changed

+44
-20
lines changed

1 file changed

+44
-20
lines changed

deps/rabbitmq_aws/src/rabbitmq_aws.erl

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,11 @@ init([]) ->
181181

182182

183183
terminate(_, State) ->
184-
% Close all Gun connections
185-
maps:fold(fun(_Host, ConnPid, _Acc) ->
186-
gun:close(ConnPid)
187-
end, ok, State#state.gun_connections),
188-
ok.
189-
184+
%% Close all Gun connections
185+
maps:fold(fun(_Host, ConnPid, _Acc) ->
186+
gun:close(ConnPid)
187+
end, ok, State#state.gun_connections),
188+
ok.
190189

191190
code_change(_, _, State) ->
192191
{ok, State}.
@@ -223,12 +222,15 @@ handle_msg({set_credentials, AccessKey, SecretAccessKey}, State) ->
223222
error = undefined}};
224223

225224
handle_msg({set_credentials, NewState}, State) ->
226-
{reply, ok, State#state{access_key = NewState#state.access_key,
227-
secret_access_key = NewState#state.secret_access_key,
228-
security_token = NewState#state.security_token,
229-
expiration = NewState#state.expiration,
230-
error = NewState#state.error,
231-
gun_connections = State#state.gun_connections}};
225+
spawn(fun() -> maps:fold(fun(_Host, ConnPid, _Acc) ->
226+
gun:close(ConnPid)
227+
end, ok, State#state.gun_connections) end),
228+
{reply, ok, State#state{access_key = NewState#state.access_key,
229+
secret_access_key = NewState#state.secret_access_key,
230+
security_token = NewState#state.security_token,
231+
expiration = NewState#state.expiration,
232+
error = NewState#state.error,
233+
gun_connections = #{}}}; % Potentially new credentials, so clear the connection pool?
232234

233235
handle_msg({set_region, Region}, State) ->
234236
{reply, ok, State#state{region = Region}};
@@ -577,10 +579,10 @@ api_get_request_with_retries(Service, Path, Retries, WaitTimeBetweenRetries) ->
577579
%% Gun HTTP client functions
578580
gun_request(State, Method, URI, Headers, Body, Options) ->
579581
{Host, Port, Path} = parse_uri(URI),
580-
{ConnPid, NewState} = get_or_create_gun_connection(State, Host, Port, Options),
582+
{ConnPid, NewState} = get_or_create_gun_connection(State, Host, Port, Path, Options),
581583
Timeout = proplists:get_value(timeout, Options, ?DEFAULT_HTTP_TIMEOUT),
582584
try
583-
StreamRef = gun:get(ConnPid, Path, Headers),
585+
StreamRef = do_gun_request(ConnPid, Method, Path, Headers, Body),
584586
case gun:await(ConnPid, StreamRef, Timeout) of
585587
{response, fin, Status, RespHeaders} ->
586588
Response = {ok, {{http_version, Status, status_text(Status)}, RespHeaders, <<>>}},
@@ -595,28 +597,50 @@ gun_request(State, Method, URI, Headers, Body, Options) ->
595597
catch
596598
_:Error ->
597599
% Connection failed, remove from pool and return error
598-
NewConnections = maps:remove(Host, NewState#state.gun_connections),
600+
HostKey = get_connection_key(Host, Port, Path, Options),
601+
NewConnections = maps:remove(HostKey, NewState#state.gun_connections),
599602
gun:close(ConnPid),
600603
{{error, Error}, NewState#state{gun_connections = NewConnections}}
601604
end.
602605

603-
get_or_create_gun_connection(State, Host, Port, Options) ->
604-
HostKey = Host ++ ":" ++ integer_to_list(Port),
606+
do_gun_request(ConnPid, get, Path, Headers, _Body) ->
607+
gun:get(ConnPid, Path, Headers);
608+
do_gun_request(ConnPid, post, Path, Headers, Body) ->
609+
gun:post(ConnPid, Path, Headers, Body, #{});
610+
do_gun_request(ConnPid, put, Path, Headers, Body) ->
611+
gun:put(ConnPid, Path, Headers, Body, #{});
612+
do_gun_request(ConnPid, head, Path, Headers, _Body) ->
613+
gun:head(ConnPid, Path, Headers, #{});
614+
do_gun_request(ConnPid, delete, Path, Headers, _Body) ->
615+
gun:delete(ConnPid, Path, Headers, #{});
616+
do_gun_request(ConnPid, patch, Path, Headers, Body) ->
617+
gun:patch(ConnPid, Path, Headers, Body, #{});
618+
do_gun_request(ConnPid, options, Path, Headers, _Body) ->
619+
gun:options(ConnPid, Path, Headers, #{}).
620+
621+
get_or_create_gun_connection(State, Host, Port, Path, Options) ->
622+
HostKey = get_connection_key(Host, Port, Path, Options),
605623
case maps:get(HostKey, State#state.gun_connections, undefined) of
606624
undefined ->
607-
create_gun_connection(State, Host, Port, HostKey, Options);
625+
create_gun_connection(State, Host, Port, Path, HostKey, Options);
608626
ConnPid ->
609627
case is_process_alive(ConnPid) andalso gun:info(ConnPid) =/= undefined of
610628
true ->
611629
{ConnPid, State};
612630
false ->
613631
% Connection is dead, create new one
614632
gun:close(ConnPid),
615-
create_gun_connection(State, Host, Port, HostKey, Options)
633+
create_gun_connection(State, Host, Port, Path, HostKey, Options)
616634
end
617635
end.
618636

619-
create_gun_connection(State, Host, Port, HostKey, Options) ->
637+
get_connection_key(Host, Port, Path, Options) ->
638+
case proplists:get_value(connection_per_path, Options, false) of
639+
true -> Host ++ ":" ++ integer_to_list(Port) ++ Path; % Per-path
640+
false -> Host ++ ":" ++ integer_to_list(Port) % Per-host (default)
641+
end.
642+
643+
create_gun_connection(State, Host, Port, Path, HostKey, Options) ->
620644
% Map HTTP version to Gun protocols, always include http as fallback
621645
HttpVersion = proplists:get_value(version, Options, "HTTP/1.1"),
622646
Protocols = case HttpVersion of

0 commit comments

Comments
 (0)