Skip to content

Commit 5dd9ffc

Browse files
committed
Simplify rabbit_queue_type callbacks
deliver should only take targets and init should only take the full record
1 parent 8ff0d72 commit 5dd9ffc

10 files changed

+92
-131
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
1212
forget_all_durable/1]).
1313
-export([pseudo_queue/2, pseudo_queue/3]).
14-
-export([exists/1, lookup/1, lookup/2, lookup_many/1, lookup_durable_queue/1,
14+
-export([exists/1, lookup/1, lookup/2, lookup_durable_queue/1,
1515
not_found_or_absent_dirty/1,
1616
with/2, with/3, with_or_die/2,
1717
assert_equivalence/5,
@@ -367,14 +367,6 @@ lookup(Name) when is_record(Name, resource) ->
367367
lookup_durable_queue(QName) ->
368368
rabbit_db_queue:get_durable(QName).
369369

370-
-spec lookup_many(rabbit_exchange:route_return()) ->
371-
[amqqueue:amqqueue() | {amqqueue:amqqueue(), route_infos()}].
372-
lookup_many([]) ->
373-
%% optimisation
374-
[];
375-
lookup_many(Names) when is_list(Names) ->
376-
rabbit_db_queue:get_many(Names).
377-
378370
-spec lookup(binary(), binary()) ->
379371
rabbit_types:ok(amqqueue:amqqueue()) |
380372
rabbit_types:error('not_found').

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ format(Q, _Ctx) when ?is_amqqueue(Q) ->
287287
{state, State},
288288
{node, node(amqqueue:get_pid(Q))}].
289289

290-
-spec init(amqqueue:amqqueue() | amqqueue:target()) ->
290+
-spec init(amqqueue:amqqueue()) ->
291291
{ok, state()}.
292292
init(Q) ->
293293
{ok, #?STATE{pid = amqqueue:get_pid(Q)}}.
@@ -474,10 +474,10 @@ settlement_action(Type, QRef, MsgSeqs, Acc) ->
474474

475475
supports_stateful_delivery() -> true.
476476

477-
-spec deliver([{amqqueue:amqqueue() | amqqueue:target(), state()}],
477+
-spec deliver([{amqqueue:target(), state()}],
478478
Delivery :: mc:state(),
479479
rabbit_queue_type:delivery_options()) ->
480-
{[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}.
480+
{[{amqqueue:target(), state()}], rabbit_queue_type:actions()}.
481481
deliver(Qs0, Msg0, Options) ->
482482
%% add guid to content here instead of in rabbit_basic:message/3,
483483
%% as classic queues are the only ones that need it

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 35 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
-export([
2020
get/1,
21-
get_many/1,
2221
get_targets/1,
2322
get_all/0,
2423
get_all/1,
@@ -474,106 +473,59 @@ internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) ->
474473
%% get_targets().
475474
%% -------------------------------------------------------------------
476475

477-
%% Queue target optimisation is only available in Khepri.
478-
%% Mnesia falls back looking up the full amqqueue record.
479476
-spec get_targets(rabbit_exchange:route_return()) ->
480-
[amqqueue:target() | amqqueue:amqqueue() |
481-
{amqqueue:target() | amqqueue:amqqueue(), rabbit_exchange:route_infos()}].
477+
[amqqueue:target() | {amqqueue:target(), rabbit_exchange:route_infos()}].
482478
get_targets(Names) ->
483479
rabbit_khepri:handle_fallback(
484-
#{mnesia => fun() -> get_many_in_ets(?MNESIA_TABLE, Names) end,
485-
khepri => fun() -> lookup_targets(Names) end
480+
#{mnesia => fun() -> lookup_targets(mnesia, Names) end,
481+
khepri => fun() -> lookup_targets(khepri, Names) end
486482
}).
487483

488-
lookup_targets(Names) ->
484+
lookup_targets(Store, Names) ->
489485
lists:filtermap(fun({Name, RouteInfos})
490486
when is_map(RouteInfos) ->
491-
case lookup_target(Name) of
487+
case lookup_target(Store, Name) of
492488
not_found -> false;
493489
Target -> {true, {Target, RouteInfos}}
494490
end;
495491
(Name) ->
496-
case lookup_target(Name) of
492+
case lookup_target(Store, Name) of
497493
not_found -> false;
498494
Target -> {true, Target}
499495
end
500496
end, Names).
501497

502-
lookup_target(#resource{name = NameBin} = Name) ->
498+
lookup_target(Store, #resource{name = NameBin} = Name) ->
503499
case rabbit_volatile_queue:is(NameBin) of
504500
true ->
505-
%% This queue is not stored in the database.
506-
%% We create it on the fly.
507-
case rabbit_volatile_queue:new(Name) of
501+
%% This queue is not stored in the database. We create it on the fly.
502+
case rabbit_volatile_queue:new_target(Name) of
508503
error -> not_found;
509-
Q -> Q
504+
Target -> Target
510505
end;
511506
false ->
512-
try
513-
ets:lookup_element(?KHEPRI_TARGET_PROJECTION, Name, 2, not_found) of
514-
not_found ->
515-
not_found;
516-
Target ->
517-
amqqueue:new_target(Name, Target)
518-
catch
519-
error:badarg ->
520-
not_found
521-
end
507+
lookup_target0(Store, Name)
522508
end.
523509

524-
%% -------------------------------------------------------------------
525-
%% get_many().
526-
%% -------------------------------------------------------------------
527-
528-
-spec get_many(rabbit_exchange:route_return()) ->
529-
[amqqueue:amqqueue() | {amqqueue:amqqueue(), rabbit_exchange:route_infos()}].
530-
get_many(Names) when is_list(Names) ->
531-
rabbit_khepri:handle_fallback(
532-
#{mnesia => fun() -> get_many_in_ets(?MNESIA_TABLE, Names) end,
533-
khepri => fun() -> get_many_in_khepri(Names) end
534-
}).
535-
536-
get_many_in_khepri(Names) ->
537-
try
538-
get_many_in_ets(?KHEPRI_PROJECTION, Names)
510+
lookup_target0(khepri, Name) ->
511+
try ets:lookup_element(?KHEPRI_TARGET_PROJECTION, Name, 2, not_found) of
512+
not_found ->
513+
not_found;
514+
Target ->
515+
amqqueue:new_target(Name, Target)
539516
catch
540517
error:badarg ->
541-
[]
542-
end.
543-
544-
get_many_in_ets(Table, [{Name, RouteInfos}])
545-
when is_map(RouteInfos) ->
546-
case ets_lookup(Table, Name) of
547-
[] -> [];
548-
[Q] -> [{Q, RouteInfos}]
518+
not_found
549519
end;
550-
get_many_in_ets(Table, [Name]) ->
551-
ets_lookup(Table, Name);
552-
get_many_in_ets(Table, Names) when is_list(Names) ->
553-
lists:filtermap(fun({Name, RouteInfos})
554-
when is_map(RouteInfos) ->
555-
case ets_lookup(Table, Name) of
556-
[] -> false;
557-
[Q] -> {true, {Q, RouteInfos}}
558-
end;
559-
(Name) ->
560-
case ets_lookup(Table, Name) of
561-
[] -> false;
562-
[Q] -> {true, Q}
563-
end
564-
end, Names).
565-
566-
ets_lookup(Table, QName = #resource{name = QNameBin}) ->
567-
case rabbit_volatile_queue:is(QNameBin) of
568-
true ->
569-
%% This queue record is not stored in the database.
570-
%% We create it on the fly.
571-
case rabbit_volatile_queue:new(QName) of
572-
error -> [];
573-
Q -> [Q]
574-
end;
575-
false ->
576-
ets:lookup(Table, QName)
520+
lookup_target0(mnesia, Name) ->
521+
case ets:lookup(?MNESIA_TABLE, Name) of
522+
[] ->
523+
not_found;
524+
[Q] ->
525+
Type = amqqueue:get_type(Q),
526+
Pid = amqqueue:get_pid(Q),
527+
ExtraBcc = amqqueue:get_extra_bcc(Q),
528+
amqqueue:new_target(Name, {Type, Pid, ExtraBcc})
577529
end.
578530

579531
%% -------------------------------------------------------------------
@@ -663,6 +615,14 @@ get_many_durable_in_khepri(Names) ->
663615
[]
664616
end.
665617

618+
get_many_in_ets(Table, Names) ->
619+
lists:filtermap(fun(Name) ->
620+
case ets:lookup(Table, Name) of
621+
[] -> false;
622+
[Q] -> {true, Q}
623+
end
624+
end, Names).
625+
666626
%% -------------------------------------------------------------------
667627
%% update().
668628
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@
204204
-callback policy_changed(amqqueue:amqqueue()) -> ok.
205205

206206
%% intitialise and return a queue type specific session context
207-
-callback init(amqqueue:amqqueue() | amqqueue:target()) ->
207+
-callback init(amqqueue:amqqueue()) ->
208208
{ok, queue_state()} | {error, Reason :: term()}.
209209

210210
-callback close(queue_state()) -> ok.
@@ -233,10 +233,10 @@
233233

234234
-callback supports_stateful_delivery() -> boolean().
235235

236-
-callback deliver([{amqqueue:amqqueue() | amqqueue:target(), queue_state()}],
236+
-callback deliver([{amqqueue:target(), queue_state()}],
237237
Message :: mc:state(),
238238
Options :: delivery_options()) ->
239-
{[{amqqueue:amqqueue() | amqqueue:target(), queue_state()}], actions()}.
239+
{[{amqqueue:target(), queue_state()}], actions()}.
240240

241241
-callback settle(queue_name(), settle_op(), rabbit_types:ctag(),
242242
[non_neg_integer()], queue_state()) ->
@@ -626,9 +626,8 @@ publish_at_most_once(X, Msg)
626626
_ = deliver(Qs, Msg, #{}, stateless),
627627
ok.
628628

629-
-spec deliver([amqqueue:amqqueue() | amqqueue:target() |
630-
{amqqueue:amqqueue() | amqqueue:target(),
631-
rabbit_exchange:route_infos()}],
629+
-spec deliver([amqqueue:target() |
630+
{amqqueue:target(), rabbit_exchange:route_infos()}],
632631
Message :: mc:state(),
633632
delivery_options(),
634633
stateless | state()) ->
@@ -792,12 +791,14 @@ get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) ->
792791
_ when InitState == undefined ->
793792
%% not found and no initial state passed - initialize new state
794793
Mod = amqqueue:get_type(Q),
795-
case Mod:init(Q) of
794+
maybe
795+
{ok, Q1} ?= to_queue(Q),
796+
{ok, QState} ?= Mod:init(Q1),
797+
#ctx{module = Mod,
798+
state = QState}
799+
else
796800
{error, Reason} ->
797-
exit({Reason, Ref});
798-
{ok, QState} ->
799-
#ctx{module = Mod,
800-
state = QState}
801+
exit({Reason, Ref})
801802
end;
802803
_ ->
803804
%% not found - initialize with supplied initial state
@@ -820,6 +821,12 @@ qref(#resource{kind = queue} = QName) ->
820821
qref(Q) ->
821822
amqqueue:get_name(Q).
822823

824+
to_queue(Q) when ?is_amqqueue(Q) ->
825+
{ok, Q};
826+
to_queue(Target) ->
827+
QName = amqqueue:get_name(Target),
828+
rabbit_amqqueue:lookup(QName).
829+
823830
-spec known_queue_type_modules() -> [module()].
824831
known_queue_type_modules() ->
825832
Registered = rabbit_registry:lookup_all(queue),

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ is_compatible(_Durable = true,
212212
is_compatible(_, _, _) ->
213213
false.
214214

215-
-spec init(amqqueue:amqqueue() | amqqueue:target()) ->
215+
-spec init(amqqueue:amqqueue()) ->
216216
{ok, rabbit_fifo_client:state()} | {error, not_found}.
217217
init(Q) when ?is_amqqueue(Q) ->
218218
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
@@ -229,15 +229,7 @@ init(Q) when ?is_amqqueue(Q) ->
229229
%% server tried is the one we want
230230
Servers0 = [{Name, N} || N <- Nodes],
231231
Servers = [Leader | lists:delete(Leader, Servers0)],
232-
{ok, rabbit_fifo_client:init(Servers, SoftLimit)};
233-
init(QueueTarget) ->
234-
QName = amqqueue:get_name(QueueTarget),
235-
case rabbit_amqqueue:lookup(QName) of
236-
{ok, Q} ->
237-
init(Q);
238-
Error ->
239-
Error
240-
end.
232+
{ok, rabbit_fifo_client:init(Servers, SoftLimit)}.
241233

242234
-spec close(rabbit_fifo_client:state()) -> ok.
243235
close(State) ->

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,20 +169,23 @@ restart_stream(QRes) ->
169169
{ok, node()} |
170170
{error, term()} |
171171
{timeout, term()}.
172-
restart_stream(QRes, Options)
173-
when element(1, QRes) == resource ->
174-
restart_stream(hd(rabbit_amqqueue:lookup_many([QRes])), Options);
175172
restart_stream(Q, Options)
176-
when ?is_amqqueue(Q) andalso
177-
?amqqueue_is_stream(Q) ->
173+
when ?amqqueue_is_stream(Q) ->
178174
?LOG_INFO("restarting stream ~s in vhost ~s with options ~p",
179-
[maps:get(name, amqqueue:get_type_state(Q)), amqqueue:get_vhost(Q), Options]),
175+
[maps:get(name, amqqueue:get_type_state(Q)), amqqueue:get_vhost(Q), Options]),
180176
#{name := StreamId} = amqqueue:get_type_state(Q),
181177
case process_command({restart_stream, StreamId, Options}) of
182178
{ok, {ok, LeaderPid}, _} ->
183179
{ok, node(LeaderPid)};
184180
Err ->
185181
Err
182+
end;
183+
restart_stream(QRes, Options) ->
184+
case rabbit_amqqueue:lookup(QRes) of
185+
{ok, Q} ->
186+
restart_stream(Q, Options);
187+
Err ->
188+
Err
186189
end.
187190

188191
delete_stream(Q, ActingUser)

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,14 +1015,6 @@ init(Q) when ?is_amqqueue(Q) ->
10151015
?LOG_WARNING("Failed to start stream client ~tp: coordinator unavailable",
10161016
[rabbit_misc:rs(QName)]),
10171017
E
1018-
end;
1019-
init(QueueTarget) ->
1020-
QName = amqqueue:get_name(QueueTarget),
1021-
case rabbit_amqqueue:lookup(QName) of
1022-
{ok, Q} ->
1023-
init(Q);
1024-
Error ->
1025-
Error
10261018
end.
10271019

10281020
close(#stream_client{readers = Readers,

deps/rabbit/src/rabbit_volatile_queue.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
-include_lib("rabbit_common/include/rabbit.hrl").
1717

1818
-export([new/1,
19+
new_target/1,
1920
new_name/0,
2021
is/1,
2122
key_from_name/1,
@@ -94,6 +95,16 @@ new(#resource{virtual_host = Vhost,
9495
new0(Name, Pid, Vhost) ->
9596
amqqueue:new(Name, Pid, false, true, none, [], Vhost, #{}, ?MODULE).
9697

98+
-spec new_target(rabbit_amqqueue:name()) ->
99+
amqqueue:target() | error.
100+
new_target(#resource{name = NameBin} = Name) ->
101+
case pid_from_name(NameBin) of
102+
{ok, Pid} when is_pid(Pid) ->
103+
amqqueue:new_target(Name, {?MODULE, Pid, none});
104+
_ ->
105+
error
106+
end.
107+
97108
-spec is(rabbit_misc:resource_name()) ->
98109
boolean().
99110
is(<<?PREFIX, _/binary>>) ->

0 commit comments

Comments
 (0)