Skip to content

Commit 0625784

Browse files
committed
Handle disconnection in one function with reason parameter
1 parent 28fd75a commit 0625784

File tree

4 files changed

+47
-41
lines changed

4 files changed

+47
-41
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@
8787
-define(REPLICA_FRESHNESS_LIMIT_MS, 10 * 1000). %% 10s
8888
-define(V2_OR_MORE(Vsn), Vsn >= 2).
8989
-define(V5_OR_MORE(Vsn), Vsn >= 5).
90+
-define(SAC_V4, rabbit_stream_sac_coordinator_v4).
91+
-define(SAC_CURRENT, rabbit_stream_sac_coordinator).
9092

9193
-type state() :: #?MODULE{}.
9294
-type args() :: #{index := ra:index(),
@@ -649,19 +651,12 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
649651
return(Meta, State#?MODULE{streams = Streams0,
650652
monitors = Monitors1}, ok, Effects0)
651653
end;
652-
{sac, Monitors1} when Vsn < 5 orelse Reason =/= noconnection ->
653-
%% A connection went down, v5+ treats noconnection differently but
654-
%% v4- does not.
655-
Mod = sac_module(Meta),
656-
{SacState1, Effects} = Mod:handle_connection_down(Pid, SacState0),
654+
{sac, Monitors1} ->
655+
{SacState1, SacEffects} = sac_handle_connection_down(SacState0, Pid,
656+
Reason, Vsn),
657657
return(Meta, State#?MODULE{single_active_consumer = SacState1,
658-
monitors = Monitors1}, ok, [Effects0 ++ Effects]);
659-
{sac, Monitors1} when Reason =:= noconnection ->
660-
%% the node of a connection got disconnected
661-
Mod = sac_module(Meta),
662-
{SacState1, Effects} = Mod:handle_connection_node_disconnected(Pid, SacState0),
663-
return(Meta, State#?MODULE{single_active_consumer = SacState1,
664-
monitors = Monitors1}, ok, [Effects0 ++ Effects]);
658+
monitors = Monitors1},
659+
ok, [Effects0 ++ SacEffects]);
665660
error ->
666661
return(Meta, State, ok, Effects0)
667662
end;
@@ -796,9 +791,9 @@ state_enter(_S, _) ->
796791
[].
797792

798793
sac_module(#{machine_version := Vsn}) when ?V5_OR_MORE(Vsn) ->
799-
rabbit_stream_sac_coordinator;
794+
?SAC_CURRENT;
800795
sac_module(_) ->
801-
rabbit_stream_sac_coordinator_v4.
796+
?SAC_V4.
802797

803798
all_member_nodes(Streams) ->
804799
maps:keys(
@@ -2441,6 +2436,14 @@ maps_to_list(M) ->
24412436
ra_local_query(QueryFun) ->
24422437
ra:local_query({?MODULE, node()}, QueryFun, infinity).
24432438

2439+
sac_handle_connection_down(SacState, Pid, Reason, Vsn) when ?V5_OR_MORE(Vsn) ->
2440+
%% the node of a connection got disconnected
2441+
?SAC_CURRENT:handle_connection_down(Pid, Reason, SacState);
2442+
sac_handle_connection_down(SacState, Pid, _Reason, _Vsn) ->
2443+
%% A connection went down, v5+ treats noconnection differently but
2444+
%% v4- does not.
2445+
?SAC_V4:handle_connection_down(Pid, SacState).
2446+
24442447
sac_make_purge_nodes(Nodes) ->
24452448
rabbit_stream_sac_coordinator:make_purge_nodes(Nodes).
24462449

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@
4141
init_state/0,
4242
send_message/2,
4343
ensure_monitors/4,
44-
handle_connection_down/2,
45-
handle_connection_node_disconnected/2,
44+
handle_connection_down/3,
4645
handle_node_reconnected/3,
4746
forget_connection/2,
4847
consumer_groups/3,
@@ -704,9 +703,11 @@ ensure_monitors(#command_purge_nodes{},
704703
ensure_monitors(_, #?MODULE{} = State0, Monitors, Effects) ->
705704
{State0, Monitors, Effects}.
706705

707-
-spec handle_connection_down(connection_pid(), state()) ->
708-
{state(), ra_machine:effects()}.
709-
handle_connection_down(Pid,
706+
-spec handle_connection_down(connection_pid(), term(), state()) ->
707+
{state(), ra_machine:effects()}.
708+
handle_connection_down(Pid, noconnection, State) ->
709+
handle_connection_node_disconnected(Pid, State);
710+
handle_connection_down(Pid, _Reason,
710711
#?MODULE{pids_groups = PidsGroups0} = State0) ->
711712
case maps:take(Pid, PidsGroups0) of
712713
error ->

deps/rabbit/src/rabbit_stream_sac_coordinator_v4.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ ensure_monitors(_, #?STATE{} = State0, Monitors, Effects) ->
409409
{State0, Monitors, Effects}.
410410

411411
-spec handle_connection_down(connection_pid(), state()) ->
412-
{state(), ra_machine:effects()}.
412+
{state(), ra_machine:effects()}.
413413
handle_connection_down(Pid,
414414
#?STATE{pids_groups = PidsGroups0} = State0) ->
415415
case maps:take(Pid, PidsGroups0) of

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -316,13 +316,13 @@ handle_connection_down_sac_should_get_activated_test(_) ->
316316
State0 = state(#{GroupId => Group}),
317317

318318
{#?STATE{pids_groups = PidsGroups1, groups = Groups1} = State1,
319-
Effects1} = ?MOD:handle_connection_down(Pid0, State0),
319+
Effects1} = ?MOD:handle_connection_down(Pid0, normal, State0),
320320
assertSize(1, PidsGroups1),
321321
assertSize(1, maps:get(Pid1, PidsGroups1)),
322322
assertSendMessageActivateEffect(Pid1, 1, Stream, ConsumerName, true, Effects1),
323323
assertHasGroup(GroupId, grp([csr(Pid1, 1, active)]), Groups1),
324324
{#?STATE{pids_groups = PidsGroups2, groups = Groups2},
325-
Effects2} = ?MOD:handle_connection_down(Pid1, State1),
325+
Effects2} = ?MOD:handle_connection_down(Pid1, normal, State1),
326326
assertEmpty(PidsGroups2),
327327
assertEmpty(Effects2),
328328
assertEmpty(Groups2),
@@ -341,7 +341,7 @@ handle_connection_down_sac_active_does_not_change_test(_) ->
341341
State = state(#{GroupId => Group}),
342342

343343
{#?STATE{pids_groups = PidsGroups, groups = Groups},
344-
Effects} = ?MOD:handle_connection_down(Pid0, State),
344+
Effects} = ?MOD:handle_connection_down(Pid0, normal, State),
345345
assertSize(1, PidsGroups),
346346
assertSize(1, maps:get(Pid1, PidsGroups)),
347347
assertEmpty(Effects),
@@ -358,7 +358,7 @@ handle_connection_down_sac_no_more_consumers_test(_) ->
358358
State = state(#{GroupId => Group}),
359359

360360
{#?STATE{pids_groups = PidsGroups, groups = Groups},
361-
Effects} = ?MOD:handle_connection_down(Pid0, State),
361+
Effects} = ?MOD:handle_connection_down(Pid0, normal, State),
362362
assertEmpty(PidsGroups),
363363
assertEmpty(Groups),
364364
assertEmpty(Effects),
@@ -377,7 +377,7 @@ handle_connection_down_sac_no_consumers_in_down_connection_test(_) ->
377377
Pid1 => maps:from_list([{GroupId, true}])}),
378378

379379
{#?STATE{pids_groups = PidsGroups, groups = Groups},
380-
Effects} = ?MOD:handle_connection_down(Pid0, State),
380+
Effects} = ?MOD:handle_connection_down(Pid0, normal, State),
381381

382382
assertSize(1, PidsGroups),
383383
assertSize(1, maps:get(Pid1, PidsGroups)),
@@ -400,7 +400,7 @@ handle_connection_down_super_stream_active_stays_test(_) ->
400400
State = state(#{GroupId => Group}),
401401

402402
{#?STATE{pids_groups = PidsGroups, groups = Groups},
403-
Effects} = ?MOD:handle_connection_down(Pid1, State),
403+
Effects} = ?MOD:handle_connection_down(Pid1, normal, State),
404404
assertSize(1, PidsGroups),
405405
assertSize(1, maps:get(Pid0, PidsGroups)),
406406
assertEmpty(Effects),
@@ -424,7 +424,7 @@ handle_connection_down_super_stream_active_changes_test(_) ->
424424

425425
{#?STATE{pids_groups = PidsGroups, groups = Groups},
426426
Effects} =
427-
?MOD:handle_connection_down(Pid0, State),
427+
?MOD:handle_connection_down(Pid0, normal, State),
428428
assertSize(1, PidsGroups),
429429
assertSize(1, maps:get(Pid1, PidsGroups)),
430430
assertSendMessageSteppingDownEffect(Pid1, 1, Stream, ConsumerName, Effects),
@@ -447,7 +447,7 @@ handle_connection_down_super_stream_activate_in_remaining_connection_test(_) ->
447447
State = state(#{GroupId => Group}),
448448

449449
{#?STATE{pids_groups = PidsGroups, groups = Groups},
450-
Effects} = ?MOD:handle_connection_down(Pid0, State),
450+
Effects} = ?MOD:handle_connection_down(Pid0, normal, State),
451451
assertSize(1, PidsGroups),
452452
assertSize(1, maps:get(Pid1, PidsGroups)),
453453
assertSendMessageActivateEffect(Pid1, 3, Stream, ConsumerName, true, Effects),
@@ -472,7 +472,7 @@ handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
472472
State = state(#{GroupId => Group}),
473473

474474
{#?STATE{pids_groups = PidsGroups, groups = Groups},
475-
Effects} = ?MOD:handle_connection_down(Pid0, State),
475+
Effects} = ?MOD:handle_connection_down(Pid0, normal, State),
476476
assertSize(1, PidsGroups),
477477
assertSize(1, maps:get(Pid1, PidsGroups)),
478478
assertEmpty(Effects),
@@ -509,7 +509,7 @@ handle_connection_down_consumers_from_dead_connection_should_be_filtered_out_tes
509509

510510
{#?STATE{pids_groups = PidsGroups1, groups = Groups1} = State1,
511511
Effects1} =
512-
?MOD:handle_connection_down(Pid0, State0),
512+
?MOD:handle_connection_down(Pid0, normal, State0),
513513
assertSize(2, PidsGroups1),
514514
assertSize(1, maps:get(Pid1, PidsGroups1)),
515515
assertSize(1, maps:get(Pid2, PidsGroups1)),
@@ -520,7 +520,7 @@ handle_connection_down_consumers_from_dead_connection_should_be_filtered_out_tes
520520
Groups1),
521521

522522
{#?STATE{pids_groups = PidsGroups2, groups = Groups2},
523-
Effects2} = ?MOD:handle_connection_down(Pid1, State1),
523+
Effects2} = ?MOD:handle_connection_down(Pid1, normal, State1),
524524
assertSize(1, PidsGroups2),
525525
assertSize(1, maps:get(Pid2, PidsGroups2)),
526526
assertSendMessageActivateEffect(Pid2, 2, Stream, ConsumerName, true, Effects2),
@@ -599,7 +599,7 @@ handle_connection_node_disconnected_test(_) ->
599599

600600
{#?STATE{pids_groups = PidsGroups1, groups = Groups1} = _State1,
601601
[Effect1]} =
602-
?MOD:handle_connection_node_disconnected(Pid1, State0),
602+
?MOD:handle_connection_down(Pid1, noconnection, State0),
603603
assertSize(2, PidsGroups1),
604604
assertSize(1, maps:get(Pid0, PidsGroups1)),
605605
assertSize(1, maps:get(Pid2, PidsGroups1)),
@@ -945,7 +945,8 @@ handle_connection_down_simple_disconn_active_block_rebalancing_test(_) ->
945945

946946
Groups0 = #{GId => Group},
947947
State0 = state(Groups0),
948-
{#?STATE{groups = Groups1}, Eff} = ?MOD:handle_connection_down(Pid2, State0),
948+
{#?STATE{groups = Groups1}, Eff} = ?MOD:handle_connection_down(Pid2, normal,
949+
State0),
949950
assertHasGroup(GId, grp([csr(Pid0, 0, {connected, waiting}),
950951
csr(Pid1, 0, {disconnected, active})]),
951952
Groups1),
@@ -963,7 +964,8 @@ handle_connection_down_super_stream_disconn_active_block_rebalancing_test(_) ->
963964

964965
Groups0 = #{GId => Group},
965966
State0 = state(Groups0),
966-
{#?STATE{groups = Groups1}, Eff} = ?MOD:handle_connection_down(Pid0, State0),
967+
{#?STATE{groups = Groups1}, Eff} = ?MOD:handle_connection_down(Pid0, normal,
968+
State0),
967969
assertHasGroup(GId, grp(1, [csr(Pid1, 0, {disconnected, active}),
968970
csr(Pid2, 0, {connected, waiting})]),
969971
Groups1),
@@ -982,7 +984,7 @@ handle_connection_node_disconnected_simple_disconn_active_block_rebalancing_test
982984
Groups0 = #{GId => Group},
983985
State0 = state(Groups0),
984986
{#?STATE{groups = Groups1}, Eff} =
985-
?MOD:handle_connection_node_disconnected(Pid2, State0),
987+
?MOD:handle_connection_down(Pid2, noconnection, State0),
986988
assertHasGroup(GId, grp([csr(Pid0, 0, {connected, waiting}),
987989
csr(Pid1, 0, {disconnected, active}),
988990
csr(Pid2, 0, {disconnected, waiting})]),
@@ -1002,7 +1004,7 @@ handle_connection_node_disconnected_super_stream_disconn_active_block_rebalancin
10021004
Groups0 = #{GId => Group},
10031005
State0 = state(Groups0),
10041006
{#?STATE{groups = Groups1}, Eff} =
1005-
?MOD:handle_connection_node_disconnected(Pid0, State0),
1007+
?MOD:handle_connection_down(Pid0, noconnection, State0),
10061008
assertHasGroup(GId, grp(1, [csr(Pid0, 0, {disconnected, waiting}),
10071009
csr(Pid1, 0, {disconnected, active}),
10081010
csr(Pid2, 0, {connected, waiting})]),
@@ -1317,9 +1319,9 @@ node_disconnected_and_reconnected_test(_) ->
13171319

13181320
State0 = state(#{GId0 => G0, GId1 => G1, GId2 => G2}),
13191321

1320-
{State1, Eff1} = ?MOD:handle_connection_node_disconnected(N1P0, State0),
1321-
{State2, Eff2} = ?MOD:handle_connection_node_disconnected(N1P1, State1),
1322-
{State3, Eff3} = ?MOD:handle_connection_node_disconnected(N1P2, State2),
1322+
{State1, Eff1} = ?MOD:handle_connection_down(N1P0, noconnection, State0),
1323+
{State2, Eff2} = ?MOD:handle_connection_down(N1P1, noconnection, State1),
1324+
{State3, Eff3} = ?MOD:handle_connection_down(N1P2, noconnection, State2),
13231325

13241326
assertNodeDisconnectedTimerEffect(N1P0, Eff1),
13251327
assertNodeDisconnectedTimerEffect(N1P1, Eff2),
@@ -1449,7 +1451,7 @@ node_disconnected_reconnected_connection_down_test(_) ->
14491451
S0 = state(#{GId => G0}),
14501452

14511453
{#?STATE{groups = G1} = S1, Eff1} =
1452-
?MOD:handle_connection_node_disconnected(P1, S0),
1454+
?MOD:handle_connection_down(P1, noconnection, S0),
14531455

14541456
assertHasGroup(GId,
14551457
grp(1, [csr(P0, {connected, waiting}),
@@ -1470,7 +1472,7 @@ node_disconnected_reconnected_connection_down_test(_) ->
14701472

14711473
assertContainsCheckConnectionEffect(P1, Eff2),
14721474

1473-
{#?STATE{groups = G3}, Eff3} = ?MOD:handle_connection_down(P1, S2),
1475+
{#?STATE{groups = G3}, Eff3} = ?MOD:handle_connection_down(P1, normal, S2),
14741476

14751477
assertHasGroup(GId,
14761478
grp(1, [csr(P0, {connected, waiting}),

0 commit comments

Comments
 (0)