Skip to content

Commit 9062476

Browse files
ansdacogoluegnes
andcommitted
Support dynamic creation of queues
## What? Support the `dynamic` field of sources and targets. ## Why? 1. This allows AMQP clients to dynamically create exclusive queues, which can be useful for RPC workloads. 2. Support creation of JMS temporary queues over AMQP using the Qpid JMS client. Exclusive queues map very nicely to JMS temporary queues because: > Although sessions are used to create temporary destinations, this is only for convenience. Their scope is actually the entire connection. Their lifetime is that of their connection and any of the connection’s sessions are allowed to create a consumer for them. https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#creating-temporary-destinations ## How? If the terminus contains the capability `temporary-queue` as defined in [amqp-bindmap-jms-v1.0-wd10](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=67638) [5.2] and as sent by Qpid JMS client, RabbitMQ will create an exclusive queue. (This allows a future commit to take other actions if capability `temporary-topic` will be used, such as the additional creation of bindings.) No matter what the desired node properties are, RabbitMQ will set the lifetime policy delete-on-close deleting the exclusive queue when the link which caused its creation ceases to exist. This means the exclusive queue will be deleted if either: * the link gets detached, or * the session ends, or * the connection closes Although the AMQP JMS Mapping and Qpid JMS create only a **sending** link with `dynamic=true`, this commit also supports **receiving** links with `dynamic=true` for non-JMS AMQP clients. RabbitMQ is free to choose the generated queue name. As suggested by the AMQP spec, the generated queue name will contain the container-id and link name unless they are very long. Co-authored-by: Arnaud Cogoluègnes <[email protected]>
1 parent 06ec8f0 commit 9062476

File tree

11 files changed

+1243
-101
lines changed

11 files changed

+1243
-101
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -698,23 +698,39 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->
698698

699699
make_source(#{role := {sender, _}}) ->
700700
#'v1_0.source'{};
701-
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
701+
make_source(#{role := {receiver, Source, _Pid},
702+
filter := Filter}) ->
702703
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
704+
Dynamic = maps:get(dynamic, Source, false),
703705
TranslatedFilter = translate_filters(Filter),
704-
#'v1_0.source'{address = {utf8, Address},
706+
#'v1_0.source'{address = make_address(Source),
705707
durable = {uint, Durable},
706-
filter = TranslatedFilter}.
708+
dynamic = Dynamic,
709+
filter = TranslatedFilter,
710+
capabilities = make_capabilities(Source)}.
707711

708712
make_target(#{role := {receiver, _Source, _Pid}}) ->
709713
#'v1_0.target'{};
710-
make_target(#{role := {sender, #{address := Address} = Target}}) ->
714+
make_target(#{role := {sender, Target}}) ->
711715
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
712-
TargetAddr = case is_binary(Address) of
713-
true -> {utf8, Address};
714-
false -> Address
715-
end,
716-
#'v1_0.target'{address = TargetAddr,
717-
durable = {uint, Durable}}.
716+
Dynamic = maps:get(dynamic, Target, false),
717+
#'v1_0.target'{address = make_address(Target),
718+
durable = {uint, Durable},
719+
dynamic = Dynamic,
720+
capabilities = make_capabilities(Target)}.
721+
722+
make_address(#{address := Addr}) ->
723+
if is_binary(Addr) ->
724+
{utf8, Addr};
725+
is_atom(Addr) ->
726+
Addr
727+
end.
728+
729+
make_capabilities(#{capabilities := Caps0}) ->
730+
Caps = [{symbol, C} || C <- Caps0],
731+
{array, symbol, Caps};
732+
make_capabilities(_) ->
733+
undefined.
718734

719735
max_message_size(#{max_message_size := Size})
720736
when is_integer(Size) andalso

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,8 @@ wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State0)
260260
State = untrack_channel(ChannelNum, SessionPid, State0),
261261
wait_for_shutdown_sessions(TimerRef, State);
262262
shutdown_sessions_timeout ->
263-
?LOG_INFO("sessions not shut down after ~b ms: ~p",
264-
[?SHUTDOWN_SESSIONS_TIMEOUT, Channels]),
263+
?LOG_INFO("sessions running ~b ms after requested to be shut down: ~p",
264+
[?SHUTDOWN_SESSIONS_TIMEOUT, maps:values(Channels)]),
265265
State0
266266
end.
267267

@@ -792,6 +792,7 @@ send_to_new_session(
792792
connection = #v1_connection{outgoing_max_frame_size = MaxFrame,
793793
vhost = Vhost,
794794
user = User,
795+
container_id = ContainerId,
795796
name = ConnName},
796797
writer = WriterPid} = State) ->
797798
%% Subtract fixed frame header size.
@@ -804,6 +805,7 @@ send_to_new_session(
804805
OutgoingMaxFrameSize,
805806
User,
806807
Vhost,
808+
ContainerId,
807809
ConnName,
808810
BeginFrame],
809811
case rabbit_amqp_session_sup:start_session(SessionSup, ChildArgs) of

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 234 additions & 79 deletions
Large diffs are not rendered by default.

deps/rabbit/test/amqp_auth_SUITE.erl

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,12 @@ groups() ->
5555
[
5656
%% authz
5757
attach_source_queue,
58+
attach_source_queue_dynamic,
5859
attach_target_exchange,
5960
attach_target_topic_exchange,
6061
attach_target_queue,
62+
attach_target_queue_dynamic_exchange_write,
63+
attach_target_queue_dynamic_queue_configure,
6164
target_per_message_exchange,
6265
target_per_message_internal_exchange,
6366
target_per_message_topic,
@@ -437,6 +440,39 @@ attach_source_queue(Config) ->
437440
end,
438441
ok = close_connection_sync(Conn).
439442

443+
attach_source_queue_dynamic(Config) ->
444+
OpnConf = connection_config(Config),
445+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
446+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
447+
448+
%% missing configure permission to queue
449+
ok = set_permissions(Config, <<>>, <<".*">>, <<".*">>),
450+
451+
Source = #{address => undefined,
452+
dynamic => true,
453+
capabilities => [<<"temporary-queue">>],
454+
durable => none},
455+
AttachArgs = #{name => <<"my link">>,
456+
role => {receiver, Source, self()},
457+
snd_settle_mode => unsettled,
458+
rcv_settle_mode => first,
459+
filter => #{}},
460+
{ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs),
461+
receive {amqp10_event,
462+
{session, Session,
463+
{ended, Error}}} ->
464+
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
465+
description = {utf8, Description}} = Error,
466+
?assertEqual(
467+
match,
468+
re:run(Description,
469+
<<"^configure access to queue 'amq\.dyn-.*' in vhost "
470+
"'test vhost' refused for user 'test user'$">>,
471+
[{capture, none}]))
472+
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
473+
end,
474+
ok = close_connection_sync(Connection).
475+
440476
attach_target_exchange(Config) ->
441477
XName = <<"amq.fanout">>,
442478
Address1 = rabbitmq_amqp_address:exchange(XName),
@@ -485,6 +521,61 @@ attach_target_queue(Config) ->
485521
end,
486522
ok = amqp10_client:close_connection(Conn).
487523

524+
attach_target_queue_dynamic_exchange_write(Config) ->
525+
OpnConf = connection_config(Config),
526+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
527+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
528+
529+
%% missing write permission to default exchange
530+
ok = set_permissions(Config, <<".*">>, <<>>, <<".*">>),
531+
532+
Target = #{address => undefined,
533+
dynamic => true,
534+
capabilities => [<<"temporary-queue">>]},
535+
AttachArgs = #{name => <<"my link">>,
536+
role => {sender, Target},
537+
snd_settle_mode => mixed,
538+
rcv_settle_mode => first},
539+
{ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs),
540+
ExpectedErr = error_unauthorized(
541+
<<"write access to exchange 'amq.default' ",
542+
"in vhost 'test vhost' refused for user 'test user'">>),
543+
receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
544+
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
545+
end,
546+
ok = close_connection_sync(Connection).
547+
548+
attach_target_queue_dynamic_queue_configure(Config) ->
549+
OpnConf = connection_config(Config),
550+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
551+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
552+
553+
%% missing configure permission to queue
554+
ok = set_permissions(Config, <<>>, <<".*">>, <<".*">>),
555+
556+
Target = #{address => undefined,
557+
dynamic => true,
558+
capabilities => [<<"temporary-queue">>]},
559+
AttachArgs = #{name => <<"my link">>,
560+
role => {sender, Target},
561+
snd_settle_mode => mixed,
562+
rcv_settle_mode => first},
563+
{ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs),
564+
receive {amqp10_event,
565+
{session, Session,
566+
{ended, Error}}} ->
567+
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
568+
description = {utf8, Description}} = Error,
569+
?assertEqual(
570+
match,
571+
re:run(Description,
572+
<<"^configure access to queue 'amq\.dyn-.*' in vhost "
573+
"'test vhost' refused for user 'test user'$">>,
574+
[{capture, none}]))
575+
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
576+
end,
577+
ok = close_connection_sync(Connection).
578+
488579
target_per_message_exchange(Config) ->
489580
TargetAddress = null,
490581
To1 = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),

0 commit comments

Comments
 (0)