Skip to content

Commit 7ef27e2

Browse files
committed
Move record queue_target into module amqqueue
1 parent 767523a commit 7ef27e2

File tree

4 files changed

+37
-34
lines changed

4 files changed

+37
-34
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
new/9,
1515
new_with_version/9,
1616
new_with_version/10,
17+
to_target/1,
1718
fields/0,
1819
fields/1,
1920
field_vhost/0,
@@ -56,6 +57,7 @@
5657
set_state/2,
5758
get_type/1,
5859
get_vhost/1,
60+
get_extra_bcc/1,
5961
is_amqqueue/1,
6062
is_auto_delete/1,
6163
is_durable/1,
@@ -120,6 +122,17 @@
120122
type_state = #{} :: map() | ets:match_pattern()
121123
}).
122124

125+
%% A subset of the amqqueue record to avoid looking up the full amqqueue record
126+
%% when delivering a message to a target queue.
127+
-record(queue_target,
128+
{name :: rabbit_amqqueue:name(),
129+
type :: rabbit_queue_type:queue_type(),
130+
pid :: pid() | ra_server_id() | none,
131+
extra_bcc :: rabbit_misc:resource_name() | none
132+
}).
133+
134+
-type target() :: #queue_target{}.
135+
123136
-type amqqueue() :: amqqueue_v2().
124137
-type amqqueue_v2() :: #amqqueue{
125138
name :: rabbit_amqqueue:name(),
@@ -145,8 +158,6 @@
145158
type_state :: #{}
146159
}.
147160

148-
-type target() :: #queue_target{}.
149-
150161
-type ra_server_id() :: {Name :: atom(), Node :: node()}.
151162

152163
-type amqqueue_pattern() :: amqqueue_v2_pattern().
@@ -331,6 +342,26 @@ new_with_version(?record_version,
331342
options = Options,
332343
type = ensure_type_compat(Type)}.
333344

345+
-spec to_target(amqqueue()) -> target().
346+
to_target(#amqqueue{name = Name,
347+
type = Type,
348+
pid = Pid,
349+
options = Options}) ->
350+
#queue_target{name = Name,
351+
type = Type,
352+
pid = Pid,
353+
extra_bcc = extra_bcc_from_options(Options)}.
354+
355+
get_extra_bcc(#amqqueue{options = Options}) ->
356+
extra_bcc_from_options(Options);
357+
get_extra_bcc(#queue_target{extra_bcc = Name}) ->
358+
Name.
359+
360+
extra_bcc_from_options(#{extra_bcc := Name}) ->
361+
Name;
362+
extra_bcc_from_options(#{}) ->
363+
none.
364+
334365
-spec is_amqqueue(any()) -> boolean().
335366

336367
is_amqqueue(#amqqueue{}) -> true.

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2057,7 +2057,7 @@ prepend_extra_bcc([]) ->
20572057
[];
20582058
prepend_extra_bcc([Q0] = Qs) ->
20592059
Q = queue(Q0),
2060-
case get_extra_bcc(Q) of
2060+
case amqqueue:get_extra_bcc(Q) of
20612061
none ->
20622062
Qs;
20632063
Name ->
@@ -2072,7 +2072,7 @@ prepend_extra_bcc(Qs) ->
20722072
ExtraQs = lists:filtermap(
20732073
fun(Q0) ->
20742074
Q = queue(Q0),
2075-
case get_extra_bcc(Q) of
2075+
case amqqueue:get_extra_bcc(Q) of
20762076
none ->
20772077
false;
20782078
Name ->
@@ -2102,16 +2102,6 @@ queue_names(Queues) ->
21022102
amqqueue:get_name(Q)
21032103
end, Queues).
21042104

2105-
get_extra_bcc(Q) when ?is_amqqueue(Q) ->
2106-
case amqqueue:get_options(Q) of
2107-
#{extra_bcc := Name} ->
2108-
Name;
2109-
_ ->
2110-
none
2111-
end;
2112-
get_extra_bcc(#queue_target{extra_bcc = Name}) ->
2113-
Name.
2114-
21152105
-spec lookup_extra_bcc(amqqueue:amqqueue() | amqqueue:target(), binary()) ->
21162106
{ok, amqqueue:amqqueue()} | {error, not_found}.
21172107
lookup_extra_bcc(Q, BCCName) ->

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1358,17 +1358,8 @@ register_rabbit_queue_target_projection() ->
13581358
PathPattern = rabbit_db_queue:khepri_queue_path(
13591359
_VHost = ?KHEPRI_WILDCARD_STAR,
13601360
_Name = ?KHEPRI_WILDCARD_STAR),
1361-
Fun = fun(_Path, Q) ->
1362-
BCC = case amqqueue:get_options(Q) of
1363-
#{extra_bcc := Name} -> Name;
1364-
_ -> none
1365-
end,
1366-
#queue_target{name = amqqueue:get_name(Q),
1367-
type = amqqueue:get_type(Q),
1368-
pid = amqqueue:get_pid(Q),
1369-
extra_bcc = BCC}
1370-
end,
1371-
Opts = #{keypos => #queue_target.name,
1361+
Fun = fun(_Path, Q) -> amqqueue:to_target(Q) end,
1362+
Opts = #{keypos => 2, %% #queue_target.name
13721363
read_concurrency => true},
13731364
Projection = khepri_projection:new(rabbit_khepri_queue_target, Fun, Opts),
13741365
khepri:register_projection(?STORE_ID, PathPattern, Projection).

deps/rabbit_common/include/rabbit.hrl

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,6 @@
9696
-record(index_route, {source_key, destination, args = []}).
9797
-record(route_by_source, {source, key, destination, args = []}).
9898

99-
%% A subset of the amqqueue record to avoid looking up the full amqqueue record
100-
%% when delivering a message to a target queue.
101-
-record(queue_target,
102-
{name :: tuple(),
103-
type :: module(),
104-
pid :: pid() | {atom(), node()} | none,
105-
extra_bcc :: binary() | none
106-
}).
107-
10899
-record(binding, {source, key, destination, args = []}).
109100
-record(reverse_binding, {destination, key, source, args = []}).
110101

0 commit comments

Comments
 (0)