Skip to content

Commit 53cff3b

Browse files
committed
Close stream connection in case of unexpected error from SAC coordinator
Calls to the stream SAC coordinator can fail for various reason (e.g. a timeout because of a network partition). The stream reader does not take into account what the SAC coordinator returns and moves on even in case of errors. This can lead to inconsistent state for SAC groups. This commit changes this behavior by handling unexpected errors from the SAC coordinator and closing the connection. The client is expected to reconnect. This is safer than risking inconsistent state. Fixes #14040
1 parent be057c3 commit 53cff3b

File tree

2 files changed

+77
-49
lines changed

2 files changed

+77
-49
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
-opaque state() :: #?MODULE{}.
2929

30+
-type sac_error() :: partition_index_conflict | not_found.
31+
3032
-export_type([state/0,
3133
command/0]).
3234

@@ -50,7 +52,8 @@
5052
import_state/2,
5153
check_conf_change/1,
5254
list_nodes/1,
53-
state_enter/2
55+
state_enter/2,
56+
is_sac_error/1
5457
]).
5558
-export([make_purge_nodes/1,
5659
make_update_conf/1]).
@@ -89,7 +92,7 @@
8992
pid(),
9093
binary(),
9194
integer()) ->
92-
{ok, boolean()} | {error, term()}.
95+
{ok, boolean()} | {error, sac_error() | term()}.
9396
register_consumer(VirtualHost,
9497
Stream,
9598
PartitionIndex,
@@ -110,7 +113,7 @@ register_consumer(VirtualHost,
110113
binary(),
111114
pid(),
112115
integer()) ->
113-
ok | {error, term()}.
116+
ok | {error, sac_error() | term()}.
114117
unregister_consumer(VirtualHost,
115118
Stream,
116119
ConsumerName,
@@ -122,13 +125,15 @@ unregister_consumer(VirtualHost,
122125
connection_pid = ConnectionPid,
123126
subscription_id = SubscriptionId}).
124127

125-
-spec activate_consumer(binary(), binary(), binary()) -> ok.
128+
-spec activate_consumer(binary(), binary(), binary()) ->
129+
ok | {error, sac_error() | term()}.
126130
activate_consumer(VH, Stream, Name) ->
127131
process_command(#command_activate_consumer{vhost =VH,
128132
stream = Stream,
129133
consumer_name= Name}).
130134

131-
-spec connection_reconnected(connection_pid()) -> ok.
135+
-spec connection_reconnected(connection_pid()) ->
136+
ok | {error, sac_error() | term()}.
132137
connection_reconnected(Pid) ->
133138
process_command(#command_connection_reconnected{pid = Pid}).
134139

@@ -150,7 +155,7 @@ wrap_cmd(Cmd) ->
150155
%% (CLI command)
151156
-spec consumer_groups(binary(), [atom()]) ->
152157
{ok,
153-
[term()] | {error, atom()}}.
158+
[term()]} | {error, sac_error() | term()}.
154159
consumer_groups(VirtualHost, InfoKeys) ->
155160
case ra_local_query(fun(State) ->
156161
SacState =
@@ -172,7 +177,7 @@ consumer_groups(VirtualHost, InfoKeys) ->
172177
%% (CLI command)
173178
-spec group_consumers(binary(), binary(), binary(), [atom()]) ->
174179
{ok, [term()]} |
175-
{error, atom()}.
180+
{error, sac_error() | term()}.
176181
group_consumers(VirtualHost, Stream, Reference, InfoKeys) ->
177182
case ra_local_query(fun(State) ->
178183
SacState =
@@ -932,6 +937,10 @@ state_enter(leader, #?MODULE{groups = Groups} = State)
932937
state_enter(_, _) ->
933938
[].
934939

940+
-spec is_sac_error(term()) -> boolean().
941+
is_sac_error(Reason) ->
942+
lists:member(Reason, ?SAC_ERRORS).
943+
935944
nodes_from_group(#group{consumers = Cs}) when is_list(Cs) ->
936945
lists:foldl(fun(#consumer{pid = Pid}, Acc) ->
937946
Acc#{node(Pid) => true}

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 61 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
-define(UNKNOWN_FIELD, unknown_field).
8282
-define(SILENT_CLOSE_DELAY, 3_000).
8383
-define(IS_INVALID_REF(Ref), is_binary(Ref) andalso byte_size(Ref) > 255).
84+
-define(SAC_MOD, rabbit_stream_sac_coordinator).
8485

8586
-import(rabbit_stream_utils, [check_write_permitted/2,
8687
check_read_permitted/3]).
@@ -722,7 +723,7 @@ open(info, {OK, S, Data},
722723
connection_state = State2}}
723724
end;
724725
open(info, {sac, check_connection, _}, State) ->
725-
rabbit_stream_sac_coordinator:connection_reconnected(self()),
726+
_ = sac_connection_reconnected(self()),
726727
{keep_state, State};
727728
open(info,
728729
{sac, #{subscription_id := SubId,
@@ -794,17 +795,15 @@ open(info,
794795
rabbit_log:debug("Subscription ~tp on ~tp has been deleted.",
795796
[SubId, Stream]),
796797
rabbit_log:debug("Active ~tp, message ~tp", [Active, Msg]),
797-
case {Active, Msg} of
798-
{false, #{stepping_down := true,
799-
stream := St,
800-
consumer_name := ConsumerName}} ->
801-
rabbit_log:debug("Former active consumer gone, activating consumer " ++
802-
"on stream ~tp, group ~tp", [St, ConsumerName]),
803-
_ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost,
804-
St,
805-
ConsumerName);
806-
_ ->
807-
ok
798+
_ = case {Active, Msg} of
799+
{false, #{stepping_down := true,
800+
stream := St,
801+
consumer_name := ConsumerName}} ->
802+
rabbit_log:debug("Former active consumer gone, activating consumer " ++
803+
"on stream ~tp, group ~tp", [St, ConsumerName]),
804+
sac_activate_consumer(VirtualHost, St, ConsumerName);
805+
_ ->
806+
ok
808807
end,
809808
{Connection0, ConnState0}
810809
end,
@@ -2554,9 +2553,8 @@ handle_frame_post_auth(Transport,
25542553
rabbit_log:debug("Subscription ~tp on stream ~tp, group ~tp " ++
25552554
"has stepped down, activating consumer",
25562555
[SubscriptionId, Stream, ConsumerName]),
2557-
_ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost,
2558-
Stream,
2559-
ConsumerName),
2556+
_ = sac_activate_consumer(VirtualHost, Stream,
2557+
ConsumerName),
25602558
ok;
25612559
_ ->
25622560
ok
@@ -3015,21 +3013,9 @@ handle_subscription(Transport,#stream_connection{
30153013

30163014
maybe_register_consumer(_, _, _, _, _, _, false = _Sac) ->
30173015
{ok, true};
3018-
maybe_register_consumer(VirtualHost,
3019-
Stream,
3020-
ConsumerName,
3021-
ConnectionName,
3022-
SubscriptionId,
3023-
Properties,
3024-
true) ->
3025-
PartitionIndex = partition_index(VirtualHost, Stream, Properties),
3026-
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
3027-
Stream,
3028-
PartitionIndex,
3029-
ConsumerName,
3030-
self(),
3031-
ConnectionName,
3032-
SubscriptionId).
3016+
maybe_register_consumer(VH, St, Name, ConnName, SubId, Properties, true) ->
3017+
PartitionIndex = partition_index(VH, St, Properties),
3018+
sac_register_consumer(VH, St, PartitionIndex, Name, self(), ConnName, SubId).
30333019

30343020
maybe_send_consumer_update(Transport,
30353021
Connection = #stream_connection{
@@ -3175,13 +3161,12 @@ maybe_unregister_consumer(VirtualHost,
31753161
ConsumerName = consumer_name(Properties),
31763162

31773163
Requests1 = maps:fold(
3178-
fun(_, #request{content =
3179-
#{active := false,
3180-
subscription_id := SubId,
3181-
stepping_down := true}}, Acc) when SubId =:= SubscriptionId ->
3182-
_ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost,
3183-
Stream,
3184-
ConsumerName),
3164+
fun(_, #request{content = #{active := false,
3165+
subscription_id := SubId,
3166+
stepping_down := true}}, Acc)
3167+
when SubId =:= SubscriptionId ->
3168+
_ = sac_activate_consumer(VirtualHost, Stream,
3169+
ConsumerName),
31853170
rabbit_log:debug("Outstanding SAC activation request for stream '~tp', " ++
31863171
"group '~tp', sending activation.",
31873172
[Stream, ConsumerName]),
@@ -3190,11 +3175,8 @@ maybe_unregister_consumer(VirtualHost,
31903175
Acc#{K => V}
31913176
end, maps:new(), Requests),
31923177

3193-
_ = rabbit_stream_sac_coordinator:unregister_consumer(VirtualHost,
3194-
Stream,
3195-
ConsumerName,
3196-
self(),
3197-
SubscriptionId),
3178+
_ = sac_unregister_consumer(VirtualHost, Stream, ConsumerName,
3179+
self(), SubscriptionId),
31983180
Requests1.
31993181

32003182
partition_index(VirtualHost, Stream, Properties) ->
@@ -4037,3 +4019,40 @@ stream_from_consumers(SubId, Consumers) ->
40374019
%% for a bit so they can't DOS us with repeated failed logins etc.
40384020
silent_close_delay() ->
40394021
timer:sleep(?SILENT_CLOSE_DELAY).
4022+
4023+
sac_connection_reconnected(Pid) ->
4024+
sac_call(fun() ->
4025+
?SAC_MOD:connection_reconnected(Pid)
4026+
end).
4027+
4028+
sac_activate_consumer(VH, St, Name) ->
4029+
sac_call(fun() ->
4030+
?SAC_MOD:activate_consumer(VH, St, Name)
4031+
end).
4032+
4033+
sac_register_consumer(VH, St, PartitionIndex, Name, Pid, ConnName, SubId) ->
4034+
sac_call(fun() ->
4035+
?SAC_MOD:register_consumer(VH, St, PartitionIndex,
4036+
Name, Pid, ConnName,
4037+
SubId)
4038+
end).
4039+
4040+
sac_unregister_consumer(VH, St, Name, Pid, SubId) ->
4041+
sac_call(fun() ->
4042+
?SAC_MOD:unregister_consumer(VH, St, Name, Pid, SubId)
4043+
end).
4044+
4045+
sac_call(Call) ->
4046+
case Call() of
4047+
{error, Reason} = Err ->
4048+
case ?SAC_MOD:is_sac_error(Reason) of
4049+
true ->
4050+
Err;
4051+
_ ->
4052+
rabbit_log:info("Stream SAC coordinator call failed with ~tp",
4053+
[Reason]),
4054+
throw({stop, {shutdown, stream_sac_coordinator_error}})
4055+
end;
4056+
R ->
4057+
R
4058+
end.

0 commit comments

Comments
 (0)