|
44 | 44 | -export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). |
45 | 45 | -export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). |
46 | 46 | -export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]). |
| 47 | +-export([has_synchronised_mirrors_online/1]). |
47 | 48 | -export([is_replicated/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression. |
48 | 49 | -export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, |
49 | | - list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1]). |
| 50 | + list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1, |
| 51 | + list_local_mirrored_classic_without_synchronised_mirrors/0, |
| 52 | + list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0]). |
50 | 53 | -export([ensure_rabbit_queue_record_is_initialized/1]). |
51 | 54 | -export([format/1]). |
52 | 55 | -export([delete_immediately_by_resource/1]). |
@@ -1035,6 +1038,27 @@ list_local_mirrored_classic_names() -> |
1035 | 1038 | is_local_to_node(amqqueue:get_pid(Q), node()), |
1036 | 1039 | is_replicated(Q)]. |
1037 | 1040 |
|
| 1041 | +-spec list_local_mirrored_classic_without_synchronised_mirrors() -> [amqqueue:amqqueue()]. |
| 1042 | +list_local_mirrored_classic_without_synchronised_mirrors() -> |
| 1043 | + [ Q || Q <- list(), |
| 1044 | + amqqueue:get_state(Q) =/= crashed, |
| 1045 | + amqqueue:is_classic(Q), |
| 1046 | + is_local_to_node(amqqueue:get_pid(Q), node()), |
| 1047 | + is_replicated(Q), |
| 1048 | + not has_synchronised_mirrors_online(Q)]. |
| 1049 | + |
| 1050 | +-spec list_local_mirrored_classic_without_synchronised_mirrors_for_cli() -> [amqqueue:amqqueue()]. |
| 1051 | +list_local_mirrored_classic_without_synchronised_mirrors_for_cli() -> |
| 1052 | + ClassicQs = list_local_mirrored_classic_without_synchronised_mirrors(), |
| 1053 | + [begin |
| 1054 | + #resource{name = Name} = amqqueue:get_name(Q), |
| 1055 | + #{ |
| 1056 | + <<"readable_name">> => rabbit_misc:rs(amqqueue:get_name(Q)), |
| 1057 | + <<"name">> => Name, |
| 1058 | + <<"virtual_host">> => amqqueue:get_vhost(Q), |
| 1059 | + <<"type">> => <<"quorum">> |
| 1060 | + } |
| 1061 | + end || Q <- ClassicQs]. |
1038 | 1062 |
|
1039 | 1063 | is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) -> |
1040 | 1064 | Node =:= node(QPid); |
@@ -1882,6 +1906,13 @@ is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) -> |
1882 | 1906 | Pid = amqqueue:get_pid(Q), |
1883 | 1907 | not rabbit_mnesia:is_process_alive(Pid). |
1884 | 1908 |
|
| 1909 | +-spec has_synchronised_mirrors_online(amqqueue:amqqueue()) -> boolean(). |
| 1910 | +has_synchronised_mirrors_online(Q) -> |
| 1911 | + %% a queue with all mirrors down would have no mirror pids. |
| 1912 | + %% We treat these as in sync intentionally to avoid false positives. |
| 1913 | + MirrorPids = amqqueue:get_sync_slave_pids(Q), |
| 1914 | + MirrorPids =/= [] andalso lists:any(fun rabbit_misc:is_process_alive/1, MirrorPids). |
| 1915 | + |
1885 | 1916 | -spec on_node_up(node()) -> 'ok'. |
1886 | 1917 |
|
1887 | 1918 | on_node_up(Node) -> |
|
0 commit comments