Skip to content

Commit 20fba86

Browse files
committed
Forget only disconnected consumers in SAC group
1 parent f193b72 commit 20fba86

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,7 @@ handle_group_forget_connection(Pid, #?MODULE{groups = Groups0} = S0,
772772
#group{consumers = Consumers0} = G0 ->
773773
{Consumers1, Updated} =
774774
lists:foldr(
775-
fun(#consumer{pid = P, status = {_, St}} = C, {L, _})
775+
fun(#consumer{pid = P, status = {?DISCONNECTED, St}} = C, {L, _})
776776
when P == Pid ->
777777
{[C#consumer{status = {?FORGOTTTEN, St}} | L], true};
778778
(C, {L, UpdatedFlag}) ->

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,49 @@ forget_connection_super_stream_disconnected_becomes_forgotten_test(_) ->
775775
assertSendMessageActivateEffect(Pid2, 2, stream(), name(), true, Eff),
776776
ok.
777777

778+
forget_connection_simple_connected_does_not_become_forgotten_test(_) ->
779+
Pid0 = new_process(),
780+
Pid1 = new_process(),
781+
Pid2 = new_process(),
782+
GId = group_id(),
783+
Group = cgroup([consumer(Pid0, 0, {connected, active}),
784+
consumer(Pid1, 1, {connected, waiting}),
785+
consumer(Pid2, 2, {connected, waiting})]),
786+
787+
Groups0 = #{GId => Group},
788+
State0 = state(Groups0),
789+
790+
{#?STATE{groups = Groups1}, Eff} = ?MOD:forget_connection(Pid1, State0),
791+
792+
assertHasGroup(GId, cgroup([consumer(Pid0, 0, {connected, active}),
793+
consumer(Pid1, 1, {connected, waiting}),
794+
consumer(Pid2, 2, {connected, waiting})]),
795+
Groups1),
796+
assertEmpty(Eff),
797+
ok.
798+
799+
forget_connection_super_stream_connected_does_not_become_forgotten_test(_) ->
800+
Pid0 = new_process(),
801+
Pid1 = new_process(),
802+
Pid2 = new_process(),
803+
GId = group_id(),
804+
Group = cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
805+
consumer(Pid1, 1, {connected, active}),
806+
consumer(Pid2, 2, {connected, waiting})]),
807+
808+
Groups0 = #{GId => Group},
809+
State0 = state(Groups0),
810+
811+
{#?STATE{groups = Groups1}, Eff} = ?MOD:forget_connection(Pid1, State0),
812+
813+
assertHasGroup(GId, cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
814+
consumer(Pid1, 1, {connected, active}),
815+
consumer(Pid2, 2, {connected, waiting})]),
816+
Groups1),
817+
assertEmpty(Eff),
818+
ok.
819+
820+
778821
register_consumer_simple_disconn_active_block_rebalancing_test(_) ->
779822
Pid0 = new_process(),
780823
Pid1 = new_process(),

0 commit comments

Comments
 (0)