Skip to content

Commit 46bb73b

Browse files
author
Vlad Ionescu
committed
merging bug21650 into default
2 parents 83f43ca + e318fdc commit 46bb73b

File tree

1 file changed

+36
-76
lines changed

1 file changed

+36
-76
lines changed

src/rabbit_reader.erl

Lines changed: 36 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
-define(HANDSHAKE_TIMEOUT, 10).
5050
-define(NORMAL_TIMEOUT, 3).
5151
-define(CLOSING_TIMEOUT, 1).
52-
-define(CHANNEL_CLOSING_TIMEOUT, 1).
5352
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
5453

5554
%---------------------------------------------------------------------------
@@ -94,23 +93,19 @@
9493
%% -> log error, wait for channels to terminate forcefully, start
9594
%% terminate_connection timer, send close, *closed*
9695
%% channel exit with soft error
97-
%% -> log error, start terminate_channel timer, mark channel as
98-
%% closing, *running*
99-
%% terminate_channel timeout -> remove 'closing' mark, *running*
96+
%% -> log error, mark channel as closing, *running*
10097
%% handshake_timeout -> ignore, *running*
10198
%% heartbeat timeout -> *throw*
10299
%% closing:
103100
%% socket close -> *terminate*
104101
%% receive frame -> ignore, *closing*
105-
%% terminate_channel timeout -> remove 'closing' mark, *closing*
106102
%% handshake_timeout -> ignore, *closing*
107103
%% heartbeat timeout -> *throw*
108104
%% channel exit with hard error
109105
%% -> log error, wait for channels to terminate forcefully, start
110106
%% terminate_connection timer, send close, *closed*
111107
%% channel exit with soft error
112-
%% -> log error, start terminate_channel timer, mark channel as
113-
%% closing
108+
%% -> log error, mark channel as closing
114109
%% if last channel to exit then send connection.close_ok,
115110
%% start terminate_connection timer, *closed*
116111
%% else *closing*
@@ -123,7 +118,6 @@
123118
%% *closed*
124119
%% receive frame -> ignore, *closed*
125120
%% terminate_connection timeout -> *terminate*
126-
%% terminate_channel timeout -> remove 'closing' mark, *closed*
127121
%% handshake_timeout -> ignore, *closed*
128122
%% heartbeat timeout -> *throw*
129123
%% channel exit -> log error, *closed*
@@ -292,8 +286,6 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
292286
mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
293287
{'EXIT', Pid, Reason} ->
294288
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
295-
{terminate_channel, Channel, Ref1} ->
296-
mainloop(Parent, Deb, terminate_channel(Channel, Ref1, State));
297289
terminate_connection ->
298290
State;
299291
handshake_timeout ->
@@ -341,32 +333,14 @@ close_connection(State = #v1{connection = #connection{
341333
State#v1{connection_state = closed}.
342334

343335
close_channel(Channel, State) ->
344-
Ref = make_ref(),
345-
TRef = erlang:send_after(1000 * ?CHANNEL_CLOSING_TIMEOUT,
346-
self(),
347-
{terminate_channel, Channel, Ref}),
348-
put({closing_channel, Channel}, {Ref, TRef}),
349-
State.
350-
351-
terminate_channel(Channel, Ref, State) ->
352-
case get({closing_channel, Channel}) of
353-
undefined -> ok; %% got close_ok in the meantime
354-
{Ref, _} -> erase({closing_channel, Channel}),
355-
ok;
356-
{_Ref, _} -> ok %% got close_ok, and have new closing channel
357-
end,
336+
put({channel, Channel}, closing),
358337
State.
359338

360339
handle_channel_exit(Channel, Reason, State) ->
361-
%% We remove the channel from the inbound map only. That allows
362-
%% the channel to be re-opened, but also means the remaining
363-
%% cleanup, including possibly closing the connection, is deferred
364-
%% until we get the (normal) exit signal.
365-
erase({channel, Channel}),
366340
handle_exception(State, Channel, Reason).
367341

368342
handle_dependent_exit(Pid, normal, State) ->
369-
channel_cleanup(Pid),
343+
erase({chpid, Pid}),
370344
maybe_close(State);
371345
handle_dependent_exit(Pid, Reason, State) ->
372346
case channel_cleanup(Pid) of
@@ -376,17 +350,10 @@ handle_dependent_exit(Pid, Reason, State) ->
376350

377351
channel_cleanup(Pid) ->
378352
case get({chpid, Pid}) of
379-
undefined ->
380-
case get({closing_chpid, Pid}) of
381-
undefined -> undefined;
382-
{channel, Channel} ->
383-
erase({closing_chpid, Pid}),
384-
Channel
385-
end;
386-
{channel, Channel} ->
387-
erase({channel, Channel}),
388-
erase({chpid, Pid}),
389-
Channel
353+
undefined -> undefined;
354+
{channel, Channel} -> erase({channel, Channel}),
355+
erase({chpid, Pid}),
356+
Channel
390357
end.
391358

392359
all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
@@ -467,13 +434,27 @@ handle_frame(Type, Channel, Payload, State) ->
467434
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
468435
case get({channel, Channel}) of
469436
{chpid, ChPid} ->
470-
ok = check_for_close(Channel, ChPid, AnalyzedFrame),
437+
case AnalyzedFrame of
438+
{method, 'channel.close', _} ->
439+
erase({channel, Channel});
440+
_ -> ok
441+
end,
471442
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
472443
State;
444+
closing ->
445+
%% According to the spec, after sending a
446+
%% channel.close we must ignore all frames except
447+
%% channel.close_ok.
448+
case AnalyzedFrame of
449+
{method, 'channel.close_ok', _} ->
450+
erase({channel, Channel});
451+
_ -> ok
452+
end,
453+
State;
473454
undefined ->
474455
case State#v1.connection_state of
475-
running -> send_to_new_channel(
476-
Channel, AnalyzedFrame, State),
456+
running -> ok = send_to_new_channel(
457+
Channel, AnalyzedFrame, State),
477458
State;
478459
Other -> throw({channel_frame_while_starting,
479460
Channel, Other, AnalyzedFrame})
@@ -716,38 +697,17 @@ i(Item, #v1{}) ->
716697
%%--------------------------------------------------------------------------
717698

718699
send_to_new_channel(Channel, AnalyzedFrame, State) ->
719-
case get({closing_channel, Channel}) of
720-
undefined ->
721-
#v1{sock = Sock,
722-
connection = #connection{
723-
frame_max = FrameMax,
724-
user = #user{username = Username},
725-
vhost = VHost}} = State,
726-
WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
727-
ChPid = rabbit_framing_channel:start_link(
728-
fun rabbit_channel:start_link/5,
729-
[Channel, self(), WriterPid, Username, VHost]),
730-
put({channel, Channel}, {chpid, ChPid}),
731-
put({chpid, ChPid}, {channel, Channel}),
732-
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame);
733-
{_, TRef} ->
734-
%% According to the spec, after sending a channel.close we
735-
%% must ignore all frames except channel.close_ok.
736-
case AnalyzedFrame of
737-
{method, 'channel.close_ok', _} ->
738-
erlang:cancel_timer(TRef),
739-
erase({closing_channel, Channel}),
740-
ok;
741-
_Other -> ok
742-
end
743-
end.
744-
745-
check_for_close(Channel, ChPid, {method, 'channel.close', _}) ->
746-
channel_cleanup(ChPid),
747-
put({closing_chpid, ChPid}, {channel, Channel}),
748-
ok;
749-
check_for_close(_Channel, _ChPid, _Frame) ->
750-
ok.
700+
#v1{sock = Sock, connection = #connection{
701+
frame_max = FrameMax,
702+
user = #user{username = Username},
703+
vhost = VHost}} = State,
704+
WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
705+
ChPid = rabbit_framing_channel:start_link(
706+
fun rabbit_channel:start_link/5,
707+
[Channel, self(), WriterPid, Username, VHost]),
708+
put({channel, Channel}, {chpid, ChPid}),
709+
put({chpid, ChPid}, {channel, Channel}),
710+
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
751711

752712
log_channel_error(ConnectionState, Channel, Reason) ->
753713
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",

0 commit comments

Comments
 (0)