Skip to content

Commit 034cc00

Browse files
authored
Add feature to allow disabling reconnection. (#203)
* Add feature to allow disabling reconnection. * Address PR feedback. * Add newline to end of test file.
1 parent d9f98ef commit 034cc00

File tree

2 files changed

+123
-17
lines changed

2 files changed

+123
-17
lines changed

src/shotgun.erl

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
#{async => boolean(),
4242
async_mode => binary | sse,
4343
handle_event => fun((fin | nofin, reference(), binary()) -> any()),
44-
timeout => timeout()}. %% Default 5000 ms
44+
timeout => timeout(), %% Default 5000 ms
45+
allow_reconnect => boolean()}. %% Default true
4546
-type response() ::
4647
#{status_code => integer(),
4748
headers => proplists:proplist(),
@@ -241,13 +242,15 @@ request(Pid, get, Uri, Headers0, Body, Options) ->
241242
async := IsAsync,
242243
async_mode := AsyncMode,
243244
headers := Headers,
244-
timeout := Timeout} =
245+
timeout := Timeout,
246+
allow_reconnect := AllowReconnect
247+
} =
245248
process_options(Options, Headers0, get),
246249

247250
Event =
248251
case IsAsync of
249252
true ->
250-
{get_async, {HandleEvent, AsyncMode}, {Uri, Headers, Body}};
253+
{get_async, {HandleEvent, AsyncMode, AllowReconnect}, {Uri, Headers, Body}};
251254
false ->
252255
{get, {Uri, Headers, Body}}
253256
end,
@@ -320,7 +323,9 @@ parse_event(EventBin) ->
320323
pid => pid() | undefined,
321324
responses => queue:queue() | undefined,
322325
status_code => integer() | undefined,
323-
stream => reference() | undefined}.
326+
stream => reference() | undefined,
327+
allow_reconnect => boolean()
328+
}.
324329

325330
%% @private
326331
-spec callback_mode() -> state_functions.
@@ -392,12 +397,17 @@ handle_info({gun_up, Pid, _Protocol}, StateName, StateData = #{pid := Pid}) ->
392397
{next_state, StateName, StateData};
393398
handle_info({gun_down, Pid, Protocol, Reason, KilledStreams},
394399
_StateName,
395-
StateData = #{pid := Pid}) ->
396-
error_logger:warning_msg("~p connection down on ~p: ~p (Killed: ~p)",
397-
[Protocol, Pid, Reason, KilledStreams]),
398-
gun:shutdown(Pid),
399-
CleanStateData = clean_state_data(StateData),
400-
{next_state, down, CleanStateData};
400+
StateData = #{pid := Pid, allow_reconnect := AllowReconnect }) ->
401+
error_logger:warning_msg("~p connection down on ~p: ~p (Killed: ~p) (AllowReconnect: ~p)",
402+
[Protocol, Pid, Reason, KilledStreams, AllowReconnect]),
403+
case AllowReconnect of
404+
true ->
405+
gun:shutdown(Pid),
406+
CleanStateData = clean_state_data(StateData),
407+
{next_state, down, CleanStateData};
408+
false ->
409+
keep_state_and_data
410+
end;
401411
handle_info(Event, StateName, StateData) ->
402412
Module = ?MODULE,
403413
% forward messages to state machine
@@ -450,7 +460,7 @@ at_rest(cast, {gun_down, _Args, _From}, StateData = #{pid := Pid}) ->
450460
CleanStateData = clean_state_data(StateData),
451461
{next_state, down, CleanStateData};
452462
at_rest(cast,
453-
{get_async, {HandleEvent, AsyncMode}, Args, From},
463+
{get_async, {HandleEvent, AsyncMode, AllowReconnect}, Args, From},
454464
StateData = #{pid := Pid}) ->
455465
StreamRef = do_http_verb(get, Pid, Args),
456466
CleanStateData = clean_state_data(StateData),
@@ -460,7 +470,8 @@ at_rest(cast,
460470
stream => StreamRef,
461471
handle_event => HandleEvent,
462472
async => true,
463-
async_mode => AsyncMode},
473+
async_mode => AsyncMode,
474+
allow_reconnect => AllowReconnect},
464475
{next_state, wait_response, NewStateData};
465476
at_rest(cast, {HttpVerb, {_, _, Body} = Args, From}, StateData = #{pid := Pid}) ->
466477
StreamRef = do_http_verb(HttpVerb, Pid, Args),
@@ -641,6 +652,7 @@ clean_state_data() ->
641652
clean_state_data(StateData) ->
642653
Responses = maps:get(responses, StateData, queue:new()),
643654
Requests = maps:get(pending_requests, StateData, queue:new()),
655+
AllowReconnect = maps:get(allow_reconnect, StateData, true),
644656
#{pid => undefined,
645657
stream => undefined,
646658
handle_event => undefined,
@@ -652,7 +664,8 @@ clean_state_data(StateData) ->
652664
async => false,
653665
async_mode => binary,
654666
buffer => <<"">>,
655-
pending_requests => Requests}.
667+
pending_requests => Requests,
668+
allow_reconnect => AllowReconnect}.
656669

657670
%% @private
658671
-spec do_http_verb(http_verb(), pid(), tuple()) -> reference().
@@ -711,6 +724,7 @@ process_options(Options, Headers0, HttpVerb) ->
711724
Async = maps:get(async, Options, false),
712725
AsyncMode = maps:get(async_mode, Options, binary),
713726
Timeout = maps:get(timeout, Options, 5000),
727+
AllowReconnect = maps:get(allow_reconnect, Options, true),
714728
case {Async, HttpVerb} of
715729
{true, get} ->
716730
ok;
@@ -723,7 +737,8 @@ process_options(Options, Headers0, HttpVerb) ->
723737
async => Async,
724738
async_mode => AsyncMode,
725739
headers => Headers,
726-
timeout => Timeout}.
740+
timeout => Timeout,
741+
allow_reconnect => AllowReconnect}.
727742

728743
%% @private
729744
-spec basic_auth_header(headers()) -> proplists:proplist().
@@ -809,8 +824,8 @@ enqueue_work_or_stop(_StateName, Event, From, StateData, Timeout) ->
809824

810825
%% @private
811826
-spec create_work({atom(), list()}, gen_statem:from()) -> not_work | {ok, work()}.
812-
create_work({M = get_async, {HandleEvent, AsyncMode}, Args}, From) ->
813-
{ok, {M, {HandleEvent, AsyncMode}, Args, From}};
827+
create_work({M = get_async, {HandleEvent, AsyncMode, AllowReconnect}, Args}, From) ->
828+
{ok, {M, {HandleEvent, AsyncMode, AllowReconnect}, Args, From}};
814829
create_work({M, Args}, From)
815830
when M == get
816831
orelse M == post

test/shotgun_async_SUITE.erl

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,15 @@
22

33
-export([all/0, init_per_suite/1, end_per_suite/1, init_per_testcase/2,
44
end_per_testcase/2]).
5-
-export([get_sse/1, get_binary/1, work_queue/1, get_handle_event/1, async_unsupported/1]).
5+
-export([
6+
get_sse/1,
7+
get_binary/1,
8+
work_queue/1,
9+
get_handle_event/1,
10+
async_unsupported/1,
11+
async_gun_down_no_reopen/1,
12+
async_gun_down_with_reopen/1
13+
]).
614

715
-include_lib("common_test/include/ct.hrl").
816

@@ -163,3 +171,86 @@ async_unsupported(Config) ->
163171
{error, {async_unsupported, put}} = shotgun:put(Conn, "/", #{}, <<>>, #{async => true}),
164172

165173
{comment, ""}.
174+
175+
-spec async_gun_down_no_reopen(shotgun_test_utils:config()) -> {comment, string()}.
176+
async_gun_down_no_reopen(Config) ->
177+
Conn = ?config(conn, Config),
178+
Self = self(),
179+
180+
ct:comment("SSE: GET should return a ref"),
181+
HandleEvent =
182+
fun(_, _, EventBin) ->
183+
case shotgun:parse_event(EventBin) of
184+
#{id := Data} ->
185+
Self ! Data;
186+
_ ->
187+
ok
188+
end
189+
end,
190+
Opts =
191+
#{async => true,
192+
async_mode => sse,
193+
allow_reconnect => false,
194+
handle_event => HandleEvent},
195+
{ok, _Ref} = shotgun:get(Conn, <<"/chunked-sse/3">>, #{}, Opts),
196+
197+
timer:sleep(500),
198+
199+
ok = shotgun_test_utils:wait_receive(<<"1">>, 500),
200+
{at_rest, _} = sys:get_state(Conn),
201+
%% Server is stopped immediately after receiving event.
202+
%% We should be at rest waiting for the next event.
203+
http_server:stop_listener(),
204+
timer:sleep(500),
205+
%% Listener needs to be running at test end.
206+
http_server:start_listener(),
207+
208+
ct:comment("Connection should be closed after server shutdown"),
209+
false = erlang:is_process_alive(Conn),
210+
211+
{comment, ""}.
212+
213+
-spec async_gun_down_with_reopen(shotgun_test_utils:config()) -> {comment, string()}.
214+
async_gun_down_with_reopen(Config) ->
215+
Conn = ?config(conn, Config),
216+
Self = self(),
217+
218+
ct:comment("SSE: GET should return a ref"),
219+
HandleEvent =
220+
fun(_, _, EventBin) ->
221+
case shotgun:parse_event(EventBin) of
222+
#{id := Data} ->
223+
Self ! Data;
224+
_ ->
225+
ok
226+
end
227+
end,
228+
Opts =
229+
#{async => true,
230+
async_mode => sse,
231+
allow_reconnect => true,
232+
handle_event => HandleEvent},
233+
{ok, _Ref} = shotgun:get(Conn, <<"/chunked-sse/3">>, #{}, Opts),
234+
235+
timer:sleep(500),
236+
237+
ok = shotgun_test_utils:wait_receive(<<"1">>, 500),
238+
{at_rest, _} = sys:get_state(Conn),
239+
%% Server is stopped immediately after receiving event.
240+
%% We should be at rest waiting for the next event.
241+
http_server:stop_listener(),
242+
timer:sleep(500),
243+
244+
%% Listener needs to be running at test end.
245+
http_server:start_listener(),
246+
247+
ct:comment("Connection should still be alive"),
248+
true = erlang:is_process_alive(Conn),
249+
250+
Port = application:get_env(http_server, http_port, 8888),
251+
shotgun:reopen(Conn, "localhost", Port),
252+
253+
{ok, _Ref2} = shotgun:get(Conn, <<"/chunked-sse/3">>, #{}, Opts),
254+
ok = shotgun_test_utils:wait_receive(<<"1">>, 500),
255+
256+
{comment, ""}.

0 commit comments

Comments
 (0)