Skip to content

Commit 94b0b62

Browse files
committed
Avoid copying the queue name
1 parent 7ef27e2 commit 94b0b62

File tree

3 files changed

+71
-31
lines changed

3 files changed

+71
-31
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
new/9,
1515
new_with_version/9,
1616
new_with_version/10,
17-
to_target/1,
17+
new_target/4,
1818
fields/0,
1919
fields/1,
2020
field_vhost/0,
@@ -40,6 +40,7 @@
4040
% options
4141
get_options/1,
4242
set_options/2,
43+
get_extra_bcc/1,
4344
% pid
4445
get_pid/1,
4546
set_pid/2,
@@ -57,7 +58,6 @@
5758
set_state/2,
5859
get_type/1,
5960
get_vhost/1,
60-
get_extra_bcc/1,
6161
is_amqqueue/1,
6262
is_auto_delete/1,
6363
is_durable/1,
@@ -79,7 +79,8 @@
7979
qnode/1,
8080
to_printable/1,
8181
to_printable/2,
82-
macros/0]).
82+
macros/0
83+
]).
8384

8485
-define(record_version, amqqueue_v2).
8586
-define(is_backwards_compat_classic(T),
@@ -122,8 +123,8 @@
122123
type_state = #{} :: map() | ets:match_pattern()
123124
}).
124125

125-
%% A subset of the amqqueue record to avoid looking up the full amqqueue record
126-
%% when delivering a message to a target queue.
126+
%% A subset of the amqqueue record containing just the necessary fields
127+
%% to deliver a message to a target queue.
127128
-record(queue_target,
128129
{name :: rabbit_amqqueue:name(),
129130
type :: rabbit_queue_type:queue_type(),
@@ -342,25 +343,16 @@ new_with_version(?record_version,
342343
options = Options,
343344
type = ensure_type_compat(Type)}.
344345

345-
-spec to_target(amqqueue()) -> target().
346-
to_target(#amqqueue{name = Name,
347-
type = Type,
348-
pid = Pid,
349-
options = Options}) ->
346+
-spec new_target(rabbit_amqqueue:name(),
347+
rabbit_queue_type:queue_type(),
348+
pid() | ra_server_id() | none,
349+
rabbit_misc:resource_name() | none) ->
350+
target().
351+
new_target(Name, Type, Pid, ExtraBcc) ->
350352
#queue_target{name = Name,
351353
type = Type,
352354
pid = Pid,
353-
extra_bcc = extra_bcc_from_options(Options)}.
354-
355-
get_extra_bcc(#amqqueue{options = Options}) ->
356-
extra_bcc_from_options(Options);
357-
get_extra_bcc(#queue_target{extra_bcc = Name}) ->
358-
Name.
359-
360-
extra_bcc_from_options(#{extra_bcc := Name}) ->
361-
Name;
362-
extra_bcc_from_options(#{}) ->
363-
none.
355+
extra_bcc = ExtraBcc}.
364356

365357
-spec is_amqqueue(any()) -> boolean().
366358

@@ -395,15 +387,21 @@ set_arguments(#amqqueue{} = Queue, Args) ->
395387
% options
396388

397389
-spec get_options(amqqueue()) -> amqqueue_options().
398-
399390
get_options(#amqqueue{options = Options}) ->
400391
Options.
401392

402393
-spec set_options(amqqueue(), amqqueue_options()) -> amqqueue().
403-
404394
set_options(#amqqueue{} = Queue, Options) ->
405395
Queue#amqqueue{options = Options}.
406396

397+
-spec get_extra_bcc(amqqueue() | target()) ->
398+
rabbit_misc:resource_name() | none.
399+
get_extra_bcc(#amqqueue{options = #{extra_bcc := Name}}) ->
400+
Name;
401+
get_extra_bcc(#amqqueue{}) ->
402+
none;
403+
get_extra_bcc(#queue_target{extra_bcc = Name}) ->
404+
Name.
407405

408406
% decorators
409407

@@ -666,8 +664,6 @@ to_printable(QName = #resource{name = Name, virtual_host = VHost}, Type) ->
666664
<<"virtual_host">> => VHost,
667665
<<"type">> => Type}.
668666

669-
% private
670-
671667
macros() ->
672668
io:format(
673669
"-define(is_~ts(Q), is_record(Q, amqqueue, ~b)).~n~n",

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -482,9 +482,45 @@ internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) ->
482482
get_targets(Names) ->
483483
rabbit_khepri:handle_fallback(
484484
#{mnesia => fun() -> get_many_in_ets(?MNESIA_TABLE, Names) end,
485-
khepri => fun() -> get_many_in_khepri(?KHEPRI_TARGET_PROJECTION, Names) end
485+
khepri => fun() -> lookup_targets(Names) end
486486
}).
487487

488+
lookup_targets(Names) ->
489+
lists:filtermap(fun({Name, RouteInfos})
490+
when is_map(RouteInfos) ->
491+
case lookup_target(Name) of
492+
not_found -> false;
493+
Target -> {true, {Target, RouteInfos}}
494+
end;
495+
(Name) ->
496+
case lookup_target(Name) of
497+
not_found -> false;
498+
Target -> {true, Target}
499+
end
500+
end, Names).
501+
502+
lookup_target(#resource{name = NameBin} = Name) ->
503+
case rabbit_volatile_queue:is(NameBin) of
504+
true ->
505+
%% This queue is not stored in the database.
506+
%% We create it on the fly.
507+
case rabbit_volatile_queue:new(Name) of
508+
error -> not_found;
509+
Q -> Q
510+
end;
511+
false ->
512+
try
513+
ets:lookup_element(?KHEPRI_TARGET_PROJECTION, Name, 2, not_found) of
514+
{Type, Pid, ExtraBcc} ->
515+
amqqueue:new_target(Name, Type, Pid, ExtraBcc);
516+
not_found ->
517+
not_found
518+
catch
519+
error:badarg ->
520+
not_found
521+
end
522+
end.
523+
488524
%% -------------------------------------------------------------------
489525
%% get_many().
490526
%% -------------------------------------------------------------------
@@ -494,12 +530,12 @@ get_targets(Names) ->
494530
get_many(Names) when is_list(Names) ->
495531
rabbit_khepri:handle_fallback(
496532
#{mnesia => fun() -> get_many_in_ets(?MNESIA_TABLE, Names) end,
497-
khepri => fun() -> get_many_in_khepri(?KHEPRI_PROJECTION, Names) end
533+
khepri => fun() -> get_many_in_khepri(Names) end
498534
}).
499535

500-
get_many_in_khepri(Table, Names) ->
536+
get_many_in_khepri(Names) ->
501537
try
502-
get_many_in_ets(Table, Names)
538+
get_many_in_ets(?KHEPRI_PROJECTION, Names)
503539
catch
504540
error:badarg ->
505541
[]

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1354,12 +1354,20 @@ register_rabbit_queue_projection() ->
13541354
KeyPos = 2, %% #amqqueue.name
13551355
register_simple_projection(Name, PathPattern, KeyPos, false).
13561356

1357+
%% This projection exists to avoid looking up the full amqqueue record
1358+
%% per message delivered to a target queue.
13571359
register_rabbit_queue_target_projection() ->
13581360
PathPattern = rabbit_db_queue:khepri_queue_path(
13591361
_VHost = ?KHEPRI_WILDCARD_STAR,
13601362
_Name = ?KHEPRI_WILDCARD_STAR),
1361-
Fun = fun(_Path, Q) -> amqqueue:to_target(Q) end,
1362-
Opts = #{keypos => 2, %% #queue_target.name
1363+
Fun = fun(_Path, Q) ->
1364+
Name = amqqueue:get_name(Q),
1365+
Type = amqqueue:get_type(Q),
1366+
Pid = amqqueue:get_pid(Q),
1367+
ExtraBcc = amqqueue:get_extra_bcc(Q),
1368+
{Name, {Type, Pid, ExtraBcc}}
1369+
end,
1370+
Opts = #{keypos => 1,
13631371
read_concurrency => true},
13641372
Projection = khepri_projection:new(rabbit_khepri_queue_target, Fun, Opts),
13651373
khepri:register_projection(?STORE_ID, PathPattern, Projection).

0 commit comments

Comments
 (0)