|
42 | 42 | ensure_monitors/4, |
43 | 43 | handle_connection_down/2, |
44 | 44 | handle_connection_node_disconnected/2, |
45 | | - handle_node_reconnected/2, |
| 45 | + handle_node_reconnected/3, |
46 | 46 | forget_connection/2, |
47 | 47 | consumer_groups/3, |
48 | 48 | group_consumers/5, |
@@ -298,17 +298,7 @@ apply(#command_purge_nodes{nodes = Nodes}, State0) -> |
298 | 298 | {State1, ok, Eff}. |
299 | 299 |
|
300 | 300 | purge_node(Node, #?MODULE{groups = Groups0} = State0) -> |
301 | | - PidsGroups = |
302 | | - maps:fold(fun(K, #group{consumers = Consumers}, Acc) -> |
303 | | - lists:foldl(fun(#consumer{pid = Pid}, AccIn) |
304 | | - when node(Pid) =:= Node -> |
305 | | - PG0 = maps:get(Pid, AccIn, #{}), |
306 | | - PG1 = PG0#{K => true}, |
307 | | - AccIn#{Pid => PG1}; |
308 | | - (_, AccIn) -> |
309 | | - AccIn |
310 | | - end, Acc, Consumers) |
311 | | - end, #{}, Groups0), |
| 301 | + PidsGroups = compute_node_pid_group_dependencies(Node, Groups0), |
312 | 302 | maps:fold(fun(Pid, Groups, {S0, Eff0}) -> |
313 | 303 | {S1, Eff1} = handle_connection_down0(Pid, S0, Groups), |
314 | 304 | {S1, Eff1 ++ Eff0} |
@@ -578,8 +568,15 @@ ensure_monitors(#command_connection_reconnected{pid = Pid}, |
578 | 568 | {State#?MODULE{pids_groups = AllPidsGroups}, |
579 | 569 | Monitors#{Pid => sac}, |
580 | 570 | [{monitor, process, Pid}, {monitor, node, node(Pid)} | Effects]}; |
| 571 | +ensure_monitors(#command_purge_nodes{}, |
| 572 | + #?MODULE{groups = Groups} = State, |
| 573 | + Monitors, |
| 574 | + Effects) -> |
| 575 | + AllPidsGroups = compute_pid_group_dependencies(Groups), |
| 576 | + {State#?MODULE{pids_groups = AllPidsGroups}, |
| 577 | + Monitors, |
| 578 | + Effects}; |
581 | 579 | ensure_monitors(_, #?MODULE{} = State0, Monitors, Effects) -> |
582 | | -%% TODO sac: ensure the pid-group mapping after purge_nodes? |
583 | 580 | {State0, Monitors, Effects}. |
584 | 581 |
|
585 | 582 | -spec handle_connection_down(connection_pid(), state()) -> |
@@ -619,21 +616,21 @@ handle_connection_node_disconnected(ConnPid, |
619 | 616 | #{connection_pid => ConnPid}}, T}]} |
620 | 617 | end. |
621 | 618 |
|
622 | | --spec handle_node_reconnected(state(), ra_machine:effects()) -> |
| 619 | +-spec handle_node_reconnected(node(), state(), ra_machine:effects()) -> |
623 | 620 | {state(), ra_machine:effects()}. |
624 | | -handle_node_reconnected(#?MODULE{pids_groups = PidsGroups0, |
| 621 | +handle_node_reconnected(Node, |
| 622 | + #?MODULE{pids_groups = PidsGroups0, |
625 | 623 | groups = Groups0} = State0, |
626 | 624 | Effects0) -> |
627 | | - AllPidsGroups = compute_pid_group_dependencies(Groups0), |
628 | | - NotMonitored = maps:keys(AllPidsGroups) -- maps:keys(PidsGroups0), |
| 625 | + NodePidsGroups = compute_node_pid_group_dependencies(Node, Groups0), |
| 626 | + PidsGroups1 = maps:merge(PidsGroups0, NodePidsGroups), |
629 | 627 | Effects1 = |
630 | 628 | lists:foldr(fun(P, Acc) -> |
631 | 629 | [notify_connection_effect(P), |
632 | | - {monitor, process, P}, |
633 | | - {monitor, node, node(P)} | Acc] |
634 | | - end, Effects0, NotMonitored), |
| 630 | + {monitor, process, P} | Acc] |
| 631 | + end, Effects0, maps:keys(NodePidsGroups)), |
635 | 632 |
|
636 | | - {State0#?MODULE{pids_groups = AllPidsGroups}, Effects1}. |
| 633 | + {State0#?MODULE{pids_groups = PidsGroups1}, Effects1}. |
637 | 634 |
|
638 | 635 | -spec forget_connection(connection_pid(), state()) -> |
639 | 636 | {state(), ra_machine:effects()}. |
@@ -1117,3 +1114,17 @@ compute_pid_group_dependencies(Groups) -> |
1117 | 1114 | AccIn#{Pid => PG1} |
1118 | 1115 | end, Acc, Cs) |
1119 | 1116 | end, #{}, Groups). |
| 1117 | + |
| 1118 | +-spec compute_node_pid_group_dependencies(node(), groups()) -> pids_groups(). |
| 1119 | +compute_node_pid_group_dependencies(Node, Groups) -> |
| 1120 | + maps:fold(fun(K, #group{consumers = Consumers}, Acc) -> |
| 1121 | + lists:foldl(fun(#consumer{pid = Pid}, AccIn) |
| 1122 | + when node(Pid) =:= Node -> |
| 1123 | + PG0 = maps:get(Pid, AccIn, #{}), |
| 1124 | + PG1 = PG0#{K => true}, |
| 1125 | + AccIn#{Pid => PG1}; |
| 1126 | + (_, AccIn) -> |
| 1127 | + AccIn |
| 1128 | + end, Acc, Consumers) |
| 1129 | + end, #{}, Groups). |
| 1130 | + |
0 commit comments