Skip to content

Commit 10a8cf4

Browse files
mkuratczykikavgo
andcommitted
Add queue_topology/1 callback to rabbit_queue_type
Co-authored-by: ikavgo <[email protected]>
1 parent b8244f7 commit 10a8cf4

File tree

5 files changed

+51
-0
lines changed

5 files changed

+51
-0
lines changed

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666

6767
-export([validate_policy/1]).
6868

69+
-export([queue_topology/1]).
70+
6971
-rabbit_boot_step(
7072
{?MODULE,
7173
[{description, "Deprecated queue-master-locator support."
@@ -674,3 +676,10 @@ send_credit_reply(Pid, QName, Ctag, DeliveryCount, Credit, Available, Drain) ->
674676

675677
send_queue_event(Pid, QName, Event) ->
676678
gen_server:cast(Pid, {queue_event, QName, Event}).
679+
680+
-spec queue_topology(amqqueue:amqqueue()) ->
681+
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
682+
queue_topology(Q) ->
683+
Pid = amqqueue:get_pid(Q),
684+
Node = node(Pid),
685+
{Node, [Node]}.

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,9 @@
274274
-callback notify_decorators(amqqueue:amqqueue()) ->
275275
ok.
276276

277+
-callback queue_topology(amqqueue:amqqueue()) ->
278+
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
279+
277280
-spec discover(binary() | atom()) -> queue_type().
278281
discover(<<"undefined">>) ->
279282
fallback();

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@
7777
force_vhost_queues_shrink_member_to_current_member/1,
7878
force_all_queues_shrink_member_to_current_member/0]).
7979

80+
-export([queue_topology/1]).
81+
8082
%% for backwards compatibility
8183
-export([file_handle_leader_reservation/1,
8284
file_handle_other_reservation/0,
@@ -2222,3 +2224,14 @@ maybe_log_leader_health_check_result([]) -> ok;
22222224
maybe_log_leader_health_check_result(Result) ->
22232225
Qs = lists:map(fun(R) -> catch maps:get(<<"readable_name">>, R) end, Result),
22242226
rabbit_log:warning("Leader health check result (unhealthy leaders detected): ~tp", [Qs]).
2227+
2228+
-spec queue_topology(amqqueue:amqqueue()) ->
2229+
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
2230+
queue_topology(Q) ->
2231+
[{leader, Leader0},
2232+
{members, Members}] = rabbit_queue_type:info(Q, [leader, members]),
2233+
Leader = case Leader0 of
2234+
'' -> undefined;
2235+
_ -> Leader0
2236+
end,
2237+
{Leader, Members}.

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959

6060
-export([check_max_segment_size_bytes/1]).
6161

62+
-export([queue_topology/1]).
63+
6264
-include_lib("rabbit_common/include/rabbit.hrl").
6365
-include("amqqueue.hrl").
6466

@@ -1393,3 +1395,18 @@ delivery_count_add(none, _) ->
13931395
none;
13941396
delivery_count_add(Count, N) ->
13951397
serial_number:add(Count, N).
1398+
1399+
-spec queue_topology(amqqueue:amqqueue()) ->
1400+
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
1401+
queue_topology(Q) ->
1402+
#{name := StreamId} = amqqueue:get_type_state(Q),
1403+
case rabbit_stream_coordinator:members(StreamId) of
1404+
{ok, Members} ->
1405+
maps:fold(fun(Node, {_Pid, writer}, {_, Replicas}) ->
1406+
{Node, [Node | Replicas]};
1407+
(Node, {_Pid, replica}, {Writer, Replicas}) ->
1408+
{Writer, [Node | Replicas]}
1409+
end, {undefined, []}, Members);
1410+
{error, _} ->
1411+
{undefined, undefined}
1412+
end.

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
notify_decorators/1
4242
]).
4343

44+
-export([queue_topology/1]).
45+
4446
%% Stateful rabbit_queue_type callbacks are unsupported by this queue type.
4547
-define(STATEFUL_CALLBACKS,
4648
[
@@ -301,3 +303,10 @@ dequeue(A1,A2,A3,A4,A5) ->
301303

302304
state_info(A1) ->
303305
?UNSUPPORTED([A1]).
306+
307+
-spec queue_topology(amqqueue:amqqueue()) ->
308+
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
309+
queue_topology(Q) ->
310+
Pid = amqqueue:get_pid(Q),
311+
Node = node(Pid),
312+
{Node, [Node]}.

0 commit comments

Comments
 (0)