|
42 | 42 | ensure_monitors/4, |
43 | 43 | handle_connection_down/2, |
44 | 44 | handle_connection_node_disconnected/2, |
45 | | - handle_node_reconnected/2, |
| 45 | + handle_node_reconnected/3, |
46 | 46 | forget_connection/2, |
47 | 47 | consumer_groups/3, |
48 | 48 | group_consumers/5, |
@@ -300,17 +300,7 @@ apply(#command_purge_nodes{nodes = Nodes}, State0) -> |
300 | 300 | {State1, ok, Eff}. |
301 | 301 |
|
302 | 302 | purge_node(Node, #?MODULE{groups = Groups0} = State0) -> |
303 | | - PidsGroups = |
304 | | - maps:fold(fun(K, #group{consumers = Consumers}, Acc) -> |
305 | | - lists:foldl(fun(#consumer{pid = Pid}, AccIn) |
306 | | - when node(Pid) =:= Node -> |
307 | | - PG0 = maps:get(Pid, AccIn, #{}), |
308 | | - PG1 = PG0#{K => true}, |
309 | | - AccIn#{Pid => PG1}; |
310 | | - (_, AccIn) -> |
311 | | - AccIn |
312 | | - end, Acc, Consumers) |
313 | | - end, #{}, Groups0), |
| 303 | + PidsGroups = compute_node_pid_group_dependencies(Node, Groups0), |
314 | 304 | maps:fold(fun(Pid, Groups, {S0, Eff0}) -> |
315 | 305 | {S1, Eff1} = handle_connection_down0(Pid, S0, Groups), |
316 | 306 | {S1, Eff1 ++ Eff0} |
@@ -580,8 +570,15 @@ ensure_monitors(#command_connection_reconnected{pid = Pid}, |
580 | 570 | {State#?MODULE{pids_groups = AllPidsGroups}, |
581 | 571 | Monitors#{Pid => sac}, |
582 | 572 | [{monitor, process, Pid}, {monitor, node, node(Pid)} | Effects]}; |
| 573 | +ensure_monitors(#command_purge_nodes{}, |
| 574 | + #?MODULE{groups = Groups} = State, |
| 575 | + Monitors, |
| 576 | + Effects) -> |
| 577 | + AllPidsGroups = compute_pid_group_dependencies(Groups), |
| 578 | + {State#?MODULE{pids_groups = AllPidsGroups}, |
| 579 | + Monitors, |
| 580 | + Effects}; |
583 | 581 | ensure_monitors(_, #?MODULE{} = State0, Monitors, Effects) -> |
584 | | -%% TODO sac: ensure the pid-group mapping after purge_nodes? |
585 | 582 | {State0, Monitors, Effects}. |
586 | 583 |
|
587 | 584 | -spec handle_connection_down(connection_pid(), state()) -> |
@@ -621,21 +618,21 @@ handle_connection_node_disconnected(ConnPid, |
621 | 618 | #{connection_pid => ConnPid}}, T}]} |
622 | 619 | end. |
623 | 620 |
|
624 | | --spec handle_node_reconnected(state(), ra_machine:effects()) -> |
| 621 | +-spec handle_node_reconnected(node(), state(), ra_machine:effects()) -> |
625 | 622 | {state(), ra_machine:effects()}. |
626 | | -handle_node_reconnected(#?MODULE{pids_groups = PidsGroups0, |
| 623 | +handle_node_reconnected(Node, |
| 624 | + #?MODULE{pids_groups = PidsGroups0, |
627 | 625 | groups = Groups0} = State0, |
628 | 626 | Effects0) -> |
629 | | - AllPidsGroups = compute_pid_group_dependencies(Groups0), |
630 | | - NotMonitored = maps:keys(AllPidsGroups) -- maps:keys(PidsGroups0), |
| 627 | + NodePidsGroups = compute_node_pid_group_dependencies(Node, Groups0), |
| 628 | + PidsGroups1 = maps:merge(PidsGroups0, NodePidsGroups), |
631 | 629 | Effects1 = |
632 | 630 | lists:foldr(fun(P, Acc) -> |
633 | 631 | [notify_connection_effect(P), |
634 | | - {monitor, process, P}, |
635 | | - {monitor, node, node(P)} | Acc] |
636 | | - end, Effects0, NotMonitored), |
| 632 | + {monitor, process, P} | Acc] |
| 633 | + end, Effects0, maps:keys(NodePidsGroups)), |
637 | 634 |
|
638 | | - {State0#?MODULE{pids_groups = AllPidsGroups}, Effects1}. |
| 635 | + {State0#?MODULE{pids_groups = PidsGroups1}, Effects1}. |
639 | 636 |
|
640 | 637 | -spec forget_connection(connection_pid(), state()) -> |
641 | 638 | {state(), ra_machine:effects()}. |
@@ -1122,3 +1119,17 @@ compute_pid_group_dependencies(Groups) -> |
1122 | 1119 | AccIn#{Pid => PG1} |
1123 | 1120 | end, Acc, Cs) |
1124 | 1121 | end, #{}, Groups). |
| 1122 | + |
| 1123 | +-spec compute_node_pid_group_dependencies(node(), groups()) -> pids_groups(). |
| 1124 | +compute_node_pid_group_dependencies(Node, Groups) -> |
| 1125 | + maps:fold(fun(K, #group{consumers = Consumers}, Acc) -> |
| 1126 | + lists:foldl(fun(#consumer{pid = Pid}, AccIn) |
| 1127 | + when node(Pid) =:= Node -> |
| 1128 | + PG0 = maps:get(Pid, AccIn, #{}), |
| 1129 | + PG1 = PG0#{K => true}, |
| 1130 | + AccIn#{Pid => PG1}; |
| 1131 | + (_, AccIn) -> |
| 1132 | + AccIn |
| 1133 | + end, Acc, Consumers) |
| 1134 | + end, #{}, Groups). |
| 1135 | + |
0 commit comments