Skip to content

Commit 6b9cee1

Browse files
committed
Update stream SAC configuration in tick
Not in aux.
1 parent bfe5adb commit 6b9cee1

File tree

3 files changed

+15
-20
lines changed

3 files changed

+15
-20
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -810,9 +810,9 @@ all_member_nodes(Streams) ->
810810
maps:merge(Acc, M)
811811
end, #{}, Streams)).
812812

813-
tick(_Ts, _State) ->
814-
[{aux, maybe_resize_coordinator_cluster},
815-
{aux, maybe_update_sac_configuration}].
813+
tick(_Ts, #?MODULE{single_active_consumer = SacState}) ->
814+
[{aux, maybe_resize_coordinator_cluster} |
815+
maybe_update_sac_configuration(SacState)].
816816

817817
members() ->
818818
%% TODO: this can be replaced with a ra_leaderboard
@@ -897,18 +897,13 @@ maybe_handle_stale_nodes(MemberNodes, ExpectedNodes,
897897
maybe_handle_stale_nodes(_, _, _, _) ->
898898
ok.
899899

900-
maybe_update_sac_configuration(RaAux, MacVersion) when MacVersion >= 5 ->
901-
#?MODULE{single_active_consumer = SacState} = ra_aux:machine_state(RaAux),
900+
maybe_update_sac_configuration(SacState) ->
902901
case sac_check_conf_change(SacState) of
903902
{new, UpdatedConf} ->
904-
Leader = ra_aux:leader_id(RaAux),
905-
ra:pipeline_command(Leader, sac_make_update_conf(UpdatedConf)),
906-
ok;
903+
[{append, sac_make_update_conf(UpdatedConf), noreply}];
907904
_ ->
908-
ok
909-
end;
910-
maybe_update_sac_configuration(_, _) ->
911-
ok.
905+
[]
906+
end.
912907

913908
add_member(Members, Node) ->
914909
MinMacVersion = erpc:call(Node, ?MODULE, version, []),
@@ -992,11 +987,6 @@ handle_aux(leader, _, maybe_resize_coordinator_cluster,
992987
AuxState, RaAux) ->
993988
%% Coordinator resizing is still happening, let's ignore this tick event
994989
{no_reply, AuxState, RaAux};
995-
handle_aux(leader, _, maybe_update_sac_configuration,
996-
AuxState, RaAux) ->
997-
MachineVersion = ra_aux:effective_machine_version(RaAux),
998-
maybe_update_sac_configuration(RaAux, MachineVersion),
999-
{no_reply, AuxState, RaAux};
1000990
handle_aux(leader, _, {down, Pid, _},
1001991
#aux{resizer = Pid} = Aux, RaAux) ->
1002992
%% Coordinator resizing has finished

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -861,8 +861,9 @@ import_state(4, #{<<"groups">> := Groups, <<"pids_groups">> := PidsGroups}) ->
861861
pids_groups = map_to_pids_groups(PidsGroups),
862862
conf = #{disconnected_timeout => ?DISCONNECTED_TIMEOUT_MS}}.
863863

864-
-spec check_conf_change(state()) -> {new, conf()} | unchanged.
865-
check_conf_change(#?MODULE{conf = Conf}) ->
864+
-spec check_conf_change(state() | term()) -> {new, conf()} | unchanged.
865+
check_conf_change(State) when is_record(State, ?MODULE)->
866+
#?MODULE{conf = Conf} = State,
866867
DisconTimeout = lookup_disconnected_timeout(),
867868
case Conf of
868869
#{?DISCONNECTED_TIMEOUT_CONF_KEY := DT}
@@ -872,7 +873,9 @@ check_conf_change(#?MODULE{conf = Conf}) ->
872873
{new, #{?DISCONNECTED_TIMEOUT_CONF_KEY => DisconTimeout}};
873874
_ ->
874875
unchanged
875-
end.
876+
end;
877+
check_conf_change(_) ->
878+
unchanged.
876879

877880
- spec make_purge_nodes([node()]) -> {sac, command()}.
878881
make_purge_nodes(Nodes) ->

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ check_conf_test(_) ->
7070
?MOD:check_conf_change(state_with_conf(#{K => 42}))),
7171
?assertMatch(unchanged,
7272
?MOD:check_conf_change(state_with_conf(#{K => Def}))),
73+
?assertMatch(unchanged,
74+
?MOD:check_conf_change(#{K => Def})),
7375
ok.
7476

7577
simple_sac_test(_) ->

0 commit comments

Comments
 (0)