diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl
index 7b7418058714..435cce8aed61 100644
--- a/deps/amqp10_client/src/amqp10_client_session.erl
+++ b/deps/amqp10_client/src/amqp10_client_session.erl
@@ -698,23 +698,39 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->
make_source(#{role := {sender, _}}) ->
#'v1_0.source'{};
-make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
+make_source(#{role := {receiver, Source, _Pid},
+ filter := Filter}) ->
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
+ Dynamic = maps:get(dynamic, Source, false),
TranslatedFilter = translate_filters(Filter),
- #'v1_0.source'{address = {utf8, Address},
+ #'v1_0.source'{address = make_address(Source),
durable = {uint, Durable},
- filter = TranslatedFilter}.
+ dynamic = Dynamic,
+ filter = TranslatedFilter,
+ capabilities = make_capabilities(Source)}.
make_target(#{role := {receiver, _Source, _Pid}}) ->
#'v1_0.target'{};
-make_target(#{role := {sender, #{address := Address} = Target}}) ->
+make_target(#{role := {sender, Target}}) ->
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
- TargetAddr = case is_binary(Address) of
- true -> {utf8, Address};
- false -> Address
- end,
- #'v1_0.target'{address = TargetAddr,
- durable = {uint, Durable}}.
+ Dynamic = maps:get(dynamic, Target, false),
+ #'v1_0.target'{address = make_address(Target),
+ durable = {uint, Durable},
+ dynamic = Dynamic,
+ capabilities = make_capabilities(Target)}.
+
+make_address(#{address := Addr}) ->
+ if is_binary(Addr) ->
+ {utf8, Addr};
+ is_atom(Addr) ->
+ Addr
+ end.
+
+make_capabilities(#{capabilities := Caps0}) ->
+ Caps = [{symbol, C} || C <- Caps0],
+ {array, symbol, Caps};
+make_capabilities(_) ->
+ undefined.
max_message_size(#{max_message_size := Size})
when is_integer(Size) andalso
diff --git a/deps/rabbit/include/rabbit_amqp_reader.hrl b/deps/rabbit/include/rabbit_amqp_reader.hrl
index 0077a9c9c2be..732bc9f04398 100644
--- a/deps/rabbit/include/rabbit_amqp_reader.hrl
+++ b/deps/rabbit/include/rabbit_amqp_reader.hrl
@@ -3,6 +3,8 @@
-define(CLOSING_TIMEOUT, 30_000).
-define(SILENT_CLOSE_DELAY, 3_000).
+-define(SHUTDOWN_SESSIONS_TIMEOUT, 10_000).
+
%% Allow for potentially large sets of tokens during the SASL exchange.
%% https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html#_Toc67999915
-define(INITIAL_MAX_FRAME_SIZE, 8192).
diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl
index 423aa84ed829..3e5d5cc08dd7 100644
--- a/deps/rabbit/src/rabbit_amqp_reader.erl
+++ b/deps/rabbit/src/rabbit_amqp_reader.erl
@@ -220,10 +220,17 @@ terminate(_, _) ->
%%--------------------------------------------------------------------------
%% error handling / termination
-close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) ->
+close(Error, State0 = #v1{connection = #v1_connection{timeout = Timeout}}) ->
%% Client properties will be emitted in the connection_closed event by rabbit_reader.
- ClientProperties = i(client_properties, State),
+ ClientProperties = i(client_properties, State0),
put(client_properties, ClientProperties),
+
+ %% "It is illegal to send any more frames (or bytes of any other kind)
+ %% after sending a close frame." [2.7.9]
+ %% Sessions might send frames via the writer proc.
+ %% Therefore, let's first try to orderly shutdown our sessions.
+ State = shutdown_sessions(State0),
+
Time = case Timeout > 0 andalso
Timeout < ?CLOSING_TIMEOUT of
true -> Timeout;
@@ -233,6 +240,31 @@ close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) ->
ok = send_on_channel0(State, #'v1_0.close'{error = Error}, amqp10_framing),
State#v1{connection_state = closed}.
+shutdown_sessions(#v1{tracked_channels = Channels} = State) ->
+ maps:foreach(fun(_ChannelNum, Pid) ->
+ gen_server:cast(Pid, shutdown)
+ end, Channels),
+ TimerRef = erlang:send_after(?SHUTDOWN_SESSIONS_TIMEOUT,
+ self(),
+ shutdown_sessions_timeout),
+ wait_for_shutdown_sessions(TimerRef, State).
+
+wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State)
+ when map_size(Channels) =:= 0 ->
+ ok = erlang:cancel_timer(TimerRef, [{async, false},
+ {info, false}]),
+ State;
+wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State0) ->
+ receive
+ {{'DOWN', ChannelNum}, _MRef, process, SessionPid, _Reason} ->
+ State = untrack_channel(ChannelNum, SessionPid, State0),
+ wait_for_shutdown_sessions(TimerRef, State);
+ shutdown_sessions_timeout ->
+ ?LOG_INFO("sessions running ~b ms after requested to be shut down: ~p",
+ [?SHUTDOWN_SESSIONS_TIMEOUT, maps:values(Channels)]),
+ State0
+ end.
+
handle_session_exit(ChannelNum, SessionPid, Reason, State0) ->
State = untrack_channel(ChannelNum, SessionPid, State0),
S = case terminated_normally(Reason) of
@@ -760,6 +792,7 @@ send_to_new_session(
connection = #v1_connection{outgoing_max_frame_size = MaxFrame,
vhost = Vhost,
user = User,
+ container_id = ContainerId,
name = ConnName},
writer = WriterPid} = State) ->
%% Subtract fixed frame header size.
@@ -772,6 +805,7 @@ send_to_new_session(
OutgoingMaxFrameSize,
User,
Vhost,
+ ContainerId,
ConnName,
BeginFrame],
case rabbit_amqp_session_sup:start_session(SessionSup, ChildArgs) of
diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl
index b23c492d3bfe..4ad681707a25 100644
--- a/deps/rabbit/src/rabbit_amqp_session.erl
+++ b/deps/rabbit/src/rabbit_amqp_session.erl
@@ -85,8 +85,10 @@
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(HIBERNATE_AFTER, 6_000).
-define(CREDIT_REPLY_TIMEOUT, 30_000).
+%% Capability defined in amqp-bindmap-jms-v1.0-wd10 [5.2] and sent by Qpid JMS client.
+-define(CAP_TEMPORARY_QUEUE, <<"temporary-queue">>).
--export([start_link/8,
+-export([start_link/9,
process_frame/2,
list_local/0,
conserve_resources/3,
@@ -163,6 +165,7 @@
routing_key :: rabbit_types:routing_key() | to | subject,
%% queue_name_bin is only set if the link target address refers to a queue.
queue_name_bin :: undefined | rabbit_misc:resource_name(),
+ dynamic :: boolean(),
max_message_size :: pos_integer(),
delivery_count :: sequence_no(),
credit :: rabbit_queue_type:credit(),
@@ -206,6 +209,7 @@
%% or a topic filter, an outgoing link will always consume from a queue.
queue_name :: rabbit_amqqueue:name(),
queue_type :: rabbit_queue_type:queue_type(),
+ dynamic :: boolean(),
send_settled :: boolean(),
max_message_size :: unlimited | pos_integer(),
@@ -260,6 +264,7 @@
-record(cfg, {
outgoing_max_frame_size :: unlimited | pos_integer(),
+ container_id :: binary(),
reader_pid :: rabbit_types:connection(),
writer_pid :: pid(),
user :: rabbit_types:user(),
@@ -382,15 +387,17 @@
-type state() :: #state{}.
-start_link(ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame) ->
- Args = {ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame},
+start_link(ReaderPid, WriterPid, ChannelNum, FrameMax,
+ User, Vhost, ContainerId, ConnName, BeginFrame) ->
+ Args = {ReaderPid, WriterPid, ChannelNum, FrameMax,
+ User, Vhost, ContainerId, ConnName, BeginFrame},
Opts = [{hibernate_after, ?HIBERNATE_AFTER}],
gen_server:start_link(?MODULE, Args, Opts).
process_frame(Pid, FrameBody) ->
gen_server:cast(Pid, {frame_body, FrameBody}).
-init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
+init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId, ConnName,
#'v1_0.begin'{
%% "If a session is locally initiated, the remote-channel MUST NOT be set." [2.7.2]
remote_channel = undefined,
@@ -401,6 +408,7 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
process_flag(trap_exit, true),
rabbit_process_flag:adjust_for_message_handling_proc(),
logger:update_process_metadata(#{channel_number => ChannelNum,
+ amqp_container => ContainerId,
connection => ConnName,
vhost => Vhost,
user => User#user.username}),
@@ -453,7 +461,8 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
remote_incoming_window = RemoteIncomingWindow,
remote_outgoing_window = RemoteOutgoingWindow,
outgoing_delivery_id = ?INITIAL_OUTGOING_DELIVERY_ID,
- cfg = #cfg{reader_pid = ReaderPid,
+ cfg = #cfg{container_id = ContainerId,
+ reader_pid = ReaderPid,
writer_pid = WriterPid,
outgoing_max_frame_size = MaxFrameSize,
user = User,
@@ -470,14 +479,17 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
terminate(_Reason, #state{incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks,
- queue_states = QStates}) ->
+ queue_states = QStates,
+ cfg = Cfg}) ->
maps:foreach(
- fun (_, _) ->
- rabbit_global_counters:publisher_deleted(?PROTOCOL)
+ fun (_, Link) ->
+ rabbit_global_counters:publisher_deleted(?PROTOCOL),
+ maybe_delete_dynamic_queue(Link, Cfg)
end, IncomingLinks),
maps:foreach(
- fun (_, _) ->
- rabbit_global_counters:consumer_deleted(?PROTOCOL)
+ fun (_, Link) ->
+ rabbit_global_counters:consumer_deleted(?PROTOCOL),
+ maybe_delete_dynamic_queue(Link, Cfg)
end, OutgoingLinks),
ok = rabbit_queue_type:close(QStates).
@@ -602,7 +614,9 @@ handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) ->
noreply(State)
catch exit:#'v1_0.error'{} = Error ->
log_error_and_close_session(Error, State1)
- end.
+ end;
+handle_cast(shutdown, State) ->
+ {stop, normal, State}.
log_error_and_close_session(
Error, State = #state{cfg = #cfg{reader_pid = ReaderPid,
@@ -1092,39 +1106,52 @@ handle_frame(#'v1_0.attach'{handle = ?UINT(Handle)} = Attach,
end;
handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
- State0 = #state{incoming_links = IncomingLinks,
+ State0 = #state{incoming_links = IncomingLinks0,
outgoing_links = OutgoingLinks0,
outgoing_unsettled_map = Unsettled0,
outgoing_pending = Pending0,
queue_states = QStates0,
- cfg = #cfg{user = #user{username = Username}}}) ->
+ cfg = Cfg = #cfg{user = #user{username = Username}}}) ->
{OutgoingLinks, Unsettled, Pending, QStates} =
case maps:take(HandleInt, OutgoingLinks0) of
- {#outgoing_link{queue_name = QName}, OutgoingLinks1} ->
+ {#outgoing_link{queue_name = QName,
+ dynamic = Dynamic}, OutgoingLinks1} ->
Ctag = handle_to_ctag(HandleInt),
{Unsettled1, Pending1} = remove_outgoing_link(Ctag, Unsettled0, Pending0),
- case rabbit_amqqueue:lookup(QName) of
- {ok, Q} ->
- Spec = #{consumer_tag => Ctag,
- reason => remove,
- user => Username},
- case rabbit_queue_type:cancel(Q, Spec, QStates0) of
- {ok, QStates1} ->
- {OutgoingLinks1, Unsettled1, Pending1, QStates1};
- {error, Reason} ->
- protocol_error(
- ?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
- "Failed to remove consumer from ~s: ~tp",
- [rabbit_misc:rs(amqqueue:get_name(Q)), Reason])
- end;
- {error, not_found} ->
- {OutgoingLinks1, Unsettled1, Pending1, QStates0}
+ case Dynamic of
+ true ->
+ delete_dynamic_queue(QName, Cfg),
+ {OutgoingLinks1, Unsettled1, Pending1, QStates0};
+ false ->
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q} ->
+ Spec = #{consumer_tag => Ctag,
+ reason => remove,
+ user => Username},
+ case rabbit_queue_type:cancel(Q, Spec, QStates0) of
+ {ok, QStates1} ->
+ {OutgoingLinks1, Unsettled1, Pending1, QStates1};
+ {error, Reason} ->
+ protocol_error(
+ ?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
+ "Failed to remove consumer from ~s: ~tp",
+ [rabbit_misc:rs(amqqueue:get_name(Q)), Reason])
+ end;
+ {error, not_found} ->
+ {OutgoingLinks1, Unsettled1, Pending1, QStates0}
+ end
end;
error ->
{OutgoingLinks0, Unsettled0, Pending0, QStates0}
end,
-
- State1 = State0#state{incoming_links = maps:remove(HandleInt, IncomingLinks),
+ IncomingLinks = case maps:take(HandleInt, IncomingLinks0) of
+ {IncomingLink, IncomingLinks1} ->
+ maybe_delete_dynamic_queue(IncomingLink, Cfg),
+ IncomingLinks1;
+ error ->
+ IncomingLinks0
+ end,
+ State1 = State0#state{incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks,
outgoing_unsettled_map = Unsettled,
outgoing_pending = Pending,
@@ -1269,29 +1296,33 @@ handle_attach(#'v1_0.attach'{
reply_frames([Reply], State);
handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
- name = LinkName = {utf8, LinkName0},
+ name = LinkName = {utf8, LinkNameBin},
handle = Handle = ?UINT(HandleInt),
source = Source,
snd_settle_mode = MaybeSndSettleMode,
- target = Target = #'v1_0.target'{address = TargetAddress},
+ target = Target0,
initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt)
},
State0 = #state{incoming_links = IncomingLinks0,
permission_cache = PermCache0,
- cfg = #cfg{max_link_credit = MaxLinkCredit,
+ cfg = #cfg{container_id = ContainerId,
+ reader_pid = ReaderPid,
+ max_link_credit = MaxLinkCredit,
vhost = Vhost,
user = User}}) ->
- case ensure_target(Target, Vhost, User, PermCache0) of
- {ok, Exchange, RoutingKey, QNameBin, PermCache} ->
+ case ensure_target(Target0, LinkNameBin, Vhost, User,
+ ContainerId, ReaderPid, PermCache0) of
+ {ok, Exchange, RoutingKey, QNameBin, Target, PermCache} ->
SndSettleMode = snd_settle_mode(MaybeSndSettleMode),
MaxMessageSize = persistent_term:get(max_message_size),
IncomingLink = #incoming_link{
- name = LinkName0,
+ name = LinkNameBin,
snd_settle_mode = SndSettleMode,
- target_address = address(TargetAddress),
+ target_address = address(Target#'v1_0.target'.address),
exchange = Exchange,
routing_key = RoutingKey,
queue_name_bin = QNameBin,
+ dynamic = default(Target#'v1_0.target'.dynamic, false),
max_message_size = MaxMessageSize,
delivery_count = DeliveryCountInt,
credit = MaxLinkCredit},
@@ -1325,10 +1356,9 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
end;
handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
- name = LinkName = {utf8, LinkName0},
+ name = LinkName = {utf8, LinkNameBin},
handle = Handle = ?UINT(HandleInt),
- source = Source = #'v1_0.source'{address = SourceAddress,
- filter = DesiredFilter},
+ source = Source0 = #'v1_0.source'{filter = DesiredFilter},
snd_settle_mode = SndSettleMode,
rcv_settle_mode = RcvSettleMode,
max_message_size = MaybeMaxMessageSize,
@@ -1339,6 +1369,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
topic_permission_cache = TopicPermCache0,
cfg = #cfg{vhost = Vhost,
user = User = #user{username = Username},
+ container_id = ContainerId,
reader_pid = ReaderPid}}) ->
{SndSettled, EffectiveSndSettleMode} =
case SndSettleMode of
@@ -1350,10 +1381,11 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
%% client only for durable messages.
{false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED}
end,
- case ensure_source(Source, Vhost, User, PermCache0, TopicPermCache0) of
+ case ensure_source(Source0, LinkNameBin, Vhost, User, ContainerId,
+ ReaderPid, PermCache0, TopicPermCache0) of
{error, Reason} ->
protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach rejected: ~tp", [Reason]);
- {ok, QName = #resource{name = QNameBin}, PermCache1, TopicPermCache} ->
+ {ok, QName = #resource{name = QNameBin}, Source, PermCache1, TopicPermCache} ->
PermCache = check_resource_access(QName, read, User, PermCache1),
case rabbit_amqqueue:with(
QName,
@@ -1439,12 +1471,14 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
%% Echo back that we will respect the client's requested max-message-size.
max_message_size = MaybeMaxMessageSize,
offered_capabilities = OfferedCaps},
+ {utf8, SourceAddress} = Source#'v1_0.source'.address,
MaxMessageSize = max_message_size(MaybeMaxMessageSize),
Link = #outgoing_link{
- name = LinkName0,
- source_address = address(SourceAddress),
+ name = LinkNameBin,
+ source_address = SourceAddress,
queue_name = queue_resource(Vhost, QNameBin),
queue_type = QType,
+ dynamic = default(Source#'v1_0.source'.dynamic, false),
send_settled = SndSettled,
max_message_size = MaxMessageSize,
credit_api_version = CreditApiVsn,
@@ -2614,17 +2648,53 @@ maybe_grant_mgmt_link_credit(Credit, _, _) ->
{Credit, []}.
-spec ensure_source(#'v1_0.source'{},
+ binary(),
rabbit_types:vhost(),
rabbit_types:user(),
+ binary(),
+ rabbit_types:connection(),
permission_cache(),
topic_permission_cache()) ->
- {ok, rabbit_amqqueue:name(), permission_cache(), topic_permission_cache()} |
+ {ok,
+ rabbit_amqqueue:name(),
+ #'v1_0.source'{},
+ permission_cache(),
+ topic_permission_cache()} |
{error, term()}.
-ensure_source(#'v1_0.source'{dynamic = true}, _, _, _, _) ->
- exit_not_implemented("Dynamic sources not supported");
-ensure_source(#'v1_0.source'{address = Address,
- durable = Durable},
- Vhost, User, PermCache, TopicPermCache) ->
+ensure_source(#'v1_0.source'{
+ address = undefined,
+ dynamic = true,
+ %% We will reply with the actual node properties.
+ dynamic_node_properties = _IgnoreDesiredProperties,
+ capabilities = {array, symbol, Caps}
+ } = Source0,
+ LinkName, Vhost, User, ContainerId,
+ ConnPid, PermCache0, TopicPermCache) ->
+ case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of
+ true ->
+ {QNameBin, Address, Props, PermCache} =
+ declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0),
+ Source = Source0#'v1_0.source'{
+ address = {utf8, Address},
+ %% While Khepri stores queue records durably, the terminus
+ %% - i.e. the existence of this receiver - is not stored durably.
+ durable = ?V_1_0_TERMINUS_DURABILITY_NONE,
+ expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH,
+ timeout = {uint, 0},
+ dynamic_node_properties = Props,
+ distribution_mode = ?V_1_0_STD_DIST_MODE_MOVE,
+ capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE])
+ },
+ QName = queue_resource(Vhost, QNameBin),
+ {ok, QName, Source, PermCache, TopicPermCache};
+ false ->
+ exit_not_implemented("Dynamic source not supported: ~p", [Source0])
+ end;
+ensure_source(Source = #'v1_0.source'{dynamic = true}, _, _, _, _, _, _, _) ->
+ exit_not_implemented("Dynamic source not supported: ~p", [Source]);
+ensure_source(Source = #'v1_0.source'{address = Address,
+ durable = Durable},
+ _LinkName, Vhost, User, _ContainerId, _ConnPid, PermCache, TopicPermCache) ->
case Address of
{utf8, <<"/queues/", QNameBinQuoted/binary>>} ->
%% The only possible v2 source address format is:
@@ -2633,15 +2703,20 @@ ensure_source(#'v1_0.source'{address = Address,
QNameBin ->
QName = queue_resource(Vhost, QNameBin),
ok = exit_if_absent(QName),
- {ok, QName, PermCache, TopicPermCache}
+ {ok, QName, Source, PermCache, TopicPermCache}
catch error:_ ->
{error, {bad_address, Address}}
end;
{utf8, SourceAddr} ->
case address_v1_permitted() of
true ->
- ensure_source_v1(SourceAddr, Vhost, User, Durable,
- PermCache, TopicPermCache);
+ case ensure_source_v1(SourceAddr, Vhost, User, Durable,
+ PermCache, TopicPermCache) of
+ {ok, QName, PermCache1, TopicPermCache1} ->
+ {ok, QName, Source, PermCache1, TopicPermCache1};
+ Err ->
+ Err
+ end;
false ->
{error, {amqp_address_v1_not_permitted, Address}}
end;
@@ -2687,42 +2762,71 @@ ensure_source_v1(Address,
Err
end.
-address(undefined) ->
- null;
-address({utf8, String}) ->
- String.
-
-spec ensure_target(#'v1_0.target'{},
+ binary(),
rabbit_types:vhost(),
rabbit_types:user(),
+ binary(),
+ rabbit_types:connection(),
permission_cache()) ->
{ok,
rabbit_types:exchange() | rabbit_exchange:name() | to,
rabbit_types:routing_key() | to | subject,
rabbit_misc:resource_name() | undefined,
+ #'v1_0.target'{},
permission_cache()} |
{error, term()}.
-ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) ->
- exit_not_implemented("Dynamic targets not supported");
-ensure_target(#'v1_0.target'{address = Address,
- durable = Durable},
- Vhost, User, PermCache) ->
+ensure_target(#'v1_0.target'{
+ address = undefined,
+ dynamic = true,
+ %% We will reply with the actual node properties.
+ dynamic_node_properties = _IgnoreDesiredProperties,
+ capabilities = {array, symbol, Caps}
+ } = Target0,
+ LinkName, Vhost, User, ContainerId, ConnPid, PermCache0) ->
+ case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of
+ true ->
+ {QNameBin, Address, Props, PermCache1} =
+ declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0),
+ {ok, Exchange, PermCache} = check_exchange(?DEFAULT_EXCHANGE_NAME, User, Vhost, PermCache1),
+ Target = #'v1_0.target'{
+ address = {utf8, Address},
+ %% While Khepri stores queue records durably,
+ %% the terminus - i.e. the existence of this producer - is not stored durably.
+ durable = ?V_1_0_TERMINUS_DURABILITY_NONE,
+ expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH,
+ timeout = {uint, 0},
+ dynamic = true,
+ dynamic_node_properties = Props,
+ capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE])
+ },
+ {ok, Exchange, QNameBin, QNameBin, Target, PermCache};
+ false ->
+ exit_not_implemented("Dynamic target not supported: ~p", [Target0])
+ end;
+ensure_target(Target = #'v1_0.target'{dynamic = true}, _, _, _, _, _, _) ->
+ exit_not_implemented("Dynamic target not supported: ~p", [Target]);
+ensure_target(Target = #'v1_0.target'{address = Address,
+ durable = Durable},
+ _LinkName, Vhost, User, _ContainerId, _ConnPid, PermCache0) ->
case target_address_version(Address) of
2 ->
case ensure_target_v2(Address, Vhost) of
{ok, to, RKey, QNameBin} ->
- {ok, to, RKey, QNameBin, PermCache};
+ {ok, to, RKey, QNameBin, Target, PermCache0};
{ok, XNameBin, RKey, QNameBin} ->
- check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
+ {ok, Exchange, PermCache} = check_exchange(XNameBin, User, Vhost, PermCache0),
+ {ok, Exchange, RKey, QNameBin, Target, PermCache};
{error, _} = Err ->
Err
end;
1 ->
case address_v1_permitted() of
true ->
- case ensure_target_v1(Address, Vhost, User, Durable, PermCache) of
+ case ensure_target_v1(Address, Vhost, User, Durable, PermCache0) of
{ok, XNameBin, RKey, QNameBin, PermCache1} ->
- check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache1);
+ {ok, Exchange, PermCache} = check_exchange(XNameBin, User, Vhost, PermCache1),
+ {ok, Exchange, RKey, QNameBin, Target, PermCache};
{error, _} = Err ->
Err
end;
@@ -2731,7 +2835,7 @@ ensure_target(#'v1_0.target'{address = Address,
end
end.
-check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
+check_exchange(XNameBin, User, Vhost, PermCache0) ->
XName = exchange_resource(Vhost, XNameBin),
PermCache = check_resource_access(XName, write, User, PermCache0),
case rabbit_exchange:lookup(XName) of
@@ -2745,7 +2849,7 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
<<"amq.", _/binary>> -> X;
_ -> XName
end,
- {ok, Exchange, RKey, QNameBin, PermCache};
+ {ok, Exchange, PermCache};
{error, not_found} ->
exit_not_found(XName)
end.
@@ -3033,7 +3137,10 @@ credit_reply_timeout(QType, QName) ->
protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args).
default(undefined, Default) -> Default;
-default(Thing, _Default) -> Thing.
+default(Thing, _Default) -> Thing.
+
+address(undefined) -> null;
+address({utf8, String}) -> String.
snd_settle_mode({ubyte, Val}) ->
case Val of
@@ -3247,20 +3354,20 @@ ensure_terminus(Type, {exchange, {XNameList, _RoutingKey}}, Vhost, User, Durabil
ok = exit_if_absent(exchange, Vhost, XNameList),
case Type of
target -> {undefined, PermCache};
- source -> declare_queue(generate_queue_name(), Vhost, User, Durability, PermCache)
+ source -> declare_queue_v1(generate_queue_name_v1(), Vhost, User, Durability, PermCache)
end;
ensure_terminus(target, {topic, _bindingkey}, _, _, _, PermCache) ->
%% exchange amq.topic exists
{undefined, PermCache};
ensure_terminus(source, {topic, _BindingKey}, Vhost, User, Durability, PermCache) ->
%% exchange amq.topic exists
- declare_queue(generate_queue_name(), Vhost, User, Durability, PermCache);
+ declare_queue_v1(generate_queue_name_v1(), Vhost, User, Durability, PermCache);
ensure_terminus(target, {queue, undefined}, _, _, _, PermCache) ->
%% Target "/queue" means publish to default exchange with message subject as routing key.
%% Default exchange exists.
{undefined, PermCache};
ensure_terminus(_, {queue, QNameList}, Vhost, User, Durability, PermCache) ->
- declare_queue(unicode:characters_to_binary(QNameList), Vhost, User, Durability, PermCache);
+ declare_queue_v1(unicode:characters_to_binary(QNameList), Vhost, User, Durability, PermCache);
ensure_terminus(_, {amqqueue, QNameList}, Vhost, _, _, PermCache) ->
%% Target "/amq/queue/" is handled specially due to AMQP legacy:
%% "Queue names starting with "amq." are reserved for pre-declared and
@@ -3285,22 +3392,39 @@ exit_if_absent(ResourceName = #resource{kind = Kind}) ->
false -> exit_not_found(ResourceName)
end.
-generate_queue_name() ->
+generate_queue_name_v1() ->
rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen").
+%% "The generated name of the address SHOULD include the link name and the
+%% container-id of the remote container to allow for ease of identification." [3.5.4]
+%% Let's include container-id and link name if they are not very long
+%% because the generated address might be sent in every message.
+generate_queue_name_dynamic(ContainerId, LinkName)
+ when byte_size(ContainerId) + byte_size(LinkName) < 150 ->
+ Prefix = <<"amq.dyn-", ContainerId/binary, "-", LinkName/binary>>,
+ rabbit_guid:binary(rabbit_guid:gen_secure(), Prefix);
+generate_queue_name_dynamic(_, _) ->
+ rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.dyn.gen").
+
+declare_queue_v1(QNameBin, Vhost, User, TerminusDurability, PermCache0) ->
+ Durable = queue_is_durable(TerminusDurability),
+ {ok, PermCache} = declare_queue(QNameBin, Vhost, User, Durable, none, PermCache0),
+ {QNameBin, PermCache}.
+
declare_queue(QNameBin,
Vhost,
User = #user{username = Username},
- TerminusDurability,
+ Durable,
+ QOwner,
PermCache0) ->
QName = queue_resource(Vhost, QNameBin),
PermCache = check_resource_access(QName, configure, User, PermCache0),
rabbit_core_metrics:queue_declared(QName),
Q0 = amqqueue:new(QName,
_Pid = none,
- queue_is_durable(TerminusDurability),
+ Durable,
_AutoDelete = false,
- _QOwner = none,
+ QOwner,
_QArgs = [],
Vhost,
#{user => Username},
@@ -3320,7 +3444,40 @@ declare_queue(QNameBin,
"Failed to declare ~s: ~p",
[rabbit_misc:rs(QName), Other])
end,
- {QNameBin, PermCache}.
+ {ok, PermCache}.
+
+declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0) ->
+ QNameBin = generate_queue_name_dynamic(ContainerId, LinkName),
+ {ok, PermCache} = declare_queue(QNameBin, Vhost, User, true, ConnPid, PermCache0),
+ QNameBinQuoted = uri_string:quote(QNameBin),
+ Address = <<"/queues/", QNameBinQuoted/binary>>,
+ Props = {map, [{{symbol, <<"lifetime-policy">>},
+ {described, ?V_1_0_SYMBOL_DELETE_ON_CLOSE, {list, []}}},
+ {{symbol, <<"supported-dist-modes">>},
+ {array, symbol, [?V_1_0_STD_DIST_MODE_MOVE]}}]},
+ {QNameBin, Address, Props, PermCache}.
+
+maybe_delete_dynamic_queue(#incoming_link{dynamic = true,
+ queue_name_bin = QNameBin},
+ Cfg = #cfg{vhost = Vhost}) ->
+ QName = queue_resource(Vhost, QNameBin),
+ delete_dynamic_queue(QName, Cfg);
+maybe_delete_dynamic_queue(#outgoing_link{dynamic = true,
+ queue_name = QName},
+ Cfg) ->
+ delete_dynamic_queue(QName, Cfg);
+maybe_delete_dynamic_queue(_, _) ->
+ ok.
+
+delete_dynamic_queue(QName, #cfg{user = #user{username = Username}}) ->
+ %% No real need to check for 'configure' access again since this queue is owned by
+ %% this connection and the user had 'configure' access when the queue got declared.
+ _ = rabbit_amqqueue:with(
+ QName,
+ fun(Q) ->
+ rabbit_queue_type:delete(Q, false, false, Username)
+ end),
+ ok.
outcomes(#'v1_0.source'{outcomes = undefined}) ->
{array, symbol, ?OUTCOMES};
diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl
index 498e333bc8c0..723ca4b5df58 100644
--- a/deps/rabbit/src/rabbit_reader.erl
+++ b/deps/rabbit/src/rabbit_reader.erl
@@ -202,7 +202,7 @@ conserve_resources(Pid, Source, {_, Conserve, _}) ->
server_properties(Protocol) ->
{ok, Product} = application:get_key(rabbit, description),
- {ok, Version} = application:get_key(rabbit, vsn),
+ Version = rabbit_misc:version(),
%% Get any configuration-specified server properties
{ok, RawConfigServerProps} = application:get_env(rabbit,
diff --git a/deps/rabbit/test/amqp_auth_SUITE.erl b/deps/rabbit/test/amqp_auth_SUITE.erl
index 581351c462ed..5889cbdd5003 100644
--- a/deps/rabbit/test/amqp_auth_SUITE.erl
+++ b/deps/rabbit/test/amqp_auth_SUITE.erl
@@ -55,9 +55,12 @@ groups() ->
[
%% authz
attach_source_queue,
+ attach_source_queue_dynamic,
attach_target_exchange,
attach_target_topic_exchange,
attach_target_queue,
+ attach_target_queue_dynamic_exchange_write,
+ attach_target_queue_dynamic_queue_configure,
target_per_message_exchange,
target_per_message_internal_exchange,
target_per_message_topic,
@@ -437,6 +440,39 @@ attach_source_queue(Config) ->
end,
ok = close_connection_sync(Conn).
+attach_source_queue_dynamic(Config) ->
+ OpnConf = connection_config(Config),
+ {ok, Connection} = amqp10_client:open_connection(OpnConf),
+ {ok, Session} = amqp10_client:begin_session_sync(Connection),
+
+ %% missing configure permission to queue
+ ok = set_permissions(Config, <<>>, <<".*">>, <<".*">>),
+
+ Source = #{address => undefined,
+ dynamic => true,
+ capabilities => [<<"temporary-queue">>],
+ durable => none},
+ AttachArgs = #{name => <<"my link">>,
+ role => {receiver, Source, self()},
+ snd_settle_mode => unsettled,
+ rcv_settle_mode => first,
+ filter => #{}},
+ {ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs),
+ receive {amqp10_event,
+ {session, Session,
+ {ended, Error}}} ->
+ #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
+ description = {utf8, Description}} = Error,
+ ?assertEqual(
+ match,
+ re:run(Description,
+ <<"^configure access to queue 'amq\.dyn-.*' in vhost "
+ "'test vhost' refused for user 'test user'$">>,
+ [{capture, none}]))
+ after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
+ end,
+ ok = close_connection_sync(Connection).
+
attach_target_exchange(Config) ->
XName = <<"amq.fanout">>,
Address1 = rabbitmq_amqp_address:exchange(XName),
@@ -485,6 +521,61 @@ attach_target_queue(Config) ->
end,
ok = amqp10_client:close_connection(Conn).
+attach_target_queue_dynamic_exchange_write(Config) ->
+ OpnConf = connection_config(Config),
+ {ok, Connection} = amqp10_client:open_connection(OpnConf),
+ {ok, Session} = amqp10_client:begin_session_sync(Connection),
+
+ %% missing write permission to default exchange
+ ok = set_permissions(Config, <<".*">>, <<>>, <<".*">>),
+
+ Target = #{address => undefined,
+ dynamic => true,
+ capabilities => [<<"temporary-queue">>]},
+ AttachArgs = #{name => <<"my link">>,
+ role => {sender, Target},
+ snd_settle_mode => mixed,
+ rcv_settle_mode => first},
+ {ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs),
+ ExpectedErr = error_unauthorized(
+ <<"write access to exchange 'amq.default' ",
+ "in vhost 'test vhost' refused for user 'test user'">>),
+ receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
+ after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
+ end,
+ ok = close_connection_sync(Connection).
+
+attach_target_queue_dynamic_queue_configure(Config) ->
+ OpnConf = connection_config(Config),
+ {ok, Connection} = amqp10_client:open_connection(OpnConf),
+ {ok, Session} = amqp10_client:begin_session_sync(Connection),
+
+ %% missing configure permission to queue
+ ok = set_permissions(Config, <<>>, <<".*">>, <<".*">>),
+
+ Target = #{address => undefined,
+ dynamic => true,
+ capabilities => [<<"temporary-queue">>]},
+ AttachArgs = #{name => <<"my link">>,
+ role => {sender, Target},
+ snd_settle_mode => mixed,
+ rcv_settle_mode => first},
+ {ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs),
+ receive {amqp10_event,
+ {session, Session,
+ {ended, Error}}} ->
+ #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
+ description = {utf8, Description}} = Error,
+ ?assertEqual(
+ match,
+ re:run(Description,
+ <<"^configure access to queue 'amq\.dyn-.*' in vhost "
+ "'test vhost' refused for user 'test user'$">>,
+ [{capture, none}]))
+ after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
+ end,
+ ok = close_connection_sync(Connection).
+
target_per_message_exchange(Config) ->
TargetAddress = null,
To1 = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl
index 17d997a78a55..3c3f47574d57 100644
--- a/deps/rabbit/test/amqp_client_SUITE.erl
+++ b/deps/rabbit/test/amqp_client_SUITE.erl
@@ -130,6 +130,10 @@ groups() ->
handshake_timeout,
credential_expires,
attach_to_exclusive_queue,
+ dynamic_target_short_link_name,
+ dynamic_target_long_link_name,
+ dynamic_source_rpc,
+ dynamic_terminus_delete,
modified_classic_queue,
modified_quorum_queue,
modified_dead_letter_headers_exchange,
@@ -4762,6 +4766,230 @@ attach_to_exclusive_queue(Config) ->
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
+dynamic_target_short_link_name(Config) ->
+ OpnConf0 = connection_config(Config),
+ OpnConf = OpnConf0#{container_id := <<"my-container">>,
+ notify_with_performative => true},
+ {ok, Connection} = amqp10_client:open_connection(OpnConf),
+ {ok, Session} = amqp10_client:begin_session_sync(Connection),
+
+ %% "The address of the target MUST NOT be set" [3.5.4]
+ Target = #{address => undefined,
+ dynamic => true,
+ capabilities => [<<"temporary-queue">>]},
+ ShortLinkName = <<"my/sender">>,
+ AttachArgs = #{name => ShortLinkName,
+ role => {sender, Target},
+ snd_settle_mode => mixed,
+ rcv_settle_mode => first},
+ {ok, Sender} = amqp10_client:attach_link(Session, AttachArgs),
+ Addr = receive {amqp10_event, {link, Sender, {attached, Attach}}} ->
+ #'v1_0.attach'{
+ target = #'v1_0.target'{
+ address = {utf8, Address},
+ dynamic = true}} = Attach,
+ Address
+ after 30000 -> ct:fail({missing_event, ?LINE})
+ end,
+ %% The client doesn't really care what the address looks like.
+ %% However let's do whitebox testing here and check the address format.
+ %% We expect the address to contain both container ID and link name since they are short.
+ ?assertMatch(<<"/queues/amq.dyn-my-container-my%2Fsender-", _GUID/binary>>, Addr),
+ ok = wait_for_credit(Sender),
+ ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)),
+ ok = wait_for_accepted(<<"t1">>),
+
+ {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"my-receiver">>, Addr, unsettled),
+ {ok, Msg} = amqp10_client:get_msg(Receiver),
+ ?assertEqual(<<"m1">>, amqp10_msg:body_bin(Msg)),
+ ok = amqp10_client:accept_msg(Receiver, Msg),
+
+ %% The exclusive queue should be deleted when we close our connection.
+ ?assertMatch([_ExclusiveQueue], rpc(Config, rabbit_amqqueue, list, [])),
+ ok = close_connection_sync(Connection),
+ eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))),
+ ok.
+
+dynamic_target_long_link_name(Config) ->
+ OpnConf0 = connection_config(Config),
+ OpnConf = OpnConf0#{container_id := <<"my-container">>,
+ notify_with_performative => true},
+ {ok, Connection} = amqp10_client:open_connection(OpnConf),
+ {ok, Session} = amqp10_client:begin_session_sync(Connection),
+
+ %% "The address of the target MUST NOT be set" [3.5.4]
+ Target = #{address => undefined,
+ dynamic => true,
+ capabilities => [<<"temporary-queue">>]},
+ LongLinkName = binary:copy(<<"z">>, 200),
+ AttachArgs = #{name => LongLinkName,
+ role => {sender, Target},
+ snd_settle_mode => mixed,
+ rcv_settle_mode => first},
+ {ok, Sender} = amqp10_client:attach_link(Session, AttachArgs),
+ Addr = receive {amqp10_event, {link, Sender, {attached, Attach}}} ->
+ #'v1_0.attach'{
+ target = #'v1_0.target'{
+ address = {utf8, Address},
+ dynamic = true}} = Attach,
+ Address
+ after 30000 -> ct:fail({missing_event, ?LINE})
+ end,
+ %% The client doesn't really care what the address looks like.
+ %% However let's do whitebox testing here and check the address format.
+ %% We expect the address to not contain the long link name.
+ ?assertMatch(<<"/queues/amq.dyn.gen-", _GUID/binary>>, Addr),
+ ok = wait_for_credit(Sender),
+ ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)),
+ ok = wait_for_accepted(<<"t1">>),
+
+ {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"my-receiver">>, Addr, unsettled),
+ {ok, Msg} = amqp10_client:get_msg(Receiver),
+ ?assertEqual(<<"m1">>, amqp10_msg:body_bin(Msg)),
+ ok = amqp10_client:accept_msg(Receiver, Msg),
+ flush(accepted),
+
+ %% Since RabbitMQ uses the delete-on-close lifetime policy, the exclusive queue should be
+ %% "deleted at the point that the link which caused its creation ceases to exist" [3.5.10]
+ ok = amqp10_client:detach_link(Sender),
+ receive {amqp10_event, {link, Receiver, {detached, Detach}}} ->
+ ?assertMatch(
+ #'v1_0.detach'{error = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED}},
+ Detach)
+ after 5000 -> ct:fail({missing_event, ?LINE})
+ end,
+ ok = close_connection_sync(Connection).
+
+%% Test the following RPC workflow:
+%% RPC client -> queue -> RPC server
+%% RPC server -> dynamic queue -> RPC client
+dynamic_source_rpc(Config) ->
+ OpnConf0 = connection_config(Config),
+ OpnConf = OpnConf0#{container_id := <<"rpc-client">>,
+ notify_with_performative => true},
+ {ok, ConnectionClient} = amqp10_client:open_connection(OpnConf),
+ {ok, SessionClient} = amqp10_client:begin_session_sync(ConnectionClient),
+
+ %% "The address of the source MUST NOT be set" [3.5.3]
+ Source = #{address => undefined,
+ dynamic => true,
+ capabilities => [<<"temporary-queue">>],
+ durable => none},
+ AttachArgs = #{name => <<"rpc-client-receiver🥕"/utf8>>,
+ role => {receiver, Source, self()},
+ snd_settle_mode => unsettled,
+ rcv_settle_mode => first,
+ filter => #{}},
+ {ok, ReceiverClient} = amqp10_client:attach_link(SessionClient, AttachArgs),
+ RespAddr = receive {amqp10_event, {link, ReceiverClient, {attached, Attach}}} ->
+ #'v1_0.attach'{
+ source = #'v1_0.source'{
+ address = {utf8, Address},
+ dynamic = true}} = Attach,
+ Address
+ after 30000 -> ct:fail({missing_event, ?LINE})
+ end,
+ %% The client doesn't really care what the address looks like.
+ %% However let's do whitebox testing here and check the address format.
+ %% We expect the address to contain both container ID and link name since they are short.
+ ?assertMatch(<<"/queues/amq.dyn-rpc-client-rpc-client-receiver", _CarrotAndGUID/binary>>,
+ RespAddr),
+
+ %% Let's use a separate connection for the RPC server.
+ {_, SessionServer, LinkPair} = RpcServer = init(Config),
+ ReqQName = atom_to_binary(?FUNCTION_NAME),
+ ReqAddr = rabbitmq_amqp_address:queue(ReqQName),
+ {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, ReqQName, #{}),
+ {ok, ReceiverServer} = amqp10_client:attach_receiver_link(SessionServer, <<"rpc-server-receiver">>, ReqAddr, unsettled),
+ {ok, SenderServer} = amqp10_client:attach_sender_link(SessionServer, <<"rpc-server-sender">>, null),
+ ok = wait_for_credit(SenderServer),
+
+ {ok, SenderClient} = amqp10_client:attach_sender_link(SessionClient, <<"rpc-client-sender">>, ReqAddr),
+ wait_for_credit(SenderClient),
+ flush(attached),
+
+ ok = amqp10_client:send_msg(
+ SenderClient,
+ amqp10_msg:set_properties(
+ #{reply_to => RespAddr},
+ amqp10_msg:new(<<"t1">>, <<"hello">>))),
+ ok = wait_for_accepted(<<"t1">>),
+
+ {ok, ReqMsg} = amqp10_client:get_msg(ReceiverServer),
+ ReqBody = amqp10_msg:body_bin(ReqMsg),
+ RespBody = string:uppercase(ReqBody),
+ #{reply_to := ReplyTo} = amqp10_msg:properties(ReqMsg),
+ ok = amqp10_client:send_msg(
+ SenderServer,
+ amqp10_msg:set_properties(
+ #{to => ReplyTo},
+ amqp10_msg:new(<<"t2">>, RespBody))),
+ ok = wait_for_accepted(<<"t2">>),
+ ok = amqp10_client:accept_msg(ReceiverServer, ReqMsg),
+
+ {ok, RespMsg} = amqp10_client:get_msg(ReceiverClient),
+ ?assertEqual(<<"HELLO">>, amqp10_msg:body_bin(RespMsg)),
+ ok = amqp10_client:accept_msg(ReceiverClient, RespMsg),
+
+ ok = detach_link_sync(ReceiverServer),
+ ok = detach_link_sync(SenderClient),
+ {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, ReqQName),
+ ok = detach_link_sync(SenderServer),
+ ok = close(RpcServer),
+ ok = close_connection_sync(ConnectionClient).
+
+dynamic_terminus_delete(Config) ->
+ OpnConf = connection_config(Config),
+ {ok, Connection} = amqp10_client:open_connection(OpnConf),
+ {ok, Session1} = amqp10_client:begin_session_sync(Connection),
+ {ok, Session2} = amqp10_client:begin_session_sync(Connection),
+
+ Terminus = #{address => undefined,
+ dynamic => true,
+ capabilities => [<<"temporary-queue">>],
+ durable => none},
+ RcvAttachArgs = #{role => {receiver, Terminus, self()},
+ snd_settle_mode => unsettled,
+ rcv_settle_mode => first,
+ filter => #{}},
+ SndAttachArgs = #{role => {sender, Terminus},
+ snd_settle_mode => mixed,
+ rcv_settle_mode => first},
+ RcvAttachArgs1 = RcvAttachArgs#{name => <<"receiver 1">>},
+ RcvAttachArgs2 = RcvAttachArgs#{name => <<"receiver 2">>},
+ RcvAttachArgs3 = RcvAttachArgs#{name => <<"receiver 3">>},
+ SndAttachArgs1 = SndAttachArgs#{name => <<"sender 1">>},
+ SndAttachArgs2 = SndAttachArgs#{name => <<"sender 2">>},
+ SndAttachArgs3 = SndAttachArgs#{name => <<"sender 3">>},
+ {ok, _R1} = amqp10_client:attach_link(Session1, RcvAttachArgs1),
+ {ok, _R2} = amqp10_client:attach_link(Session2, RcvAttachArgs2),
+ {ok, R3} = amqp10_client:attach_link(Session2, RcvAttachArgs3),
+ {ok, _S1} = amqp10_client:attach_link(Session1, SndAttachArgs1),
+ {ok, _S2} = amqp10_client:attach_link(Session2, SndAttachArgs2),
+ {ok, S3} = amqp10_client:attach_link(Session2, SndAttachArgs3),
+ [receive {amqp10_event, {link, _LinkRef, attached}} -> ok
+ after 30000 -> ct:fail({missing_event, ?LINE})
+ end
+ || _ <- lists:seq(1, 6)],
+
+ %% We should now have 6 exclusive queues.
+ ?assertEqual(6, rpc(Config, rabbit_amqqueue, count, [])),
+
+ %% Since RabbitMQ uses the delete-on-close lifetime policy, the exclusive queue should be
+ %% "deleted at the point that the link which caused its creation ceases to exist" [3.5.10]
+ ok = detach_link_sync(R3),
+ ok = detach_link_sync(S3),
+ ?assertEqual(4, rpc(Config, rabbit_amqqueue, count, [])),
+
+ %% When a session is ended, the sessions's links cease to exist.
+ ok = end_session_sync(Session2),
+ eventually(?_assertEqual(2, rpc(Config, rabbit_amqqueue, count, []))),
+
+ %% When a connection is closed, the connection's links cease to exist.
+ ok = close_connection_sync(Connection),
+ eventually(?_assertEqual(0, rpc(Config, rabbit_amqqueue, count, []))),
+ ok.
+
priority_classic_queue(Config) ->
QArgs = #{<<"x-queue-type">> => {utf8, <<"classic">>},
<<"x-max-priority">> => {ulong, 10}},
diff --git a/deps/rabbit/test/amqp_jms_SUITE.erl b/deps/rabbit/test/amqp_jms_SUITE.erl
index a97bd5d68b0e..7a5462eda3b0 100644
--- a/deps/rabbit/test/amqp_jms_SUITE.erl
+++ b/deps/rabbit/test/amqp_jms_SUITE.erl
@@ -14,6 +14,10 @@
-compile(nowarn_export_all).
-compile(export_all).
+-import(rabbit_ct_broker_helpers,
+ [rpc/4]).
+-import(rabbit_ct_helpers,
+ [eventually/3]).
-import(amqp_utils,
[init/1,
close/1,
@@ -30,8 +34,15 @@ all() ->
groups() ->
[{cluster_size_1, [shuffle],
[
+ %% CT test case per Java class
+ jms_connection,
+ jms_temporary_queue,
+
+ %% CT test case per test in Java class JmsTest
message_types_jms_to_jms,
- message_types_jms_to_amqp
+ message_types_jms_to_amqp,
+ temporary_queue_rpc,
+ temporary_queue_delete
]
}].
@@ -54,7 +65,9 @@ end_per_suite(Config) ->
init_per_group(cluster_size_1, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
- Config1 = rabbit_ct_helpers:set_config(Config, {rmq_nodename_suffix, Suffix}),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config,
+ {rmq_nodename_suffix, Suffix}),
Config2 = rabbit_ct_helpers:merge_app_env(
Config1,
{rabbit,
@@ -82,6 +95,9 @@ init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
+ %% Assert that every testcase cleaned up.
+ eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, [])), 1000, 5),
+ eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, [])), 1000, 5),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
build_maven_test_project(Config) ->
@@ -98,67 +114,49 @@ build_maven_test_project(Config) ->
%% Testcases.
%% -------------------------------------------------------------------
+jms_connection(Config) ->
+ ok = run(?FUNCTION_NAME, [{"-Dtest=~s", [<<"JmsConnectionTest">>]}], Config).
+
+jms_temporary_queue(Config) ->
+ ok = run(?FUNCTION_NAME, [{"-Dtest=~s", [<<"JmsTemporaryQueueTest">>]}], Config).
+
%% Send different message types from JMS client to JMS client.
message_types_jms_to_jms(Config) ->
TestName = QName = atom_to_binary(?FUNCTION_NAME),
ok = declare_queue(QName, <<"quorum">>, Config),
- ok = run(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config),
+ ok = run_jms_test(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config),
ok = delete_queue(QName, Config).
%% Send different message types from JMS client to Erlang AMQP 1.0 client.
message_types_jms_to_amqp(Config) ->
+ TestName = atom_to_binary(?FUNCTION_NAME),
+ ok = run_jms_test(TestName, [], Config).
+
+temporary_queue_rpc(Config) ->
TestName = QName = atom_to_binary(?FUNCTION_NAME),
- ok = declare_queue(QName, <<"quorum">>, Config),
- Address = rabbitmq_amqp_address:queue(QName),
-
- %% The JMS client sends messaegs.
- ok = run(TestName, [{"-Dqueue=~ts", [Address]}], Config),
-
- %% The Erlang AMQP 1.0 client receives messages.
- OpnConf = connection_config(Config),
- {ok, Connection} = amqp10_client:open_connection(OpnConf),
- {ok, Session} = amqp10_client:begin_session_sync(Connection),
- {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled),
- {ok, Msg1} = amqp10_client:get_msg(Receiver),
- ?assertEqual(
- #'v1_0.amqp_value'{content = {utf8, <<"msg1🥕"/utf8>>}},
- amqp10_msg:body(Msg1)),
- {ok, Msg2} = amqp10_client:get_msg(Receiver),
- ?assertEqual(
- #'v1_0.amqp_value'{
- content = {map, [
- {{utf8, <<"key1">>}, {utf8, <<"value">>}},
- {{utf8, <<"key2">>}, true},
- {{utf8, <<"key3">>}, {double, -1.1}},
- {{utf8, <<"key4">>}, {long, -1}}
- ]}},
- amqp10_msg:body(Msg2)),
- {ok, Msg3} = amqp10_client:get_msg(Receiver),
- ?assertEqual(
- [
- #'v1_0.amqp_sequence'{
- content = [{utf8, <<"value">>},
- true,
- {double, -1.1},
- {long, -1}]}
- ],
- amqp10_msg:body(Msg3)),
-
- ok = detach_link_sync(Receiver),
- ok = end_session_sync(Session),
- ok = close_connection_sync(Connection),
+ ok = declare_queue(QName, <<"classic">>, Config),
+ ok = run_jms_test(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config),
ok = delete_queue(QName, Config).
+temporary_queue_delete(Config) ->
+ TestName = atom_to_binary(?FUNCTION_NAME),
+ ok = run_jms_test(TestName, [], Config).
+
%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------
+run_jms_test(TestName, JavaProps, Config) ->
+ run(TestName, [{"-Dtest=JmsTest#~ts", [TestName]} | JavaProps], Config).
+
run(TestName, JavaProps, Config) ->
TestProjectDir = ?config(data_dir, Config),
+
Cmd = [filename:join([TestProjectDir, "mvnw"]),
"test",
- {"-Dtest=JmsTest#~ts", [TestName]},
- {"-Drmq_broker_uri=~ts", [rabbit_ct_broker_helpers:node_uri(Config, 0)]}
+ {"-Drmq_broker_uri=~ts", [rabbit_ct_broker_helpers:node_uri(Config, 0)]},
+ {"-Dnodename=~ts", [rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)]},
+ {"-Drabbitmqctl.bin=~ts", [rabbit_ct_helpers:get_config(Config, rabbitmqctl_cmd)]}
] ++ JavaProps,
case rabbit_ct_helpers:exec(Cmd, [{cd, TestProjectDir}]) of
{ok, _Stdout_} ->
diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml b/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml
index cce3ecb58f45..8b06c85521b0 100644
--- a/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml
+++ b/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml
@@ -9,7 +9,9 @@
https://www.rabbitmq.com
5.10.2
+ 3.27.3
2.6.1
+ [0.5.0-SNAPSHOT,)
1.2.13
2.43.0
1.25.2
@@ -30,13 +32,24 @@
${qpid-jms-client.version}
test
-
ch.qos.logback
logback-classic
${logback.version}
test
+
+ com.rabbitmq.client
+ amqp-client
+ ${amqp-client.version}
+ test
+
+
+ org.assertj
+ assertj-core
+ ${assertj.version}
+ test
+
@@ -81,4 +94,16 @@
+
+
+
+
+ ossrh
+ https://oss.sonatype.org/content/repositories/snapshots
+ true
+ false
+
+
+
+
diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/Cli.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/Cli.java
new file mode 100644
index 000000000000..2dc08413eae4
--- /dev/null
+++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/Cli.java
@@ -0,0 +1,163 @@
+// The contents of this file are subject to the Mozilla Public License
+// Version 2.0 (the "License"); you may not use this file except in
+// compliance with the License. You may obtain a copy of the License
+// at https://www.mozilla.org/en-US/MPL/2.0/
+//
+// Software distributed under the License is distributed on an "AS IS"
+// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+// the License for the specific language governing rights and
+// limitations under the License.
+//
+// The Original Code is RabbitMQ.
+//
+// The Initial Developer of the Original Code is Pivotal Software, Inc.
+// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
+//
+package com.rabbitmq.amqp.tests.jms;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+final class Cli {
+
+ private Cli() {}
+
+ static void startBroker() {
+ rabbitmqctl("start_app");
+ }
+
+ static void stopBroker() {
+ rabbitmqctl("stop_app");
+ }
+
+ private static ProcessState rabbitmqctl(String command) {
+ return rabbitmqctl(command, nodename());
+ }
+
+ private static ProcessState rabbitmqctl(String command, String nodename) {
+ return executeCommand(rabbitmqctlCommand() + " -n '" + nodename + "'" + " " + command);
+ }
+
+ private static String rabbitmqctlCommand() {
+ return System.getProperty("rabbitmqctl.bin");
+ }
+
+ public static String nodename() {
+ return System.getProperty("nodename", "rabbit@" + hostname());
+ }
+
+ public static String hostname() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ try {
+ return executeCommand("hostname").output();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ private static ProcessState executeCommand(String command) {
+ return executeCommand(command, false);
+ }
+
+ private static ProcessState executeCommand(String command, boolean ignoreError) {
+ Process pr = executeCommandProcess(command);
+ InputStreamPumpState inputState = new InputStreamPumpState(pr.getInputStream());
+ InputStreamPumpState errorState = new InputStreamPumpState(pr.getErrorStream());
+
+ int ev = waitForExitValue(pr, inputState, errorState);
+ inputState.pump();
+ errorState.pump();
+ if (ev != 0 && !ignoreError) {
+ throw new RuntimeException(
+ "unexpected command exit value: "
+ + ev
+ + "\ncommand: "
+ + command
+ + "\n"
+ + "\nstdout:\n"
+ + inputState.buffer.toString()
+ + "\nstderr:\n"
+ + errorState.buffer.toString()
+ + "\n");
+ }
+ return new ProcessState(inputState);
+ }
+
+ private static int waitForExitValue(
+ Process pr, InputStreamPumpState inputState, InputStreamPumpState errorState) {
+ while (true) {
+ try {
+ inputState.pump();
+ errorState.pump();
+ pr.waitFor();
+ break;
+ } catch (InterruptedException ignored) {
+ }
+ }
+ return pr.exitValue();
+ }
+
+ private static Process executeCommandProcess(String command) {
+ String[] finalCommand;
+ if (System.getProperty("os.name").toLowerCase().contains("windows")) {
+ finalCommand = new String[4];
+ finalCommand[0] = "C:\\winnt\\system32\\cmd.exe";
+ finalCommand[1] = "/y";
+ finalCommand[2] = "/c";
+ finalCommand[3] = command;
+ } else {
+ finalCommand = new String[3];
+ finalCommand[0] = "/bin/sh";
+ finalCommand[1] = "-c";
+ finalCommand[2] = command;
+ }
+ try {
+ return Runtime.getRuntime().exec(finalCommand);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static class ProcessState {
+
+ private final InputStreamPumpState inputState;
+
+ ProcessState(InputStreamPumpState inputState) {
+ this.inputState = inputState;
+ }
+
+ String output() {
+ return inputState.buffer.toString();
+ }
+ }
+
+ private static class InputStreamPumpState {
+
+ private final BufferedReader reader;
+ private final StringBuilder buffer;
+
+ private InputStreamPumpState(InputStream in) {
+ this.reader = new BufferedReader(new InputStreamReader(in));
+ this.buffer = new StringBuilder();
+ }
+
+ void pump() {
+ String line;
+ while (true) {
+ try {
+ if ((line = reader.readLine()) == null) break;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ buffer.append(line).append("\n");
+ }
+ }
+ }
+}
diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java
new file mode 100644
index 000000000000..d526cbbee4ff
--- /dev/null
+++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java
@@ -0,0 +1,196 @@
+// The contents of this file are subject to the Mozilla Public License
+// Version 2.0 (the "License"); you may not use this file except in
+// compliance with the License. You may obtain a copy of the License
+// at https://www.mozilla.org/en-US/MPL/2.0/
+//
+// Software distributed under the License is distributed on an "AS IS"
+// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+// the License for the specific language governing rights and
+// limitations under the License.
+//
+// The Original Code is RabbitMQ.
+//
+// The Initial Developer of the Original Code is Pivotal Software, Inc.
+// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc.
+// and/or its subsidiaries. All rights reserved.
+//
+
+package com.rabbitmq.amqp.tests.jms;
+
+import static com.rabbitmq.amqp.tests.jms.Cli.startBroker;
+import static com.rabbitmq.amqp.tests.jms.Cli.stopBroker;
+import static com.rabbitmq.amqp.tests.jms.TestUtils.*;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import jakarta.jms.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Based on
+ * https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests.
+ */
+@JmsTestInfrastructure
+public class JmsConnectionTest {
+
+ String destination;
+
+ @Test
+ @Timeout(30)
+ public void testCreateConnection() throws Exception {
+ try (Connection connection = connection()) {
+ assertNotNull(connection);
+ }
+ }
+
+ @Test
+ @Timeout(30)
+ public void testCreateConnectionAndStart() throws Exception {
+ try (Connection connection = connection()) {
+ assertNotNull(connection);
+ connection.start();
+ }
+ }
+
+ @Test
+ @Timeout(30)
+ // Currently not supported by RabbitMQ.
+ @Disabled
+ public void testCreateWithDuplicateClientIdFails() throws Exception {
+ JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory();
+ JmsConnection connection1 = (JmsConnection) factory.createConnection();
+ connection1.setClientID("Test");
+ assertNotNull(connection1);
+ connection1.start();
+ JmsConnection connection2 = (JmsConnection) factory.createConnection();
+ try {
+ connection2.setClientID("Test");
+ fail("should have thrown a JMSException");
+ } catch (InvalidClientIDException ex) {
+ // OK
+ } catch (Exception unexpected) {
+ fail("Wrong exception type thrown: " + unexpected);
+ }
+
+ connection1.close();
+ connection2.close();
+ }
+
+ @Test
+ public void testSetClientIdAfterStartedFails() {
+ assertThrows(
+ JMSException.class,
+ () -> {
+ try (Connection connection = connection()) {
+ connection.setClientID("Test");
+ connection.start();
+ connection.setClientID("NewTest");
+ }
+ });
+ }
+
+ @Test
+ @Timeout(30)
+ public void testCreateConnectionAsSystemAdmin() throws Exception {
+ JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory();
+ factory.setUsername(adminUsername());
+ factory.setPassword(adminPassword());
+ try (Connection connection = factory.createConnection()) {
+ assertNotNull(connection);
+ connection.start();
+ }
+ }
+
+ @Test
+ @Timeout(30)
+ public void testCreateConnectionCallSystemAdmin() throws Exception {
+ try (Connection connection =
+ connectionFactory().createConnection(adminUsername(), adminPassword())) {
+ assertNotNull(connection);
+ connection.start();
+ }
+ }
+
+ @Test
+ @Timeout(30)
+ public void testCreateConnectionAsUnknwonUser() {
+ assertThrows(
+ JMSSecurityException.class,
+ () -> {
+ JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory();
+ factory.setUsername("unknown");
+ factory.setPassword("unknown");
+ try (Connection connection = factory.createConnection()) {
+ assertNotNull(connection);
+ connection.start();
+ }
+ });
+ }
+
+ @Test
+ @Timeout(30)
+ public void testCreateConnectionCallUnknwonUser() {
+ assertThrows(
+ JMSSecurityException.class,
+ () -> {
+ try (Connection connection = connectionFactory().createConnection("unknown", "unknown")) {
+ assertNotNull(connection);
+ connection.start();
+ }
+ });
+ }
+
+ @Test
+ @Timeout(30)
+ public void testBrokerStopWontHangConnectionClose() throws Exception {
+ Connection connection = connection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = queue(destination);
+ connection.start();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ Message m = session.createTextMessage("Sample text");
+ producer.send(m);
+
+ try {
+ stopBroker();
+ try {
+ connection.close();
+ } catch (Exception ex) {
+ fail("Should not have thrown an exception.");
+ }
+ } finally {
+ startBroker();
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void testConnectionExceptionBrokerStop() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ try (Connection connection = connection()) {
+ connection.setExceptionListener(exception -> latch.countDown());
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+
+ try {
+ stopBroker();
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ } finally {
+ startBroker();
+ }
+ }
+ }
+}
diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java
new file mode 100644
index 000000000000..ae60fa4b8a31
--- /dev/null
+++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java
@@ -0,0 +1,140 @@
+// The contents of this file are subject to the Mozilla Public License
+// Version 2.0 (the "License"); you may not use this file except in
+// compliance with the License. You may obtain a copy of the License
+// at https://www.mozilla.org/en-US/MPL/2.0/
+//
+// Software distributed under the License is distributed on an "AS IS"
+// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+// the License for the specific language governing rights and
+// limitations under the License.
+//
+// The Original Code is RabbitMQ.
+//
+// The Initial Developer of the Original Code is Pivotal Software, Inc.
+// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc.
+// and/or its subsidiaries. All rights reserved.
+//
+
+package com.rabbitmq.amqp.tests.jms;
+
+import static com.rabbitmq.amqp.tests.jms.TestUtils.brokerUri;
+import static com.rabbitmq.amqp.tests.jms.TestUtils.connection;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import jakarta.jms.*;
+import jakarta.jms.IllegalStateException;
+import java.util.UUID;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Based on
+ * https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests.
+ */
+public class JmsTemporaryQueueTest {
+
+ Connection connection;
+
+ @BeforeEach
+ void init() throws JMSException {
+ connection = connection();
+ }
+
+ @AfterEach
+ void tearDown() throws JMSException {
+ connection.close();
+ }
+
+ @Test
+ @Timeout(60)
+ public void testCreatePublishConsumeTemporaryQueue() throws Exception {
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ TemporaryQueue queue = session.createTemporaryQueue();
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageProducer producer = session.createProducer(queue);
+ String body = UUID.randomUUID().toString();
+ producer.send(session.createTextMessage(body));
+ assertEquals(body, consumer.receive(60_000).getBody(String.class));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testCantConsumeFromTemporaryQueueCreatedOnAnotherConnection() throws Exception {
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue tempQueue = session.createTemporaryQueue();
+ session.createConsumer(tempQueue);
+
+ Connection connection2 = new JmsConnectionFactory(brokerUri()).createConnection();
+ try {
+ Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try {
+ session2.createConsumer(tempQueue);
+ fail("should not be able to consumer from temporary queue from another connection");
+ } catch (InvalidDestinationException ide) {
+ // expected
+ }
+ } finally {
+ connection2.close();
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void testCantSendToTemporaryQueueFromClosedConnection() throws Exception {
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue tempQueue = session.createTemporaryQueue();
+
+ Connection connection2 = new JmsConnectionFactory(brokerUri()).createConnection();
+ try {
+ Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Message msg = session2.createMessage();
+ MessageProducer producer = session2.createProducer(tempQueue);
+
+ // Close the original connection
+ connection.close();
+
+ try {
+ producer.send(msg);
+ fail("should not be able to send to temporary queue from closed connection");
+ } catch (jakarta.jms.IllegalStateException ide) {
+ // expected
+ }
+ } finally {
+ connection2.close();
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void testCantDeleteTemporaryQueueWithConsumers() throws Exception {
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue tempQueue = session.createTemporaryQueue();
+ MessageConsumer consumer = session.createConsumer(tempQueue);
+
+ try {
+ tempQueue.delete();
+ fail("should not be able to delete temporary queue with active consumers");
+ } catch (IllegalStateException ide) {
+ // expected
+ }
+
+ consumer.close();
+
+ // Now it should be allowed
+ tempQueue.delete();
+ }
+}
diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java
index f5c5bffba2b2..71e736a4e016 100644
--- a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java
+++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java
@@ -1,13 +1,22 @@
package com.rabbitmq.amqp.tests.jms;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static com.rabbitmq.amqp.tests.jms.TestUtils.protonClient;
+import static com.rabbitmq.amqp.tests.jms.TestUtils.protonConnection;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.*;
import jakarta.jms.*;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import javax.naming.Context;
+
+import com.rabbitmq.qpid.protonj2.client.Client;
+import com.rabbitmq.qpid.protonj2.client.Delivery;
+import com.rabbitmq.qpid.protonj2.client.Receiver;
+import jakarta.jms.Queue;
import org.junit.jupiter.api.Test;
+@JmsTestInfrastructure
public class JmsTest {
private javax.naming.Context getContext() throws Exception{
@@ -95,20 +104,20 @@ public void message_types_jms_to_jms() throws Exception {
}
}
+ String destination;
+
@Test
public void message_types_jms_to_amqp() throws Exception {
Context context = getContext();
ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection");
+ Queue queue = TestUtils.queue(destination);
+ String msg1 = "msg1🥕";
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
- Destination queue = (Destination) context.lookup("myQueue");
MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
- connection.start();
// TextMessage
- String msg1 = "msg1🥕";
TextMessage textMessage = session.createTextMessage(msg1);
producer.send(textMessage);
@@ -128,5 +137,77 @@ public void message_types_jms_to_amqp() throws Exception {
streamMessage.writeLong(-1L);
producer.send(streamMessage);
}
+
+ try (Client client = protonClient();
+ com.rabbitmq.qpid.protonj2.client.Connection amqpConnection = protonConnection(client)) {
+ Receiver receiver = amqpConnection.openReceiver(queue.getQueueName());
+ Delivery delivery = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull(delivery);
+ assertEquals(msg1, delivery.message().body());
+
+ delivery = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull(delivery);
+ com.rabbitmq.qpid.protonj2.client.Message