diff --git a/.github/workflows/test-make-tests.yaml b/.github/workflows/test-make-tests.yaml index 9d33ebd12f3a..d24122a1f921 100644 --- a/.github/workflows/test-make-tests.yaml +++ b/.github/workflows/test-make-tests.yaml @@ -28,6 +28,7 @@ jobs: - parallel-ct-set-2 - parallel-ct-set-3 - parallel-ct-set-4 + - parallel-ct-set-5 - ct-amqp_client - ct-clustering_management - eunit ct-dead_lettering diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index bc99a83ee37a..fdb75d7d939b 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -92,8 +92,8 @@ -type max_message_size() :: undefined | non_neg_integer(). -type footer_opt() :: crc32 | adler32. --type attach_args() :: #{name => binary(), - role => attach_role(), +-type attach_args() :: #{name := binary(), + role := attach_role(), snd_settle_mode => snd_settle_mode(), rcv_settle_mode => rcv_settle_mode(), filter => filter(), @@ -739,13 +739,19 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) -> make_source(#{role := {sender, _}}) -> #'v1_0.source'{}; -make_source(#{role := {receiver, Source, _Pid}, - filter := Filter}) -> +make_source(#{role := {receiver, Source, _Pid}} = AttachArgs) -> Durable = translate_terminus_durability(maps:get(durable, Source, none)), + ExpiryPolicy = case Source of + #{expiry_policy := Policy} when is_binary(Policy) -> + {symbol, Policy}; + _ -> + undefined + end, Dynamic = maps:get(dynamic, Source, false), - TranslatedFilter = translate_filters(Filter), + TranslatedFilter = translate_filters(maps:get(filter, AttachArgs, #{})), #'v1_0.source'{address = make_address(Source), durable = {uint, Durable}, + expiry_policy = ExpiryPolicy, dynamic = Dynamic, filter = TranslatedFilter, capabilities = make_capabilities(Source)}. diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 8bc828da007c..96c1312eb8a1 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -236,15 +236,19 @@ define ct_master.erl {ok, Pid2, _} = peer:start(StartOpts#{name => "rabbit_shard2"}), {ok, Pid3, _} = peer:start(StartOpts#{name => "rabbit_shard3"}), {ok, Pid4, _} = peer:start(StartOpts#{name => "rabbit_shard4"}), + {ok, Pid5, _} = peer:start(StartOpts#{name => "rabbit_shard5"}), peer:call(Pid1, net_kernel, set_net_ticktime, [5]), peer:call(Pid2, net_kernel, set_net_ticktime, [5]), peer:call(Pid3, net_kernel, set_net_ticktime, [5]), peer:call(Pid4, net_kernel, set_net_ticktime, [5]), + peer:call(Pid5, net_kernel, set_net_ticktime, [5]), peer:call(Pid1, persistent_term, put, [rabbit_ct_tcp_port_base, 16000]), peer:call(Pid2, persistent_term, put, [rabbit_ct_tcp_port_base, 20000]), peer:call(Pid3, persistent_term, put, [rabbit_ct_tcp_port_base, 24000]), peer:call(Pid4, persistent_term, put, [rabbit_ct_tcp_port_base, 28000]), + peer:call(Pid5, persistent_term, put, [rabbit_ct_tcp_port_base, 32000]), [{[_], {ok, Results}}] = ct_master_fork:run("$1"), + peer:stop(Pid5), peer:stop(Pid4), peer:stop(Pid3), peer:stop(Pid2), @@ -258,7 +262,7 @@ endef PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management -PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control +PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit PARALLEL_CT_SET_2_A = cluster confirms_rejects consumer_timeout rabbit_access_control rabbit_confirms rabbit_core_metrics_gc rabbit_cuttlefish rabbit_db_binding rabbit_db_exchange @@ -276,13 +280,16 @@ PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit ra PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue +PARALLEL_CT_SET_5_A = rabbit_direct_reply_to_prop direct_reply_to_amqpl direct_reply_to_amqp + PARALLEL_CT_SET_1 = $(sort $(PARALLEL_CT_SET_1_A) $(PARALLEL_CT_SET_1_B) $(PARALLEL_CT_SET_1_C) $(PARALLEL_CT_SET_1_D)) PARALLEL_CT_SET_2 = $(sort $(PARALLEL_CT_SET_2_A) $(PARALLEL_CT_SET_2_B) $(PARALLEL_CT_SET_2_C) $(PARALLEL_CT_SET_2_D)) PARALLEL_CT_SET_3 = $(sort $(PARALLEL_CT_SET_3_A) $(PARALLEL_CT_SET_3_B) $(PARALLEL_CT_SET_3_C) $(PARALLEL_CT_SET_3_D)) PARALLEL_CT_SET_4 = $(sort $(PARALLEL_CT_SET_4_A) $(PARALLEL_CT_SET_4_B) $(PARALLEL_CT_SET_4_C) $(PARALLEL_CT_SET_4_D)) +PARALLEL_CT_SET_5 = $(PARALLEL_CT_SET_5_A) SEQUENTIAL_CT_SUITES = amqp_client clustering_management dead_lettering feature_flags metadata_store_clustering quorum_queue rabbit_stream_queue rabbit_fifo_prop -PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4) +PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4) $(PARALLEL_CT_SET_5) ifeq ($(filter-out $(SEQUENTIAL_CT_SUITES) $(PARALLEL_CT_SUITES),$(CT_SUITES)),) parallel-ct-sanity-check: @@ -308,16 +315,19 @@ define tpl_parallel_ct_test_spec {node, shard2, 'rabbit_shard2@localhost'}. {node, shard3, 'rabbit_shard3@localhost'}. {node, shard4, 'rabbit_shard4@localhost'}. +{node, shard5, 'rabbit_shard5@localhost'}. {define, 'Set1', [$(call comma_list,$(addsuffix _SUITE,$1))]}. {define, 'Set2', [$(call comma_list,$(addsuffix _SUITE,$2))]}. {define, 'Set3', [$(call comma_list,$(addsuffix _SUITE,$3))]}. {define, 'Set4', [$(call comma_list,$(addsuffix _SUITE,$4))]}. +{define, 'Set5', [$(call comma_list,$(addsuffix _SUITE,$5))]}. {suites, shard1, "test/", 'Set1'}. {suites, shard2, "test/", 'Set2'}. {suites, shard3, "test/", 'Set3'}. {suites, shard4, "test/", 'Set4'}. +{suites, shard5, "test/", 'Set5'}. endef define parallel_ct_set_target @@ -330,7 +340,7 @@ parallel-ct-set-$(1): test-build $$(call erlang,$$(call ct_master.erl,ct.set-$(1).spec),-sname parallel_ct_$(PROJECT)@localhost -hidden -kernel net_ticktime 5) endef -$(foreach set,1 2 3 4,$(eval $(call parallel_ct_set_target,$(set)))) +$(foreach set,1 2 3 4 5,$(eval $(call parallel_ct_set_target,$(set)))) # -------------------------------------------------------------------- # Compilation. diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index d1c7ea8a126a..39082c95c64f 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -228,7 +228,14 @@ convert_from(mc_amqp, Sections, Env) -> %% drop it, what else can we do? undefined end, - + ReplyTo = case unwrap_shortstr(ReplyTo0) of + <<"/queues/", Queue/binary>> -> + try cow_uri:urldecode(Queue) + catch error:_ -> undefined + end; + Other -> + Other + end, BP = #'P_basic'{message_id = MsgId091, delivery_mode = DelMode, expiration = Expiration, @@ -237,7 +244,7 @@ convert_from(mc_amqp, Sections, Env) -> [] -> undefined; AllHeaders -> AllHeaders end, - reply_to = unwrap_shortstr(ReplyTo0), + reply_to = ReplyTo, type = Type, app_id = unwrap_shortstr(GroupId), priority = Priority, @@ -349,7 +356,7 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) -> delivery_mode = DelMode, headers = Headers0, user_id = UserId, - reply_to = ReplyTo, + reply_to = ReplyTo0, type = Type, priority = Priority, app_id = AppId, @@ -382,6 +389,13 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) -> ttl = wrap(uint, Ttl), %% TODO: check Priority is a ubyte? priority = wrap(ubyte, Priority)}, + ReplyTo = case ReplyTo0 of + undefined -> + undefined; + _ -> + Queue = uri_string:quote(ReplyTo0), + {utf8, <<"/queues/", Queue/binary>>} + end, CorrId = case mc_util:urn_string_to_uuid(CorrId0) of {ok, CorrUUID} -> {uuid, CorrUUID}; @@ -389,18 +403,18 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) -> wrap(utf8, CorrId0) end, MsgId = case mc_util:urn_string_to_uuid(MsgId0) of - {ok, MsgUUID} -> - {uuid, MsgUUID}; - _ -> - wrap(utf8, MsgId0) - end, + {ok, MsgUUID} -> + {uuid, MsgUUID}; + _ -> + wrap(utf8, MsgId0) + end, P = case amqp10_section_header(?AMQP10_PROPERTIES_HEADER, Headers) of undefined -> #'v1_0.properties'{message_id = MsgId, user_id = wrap(binary, UserId), to = undefined, % subject = wrap(utf8, RKey), - reply_to = wrap(utf8, ReplyTo), + reply_to = ReplyTo, correlation_id = CorrId, content_type = wrap(symbol, ContentType), content_encoding = wrap(symbol, ContentEncoding), diff --git a/deps/rabbit/src/pid_recomposition.erl b/deps/rabbit/src/pid_recomposition.erl deleted file mode 100644 index 95f49e51be21..000000000000 --- a/deps/rabbit/src/pid_recomposition.erl +++ /dev/null @@ -1,60 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(pid_recomposition). - -%% API --export([ - to_binary/1, - from_binary/1, - decompose/1, - recompose/1 -]). - --define(TTB_PREFIX, 131). - --define(NEW_PID_EXT, 88). --define(ATOM_UTF8_EXT, 118). --define(SMALL_ATOM_UTF8_EXT, 119). - --spec decompose(pid()) -> #{atom() => any()}. -decompose(Pid) -> - from_binary(term_to_binary(Pid, [{minor_version, 2}])). - --spec from_binary(binary()) -> #{atom() => any()}. -from_binary(Bin) -> - <> = Bin, - {Node, Rest2} = case PidData of - <> -> - {Node0, Rest1}; - <> -> - {Node0, Rest1} - end, - <> = Rest2, - #{ - node => binary_to_atom(Node, utf8), - id => ID, - serial => Serial, - creation => Creation - }. - --spec to_binary(#{atom() => any()}) -> binary(). -to_binary(#{node := Node, id := ID, serial := Serial, creation := Creation}) -> - BinNode = atom_to_binary(Node), - NodeLen = byte_size(BinNode), - <>. - --spec recompose(#{atom() => any()}) -> pid(). -recompose(M) -> - binary_to_term(to_binary(M)). diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 2bca266ca773..c213e13abd00 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -1157,7 +1157,6 @@ pg_local_amqp_connection() -> pg_local_scope(Prefix) -> list_to_atom(io_lib:format("~s_~s", [Prefix, node()])). - -spec update_cluster_tags() -> 'ok'. update_cluster_tags() -> diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index dde44bb7d9bb..eb0178fe5352 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -87,8 +87,14 @@ handle_http_req(<<"GET">>, QName, fun(Q) -> {ok, NumMsgs, NumConsumers} = rabbit_amqqueue:stat(Q), - RespPayload = encode_queue(Q, NumMsgs, NumConsumers), - {ok, {<<"200">>, RespPayload, PermCaches}} + case rabbit_volatile_queue:is(QNameBin) andalso + not rabbit_volatile_queue:exists(QName) of + true -> + {error, not_found}; + false -> + RespPayload = encode_queue(Q, NumMsgs, NumConsumers), + {ok, {<<"200">>, RespPayload, PermCaches}} + end end) of {ok, Result} -> Result; diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 02200450852d..cd29844725f0 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -81,6 +81,10 @@ ?V_1_0_SYMBOL_REJECTED, ?V_1_0_SYMBOL_RELEASED, ?V_1_0_SYMBOL_MODIFIED]). +%% The queue process monitors our session process. When our session process +%% terminates (abnormally) any messages checked out to our session process +%% will be requeued. That's why the we only support RELEASED as the default outcome. +-define(DEFAULT_OUTCOME, #'v1_0.released'{}). -define(DEFAULT_EXCHANGE_NAME, <<>>). -define(PROTOCOL, amqp10). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -88,6 +92,7 @@ -define(CREDIT_REPLY_TIMEOUT, 30_000). %% Capability defined in amqp-bindmap-jms-v1.0-wd10 [5.2] and sent by Qpid JMS client. -define(CAP_TEMPORARY_QUEUE, <<"temporary-queue">>). +-define(CAP_VOLATILE_QUEUE, <<"rabbitmq:volatile-queue">>). -export([start_link/9, process_frame/2, @@ -96,7 +101,8 @@ check_resource_access/4, check_read_permitted_on_topic/4, reset_authz/2, - info/1 + info/1, + is_local/1 ]). -export([init/1, @@ -500,19 +506,15 @@ terminate(_Reason, #state{incoming_links = IncomingLinks, maps:foreach( fun (_, Link) -> rabbit_global_counters:publisher_deleted(?PROTOCOL), - maybe_delete_dynamic_queue(Link, Cfg) + maybe_delete_dynamic_classic_queue(Link, Cfg) end, IncomingLinks), maps:foreach( fun (_, Link) -> rabbit_global_counters:consumer_deleted(?PROTOCOL), - maybe_delete_dynamic_queue(Link, Cfg) + maybe_delete_dynamic_classic_queue(Link, Cfg) end, OutgoingLinks), ok = rabbit_queue_type:close(QStates). --spec list_local() -> [pid()]. -list_local() -> - pg:which_groups(pg_scope()). - -spec conserve_resources(pid(), rabbit_alarm:resource_alarm_source(), rabbit_alarm:resource_alert()) -> ok. @@ -523,6 +525,16 @@ conserve_resources(Pid, Source, {_, Conserve, _}) -> reset_authz(Pid, User) -> gen_server:cast(Pid, {reset_authz, User}). +handle_call({has_state, QName, QType}, + _From, + #state{queue_states = QStates} = State) -> + Reply = case rabbit_queue_type:module(QName, QStates) of + {ok, QType} -> + true; + _ -> + false + end, + reply(Reply, State); handle_call(infos, _From, State) -> reply(infos(State), State); handle_call(Msg, _From, State) -> @@ -645,15 +657,25 @@ log_error_and_close_session( WriterPid, Ch, #'v1_0.end'{error = Error}), {stop, {shutdown, Error}, State}. -%% Batch confirms / rejects to publishers. +%% If we receive consecutive confirms/rejects from queues, we batch them +%% to send fewer disposition frames to publishers. noreply_coalesce(#state{stashed_rejected = [], stashed_settled = [], stashed_down = [], stashed_eol = []} = State) -> noreply(State); -noreply_coalesce(State) -> - Timeout = 0, - {noreply, State, Timeout}. +noreply_coalesce(#state{outgoing_pending = Pending} = State) -> + case queue:is_empty(Pending) of + true -> + Timeout = 0, + {noreply, State, Timeout}; + false -> + %% We prioritise processing the Pending queue over batching confirms/rejects + %% because we must ensure to grant the next batch of link credit to the + %% volatile queue before processing the next delivery in our mailbox to + %% avoid the volatile queue dropping messages. + noreply(State) + end. noreply(State0) -> State = send_buffered(State0), @@ -1165,13 +1187,17 @@ handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, {OutgoingLinks, Unsettled, Pending, QStates} = case maps:take(HandleInt, OutgoingLinks0) of {#outgoing_link{queue_name = QName, + queue_type = QType, dynamic = Dynamic}, OutgoingLinks1} -> Ctag = handle_to_ctag(HandleInt), {Unsettled1, Pending1} = remove_outgoing_link(Ctag, Unsettled0, Pending0), case Dynamic of - true -> - delete_dynamic_queue(QName, Cfg), + true when QType =:= rabbit_classic_queue -> + delete_dynamic_classic_queue(QName, Cfg), {OutgoingLinks1, Unsettled1, Pending1, QStates0}; + true when QType =:= rabbit_volatile_queue -> + QStates1 = rabbit_queue_type:remove(QName, QStates0), + {OutgoingLinks1, Unsettled1, Pending1, QStates1}; false -> case rabbit_amqqueue:lookup(QName) of {ok, Q} -> @@ -1196,7 +1222,7 @@ handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, end, IncomingLinks = case maps:take(HandleInt, IncomingLinks0) of {IncomingLink, IncomingLinks1} -> - maybe_delete_dynamic_queue(IncomingLink, Cfg), + maybe_delete_dynamic_classic_queue(IncomingLink, Cfg), IncomingLinks1; error -> IncomingLinks0 @@ -1410,7 +1436,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, name = LinkName = {utf8, LinkNameBin}, handle = Handle = ?UINT(HandleInt), - source = Source0 = #'v1_0.source'{filter = DesiredFilter}, + source = Source0, snd_settle_mode = SndSettleMode, rcv_settle_mode = RcvSettleMode, max_message_size = MaybeMaxMessageSize, @@ -1433,11 +1459,14 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, %% client only for durable messages. {false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED} end, - case ensure_source(Source0, LinkNameBin, Vhost, User, ContainerId, - ReaderPid, PermCache0, TopicPermCache0) of + case ensure_source(Source0, SndSettled, LinkNameBin, + Vhost, User, ContainerId, ReaderPid, + PermCache0, TopicPermCache0) of {error, Reason} -> link_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach refused: ~tp", [Reason]); - {ok, QName = #resource{name = QNameBin}, Source, PermCache1, TopicPermCache} -> + {ok, QName = #resource{name = QNameBin}, + Source = #'v1_0.source'{filter = DesiredFilter}, + PermCache1, TopicPermCache} -> PermCache = check_resource_access(QName, read, User, PermCache1), case rabbit_amqqueue:with( QName, @@ -1511,14 +1540,8 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, initial_delivery_count = ?UINT(?INITIAL_DELIVERY_COUNT), snd_settle_mode = EffectiveSndSettleMode, rcv_settle_mode = RcvSettleMode, - %% The queue process monitors our session process. When our session process - %% terminates (abnormally) any messages checked out to our session process - %% will be requeued. That's why the we only support RELEASED as the default outcome. - source = Source#'v1_0.source'{ - default_outcome = #'v1_0.released'{}, - outcomes = outcomes(Source), - %% "the sending endpoint sets the filter actually in place" [3.5.3] - filter = EffectiveFilter}, + %% "the sending endpoint sets the filter actually in place" [3.5.3] + source = Source#'v1_0.source'{filter = EffectiveFilter}, role = ?AMQP_ROLE_SENDER, %% Echo back that we will respect the client's requested max-message-size. max_message_size = MaybeMaxMessageSize, @@ -2705,6 +2728,7 @@ maybe_grant_mgmt_link_credit(Credit, _, _) -> {Credit, []}. -spec ensure_source(#'v1_0.source'{}, + boolean(), binary(), rabbit_types:vhost(), rabbit_types:user(), @@ -2720,38 +2744,69 @@ maybe_grant_mgmt_link_credit(Credit, _, _) -> {error, term()}. ensure_source(#'v1_0.source'{ address = undefined, + durable = Durable, + expiry_policy = ExpiryPolicy, + timeout = Timeout, dynamic = true, %% We will reply with the actual node properties. dynamic_node_properties = _IgnoreDesiredProperties, capabilities = {array, symbol, Caps} } = Source0, - LinkName, Vhost, User, ContainerId, + SndSettled, LinkName, Vhost, User, ContainerId, ConnPid, PermCache0, TopicPermCache) -> - case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of - true -> - {QNameBin, Address, Props, PermCache} = - declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0), - Source = Source0#'v1_0.source'{ - address = {utf8, Address}, - %% While Khepri stores queue records durably, the terminus - %% - i.e. the existence of this receiver - is not stored durably. - durable = ?V_1_0_TERMINUS_DURABILITY_NONE, - expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH, - timeout = {uint, 0}, - dynamic_node_properties = Props, - distribution_mode = ?V_1_0_STD_DIST_MODE_MOVE, - capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE]) - }, + FFEnabled = rabbit_volatile_queue:ff_enabled(), + case maps:from_keys(Caps, true) of + #{{symbol, ?CAP_VOLATILE_QUEUE} := true} + when (Durable =:= undefined orelse Durable =:= ?V_1_0_TERMINUS_DURABILITY_NONE) andalso + ExpiryPolicy =:= ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH andalso + (Timeout =:= undefined orelse Timeout =:= {uint, 0}) andalso + SndSettled andalso + FFEnabled -> + %% create volatile queue + QNameBin = rabbit_volatile_queue:new_name(), + Source = #'v1_0.source'{ + address = {utf8, queue_address(QNameBin)}, + durable = Durable, + expiry_policy = ExpiryPolicy, + timeout = {uint, 0}, + dynamic = true, + dynamic_node_properties = dynamic_node_properties(), + distribution_mode = ?V_1_0_STD_DIST_MODE_MOVE, + capabilities = rabbit_amqp_util:capabilities([?CAP_VOLATILE_QUEUE]) + }, + QName = rabbit_misc:queue_resource(Vhost, QNameBin), + {ok, QName, Source, PermCache0, TopicPermCache}; + #{{symbol, ?CAP_TEMPORARY_QUEUE} := true} -> + %% create exclusive classic queue + {QNameBin, Address, PermCache} = + declare_exclusive_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0), + Source = #'v1_0.source'{ + address = {utf8, Address}, + %% While Khepri stores queue records durably, the terminus + %% - i.e. the existence of this receiver - is not stored durably. + durable = ?V_1_0_TERMINUS_DURABILITY_NONE, + expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH, + timeout = {uint, 0}, + dynamic = true, + dynamic_node_properties = dynamic_node_properties(), + distribution_mode = ?V_1_0_STD_DIST_MODE_MOVE, + default_outcome = ?DEFAULT_OUTCOME, + outcomes = outcomes(Source0), + capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE]) + }, QName = queue_resource(Vhost, QNameBin), {ok, QName, Source, PermCache, TopicPermCache}; - false -> - exit_not_implemented("Dynamic source not supported: ~p", [Source0]) + _ -> + exit_not_implemented("Dynamic source not supported: ~tp", [Source0]) end; -ensure_source(Source = #'v1_0.source'{dynamic = true}, _, _, _, _, _, _, _) -> - exit_not_implemented("Dynamic source not supported: ~p", [Source]); -ensure_source(Source = #'v1_0.source'{address = Address, +ensure_source(Source = #'v1_0.source'{dynamic = true}, _, _, _, _, _, _, _, _) -> + exit_not_implemented("Dynamic source not supported: ~tp", [Source]); +ensure_source(Source0 = #'v1_0.source'{address = Address, durable = Durable}, - _LinkName, Vhost, User, _ContainerId, _ConnPid, PermCache, TopicPermCache) -> + _SndSettle, _LinkName, Vhost, User, _ContainerId, + _ConnPid, PermCache, TopicPermCache) -> + Source = Source0#'v1_0.source'{default_outcome = ?DEFAULT_OUTCOME, + outcomes = outcomes(Source0)}, case Address of {utf8, <<"/queues/", QNameBinQuoted/binary>>} -> %% The only possible v2 source address format is: @@ -2843,9 +2898,9 @@ ensure_target(#'v1_0.target'{ LinkName, Vhost, User, ContainerId, ConnPid, PermCache0) -> case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of true -> - {QNameBin, Address, Props, PermCache1} = - declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0), - {ok, Exchange, PermCache} = check_exchange(?DEFAULT_EXCHANGE_NAME, User, Vhost, PermCache1), + {ok, Exchange, PermCache1} = check_exchange(?DEFAULT_EXCHANGE_NAME, User, Vhost, PermCache0), + {QNameBin, Address, PermCache} = + declare_exclusive_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache1), Target = #'v1_0.target'{ address = {utf8, Address}, %% While Khepri stores queue records durably, @@ -2854,7 +2909,7 @@ ensure_target(#'v1_0.target'{ expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH, timeout = {uint, 0}, dynamic = true, - dynamic_node_properties = Props, + dynamic_node_properties = dynamic_node_properties(), capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE]) }, {ok, Exchange, QNameBin, QNameBin, Target, PermCache}; @@ -2908,8 +2963,7 @@ check_exchange(XNameBin, User, Vhost, PermCache0) -> end, {ok, Exchange, PermCache}; {error, not_found} -> - link_error(?V_1_0_AMQP_ERROR_NOT_FOUND, - "no ~ts", [rabbit_misc:rs(XName)]) + link_error_not_found(XName) end. address_v1_permitted() -> @@ -3467,17 +3521,15 @@ error_if_absent(Kind, Vhost, Name) when is_list(Name) -> error_if_absent(Kind, Vhost, Name) when is_binary(Name) -> error_if_absent(rabbit_misc:r(Vhost, Kind, Name)). -error_if_absent(Resource = #resource{kind = Kind}) -> - Mod = case Kind of - exchange -> rabbit_exchange; - queue -> rabbit_amqqueue - end, - case Mod:exists(Resource) of - true -> - ok; - false -> - link_error(?V_1_0_AMQP_ERROR_NOT_FOUND, - "no ~ts", [rabbit_misc:rs(Resource)]) +error_if_absent(#resource{kind = exchange} = Name) -> + case rabbit_exchange:exists(Name) of + true -> ok; + false -> link_error_not_found(Name) + end; +error_if_absent(#resource{kind = queue} = Name) -> + case rabbit_amqqueue:exists(Name) of + true -> ok; + false -> link_error_not_found(Name) end. generate_queue_name_v1() -> @@ -3533,30 +3585,40 @@ declare_queue(QNameBin, end, {ok, PermCache}. -declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0) -> +declare_exclusive_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0) -> QNameBin = generate_queue_name_dynamic(ContainerId, LinkName), + Address = queue_address(QNameBin), {ok, PermCache} = declare_queue(QNameBin, Vhost, User, true, ConnPid, PermCache0), - QNameBinQuoted = uri_string:quote(QNameBin), - Address = <<"/queues/", QNameBinQuoted/binary>>, - Props = {map, [{{symbol, <<"lifetime-policy">>}, - {described, ?V_1_0_SYMBOL_DELETE_ON_CLOSE, {list, []}}}, - {{symbol, <<"supported-dist-modes">>}, - {array, symbol, [?V_1_0_STD_DIST_MODE_MOVE]}}]}, - {QNameBin, Address, Props, PermCache}. - -maybe_delete_dynamic_queue(#incoming_link{dynamic = true, - queue_name_bin = QNameBin}, - Cfg = #cfg{vhost = Vhost}) -> + {QNameBin, Address, PermCache}. + +-spec queue_address(unicode:unicode_binary()) -> + unicode:unicode_binary(). +queue_address(QueueName) -> + QueueNameQuoted = uri_string:quote(QueueName), + <<"/queues/", QueueNameQuoted/binary>>. + +dynamic_node_properties() -> + {map, [{{symbol, <<"lifetime-policy">>}, + {described, ?V_1_0_SYMBOL_DELETE_ON_CLOSE, {list, []}}}, + {{symbol, <<"supported-dist-modes">>}, + {array, symbol, [?V_1_0_STD_DIST_MODE_MOVE]}}]}. + +maybe_delete_dynamic_classic_queue( + #incoming_link{dynamic = true, + queue_name_bin = QNameBin}, + Cfg = #cfg{vhost = Vhost}) -> QName = queue_resource(Vhost, QNameBin), - delete_dynamic_queue(QName, Cfg); -maybe_delete_dynamic_queue(#outgoing_link{dynamic = true, - queue_name = QName}, - Cfg) -> - delete_dynamic_queue(QName, Cfg); -maybe_delete_dynamic_queue(_, _) -> + delete_dynamic_classic_queue(QName, Cfg); +maybe_delete_dynamic_classic_queue( + #outgoing_link{dynamic = true, + queue_type = rabbit_classic_queue, + queue_name = QName}, + Cfg) -> + delete_dynamic_classic_queue(QName, Cfg); +maybe_delete_dynamic_classic_queue(_, _) -> ok. -delete_dynamic_queue(QName, #cfg{user = #user{username = Username}}) -> +delete_dynamic_classic_queue(QName, #cfg{user = #user{username = Username}}) -> %% No real need to check for 'configure' access again since this queue is owned by %% this connection and the user had 'configure' access when the queue got declared. _ = rabbit_amqqueue:with( @@ -3888,13 +3950,37 @@ error_not_found(Resource) -> condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, description = {utf8, Description}}. +-spec link_error_not_found(rabbit_types:r(exchange | queue)) -> no_return(). +link_error_not_found(Resource) -> + link_error(?V_1_0_AMQP_ERROR_NOT_FOUND, + "no ~ts", + [rabbit_misc:rs(Resource)]). + is_valid_max(Val) -> is_integer(Val) andalso Val > 0 andalso Val =< ?UINT_MAX. +-spec list_local() -> [pid()]. +list_local() -> + pg:which_groups(pg_scope()). + +%% Returns true if Pid is a local AMQP session process. +-spec is_local(pid()) -> boolean(). +is_local(Pid) -> + pg:get_local_members(pg_scope(), Pid) =/= []. + +-spec pg_scope() -> atom(). pg_scope() -> - rabbit:pg_local_scope(amqp_session). + Key = pg_scope_amqp_session, + case persistent_term:get(Key, undefined) of + undefined -> + Val = rabbit:pg_local_scope(amqp_session), + persistent_term:put(Key, Val), + Val; + Val -> + Val + end. -spec cap_credit(rabbit_queue_type:credit(), pos_integer()) -> rabbit_queue_type:credit(). diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index b48188e89477..04ce9c93ae3b 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -129,6 +129,11 @@ msg_id }). +-record(direct_reply, { + consumer_tag :: rabbit_types:ctag(), + queue :: rabbit_misc:resource_name() + }). + -record(ch, {cfg :: #conf{}, %% limiter state, see rabbit_limiter limiter, @@ -159,8 +164,7 @@ %% a list of tags for published messages that were %% rejected but are yet to be sent to the client rejected, - %% used by "one shot RPC" (amq. - reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()}, + direct_reply :: none | #direct_reply{}, %% see rabbitmq-server#114 delivery_flow :: flow | noflow, interceptor_state, @@ -297,48 +301,14 @@ shutdown(Pid) -> send_command(Pid, Msg) -> gen_server2:cast(Pid, {command, Msg}). - --spec deliver_reply(binary(), mc:state()) -> 'ok'. -deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) -> - Nodes = nodes_with_hashes(), - case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of - {ok, Pid, Key} -> - delegate:invoke_no_result( - Pid, {?MODULE, deliver_reply_local, [Key, Message]}); - {error, _} -> - ok - end. - -%% We want to ensure people can't use this mechanism to send a message -%% to an arbitrary process and kill it! - --spec deliver_reply_local(pid(), binary(), mc:state()) -> 'ok'. - +%% Delete this function when feature flag rabbitmq_4.2.0 becomes required. +-spec deliver_reply_local(pid(), binary(), mc:state()) -> ok. deliver_reply_local(Pid, Key, Message) -> case pg_local:in_group(rabbit_channels, Pid) of true -> gen_server2:cast(Pid, {deliver_reply, Key, Message}); false -> ok end. -declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) -> - exists; -declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) -> - Nodes = nodes_with_hashes(), - case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of - {ok, Pid, Key} -> - Msg = {declare_fast_reply_to, Key}, - rabbit_misc:with_exit_handler( - rabbit_misc:const(not_found), - fun() -> gen_server2:call(Pid, Msg, infinity) end); - {error, _} -> - not_found - end; -declare_fast_reply_to(_) -> - not_found. - -nodes_with_hashes() -> - #{erlang:phash2(Node) => Node || Node <- rabbit_nodes:list_members()}. - -spec list() -> [pid()]. list() -> @@ -530,7 +500,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, unconfirmed = rabbit_confirms:init(), rejected = [], confirmed = [], - reply_consumer = none, + direct_reply = none, delivery_flow = Flow, interceptor_state = undefined, queue_states = rabbit_queue_type:init() @@ -587,6 +557,29 @@ handle_call({{info, Items}, Deadline}, _From, State) -> reply({error, Error}, State) end; +handle_call({has_state, #resource{virtual_host = Vhost, + name = Name}, rabbit_volatile_queue}, + _From, + #ch{cfg = #conf{virtual_host = Vhost}, + direct_reply = #direct_reply{queue = Name}} = State) -> + reply(true, State); +handle_call({has_state, _QName, _QType}, _From, State) -> + reply(false, State); +%% Delete below clause when feature flag rabbitmq_4.2.0 becomes required. +handle_call({declare_fast_reply_to, Key}, _From, State = #ch{direct_reply = Reply}) -> + Result = case Reply of + none -> + not_found; + #direct_reply{queue = QNameBin} -> + case rabbit_volatile_queue:key_from_name(QNameBin) of + {ok, Key} -> + exists; + _ -> + not_found + end + end, + reply(Result, State); + handle_call(refresh_config, _From, State = #ch{cfg = #conf{virtual_host = VHost} = Cfg}) -> reply(ok, State#ch{cfg = Cfg#conf{trace_state = rabbit_trace:init(VHost)}}); @@ -595,13 +588,6 @@ handle_call(refresh_interceptors, _From, State) -> IState = rabbit_channel_interceptor:init(State), reply(ok, State#ch{interceptor_state = IState}); -handle_call({declare_fast_reply_to, Key}, _From, - State = #ch{reply_consumer = Consumer}) -> - reply(case Consumer of - {_, _, Key} -> exists; - _ -> not_found - end, State); - handle_call(list_queue_states, _From, State = #ch{queue_states = QueueStates}) -> %% For testing of cleanup only %% HACK @@ -665,29 +651,31 @@ handle_cast({command, Msg}, State) -> ok = send(Msg, State), noreply(State); -handle_cast({deliver_reply, _K, _Del}, - State = #ch{cfg = #conf{state = closing}}) -> - noreply(State); -handle_cast({deliver_reply, _K, _Msg}, State = #ch{reply_consumer = none}) -> - noreply(State); +%% Delete below clause when feature flag rabbitmq_4.2.0 becomes required. handle_cast({deliver_reply, Key, Mc}, - State = #ch{cfg = #conf{writer_pid = WriterPid, - msg_interceptor_ctx = MsgIcptCtx}, - next_tag = DeliveryTag, - reply_consumer = {ConsumerTag, _Suffix, Key}}) -> - ExchName = mc:exchange(Mc), - [RoutingKey | _] = mc:routing_keys(Mc), - Content = outgoing_content(Mc, MsgIcptCtx), - ok = rabbit_writer:send_command( - WriterPid, - #'basic.deliver'{consumer_tag = ConsumerTag, - delivery_tag = DeliveryTag, - redelivered = false, - exchange = ExchName, - routing_key = RoutingKey}, - Content), + #ch{cfg = #conf{state = ChanState, + writer_pid = WriterPid, + msg_interceptor_ctx = MsgIcptCtx}, + next_tag = DeliveryTag, + direct_reply = #direct_reply{consumer_tag = Ctag, + queue = QNameBin}} = State) + when ChanState =/= closing -> + case rabbit_volatile_queue:key_from_name(QNameBin) of + {ok, Key} -> + ExchName = mc:exchange(Mc), + [RoutingKey | _] = mc:routing_keys(Mc), + Deliver = #'basic.deliver'{consumer_tag = Ctag, + delivery_tag = DeliveryTag, + redelivered = false, + exchange = ExchName, + routing_key = RoutingKey}, + Content = outgoing_content(Mc, MsgIcptCtx), + ok = rabbit_writer:send_command(WriterPid, Deliver, Content); + _ -> + ok + end, noreply(State); -handle_cast({deliver_reply, _K1, _}, State=#ch{reply_consumer = {_, _, _K2}}) -> +handle_cast({deliver_reply, _, _}, State) -> noreply(State); % Note: https://www.pivotaltracker.com/story/show/166962656 @@ -1084,20 +1072,19 @@ strip_cr_lf(NameBin) -> binary:replace(NameBin, [<<"\n">>, <<"\r">>], <<"">>, [global]). -maybe_set_fast_reply_to( - C = #content{properties = P = #'P_basic'{reply_to = - <<"amq.rabbitmq.reply-to">>}}, - #ch{reply_consumer = ReplyConsumer}) -> - case ReplyConsumer of - none -> rabbit_misc:protocol_error( - precondition_failed, - "fast reply consumer does not exist", []); - {_, Suf, _K} -> Rep = <<"amq.rabbitmq.reply-to.", Suf/binary>>, - rabbit_binary_generator:clear_encoded_content( - C#content{properties = P#'P_basic'{reply_to = Rep}}) +override_reply_to( + C0 = #content{properties = P = #'P_basic'{reply_to = <<"amq.rabbitmq.reply-to">>}}, + #ch{direct_reply = Reply}) -> + case Reply of + #direct_reply{queue = QNameBin} -> + C = C0#content{properties = P#'P_basic'{reply_to = QNameBin}}, + rabbit_binary_generator:clear_encoded_content(C); + none -> + rabbit_misc:protocol_error(precondition_failed, + "fast reply consumer does not exist", []) end; -maybe_set_fast_reply_to(C, _State) -> - C. +override_reply_to(Content, _) -> + Content. record_rejects([], State) -> State; @@ -1198,9 +1185,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_internal_exchange(Exchange), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. - DecodedContent = #content {properties = Props} = - maybe_set_fast_reply_to( - rabbit_binary_parser:ensure_content_decoded(Content), State1), + DecContent0 = rabbit_binary_parser:ensure_content_decoded(Content), + DecContent = #content{properties = Props} = override_reply_to(DecContent0, State1), check_expiration_header(Props), DoConfirm = Tx =/= none orelse ConfirmEnabled, {DeliveryOptions, State} = @@ -1217,7 +1203,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, case mc_amqpl:message(ExchangeName, RoutingKey, - DecodedContent) of + DecContent) of {error, Reason} -> rabbit_misc:precondition_failed("invalid message: ~tp", [Reason]); {ok, Message0} -> @@ -1225,7 +1211,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_user_id_header(Message0, User), Message = rabbit_msg_interceptor:intercept_incoming(Message0, MsgIcptCtx), QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}), - [deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames], Queues = rabbit_amqqueue:lookup_many(QNames), rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum, Username, TraceState), @@ -1301,30 +1286,46 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>, consumer_tag = CTag0, no_ack = NoAck, nowait = NoWait}, - _, State = #ch{reply_consumer = ReplyConsumer, - cfg = #conf{max_consumers = MaxConsumers}, + _, State = #ch{direct_reply = DirectReply0, + cfg = #conf{virtual_host = Vhost, + user = #user{username = Username}, + max_consumers = MaxConsumers}, + queue_states = QStates0, consumer_mapping = ConsumerMapping}) -> CurrentConsumers = maps:size(ConsumerMapping), case maps:find(CTag0, ConsumerMapping) of error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity' rabbit_misc:protocol_error( - not_allowed, "reached maximum (~B) of consumers per channel", [MaxConsumers]); + not_allowed, + "reached maximum (~B) of consumers per channel", [MaxConsumers]); error -> - case {ReplyConsumer, NoAck} of + case {DirectReply0, NoAck} of {none, true} -> CTag = case CTag0 of - <<>> -> rabbit_guid:binary( - rabbit_guid:gen_secure(), "amq.ctag"); - Other -> Other + <<>> -> + rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.ctag"); + Other -> + Other end, - %% Precalculate both suffix and key - {Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix(self()), - Consumer = {CTag, Suffix, Key}, - State1 = State#ch{reply_consumer = Consumer}, + QNameBin = rabbit_volatile_queue:new_name(), + QName = rabbit_misc:queue_resource(Vhost, QNameBin), + Q = rabbit_volatile_queue:new(QName), + Spec = #{no_ack => true, + channel_pid => self(), + mode => {simple_prefetch, 0}, + consumer_tag => CTag, + ok_msg => undefined, + acting_user => Username}, + {ok, QStates} = rabbit_queue_type:consume(Q, Spec, QStates0), + State1 = State#ch{direct_reply = #direct_reply{consumer_tag = CTag, + queue = QNameBin}, + queue_states = QStates}, case NoWait of - true -> {noreply, State1}; - false -> Rep = #'basic.consume_ok'{consumer_tag = CTag}, - {reply, Rep, State1} + true -> + {noreply, State1}; + false -> + Rep = #'basic.consume_ok'{consumer_tag = CTag}, + {reply, Rep, State1} end; {_, false} -> rabbit_misc:protocol_error( @@ -1341,17 +1342,25 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>, end; handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{reply_consumer = {ConsumerTag, _, _}}) -> - State1 = State#ch{reply_consumer = none}, + _, State = #ch{cfg = #conf{virtual_host = Vhost}, + direct_reply = #direct_reply{consumer_tag = ConsumerTag, + queue = QNameBin}, + queue_states = QStates}) -> + QName = rabbit_misc:queue_resource(Vhost, QNameBin), + QStates1 = rabbit_queue_type:remove(QName, QStates), + State1 = State#ch{direct_reply = none, + queue_states = QStates1}, case NoWait of - true -> {noreply, State1}; - false -> Rep = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, - {reply, Rep, State1} + true -> + {noreply, State1}; + false -> + Rep = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, + {reply, Rep, State1} end; handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ConsumerTag, - no_local = _, % FIXME: implement + no_local = _Unsupported, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait, @@ -1873,21 +1882,30 @@ record_sent(Type, QueueType, Tag, AckRequired, next_tag = DeliveryTag }) -> rabbit_global_counters:messages_delivered(amqp091, QueueType, 1), - ?INCR_STATS(queue_stats, QName, 1, - case {Type, AckRequired} of - {get, true} -> - rabbit_global_counters:messages_delivered_get_manual_ack(amqp091, QueueType, 1), - get; - {get, false} -> - rabbit_global_counters:messages_delivered_get_auto_ack(amqp091, QueueType, 1), - get_no_ack; - {deliver, true} -> - rabbit_global_counters:messages_delivered_consume_manual_ack(amqp091, QueueType, 1), - deliver; - {deliver, false} -> - rabbit_global_counters:messages_delivered_consume_auto_ack(amqp091, QueueType, 1), - deliver_no_ack - end, State), + Measure = case {Type, AckRequired} of + {get, true} -> + rabbit_global_counters:messages_delivered_get_manual_ack( + amqp091, QueueType, 1), + get; + {get, false} -> + rabbit_global_counters:messages_delivered_get_auto_ack( + amqp091, QueueType, 1), + get_no_ack; + {deliver, true} -> + rabbit_global_counters:messages_delivered_consume_manual_ack( + amqp091, QueueType, 1), + deliver; + {deliver, false} -> + rabbit_global_counters:messages_delivered_consume_auto_ack( + amqp091, QueueType, 1), + deliver_no_ack + end, + case rabbit_volatile_queue:is(QName#resource.name) of + true -> + ok; + false -> + ?INCR_STATS(queue_stats, QName, 1, Measure, State) + end, case Redelivered of true -> rabbit_global_counters:messages_redelivered(amqp091, QueueType, 1), @@ -2074,9 +2092,16 @@ deliver_to_queues(XName, case rabbit_event:stats_level(State, #ch.stats_timer) of fine -> ?INCR_STATS(exchange_stats, XName, 1, publish), - lists:foreach(fun(QName) -> - ?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) - end, QueueNames); + lists:foreach( + fun(#resource{name = QNameBin} = QName) -> + case rabbit_volatile_queue:is(QNameBin) of + true -> + ok; + false -> + ?INCR_STATS(queue_exchange_stats, + {QName, XName}, 1, publish) + end + end, QueueNames); _ -> ok end, @@ -2379,14 +2404,17 @@ handle_method(#'queue.bind'{queue = QueueNameBin, RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User); %% Note that all declares to these are effectively passive. If it %% exists it by definition has one consumer. -handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", - _/binary>> = QueueNameBin}, +handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", + _/binary>> = QueueNameBin0}, _ConnPid, _AuthzContext, _CollectorPid, VHost, _User) -> - StrippedQueueNameBin = strip_cr_lf(QueueNameBin), - QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin), - case declare_fast_reply_to(StrippedQueueNameBin) of - exists -> {ok, QueueName, 0, 1}; - not_found -> rabbit_amqqueue:not_found(QueueName) + QueueNameBin = strip_cr_lf(QueueNameBin0), + QueueName = rabbit_misc:queue_resource(VHost, QueueNameBin), + Q = rabbit_volatile_queue:new(QueueName), + case rabbit_queue_type:declare(Q, node()) of + {existing, _} -> + {ok, QueueName, 0, 1}; + {absent, _, _} -> + rabbit_amqqueue:not_found(QueueName) end; handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, @@ -2489,7 +2517,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, passive = true}, ConnPid, _AuthzContext, _CollectorPid, VHostPath, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), - QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin), + QueueName = rabbit_misc:queue_resource(VHostPath, StrippedQueueNameBin), Fun = fun (Q0) -> QStat = maybe_stat(NoWait, Q0), {QStat, Q0} @@ -2600,7 +2628,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName). -handle_deliver(CTag, Ack, Msgs, State) when is_list(Msgs) -> +handle_deliver(CTag, Ack, Msgs, State) -> lists:foldl(fun(Msg, S) -> handle_deliver0(CTag, Ack, Msg, S) end, State, Msgs). diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index f0c4bbda347c..0311dd8741a4 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -54,6 +54,7 @@ consume/3, cancel/3, handle_event/3, + supports_stateful_delivery/0, deliver/3, settle/5, credit_v1/5, @@ -62,8 +63,7 @@ info/2, state_info/1, capabilities/0, - notify_decorators/1, - is_stateful/0 + notify_decorators/1 ]). -export([delete_crashed/1, @@ -471,6 +471,8 @@ settlement_action(_Type, _QRef, [], Acc) -> settlement_action(Type, QRef, MsgSeqs, Acc) -> [{Type, QRef, MsgSeqs} | Acc]. +supports_stateful_delivery() -> true. + -spec deliver([{amqqueue:amqqueue(), state()}], Delivery :: mc:state(), rabbit_queue_type:delivery_options()) -> @@ -684,8 +686,6 @@ notify_decorators(Q) when ?is_amqqueue(Q) -> QPid = amqqueue:get_pid(Q), delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}). -is_stateful() -> true. - reject_seq_no(SeqNo, U0) -> reject_seq_no(SeqNo, U0, []). diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index fc255f6a4b0b..98eea8aba260 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -211,3 +211,10 @@ stability => stable, depends_on => ['rabbitmq_4.0.0'] }}). + +-rabbit_feature_flag( + {'rabbitmq_4.2.0', + #{desc => "Allows rolling upgrades to 4.2.x", + stability => stable, + depends_on => ['rabbitmq_4.1.0'] + }}). diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index 304b597985b0..4cc90e39ac4d 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -490,26 +490,39 @@ get_many_in_khepri(Names) -> get_many_in_ets(Table, [{Name, RouteInfos}]) when is_map(RouteInfos) -> - case ets:lookup(Table, Name) of + case ets_lookup(Table, Name) of [] -> []; [Q] -> [{Q, RouteInfos}] end; get_many_in_ets(Table, [Name]) -> - ets:lookup(Table, Name); + ets_lookup(Table, Name); get_many_in_ets(Table, Names) when is_list(Names) -> lists:filtermap(fun({Name, RouteInfos}) when is_map(RouteInfos) -> - case ets:lookup(Table, Name) of + case ets_lookup(Table, Name) of [] -> false; [Q] -> {true, {Q, RouteInfos}} end; (Name) -> - case ets:lookup(Table, Name) of + case ets_lookup(Table, Name) of [] -> false; [Q] -> {true, Q} end end, Names). +ets_lookup(Table, QName = #resource{name = QNameBin}) -> + case rabbit_volatile_queue:is(QNameBin) of + true -> + %% This queue record is not stored in the database. + %% We create it on the fly. + case rabbit_volatile_queue:new(QName) of + error -> []; + Q -> [Q] + end; + false -> + ets:lookup(Table, QName) + end. + %% ------------------------------------------------------------------- %% get(). %% ------------------------------------------------------------------- @@ -517,11 +530,19 @@ get_many_in_ets(Table, Names) when is_list(Names) -> -spec get(QName) -> Ret when QName :: rabbit_amqqueue:name(), Ret :: {ok, Queue :: amqqueue:amqqueue()} | {error, not_found}. -get(Name) -> - rabbit_khepri:handle_fallback( - #{mnesia => fun() -> get_in_mnesia(Name) end, - khepri => fun() -> get_in_khepri(Name) end - }). +get(#resource{name = NameBin} = Name) -> + case rabbit_volatile_queue:is(NameBin) of + true -> + case rabbit_volatile_queue:new(Name) of + error -> {error, not_found}; + Q -> {ok, Q} + end; + false -> + rabbit_khepri:handle_fallback( + #{mnesia => fun() -> get_in_mnesia(Name) end, + khepri => fun() -> get_in_khepri(Name) end + }) + end. get_in_mnesia(Name) -> rabbit_mnesia:dirty_read({?MNESIA_TABLE, Name}). @@ -776,11 +797,16 @@ update_durable_in_khepri(UpdateFun, FilterFun) -> %% %% @private -exists(QName) -> - rabbit_khepri:handle_fallback( - #{mnesia => fun() -> exists_in_mnesia(QName) end, - khepri => fun() -> exists_in_khepri(QName) end - }). +exists(#resource{name = NameBin} = Name) -> + case rabbit_volatile_queue:is(NameBin) of + true -> + rabbit_volatile_queue:exists(Name); + false -> + rabbit_khepri:handle_fallback( + #{mnesia => fun() -> exists_in_mnesia(Name) end, + khepri => fun() -> exists_in_khepri(Name) end + }) + end. exists_in_mnesia(QName) -> ets:member(?MNESIA_TABLE, QName). diff --git a/deps/rabbit/src/rabbit_db_topic_exchange.erl b/deps/rabbit/src/rabbit_db_topic_exchange.erl index 0bf0e03a4502..35e6cf59d56c 100644 --- a/deps/rabbit/src/rabbit_db_topic_exchange.erl +++ b/deps/rabbit/src/rabbit_db_topic_exchange.erl @@ -31,7 +31,7 @@ -type match_result() :: [rabbit_types:binding_destination() | {rabbit_amqqueue:name(), rabbit_types:binding_key()}]. --define(COMPILED_TOPIC_SPLIT_PATTERN, dot_binary_pattern). +-define(COMPILED_TOPIC_SPLIT_PATTERN, cp_dot). %% ------------------------------------------------------------------- %% set(). diff --git a/deps/rabbit/src/rabbit_direct_reply_to.erl b/deps/rabbit/src/rabbit_direct_reply_to.erl deleted file mode 100644 index 879fb90bcf22..000000000000 --- a/deps/rabbit/src/rabbit_direct_reply_to.erl +++ /dev/null @@ -1,51 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(rabbit_direct_reply_to). - --export([compute_key_and_suffix/1, - decode_reply_to/2]). - -%% This pid encoding function produces values that are of mostly fixed size -%% regardless of the node name length. --spec compute_key_and_suffix(pid()) -> - {binary(), binary()}. -compute_key_and_suffix(Pid) -> - Key = base64:encode(rabbit_guid:gen()), - - PidParts0 = #{node := Node} = pid_recomposition:decompose(Pid), - %% Note: we hash the entire node name. This is sufficient for our needs of shortening node name - %% in the TTB-encoded pid, and helps avoid doing the node name split for every single cluster member - %% in rabbit_channel:nodes_with_hashes/0. - %% - %% We also use a synthetic node prefix because the hash alone will be sufficient to - NodeHash = erlang:phash2(Node), - PidParts = maps:update(node, rabbit_nodes_common:make("reply", integer_to_list(NodeHash)), PidParts0), - RecomposedEncoded = base64:encode(pid_recomposition:to_binary(PidParts)), - - Suffix = <>, - {Key, Suffix}. - --spec decode_reply_to(binary(), #{non_neg_integer() => node()}) -> - {ok, pid(), binary()} | {error, any()}. -decode_reply_to(Bin, CandidateNodes) -> - try - [PidEnc, Key] = binary:split(Bin, <<".">>), - RawPidBin = base64:decode(PidEnc), - PidParts0 = #{node := ShortenedNodename} = pid_recomposition:from_binary(RawPidBin), - {_, NodeHash} = rabbit_nodes_common:parts(ShortenedNodename), - case maps:get(list_to_integer(NodeHash), CandidateNodes, undefined) of - undefined -> - {error, target_node_not_found}; - Candidate -> - PidParts = maps:update(node, Candidate, PidParts0), - {ok, pid_recomposition:recompose(PidParts), Key} - end - catch - error:_ -> - {error, unrecognized_format} - end. diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl index eb7e2b0d9cc3..2667f8681ac5 100644 --- a/deps/rabbit/src/rabbit_exchange.erl +++ b/deps/rabbit/src/rabbit_exchange.erl @@ -30,8 +30,7 @@ -type route_opts() :: #{return_binding_keys => boolean()}. -type route_infos() :: #{binding_keys => #{rabbit_types:binding_key() => true}}. -type route_return() :: list(rabbit_amqqueue:name() | - {rabbit_amqqueue:name(), route_infos()} | - {virtual_reply_queue, binary()}). + {rabbit_amqqueue:name(), route_infos()}). %%---------------------------------------------------------------------------- @@ -373,7 +372,7 @@ info_all(VHostPath, Items, Ref, AggregatorPid) -> AggregatorPid, Ref, fun(X) -> info(X, Items) end, list(VHostPath)). -spec route(rabbit_types:exchange(), mc:state()) -> - [rabbit_amqqueue:name() | {virtual_reply_queue, binary()}]. + [rabbit_amqqueue:name()]. route(Exchange, Message) -> route(Exchange, Message, #{}). @@ -384,15 +383,7 @@ route(#exchange{name = #resource{name = ?DEFAULT_EXCHANGE_NAME, Message, _Opts) -> RKs0 = mc:routing_keys(Message), RKs = lists:usort(RKs0), - [begin - case virtual_reply_queue(RK) of - false -> - rabbit_misc:r(VHost, queue, RK); - true -> - {virtual_reply_queue, RK} - end - end - || RK <- RKs]; + [rabbit_misc:r(VHost, queue, RK) || RK <- RKs]; route(X = #exchange{name = XName, decorators = Decorators}, Message, Opts) -> @@ -407,9 +398,6 @@ route(X = #exchange{name = XName, maps:keys(QNamesToBKeys) end. -virtual_reply_queue(<<"amq.rabbitmq.reply-to.", _/binary>>) -> true; -virtual_reply_queue(_) -> false. - route1(_, _, _, {[], _, QNames}) -> QNames; route1(Message, Decorators, Opts, diff --git a/deps/rabbit/src/rabbit_global_counters.erl b/deps/rabbit/src/rabbit_global_counters.erl index 32cc8964c037..0dd43c36a4d1 100644 --- a/deps/rabbit/src/rabbit_global_counters.erl +++ b/deps/rabbit/src/rabbit_global_counters.erl @@ -132,20 +132,19 @@ boot_step() -> [begin %% Protocol counters - Protocol = #{protocol => Proto}, - init(Protocol), + Labels = #{protocol => Proto}, + init(Labels), rabbit_msg_size_metrics:init(Proto), %% Protocol & Queue Type counters - init(Protocol#{queue_type => rabbit_classic_queue}), - init(Protocol#{queue_type => rabbit_quorum_queue}), - init(Protocol#{queue_type => rabbit_stream_queue}) + init(Labels#{queue_type => rabbit_classic_queue}), + init(Labels#{queue_type => rabbit_quorum_queue}), + init(Labels#{queue_type => rabbit_stream_queue}), + init(Labels#{queue_type => rabbit_volatile_queue}) end || Proto <- [amqp091, amqp10]], - %% Dead Letter counters - %% - %% Streams never dead letter. - %% + init(#{queue_type => rabbit_volatile_queue, dead_letter_strategy => disabled}, + [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER]), %% Source classic queue dead letters. init(#{queue_type => rabbit_classic_queue, dead_letter_strategy => disabled}, [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER, @@ -155,7 +154,6 @@ boot_step() -> [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER, ?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER, ?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER]), - %% %% Source quorum queue dead letters. %% Only quorum queues can dead letter due to delivery-limit exceeded. %% Only quorum queues support dead letter strategy at-least-once. diff --git a/deps/rabbit/src/rabbit_pid_codec.erl b/deps/rabbit/src/rabbit_pid_codec.erl new file mode 100644 index 000000000000..39002e7f9ba0 --- /dev/null +++ b/deps/rabbit/src/rabbit_pid_codec.erl @@ -0,0 +1,64 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_pid_codec). + +-export([decompose/1, + decompose_from_binary/1, + recompose/1, + recompose_to_binary/1]). + +-define(NEW_PID_EXT, 88). +-define(ATOM_UTF8_EXT, 118). +-define(SMALL_ATOM_UTF8_EXT, 119). +-define(TTB_PREFIX, 131). + +-type decomposed_pid() :: #{node := node(), + id := non_neg_integer(), + serial := non_neg_integer(), + creation := non_neg_integer()}. + +-spec decompose(pid()) -> decomposed_pid(). +decompose(Pid) -> + Bin = term_to_binary(Pid, [{minor_version, 2}]), + decompose_from_binary(Bin). + +-spec decompose_from_binary(binary()) -> decomposed_pid(). +decompose_from_binary(Bin) -> + <> = Bin, + {Node, Rest} = case PidData of + <> -> + {Node0, Rest1}; + <> -> + {Node0, Rest1} + end, + <> = Rest, + #{node => binary_to_atom(Node, utf8), + id => ID, + serial => Serial, + creation => Creation}. + +-spec recompose_to_binary(decomposed_pid()) -> binary(). +recompose_to_binary(#{node := Node, + id := ID, + serial := Serial, + creation := Creation}) -> + BinNode = atom_to_binary(Node), + NodeLen = byte_size(BinNode), + <>. + +-spec recompose(decomposed_pid()) -> pid(). +recompose(Map) -> + Bin = recompose_to_binary(Map), + binary_to_term(Bin). diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 936bffc69c23..028dca23f608 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -203,8 +203,6 @@ -callback policy_changed(amqqueue:amqqueue()) -> ok. --callback is_stateful() -> boolean(). - %% intitialise and return a queue type specific session context -callback init(amqqueue:amqqueue()) -> {ok, queue_state()} | {error, Reason :: term()}. @@ -216,7 +214,7 @@ -callback consume(amqqueue:amqqueue(), consume_spec(), queue_state()) -> - {ok, queue_state(), actions()} | + {ok, queue_state()} | {error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}. -callback cancel(amqqueue:amqqueue(), @@ -232,6 +230,8 @@ {ok, queue_state(), actions()} | {error, term()} | {eol, actions()} | {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. +-callback supports_stateful_delivery() -> boolean(). + -callback deliver([{amqqueue:amqqueue(), queue_state()}], Message :: mc:state(), Options :: delivery_options()) -> @@ -577,8 +577,10 @@ handle_down(Pid, QName, Info, State0) -> {ok, state(), actions()} | {eol, actions()} | {error, term()} | {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. handle_event(QRef, Evt, Ctxs) -> - %% events can arrive after a queue state has been cleared up - %% so need to be defensive here + %% We are defensive here and do not want to crash because + %% 1. Events can arrive after a queue state has been cleared up, and + %% 2. Direct Reply-to responder might send to a non-existing queue name + %% by using correctly encoded channel/session pid but wrong key. case get_ctx(QRef, Ctxs, undefined) of #ctx{module = Mod, state = State0} = Ctx -> @@ -660,7 +662,7 @@ deliver0(Qs, Message0, Options, #?STATE{} = State0) -> fun (Elem, Acc) -> {Q, BKeys} = queue_binding_keys(Elem), Mod = amqqueue:get_type(Q), - QState = case Mod:is_stateful() of + QState = case Mod:supports_stateful_delivery() of true -> #ctx{state = S} = get_ctx(Q, State0), S; @@ -743,7 +745,8 @@ credit(QName, CTag, DeliveryCount, Credit, Drain, Ctxs) -> -spec dequeue(amqqueue:amqqueue(), boolean(), pid(), rabbit_types:ctag(), state()) -> {ok, non_neg_integer(), term(), state()} | - {empty, state()} | rabbit_types:error(term()) | + {empty, state()} | + rabbit_types:error(term()) | {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) -> #ctx{state = State0} = Ctx = get_ctx(Q, Ctxs), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 01b8aac72ba3..3e8a1df80e2a 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -29,7 +29,8 @@ -export([settle/5, dequeue/5, consume/3, cancel/3]). -export([credit_v1/5, credit/6]). -export([purge/1]). --export([deliver/3]). +-export([supports_stateful_delivery/0, + deliver/3]). -export([dead_letter_publish/5]). -export([cluster_state/1, status/2]). -export([update_consumer_handler/8, update_consumer/9]). @@ -70,8 +71,7 @@ -export([is_enabled/0, is_compatible/3, - declare/2, - is_stateful/0]). + declare/2]). -export([validate_policy/1, merge_policy_value/3]). -export([force_shrink_member_to_current_member/2, @@ -1109,6 +1109,8 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName, ActingUser) -> {queue, QName}, {user_who_performed_action, ActingUser}]). +supports_stateful_delivery() -> true. + deliver0(QName, undefined, Msg, QState0) -> case rabbit_fifo_client:enqueue(QName, Msg, QState0) of {ok, _, _} = Res -> Res; @@ -1123,10 +1125,10 @@ deliver(QSs, Msg0, Options) -> Correlation = maps:get(correlation, Options, undefined), Msg = mc:prepare(store, Msg0), lists:foldl( - fun({Q, stateless}, {Qs, Actions}) -> + fun({Q, stateless}, Acc) -> QRef = amqqueue:get_pid(Q), ok = rabbit_fifo_client:untracked_enqueue([QRef], Msg), - {Qs, Actions}; + Acc; ({Q, S0}, {Qs, Actions}) -> QName = amqqueue:get_name(Q), case deliver0(QName, Correlation, Msg, S0) of @@ -2083,8 +2085,6 @@ notify_decorators(QName, F, A) -> ok end. -is_stateful() -> true. - force_shrink_member_to_current_member(VHost, Name) -> Node = node(), QName = rabbit_misc:r(VHost, queue, Name), diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 9368c369f33f..8600c6973287 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -23,6 +23,7 @@ consume/3, cancel/3, handle_event/3, + supports_stateful_delivery/0, deliver/3, settle/5, credit_v1/5, @@ -39,8 +40,7 @@ stat/1, format/2, capabilities/0, - notify_decorators/1, - is_stateful/0]). + notify_decorators/1]). -export([list_with_minimum_quorum/0]). @@ -536,6 +536,8 @@ credit(QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain, {State, []} end. +supports_stateful_delivery() -> true. + deliver(QSs, Msg, Options) -> lists:foldl( fun({Q, stateless}, {Qs, Actions}) -> @@ -1431,8 +1433,6 @@ list_with_minimum_quorum() -> map_size(RunningMembers) =< map_size(Members) div 2 + 1 end, rabbit_amqqueue:list_local_stream_queues()). -is_stateful() -> true. - get_nodes(Q) when ?is_amqqueue(Q) -> #{nodes := Nodes} = amqqueue:get_type_state(Q), Nodes. diff --git a/deps/rabbit/src/rabbit_trace.erl b/deps/rabbit/src/rabbit_trace.erl index 9400cf7dc0c3..234083a17e83 100644 --- a/deps/rabbit/src/rabbit_trace.erl +++ b/deps/rabbit/src/rabbit_trace.erl @@ -62,8 +62,6 @@ tap_in(Msg, QNames, ConnName, ChannelNum, Username, TraceX) -> RoutedQs = lists:map(fun(#resource{kind = queue, name = Name}) -> {longstr, Name}; ({#resource{kind = queue, name = Name}, _}) -> - {longstr, Name}; - ({virtual_reply_queue, Name}) -> {longstr, Name} end, QNames), trace(TraceX, Msg, <<"publish">>, XName, diff --git a/deps/rabbit/src/rabbit_volatile_queue.erl b/deps/rabbit/src/rabbit_volatile_queue.erl new file mode 100644 index 000000000000..79b82032a6eb --- /dev/null +++ b/deps/rabbit/src/rabbit_volatile_queue.erl @@ -0,0 +1,409 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +%% This queue type is volatile: +%% * Queue metadata is not stored in the metadata store. +%% * Messages in this queue are effectively transient and delivered at most once. +%% * Messages are not buffered. +%% * Messages are dropped immediately if consumer ran out of link credit. +-module(rabbit_volatile_queue). +-behaviour(rabbit_queue_type). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([new/1, + new_name/0, + is/1, + key_from_name/1, + pid_from_name/2, + exists/1, + ff_enabled/0, + local_cast/2, + local_call/2]). + +%% rabbit_queue_type callbacks +-export([declare/2, + supports_stateful_delivery/0, + deliver/3, + credit/6, + init/1, + close/1, + update/2, + consume/3, + cancel/3, + handle_event/3, + is_enabled/0, + is_compatible/3, + is_recoverable/1, + purge/1, + policy_changed/1, + stat/1, + format/2, + capabilities/0, + notify_decorators/1, + stop/1, + list_with_minimum_quorum/0, + drain/1, + revive/0, + queue_vm_stats_sups/0, + queue_vm_ets/0, + delete/4, + recover/2, + settle/5, + credit_v1/5, + dequeue/5, + state_info/1, + info/2, + policy_apply_to_name/0 + ]). + +-define(STATE, ?MODULE). +-record(?STATE, { + name :: rabbit_amqqueue:name(), + ctag :: undefined | rabbit_types:ctag(), + delivery_count :: undefined | rabbit_queue_type:delivery_count(), + credit :: undefined | rabbit_queue_type:credit(), + dropped = 0 :: non_neg_integer() + }). + +-opaque state() :: #?STATE{}. + +-export_type([state/0]). + +-define(PREFIX, "amq.rabbitmq.reply-to."). +-define(CP_DOT, cp_dot). + +-spec new(rabbit_amqqueue:name()) -> + amqqueue:amqqueue() | error. +new(#resource{virtual_host = Vhost, + name = <<"amq.rabbitmq.reply-to">>} = Name) -> + new0(Name, self(), Vhost); +new(#resource{virtual_host = Vhost, + name = NameBin} = Name) -> + case pid_from_name(NameBin, nodes_with_hashes()) of + {ok, Pid} when is_pid(Pid) -> + new0(Name, Pid, Vhost); + _ -> + error + end. + +new0(Name, Pid, Vhost) -> + amqqueue:new(Name, Pid, false, true, none, [], Vhost, #{}, ?MODULE). + +-spec is(rabbit_misc:resource_name()) -> + boolean(). +is(<>) -> + true; +is(Name) when is_binary(Name) -> + false. + +init(Q) -> + {ok, #?STATE{name = amqqueue:get_name(Q)}}. + +consume(_Q, Spec, State) -> + #{no_ack := true, + consumer_tag := Ctag, + mode := Mode} = Spec, + {DeliveryCount, Credit} = case Mode of + {credited, InitialDC} -> + {InitialDC, 0}; + {simple_prefetch, 0} -> + {undefined, undefined} + end, + {ok, State#?STATE{ctag = Ctag, + delivery_count = DeliveryCount, + credit = Credit}}. + +declare(Q, _Node) -> + #resource{name = NameBin} = Name = amqqueue:get_name(Q), + case NameBin of + <<"amq.rabbitmq.reply-to">> -> + {existing, Q}; + _ -> + case exists(Name) of + true -> + {existing, Q}; + false -> + {absent, Q, stopped} + end + end. + +-spec exists(rabbit_amqqueue:name()) -> boolean(). +exists(#resource{kind = queue, + name = QNameBin} = QName) -> + case pid_from_name(QNameBin, nodes_with_hashes()) of + {ok, Pid} when is_pid(Pid) -> + case ff_enabled() of + true -> + Request = {has_state, QName, ?MODULE}, + MFA = {?MODULE, local_call, [Request]}, + try delegate:invoke(Pid, MFA) + catch _:_ -> false + end; + false -> + case key_from_name(QNameBin) of + {ok, Key} -> + Msg = {declare_fast_reply_to, Key}, + try gen_server:call(Pid, Msg, infinity) of + exists -> true; + _ -> false + catch exit:_ -> false + end; + error -> + false + end + end; + _ -> + false + end. + +supports_stateful_delivery() -> + false. + +deliver(Qs, Msg, #{correlation := Corr}) + when Corr =/= undefined -> + Corrs = [Corr], + Actions = lists:map(fun({Q, stateless}) -> + deliver0(Q, Msg), + {settled, amqqueue:get_name(Q), Corrs} + end, Qs), + {[], Actions}; +deliver(Qs, Msg, #{}) -> + lists:foreach(fun({Q, stateless}) -> + deliver0(Q, Msg) + end, Qs), + {[], []}. + +deliver0(Q, Msg) -> + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), + case ff_enabled() of + true -> + Request = {queue_event, QName, {deliver, Msg}}, + MFA = {?MODULE, local_cast, [Request]}, + delegate:invoke_no_result(QPid, MFA); + false -> + case key_from_name(QName#resource.name) of + {ok, Key} -> + MFA = {rabbit_channel, deliver_reply_local, [Key, Msg]}, + delegate:invoke_no_result(QPid, MFA); + error -> + ok + end + end. + +-spec local_cast(pid(), term()) -> ok. +local_cast(Pid, Request) -> + %% Ensure clients can't send a message to an arbitrary process and kill it. + case is_local(Pid) of + true -> gen_server:cast(Pid, Request); + false -> ok + end. + +-spec local_call(pid(), term()) -> term(). +local_call(Pid, Request) -> + %% Ensure clients can't send a message to an arbitrary process and kill it. + case is_local(Pid) of + true -> gen_server:call(Pid, Request); + false -> exit({unknown_pid, Pid}) + end. + +is_local(Pid) -> + rabbit_amqp_session:is_local(Pid) orelse + pg_local:in_group(rabbit_channels, Pid). + +handle_event(QName, {deliver, Msg}, #?STATE{name = QName, + ctag = Ctag, + credit = undefined} = State) -> + {ok, State, deliver_actions(QName, Ctag, Msg)}; +handle_event(QName, {deliver, Msg}, #?STATE{name = QName, + ctag = Ctag, + delivery_count = DeliveryCount, + credit = Credit} = State0) + when Credit > 0 -> + State = State0#?STATE{delivery_count = serial_number:add(DeliveryCount, 1), + credit = Credit - 1}, + {ok, State, deliver_actions(QName, Ctag, Msg)}; +handle_event(QName, {deliver, _Msg}, #?STATE{name = QName, + dropped = Dropped} = State) -> + rabbit_global_counters:messages_dead_lettered(maxlen, ?MODULE, disabled, 1), + {ok, State#?STATE{dropped = Dropped + 1}, []}. + +deliver_actions(QName, Ctag, Mc) -> + Msgs = [{QName, self(), undefined, _Redelivered = false, Mc}], + [{deliver, Ctag, _AckRequired = false, Msgs}]. + +credit(_QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain, + #?STATE{delivery_count = DeliveryCountSnd} = State) -> + LinkCreditSnd = amqp10_util:link_credit_snd( + DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd), + {DeliveryCount, Credit} = case Drain of + true -> + {serial_number:add(DeliveryCountSnd, LinkCreditSnd), 0}; + false -> + {DeliveryCountSnd, LinkCreditSnd} + end, + {State#?STATE{delivery_count = DeliveryCount, + credit = Credit}, + [{credit_reply, CTag, DeliveryCount, Credit, _Available = 0, Drain}]}. + +close(#?STATE{}) -> + ok. + +update(_, #?STATE{} = State) -> + State. + +cancel(_, _, #?STATE{} = State) -> + {ok, State}. + +is_enabled() -> + true. + +ff_enabled() -> + rabbit_feature_flags:is_enabled('rabbitmq_4.2.0'). + +is_compatible(_, _, _) -> + true. + +is_recoverable(_) -> + false. + +purge(_) -> + {ok, 0}. + +policy_changed(_) -> + ok. + +notify_decorators(_) -> + ok. + +stat(_) -> + {ok, 0, 1}. + +format(_, _) -> + []. + +capabilities() -> + #{unsupported_policies => [], + queue_arguments => [], + consumer_arguments => [], + amqp_capabilities => [], + server_named => false, + rebalance_module => undefined, + can_redeliver => false , + is_replicable => false}. + +stop(_) -> + ok. + +list_with_minimum_quorum() -> + []. + +drain(_) -> + ok. + +revive() -> + ok. + +queue_vm_stats_sups() -> + {[], []}. + +queue_vm_ets() -> + {[], []}. + +delete(_, _, _, _) -> + {ok, 0}. + +recover(_, _) -> + {[], []}. + +settle(_, _, _, _, #?STATE{} = State) -> + {State, []}. + +credit_v1(_, _, _, _, #?STATE{} = State) -> + {State, []}. + +dequeue(_, _, _, _, #?STATE{name = Name}) -> + {protocol_error, not_implemented, + "basic.get not supported by volatile ~ts", + [rabbit_misc:rs(Name)]}. + +state_info(#?STATE{}) -> + #{}. + +info(_, _) -> + []. + +policy_apply_to_name() -> + <<>>. + +-spec new_name() -> + rabbit_misc:resource_name(). +new_name() -> + EncodedPid = encode_pid(self()), + EncodedKey = base64:encode(rabbit_guid:gen()), + <>. + +%% This pid encoding function produces values that are of mostly fixed size +%% regardless of the node name length. +encode_pid(Pid) -> + PidParts0 = #{node := Node} = rabbit_pid_codec:decompose(Pid), + %% Note: we hash the entire node name. This is sufficient for our needs of shortening node name + %% in the TTB-encoded pid, and helps avoid doing the node name split for every single cluster member + %% in rabbit_nodes:all_running_with_hashes/0. + %% + %% We also use a synthetic node prefix because the hash alone will be sufficient to + NodeHash = erlang:phash2(Node), + PidParts = maps:update(node, + rabbit_nodes_common:make("reply", integer_to_list(NodeHash)), + PidParts0), + base64:encode(rabbit_pid_codec:recompose_to_binary(PidParts)). + +-spec pid_from_name(rabbit_misc:resource_name(), + #{non_neg_integer() => node()}) -> + {ok, pid()} | error. +pid_from_name(<>, CandidateNodes) -> + Cp = case persistent_term:get(?CP_DOT, undefined) of + undefined -> + P = binary:compile_pattern(<<".">>), + persistent_term:put(?CP_DOT, P), + P; + P -> + P + end, + try + [PidBase64, _KeyBase64] = binary:split(Bin, Cp), + PidBin = base64:decode(PidBase64), + PidParts0 = #{node := ShortenedNodename} = rabbit_pid_codec:decompose_from_binary(PidBin), + {_, NodeHash} = rabbit_nodes_common:parts(ShortenedNodename), + case maps:get(list_to_integer(NodeHash), CandidateNodes, undefined) of + undefined -> + error; + Candidate -> + PidParts = maps:update(node, Candidate, PidParts0), + {ok, rabbit_pid_codec:recompose(PidParts)} + end + catch error:_ -> error + end; +pid_from_name(_, _) -> + error. + +%% Returns the base 64 encoded key. +-spec key_from_name(rabbit_misc:resource_name()) -> + {ok, binary()} | error. +key_from_name(<>) -> + case binary:split(Suffix, <<".">>) of + [_Pid, Key] -> + {ok, Key}; + _ -> + error + end; +key_from_name(_) -> + error. + +nodes_with_hashes() -> + #{erlang:phash2(Node) => Node || Node <- rabbit_nodes:list_members()}. diff --git a/deps/rabbit/test/amqp_auth_SUITE.erl b/deps/rabbit/test/amqp_auth_SUITE.erl index 49f5b0c40d2c..dbb326c8aee5 100644 --- a/deps/rabbit/test/amqp_auth_SUITE.erl +++ b/deps/rabbit/test/amqp_auth_SUITE.erl @@ -56,7 +56,8 @@ groups() -> [ %% authz attach_source_queue, - attach_source_queue_dynamic, + attach_source_queue_dynamic_exclusive, + attach_source_queue_dynamic_volatile, attach_target_exchange, attach_target_topic_exchange, attach_target_queue, @@ -447,7 +448,7 @@ attach_source_queue(Config) -> end, ok = close_connection_sync(Conn). -attach_source_queue_dynamic(Config) -> +attach_source_queue_dynamic_exclusive(Config) -> OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), @@ -480,6 +481,41 @@ attach_source_queue_dynamic(Config) -> end, ok = close_connection_sync(Connection). +attach_source_queue_dynamic_volatile(Config) -> + ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0'), + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + %% missing read permission on volatile queue + ok = set_permissions(Config, <<".*">>, <<".*">>, <<>>), + + Source = #{address => undefined, + durable => none, + expiry_policy => <<"link-detach">>, + dynamic => true, + capabilities => [<<"rabbitmq:volatile-queue">>]}, + AttachArgs = #{name => <<"receiver">>, + role => {receiver, Source, self()}, + snd_settle_mode => settled, + rcv_settle_mode => first}, + {ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs), + receive {amqp10_event, + {session, Session, + {ended, Error}}} -> + #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, Description}} = Error, + ?assertEqual( + match, + re:run(Description, + <<"^read access to queue 'amq\.rabbitmq\.reply-to\..*' in vhost " + "'test vhost' refused for user 'test user'$">>, + [{capture, none}])) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) + end, + ok = close_connection_sync(Connection). + attach_target_exchange(Config) -> XName = <<"amq.fanout">>, Address1 = rabbitmq_amqp_address:exchange(XName), diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 1fb8f3fc74e6..39680de6706d 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -37,6 +37,7 @@ wait_for_credit/1, wait_for_accepts/1, send_messages/3, send_messages/4, + receive_messages/2, detach_link_sync/1, end_session_sync/1, wait_for_session_end/1, @@ -3428,8 +3429,7 @@ max_message_size_server_to_client(Config) -> role => {receiver, #{address => Address, durable => configuration}, self()}, snd_settle_mode => unsettled, - rcv_settle_mode => first, - filter => #{}}, + rcv_settle_mode => first}, {ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs), {ok, Msg} = amqp10_client:get_msg(Receiver), ?assertEqual([PayloadSmallEnough], amqp10_msg:body(Msg)), @@ -5009,8 +5009,7 @@ dynamic_source_rpc(Config) -> AttachArgs = #{name => <<"rpc-client-receiver🥕"/utf8>>, role => {receiver, Source, self()}, snd_settle_mode => unsettled, - rcv_settle_mode => first, - filter => #{}}, + rcv_settle_mode => first}, {ok, ReceiverClient} = amqp10_client:attach_link(SessionClient, AttachArgs), RespAddr = receive {amqp10_event, {link, ReceiverClient, {attached, Attach}}} -> #'v1_0.attach'{ @@ -5081,8 +5080,7 @@ dynamic_terminus_delete(Config) -> durable => none}, RcvAttachArgs = #{role => {receiver, Terminus, self()}, snd_settle_mode => unsettled, - rcv_settle_mode => first, - filter => #{}}, + rcv_settle_mode => first}, SndAttachArgs = #{role => {sender, Terminus}, snd_settle_mode => mixed, rcv_settle_mode => first}, @@ -5768,7 +5766,6 @@ footer_checksum(FooterOpt, Config) -> durable => configuration}, self()}, snd_settle_mode => settled, rcv_settle_mode => first, - filter => #{}, footer_opt => FooterOpt}, SndAttachArgs = #{name => <<"my sender">>, role => {sender, #{address => Addr, @@ -6873,19 +6870,6 @@ drain_queue(Session, Address, N) -> ok = amqp10_client:detach_link(Receiver), {ok, Msgs}. -receive_messages(Receiver, N) -> - receive_messages0(Receiver, N, []). - -receive_messages0(_Receiver, 0, Acc) -> - lists:reverse(Acc); -receive_messages0(Receiver, N, Acc) -> - receive - {amqp10_msg, Receiver, Msg} -> - receive_messages0(Receiver, N - 1, [Msg | Acc]) - after 30000 -> - ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}}) - end. - count_received_messages(Receiver) -> count_received_messages0(Receiver, 0). diff --git a/deps/rabbit/test/amqp_utils.erl b/deps/rabbit/test/amqp_utils.erl index df6599c4ca07..73273443ee94 100644 --- a/deps/rabbit/test/amqp_utils.erl +++ b/deps/rabbit/test/amqp_utils.erl @@ -18,6 +18,7 @@ wait_for_accepts/1, send_message/2, send_messages/3, send_messages/4, + receive_messages/2, detach_link_sync/1, end_session_sync/1, wait_for_session_end/1, @@ -118,6 +119,19 @@ send_messages(Sender, Left, Settled, BodySuffix) -> ok = send_message(Sender, Msg), send_messages(Sender, Left - 1, Settled, BodySuffix). +receive_messages(Receiver, Num) -> + receive_messages0(Receiver, Num, []). + +receive_messages0(_Receiver, 0, Acc) -> + lists:reverse(Acc); +receive_messages0(Receiver, N, Acc) -> + receive + {amqp10_msg, Receiver, Msg} -> + receive_messages0(Receiver, N - 1, [Msg | Acc]) + after 20_000 -> + ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}}) + end. + detach_link_sync(Link) -> ok = amqp10_client:detach_link(Link), ok = wait_for_link_detach(Link). diff --git a/deps/rabbit/test/direct_reply_to_amqp_SUITE.erl b/deps/rabbit/test/direct_reply_to_amqp_SUITE.erl new file mode 100644 index 000000000000..bc4c18290e4f --- /dev/null +++ b/deps/rabbit/test/direct_reply_to_amqp_SUITE.erl @@ -0,0 +1,631 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(direct_reply_to_amqp_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). + +-compile([nowarn_export_all, + export_all]). + +-import(amqp_utils, + [init/1, init/2, + connection_config/1, connection_config/2, + flush/1, + wait_for_credit/1, + wait_for_accepts/1, + send_messages/3, + receive_messages/2, + detach_link_sync/1, + end_session_sync/1, + close_connection_sync/1]). + +all() -> + [ + {group, cluster_size_1}, + {group, cluster_size_3} + ]. + +groups() -> + [ + {cluster_size_1, [], + [ + responder_attaches_queue_target, + many_replies, + many_volatile_queues_same_session + ]}, + {cluster_size_3, [shuffle], + [ + rpc_new_to_old_node, + rpc_old_to_new_node + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), + rabbit_ct_helpers:log_environment(), + Config. + +end_per_suite(Config) -> + Config. + +init_per_group(Group, Config) -> + Nodes = case Group of + cluster_size_1 -> 1; + cluster_size_3 -> 3 + end, + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config( + Config, [{rmq_nodes_count, Nodes}, + {rmq_nodename_suffix, Suffix}]), + Config2 = rabbit_ct_helpers:run_setup_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + case rabbit_ct_broker_helpers:enable_feature_flag(Config2, 'rabbitmq_4.2.0') of + ok -> + Config2; + {skip, _} = Skip -> + Skip + end. + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% Responder attaches to a "/queues/amq.rabbitmq.reply-to." target. +responder_attaches_queue_target(Config) -> + RequestQueue = atom_to_binary(?FUNCTION_NAME), + AddrRequestQueue = rabbitmq_amqp_address:queue(RequestQueue), + + {ConnResponder, SessionResponder, LinkPairResponder} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPairResponder, RequestQueue, #{}), + + OpnConfRequester0 = connection_config(Config), + OpnConfRequester = OpnConfRequester0#{container_id := <<"requester">>, + notify_with_performative => true}, + {ok, ConnRequester} = amqp10_client:open_connection(OpnConfRequester), + {ok, SessionRequester} = amqp10_client:begin_session_sync(ConnRequester), + {ok, ReceiverRequester} = amqp10_client:attach_link(SessionRequester, attach_args()), + AddrVolQ = receive {amqp10_event, {link, ReceiverRequester, {attached, Attach}}} -> + #'v1_0.attach'{ + source = #'v1_0.source'{ + address = {utf8, AddressVolatileQueue}, + dynamic = true}} = Attach, + AddressVolatileQueue + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:flow_link_credit(ReceiverRequester, 3, never), + + {ok, SenderRequester} = amqp10_client:attach_sender_link_sync( + SessionRequester, <<"sender requester">>, AddrRequestQueue), + ok = wait_for_credit(SenderRequester), + + RpcId = <<"RPC message ID">>, + ok = amqp10_client:send_msg( + SenderRequester, + amqp10_msg:set_properties( + #{message_id => RpcId, + reply_to => AddrVolQ}, + amqp10_msg:new(<<"t1">>, <<"request-1">>))), + receive {amqp10_disposition, {accepted, <<"t1">>}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, ReceiverResponder} = amqp10_client:attach_receiver_link( + SessionResponder, <<"receiver responder">>, + AddrRequestQueue, unsettled), + {ok, RequestMsg} = amqp10_client:get_msg(ReceiverResponder), + ok = amqp10_client:accept_msg(ReceiverResponder, RequestMsg), + ?assertEqual(<<"request-1">>, amqp10_msg:body_bin(RequestMsg)), + #{message_id := RpcId, + reply_to := ReplyToAddr} = amqp10_msg:properties(RequestMsg), + ?assertMatch(<<"/queues/amq.rabbitmq.reply-to.", _/binary>>, ReplyToAddr), + + %% The metadata store should store only the request queue. + ?assertEqual(1, rabbit_ct_broker_helpers:rpc(Config, rabbit_db_queue, count, [])), + + {ok, #{queue := ReplyQ}} = rabbitmq_amqp_address:to_map(ReplyToAddr), + ?assertMatch({ok, #{vhost := <<"/">>, + durable := false, + type := <<"rabbit_volatile_queue">>, + message_count := 0, + consumer_count := 1}}, + rabbitmq_amqp_client:get_queue(LinkPairResponder, ReplyQ)), + + {ok, SenderResponder1} = amqp10_client:attach_sender_link_sync( + SessionResponder, <<"sender responder unsettled">>, + ReplyToAddr, unsettled), + {ok, SenderResponder2} = amqp10_client:attach_sender_link_sync( + SessionResponder, <<"sender responder settled">>, + ReplyToAddr, settled), + ok = wait_for_credit(SenderResponder1), + ok = wait_for_credit(SenderResponder2), + flush(attached), + ?assertMatch(#{publishers := 3, + consumers := 2}, + maps:get(#{protocol => amqp10}, get_global_counters(Config))), + + %% Multiple responders stream back multiple replies for the single request. + %% "AMQP is commonly used in publish/subscribe systems where copies of a single + %% original message are distributed to zero or many subscribers. AMQP permits + %% zero or multiple responses to a message with the reply-to property set, + %% which can be correlated using the correlation-id property." + %% [http-over-amqp-v1.0-wd06] + ok = amqp10_client:send_msg( + SenderResponder1, + amqp10_msg:set_properties( + #{message_id => <<"reply 1">>, + correlation_id => RpcId}, + amqp10_msg:new(<<1>>, <<"reply-1">>))), + receive {amqp10_disposition, {accepted, <<1>>}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:send_msg( + SenderResponder1, + amqp10_msg:set_properties( + #{message_id => <<"reply 2">>, + to => ReplyToAddr, + correlation_id => RpcId}, + amqp10_msg:new(<<2>>, <<"reply-2">>))), + receive {amqp10_disposition, {accepted, <<2>>}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:send_msg( + SenderResponder2, + amqp10_msg:set_properties( + #{message_id => <<"reply 3">>, + to => ReplyToAddr, + correlation_id => RpcId}, + amqp10_msg:new(<<3>>, <<"reply-3">>, true))), + + receive {amqp10_msg, ReceiverRequester, ReplyMsg1} -> + ?assertEqual(<<"reply-1">>, + amqp10_msg:body_bin(ReplyMsg1)), + ?assertMatch(#{message_id := <<"reply 1">>, + correlation_id := RpcId}, + amqp10_msg:properties(ReplyMsg1)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, ReceiverRequester, ReplyMsg2} -> + ?assertEqual(<<"reply-2">>, + amqp10_msg:body_bin(ReplyMsg2)), + ?assertMatch(#{message_id := <<"reply 2">>, + correlation_id := RpcId}, + amqp10_msg:properties(ReplyMsg2)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, ReceiverRequester, ReplyMsg3} -> + ?assertEqual(<<"reply-3">>, + amqp10_msg:body_bin(ReplyMsg3)), + ?assertMatch(#{message_id := <<"reply 3">>, + correlation_id := RpcId}, + amqp10_msg:properties(ReplyMsg3)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + + %% RabbitMQ should drop the 4th reply due to insufficient link credit. + ok = amqp10_client:send_msg( + SenderResponder2, + amqp10_msg:set_properties( + #{message_id => <<"reply 4">>, + to => ReplyToAddr, + correlation_id => RpcId}, + amqp10_msg:new(<<4>>, <<"reply-4">>, true))), + receive {amqp10_msg, _, _} -> + ct:fail({unxpected_msg, ?LINE}) + after 5 -> ok + end, + + %% Test drain + flush(pre_drain), + ok = amqp10_client:flow_link_credit(ReceiverRequester, 100_000, never, true), + receive {amqp10_event, {link, ReceiverRequester, credit_exhausted}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + %% RabbitMQ should also drop the 5th reply due to insufficient link credit. + ok = amqp10_client:send_msg( + SenderResponder2, + amqp10_msg:set_properties( + #{message_id => <<"reply 5">>, + to => ReplyToAddr, + correlation_id => RpcId}, + amqp10_msg:new(<<5>>, <<"reply-5">>, true))), + receive {amqp10_msg, _, _} -> + ct:fail({unxpected_msg, ?LINE}) + after 5 -> ok + end, + + %% When the requester detaches, the volatile queue is gone. + ok = detach_link_sync(ReceiverRequester), + flush(detached), + ?assertMatch(#{publishers := 3, + consumers := 1}, + maps:get(#{protocol => amqp10}, get_global_counters(Config))), + %% Therefore, HTTP GET on that queue should return 404. + {error, Resp} = rabbitmq_amqp_client:get_queue(LinkPairResponder, ReplyQ), + ?assertMatch(#{subject := <<"404">>}, amqp10_msg:properties(Resp)), + %% Also, RabbitMQ should refuse attaching to the volatile queue target. + {ok, SenderResponder3} = amqp10_client:attach_sender_link_sync( + SessionResponder, <<"sender responder 3">>, + ReplyToAddr), + receive {amqp10_event, {link, SenderResponder3, {detached, Error1}}} -> + ?assertMatch( + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, + description = {utf8, <<"no queue 'amq.rabbitmq.reply-to.", _/binary>>}}, + Error1) + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + %% RabbitMQ should also refuse attaching to the volatile queue target + %% when the requester ends the session. + ok = end_session_sync(SessionRequester), + {ok, SenderResponder4} = amqp10_client:attach_sender_link_sync( + SessionResponder, <<"sender responder 4">>, + ReplyToAddr), + receive {amqp10_event, {link, SenderResponder4, {detached, Error2}}} -> + ?assertMatch( + #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_NOT_FOUND}, + Error2) + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPairResponder, RequestQueue), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPairResponder), + + ok = close_connection_sync(ConnResponder), + ok = close_connection_sync(ConnRequester), + + Counters = get_global_counters(Config), + ?assertMatch(#{messages_delivered_total := 3}, + maps:get(#{protocol => amqp10, + queue_type => rabbit_volatile_queue}, Counters)), + ?assertMatch(#{messages_dead_lettered_maxlen_total := 2}, + maps:get(#{dead_letter_strategy => disabled, + queue_type => rabbit_volatile_queue}, Counters)), + ?assertMatch(#{publishers := 0, + consumers := 0, + %% RabbitMQ received 6 msgs in total (1 request + 5 replies) + messages_received_total := 6}, + maps:get(#{protocol => amqp10}, Counters)). + +%% Test that responder can send many messages to requester. +%% Load test the volatile queue. +many_replies(Config) -> + Num = 3000, + RequestQueue = atom_to_binary(?FUNCTION_NAME), + AddrRequestQueue = rabbitmq_amqp_address:queue(RequestQueue), + + {ConnResponder, SessionResponder, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, RequestQueue, #{}), + + OpnConfRequester0 = connection_config(Config), + OpnConfRequester = OpnConfRequester0#{container_id := <<"requester">>, + notify_with_performative => true}, + {ok, ConnRequester} = amqp10_client:open_connection(OpnConfRequester), + {ok, SessionRequester} = amqp10_client:begin_session_sync(ConnRequester), + {ok, ReceiverRequester} = amqp10_client:attach_link(SessionRequester, attach_args()), + AddrVolQ = receive {amqp10_event, {link, ReceiverRequester, {attached, Attach}}} -> + #'v1_0.attach'{ + source = #'v1_0.source'{ + address = {utf8, AddressVolatileQueue}, + dynamic = true}} = Attach, + AddressVolatileQueue + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:flow_link_credit(ReceiverRequester, Num, never), + + {ok, SenderRequester} = amqp10_client:attach_sender_link_sync( + SessionRequester, <<"sender requester">>, AddrRequestQueue), + ok = wait_for_credit(SenderRequester), + + ok = amqp10_client:send_msg( + SenderRequester, + amqp10_msg:set_properties( + #{reply_to => AddrVolQ}, + amqp10_msg:new(<<"t1">>, <<"request-1">>))), + receive {amqp10_disposition, {accepted, <<"t1">>}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, ReceiverResponder} = amqp10_client:attach_receiver_link( + SessionResponder, <<"receiver responder">>, + AddrRequestQueue, unsettled), + {ok, RequestMsg} = amqp10_client:get_msg(ReceiverResponder), + ok = amqp10_client:accept_msg(ReceiverResponder, RequestMsg), + #{reply_to := ReplyToAddr} = amqp10_msg:properties(RequestMsg), + + {ok, SenderResponder} = amqp10_client:attach_sender_link_sync( + SessionResponder, <<"sender responder">>, ReplyToAddr), + ok = wait_for_credit(SenderResponder), + flush(attached), + + ok = send_messages(SenderResponder, Num, true), + Msgs = receive_messages(ReceiverRequester, Num), + receive {amqp10_event, {link, ReceiverRequester, credit_exhausted}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + lists:foldl(fun(Msg, N) -> + Bin = integer_to_binary(N), + ?assertEqual(Bin, amqp10_msg:body_bin(Msg)), + N - 1 + end, Num, Msgs), + + [ok = detach_link_sync(R) || R <- [ReceiverRequester, SenderRequester, + ReceiverResponder, SenderResponder]], + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, RequestQueue), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = close_connection_sync(ConnResponder), + ok = close_connection_sync(ConnRequester). + +%% In contrast to AMQP 0.9.1, we expect RabbitMQ to allow for multiple volatile queues +%% on the same AMQP 1.0 session because for example a JMS app can create multiple +%% temporary queues on the same session: +%% https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/session#createTemporaryQueue() +%% https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/jmscontext#createTemporaryQueue() +many_volatile_queues_same_session(Config) -> + RequestQueue = atom_to_binary(?FUNCTION_NAME), + AddrRequestQueue = rabbitmq_amqp_address:queue(RequestQueue), + + {ConnResponder, SessionResponder, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, RequestQueue, #{}), + + OpnConfRequester0 = connection_config(Config), + OpnConfRequester = OpnConfRequester0#{container_id := <<"requester">>, + notify_with_performative => true}, + {ok, ConnRequester} = amqp10_client:open_connection(OpnConfRequester), + {ok, SessionRequester} = amqp10_client:begin_session_sync(ConnRequester), + {ok, Receiver1Requester} = amqp10_client:attach_link(SessionRequester, attach_args(<<"r1">>)), + {ok, Receiver2Requester} = amqp10_client:attach_link(SessionRequester, attach_args(<<"r2">>)), + AddrVolQ1 = receive {amqp10_event, {link, Receiver1Requester, {attached, Attach1}}} -> + #'v1_0.attach'{ + source = #'v1_0.source'{ + address = {utf8, AddressVolatileQueue1}, + dynamic = true}} = Attach1, + AddressVolatileQueue1 + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + AddrVolQ2 = receive {amqp10_event, {link, Receiver2Requester, {attached, Attach2}}} -> + #'v1_0.attach'{ + source = #'v1_0.source'{ + address = {utf8, AddressVolatileQueue2}, + dynamic = true}} = Attach2, + AddressVolatileQueue2 + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:flow_link_credit(Receiver1Requester, 1, never), + ok = amqp10_client:flow_link_credit(Receiver2Requester, 1, never), + + {ok, SenderRequester} = amqp10_client:attach_sender_link_sync( + SessionRequester, <<"sender requester">>, AddrRequestQueue), + ok = wait_for_credit(SenderRequester), + + ok = amqp10_client:send_msg( + SenderRequester, + amqp10_msg:set_properties( + #{message_id => <<"RPC receiver 1">>, + reply_to => AddrVolQ1}, + amqp10_msg:new(<<"t1">>, <<"request-1">>))), + ok = amqp10_client:send_msg( + SenderRequester, + amqp10_msg:set_properties( + #{message_id => <<"RPC receiver 2">>, + reply_to => AddrVolQ2}, + amqp10_msg:new(<<"t2">>, <<"request-2">>))), + ok = wait_for_accepts(2), + + {ok, ReceiverResponder} = amqp10_client:attach_receiver_link( + SessionResponder, <<"receiver responder">>, + AddrRequestQueue, settled), + {ok, RequestMsg1} = amqp10_client:get_msg(ReceiverResponder), + {ok, RequestMsg2} = amqp10_client:get_msg(ReceiverResponder), + #{message_id := Id1, + reply_to := ReplyToAddr1} = amqp10_msg:properties(RequestMsg1), + #{message_id := Id2, + reply_to := ReplyToAddr2} = amqp10_msg:properties(RequestMsg2), + + ?assertMatch(<<"/queues/amq.rabbitmq.reply-to.", _/binary>>, ReplyToAddr1), + ?assertMatch(<<"/queues/amq.rabbitmq.reply-to.", _/binary>>, ReplyToAddr2), + ?assertNotEqual(ReplyToAddr1, ReplyToAddr2), + %% The metadata store should store only the request queue. + ?assertEqual(1, rabbit_ct_broker_helpers:rpc(Config, rabbit_db_queue, count, [])), + + {ok, SenderResponder} = amqp10_client:attach_sender_link_sync( + SessionResponder, <<"sender responder">>, + null, mixed), + ok = wait_for_credit(SenderResponder), + flush(attached), + + ok = amqp10_client:send_msg( + SenderResponder, + amqp10_msg:set_properties( + #{message_id => <<"reply 1">>, + to => ReplyToAddr1, + correlation_id => Id1}, + amqp10_msg:new(<<1>>, <<"reply-1">>, true))), + ok = amqp10_client:send_msg( + SenderResponder, + amqp10_msg:set_properties( + #{message_id => <<"reply 2">>, + to => ReplyToAddr2, + correlation_id => Id2}, + amqp10_msg:new(<<2>>, <<"reply-2">>, false))), + ok = wait_for_accepts(1), + + receive {amqp10_msg, Receiver2Requester, ReplyMsg2} -> + ?assertEqual(<<"reply-2">>, + amqp10_msg:body_bin(ReplyMsg2)), + ?assertMatch(#{message_id := <<"reply 2">>, + correlation_id := <<"RPC receiver 2">>}, + amqp10_msg:properties(ReplyMsg2)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver1Requester, ReplyMsg1} -> + ?assertEqual(<<"reply-1">>, + amqp10_msg:body_bin(ReplyMsg1)), + ?assertMatch(#{message_id := <<"reply 1">>, + correlation_id := <<"RPC receiver 1">>}, + amqp10_msg:properties(ReplyMsg1)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + + [ok = detach_link_sync(R) || R <- [Receiver1Requester, Receiver2Requester, SenderRequester, + ReceiverResponder, SenderResponder]], + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, RequestQueue), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = close_connection_sync(ConnResponder), + ok = close_connection_sync(ConnRequester). + +%% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests. +rpc_new_to_old_node(Config) -> + rpc(0, 1, Config). + +rpc_old_to_new_node(Config) -> + rpc(1, 0, Config). + +rpc(RequesterNode, ResponderNode, Config) -> + RequestQueue = atom_to_binary(?FUNCTION_NAME), + AddrRequestQueue = rabbitmq_amqp_address:queue(RequestQueue), + + {ConnResponder, SessionResponder, _} = init(ResponderNode, Config), + + OpnConfRequester0 = connection_config(RequesterNode, Config), + OpnConfRequester = OpnConfRequester0#{container_id := <<"requester">>, + notify_with_performative => true}, + {ok, ConnRequester} = amqp10_client:open_connection(OpnConfRequester), + {ok, SessionRequester} = amqp10_client:begin_session_sync(ConnRequester), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync( + SessionRequester, <<"link pair requester">>), + + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, RequestQueue, #{}), + + {ok, ReceiverRequester} = amqp10_client:attach_link(SessionRequester, attach_args()), + AddrVolQ = receive {amqp10_event, {link, ReceiverRequester, {attached, Attach}}} -> + #'v1_0.attach'{ + source = #'v1_0.source'{ + address = {utf8, AddressVolatileQueue}, + dynamic = true}} = Attach, + AddressVolatileQueue + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:flow_link_credit(ReceiverRequester, 2, never), + + {ok, SenderRequester} = amqp10_client:attach_sender_link_sync( + SessionRequester, <<"sender requester">>, AddrRequestQueue), + ok = wait_for_credit(SenderRequester), + + RpcId = <<"RPC message ID">>, + ok = amqp10_client:send_msg( + SenderRequester, + amqp10_msg:set_properties( + #{message_id => RpcId, + reply_to => AddrVolQ}, + amqp10_msg:new(<<"t1">>, <<"request-1">>))), + receive {amqp10_disposition, {accepted, <<"t1">>}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = rabbit_ct_broker_helpers:await_metadata_store_consistent(Config, ResponderNode), + {ok, ReceiverResponder} = amqp10_client:attach_receiver_link( + SessionResponder, <<"receiver responder">>, + RequestQueue, unsettled), + {ok, RequestMsg} = amqp10_client:get_msg(ReceiverResponder), + ?assertEqual(<<"request-1">>, amqp10_msg:body_bin(RequestMsg)), + + #{message_id := RpcId, + reply_to := ReplyToAddr} = amqp10_msg:properties(RequestMsg), + + ?assertMatch(<<"/queues/amq.rabbitmq.reply-to.", _/binary>>, ReplyToAddr), + + {ok, SenderResponderAnon} = amqp10_client:attach_sender_link_sync( + SessionResponder, + <<"sender responder anonymous terminus">>, + null, unsettled), + ok = wait_for_credit(SenderResponderAnon), + flush(attached), + + %% Responder streams back two replies for the single request. + ok = amqp10_client:send_msg( + SenderResponderAnon, + amqp10_msg:set_properties( + #{message_id => <<"reply 1">>, + to => ReplyToAddr, + correlation_id => RpcId}, + amqp10_msg:new(<<1>>, <<"reply-1">>))), + receive {amqp10_disposition, {accepted, <<1>>}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:send_msg( + SenderResponderAnon, + amqp10_msg:set_properties( + #{message_id => <<"reply 2">>, + to => ReplyToAddr, + correlation_id => RpcId}, + amqp10_msg:new(<<2>>, <<"reply-2">>))), + receive {amqp10_disposition, {accepted, <<2>>}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:accept_msg(ReceiverResponder, RequestMsg), + + %% The metadata store should store only the request queue. + ?assertEqual(1, rabbit_ct_broker_helpers:rpc(Config, rabbit_db_queue, count, [])), + + receive {amqp10_msg, ReceiverRequester, ReplyMsg1} -> + ?assertEqual(<<"reply-1">>, + amqp10_msg:body_bin(ReplyMsg1)), + ?assertMatch(#{message_id := <<"reply 1">>, + correlation_id := RpcId}, + amqp10_msg:properties(ReplyMsg1)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, ReceiverRequester, ReplyMsg2} -> + ?assertEqual(<<"reply-2">>, + amqp10_msg:body_bin(ReplyMsg2)), + ?assertMatch(#{message_id := <<"reply 2">>, + correlation_id := RpcId}, + amqp10_msg:properties(ReplyMsg2)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, RequestQueue), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + + ok = close_connection_sync(ConnResponder), + ok = close_connection_sync(ConnRequester). + +attach_args() -> + attach_args(<<"receiver requester">>). + +attach_args(Name) -> + Source = #{address => undefined, + durable => none, + expiry_policy => <<"link-detach">>, + dynamic => true, + capabilities => [<<"rabbitmq:volatile-queue">>]}, + #{name => Name, + role => {receiver, Source, self()}, + snd_settle_mode => settled, + rcv_settle_mode => first}. + +get_global_counters(Config) -> + rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, []). diff --git a/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl b/deps/rabbit/test/direct_reply_to_amqpl_SUITE.erl similarity index 51% rename from deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl rename to deps/rabbit/test/direct_reply_to_amqpl_SUITE.erl index 86dc12c4bfc5..2798e1f8ba35 100644 --- a/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl +++ b/deps/rabbit/test/direct_reply_to_amqpl_SUITE.erl @@ -5,18 +5,27 @@ %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% --module(amqpl_direct_reply_to_SUITE). +-module(direct_reply_to_amqpl_SUITE). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). -compile([nowarn_export_all, export_all]). --import(rabbit_ct_helpers, [eventually/1]). +-import(rabbit_ct_helpers, + [eventually/1]). +-import(amqp_utils, + [init/1, + close/1, + connection_config/1, + wait_for_credit/1, + end_session_sync/1, + close_connection_sync/1]). --define(TIMEOUT, 30_000). +-define(TIMEOUT, 9000). %% This is the pseudo queue that is specially interpreted by RabbitMQ. -define(REPLY_QUEUE, <<"amq.rabbitmq.reply-to">>). @@ -24,19 +33,26 @@ all() -> [ {group, cluster_size_1}, + {group, cluster_size_1_ff_disabled}, {group, cluster_size_3} ]. groups() -> [ {cluster_size_1, [shuffle], + cluster_size_1_common() ++ [ - trace, - failure_ack_mode, - failure_multiple_consumers, - failure_reuse_consumer_tag, - failure_publish + amqpl_amqp_amqpl, + amqp_amqpl_amqp ]}, + + %% Delete this group when feature flag rabbitmq_4.2.0 becomes required. + {cluster_size_1_ff_disabled, [], + cluster_size_1_common() ++ + [ + enable_ff % must run last + ]}, + {cluster_size_3, [shuffle], [ rpc_new_to_old_node, @@ -44,11 +60,21 @@ groups() -> ]} ]. +cluster_size_1_common() -> + [ + trace, + failure_ack_mode, + failure_multiple_consumers, + failure_reuse_consumer_tag, + failure_publish + ]. + %% ------------------------------------------------------------------- %% Testsuite setup/teardown. %% ------------------------------------------------------------------- init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), rabbit_ct_helpers:log_environment(), Config. @@ -57,15 +83,27 @@ end_per_suite(Config) -> init_per_group(Group, Config) -> Nodes = case Group of - cluster_size_1 -> 1; - cluster_size_3 -> 3 + cluster_size_3 -> + 3; + _ -> + 1 end, - Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), - Config1 = rabbit_ct_helpers:set_config( - Config, [{rmq_nodes_count, Nodes}, - {rmq_nodename_suffix, Suffix}]), + Config1 = case Group of + cluster_size_1_ff_disabled -> + rabbit_ct_helpers:merge_app_env( + Config, + {rabbit, + [{forced_feature_flags_on_init, + {rel, [], ['rabbitmq_4.2.0']}}]}); + _ -> + Config + end, + Suffix = rabbit_ct_helpers:testcase_absname(Config1, "", "-"), + Config2 = rabbit_ct_helpers:set_config( + Config1, [{rmq_nodes_count, Nodes}, + {rmq_nodename_suffix, Suffix}]), rabbit_ct_helpers:run_setup_steps( - Config1, + Config2, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()). @@ -81,21 +119,129 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). +%% Test enabling the feature flag while a client consumes from the volatile queue. +%% Delete this test case when feature flag rabbitmq_4.2.0 becomes required. +enable_ff(Config) -> + RequestQueue = <<"request queue">>, + RequestPayload = <<"my request">>, + CorrelationId = <<"my correlation ID">>, + RequesterCh = rabbit_ct_client_helpers:open_channel(Config), + ResponderCh = rabbit_ct_client_helpers:open_channel(Config), + + amqp_channel:subscribe(RequesterCh, + #'basic.consume'{queue = ?REPLY_QUEUE, + no_ack = true}, + self()), + CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0 + end, + + #'queue.declare_ok'{} = amqp_channel:call( + RequesterCh, + #'queue.declare'{queue = RequestQueue}), + #'confirm.select_ok'{} = amqp_channel:call(RequesterCh, #'confirm.select'{}), + amqp_channel:register_confirm_handler(RequesterCh, self()), + + %% Send the request. + amqp_channel:cast( + RequesterCh, + #'basic.publish'{routing_key = RequestQueue}, + #amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE, + correlation_id = CorrelationId}, + payload = RequestPayload}), + receive #'basic.ack'{} -> ok + end, + + %% Receive the request. + {#'basic.get_ok'{}, + #amqp_msg{props = #'P_basic'{reply_to = ReplyTo, + correlation_id = CorrelationId}, + payload = RequestPayload} + } = amqp_channel:call(ResponderCh, #'basic.get'{queue = RequestQueue}), + + ?assertEqual(#'queue.declare_ok'{queue = ReplyTo, + message_count = 0, + consumer_count = 1}, + amqp_channel:call(ResponderCh, + #'queue.declare'{queue = ReplyTo})), + + %% Send the first reply. + amqp_channel:cast( + ResponderCh, + #'basic.publish'{routing_key = ReplyTo}, + #amqp_msg{props = #'P_basic'{correlation_id = CorrelationId}, + payload = <<"reply 1">>}), + + %% Receive the frst reply. + receive {#'basic.deliver'{consumer_tag = CTag, + redelivered = false, + exchange = <<>>, + routing_key = ReplyTo}, + #amqp_msg{payload = P1, + props = #'P_basic'{correlation_id = CorrelationId}}} -> + ?assertEqual(<<"reply 1">>, P1) + after ?TIMEOUT -> ct:fail({missing_reply, ?LINE}) + end, + + ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0'), + + ?assertEqual(#'queue.declare_ok'{queue = ReplyTo, + message_count = 0, + consumer_count = 1}, + amqp_channel:call(ResponderCh, + #'queue.declare'{queue = ReplyTo})), + + %% Send the second reply. + amqp_channel:cast( + ResponderCh, + #'basic.publish'{routing_key = ReplyTo}, + #amqp_msg{props = #'P_basic'{correlation_id = CorrelationId}, + payload = <<"reply 2">>}), + + %% Receive the second reply. + receive {#'basic.deliver'{consumer_tag = CTag}, + #amqp_msg{payload = P2, + props = #'P_basic'{correlation_id = CorrelationId}}} -> + ?assertEqual(<<"reply 2">>, P2) + after ?TIMEOUT -> ct:fail({missing_reply, ?LINE}) + end, + + %% Requester cancels consumption. + ?assertMatch(#'basic.cancel_ok'{consumer_tag = CTag}, + amqp_channel:call(RequesterCh, #'basic.cancel'{consumer_tag = CTag})), + + %% Responder checks again if the requester is still there. + %% This time, the requester and its queue should be gone. + try amqp_channel:call(ResponderCh, #'queue.declare'{queue = ReplyTo}) of + _ -> + ct:fail("expected queue.declare to fail") + catch exit:Reason -> + ?assertMatch( + {{_, {_, _, <<"NOT_FOUND - no queue '", + ReplyTo:(byte_size(ReplyTo))/binary, + "' in vhost '/'">>}}, _}, + Reason) + end, + + %% Clean up. + #'queue.delete_ok'{} = amqp_channel:call(RequesterCh, + #'queue.delete'{queue = RequestQueue}), + ok = rabbit_ct_client_helpers:close_channel(RequesterCh). + %% Test case for %% https://github.com/rabbitmq/rabbitmq-server/discussions/11662 trace(Config) -> {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_on"]), Node = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)), - TraceQueue = <<"tests.amqpl_direct_reply_to.trace.tracing">>, - RequestQueue = <<"tests.amqpl_direct_reply_to.trace.requests">>, - RequestPayload = <<"my request">>, - ReplyPayload = <<"my reply">>, - CorrelationId = <<"my correlation ID">>, + TraceQueue = <<"trace-queue">>, + RequestQueue = <<"request-queue">>, + RequestPayload = <<"my-request">>, + ReplyPayload = <<"my-reply">>, + CorrelationId = <<"my-correlation-id">>, Qs = [RequestQueue, TraceQueue], Ch = rabbit_ct_client_helpers:open_channel(Config), - RequesterCh = rabbit_ct_client_helpers:open_channel(Config, 0), - ResponderCh = rabbit_ct_client_helpers:open_channel(Config, 0), + RequesterCh = rabbit_ct_client_helpers:open_channel(Config), + ResponderCh = rabbit_ct_client_helpers:open_channel(Config), [#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q0}) || Q0 <- Qs], #'queue.bind_ok'{} = amqp_channel:call( @@ -266,6 +412,201 @@ failure_publish(Config) -> end, ok = rabbit_ct_client_helpers:close_connection(Conn). +%% Test that Direct Reply-To works when the requester is an AMQP 0.9.1 client +%% and the responder is an AMQP 1.0 client. +amqpl_amqp_amqpl(Config) -> + RequestQ = atom_to_binary(?FUNCTION_NAME), + AddrRequestQ = rabbitmq_amqp_address:queue(RequestQ), + Id = <<"🥕"/utf8>>, + RequestPayload = <<"request payload">>, + ReplyPayload = <<"reply payload">>, + + Chan = rabbit_ct_client_helpers:open_channel(Config), + amqp_channel:subscribe(Chan, #'basic.consume'{queue = ?REPLY_QUEUE, + no_ack = true}, self()), + CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0 + end, + + %% Send the request via AMQP 0.9.1 + #'queue.declare_ok'{} = amqp_channel:call(Chan, #'queue.declare'{queue = RequestQ}), + amqp_channel:cast(Chan, + #'basic.publish'{routing_key = RequestQ}, + #amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE, + message_id = Id}, + payload = RequestPayload}), + + %% Receive the request via AMQP 1.0. + {_, Session, LinkPair} = Init = init(Config), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, AddrRequestQ), + {ok, RequestMsg} = amqp10_client:get_msg(Receiver, ?TIMEOUT), + ?assertEqual(RequestPayload, amqp10_msg:body_bin(RequestMsg)), + #{message_id := Id, + reply_to := ReplyToAddr} = amqp10_msg:properties(RequestMsg), + + %% AMQP 1.0 responder checks whether the AMQP 0.9.1 requester is still there. + {ok, #{queue := ReplyQ}} = rabbitmq_amqp_address:to_map(ReplyToAddr), + ?assertMatch({ok, #{vhost := <<"/">>, + durable := false, + type := <<"rabbit_volatile_queue">>, + message_count := 0, + consumer_count := 1}}, + rabbitmq_amqp_client:get_queue(LinkPair, ReplyQ)), + + %% Send the reply via AMQP 1.0. + {ok, Sender} = amqp10_client:attach_sender_link_sync( + Session, <<"sender">>, ReplyToAddr), + ok = wait_for_credit(Sender), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_headers( + #{priority => 3, + durable => true}, + amqp10_msg:set_properties( + #{message_id => <<"reply ID">>, + correlation_id => Id}, + amqp10_msg:set_application_properties( + #{<<"my key">> => <<"my val">>}, + amqp10_msg:new(<<1>>, ReplyPayload, true))))), + + %% Receive the reply via AMQP 0.9.1 + receive {Deliver, #amqp_msg{payload = ReplyPayload, + props = #'P_basic'{headers = Headers} = Props}} -> + ?assertMatch(#'basic.deliver'{ + consumer_tag = CTag, + redelivered = false, + exchange = <<>>, + routing_key = <<"amq.rabbitmq.reply-to.", _/binary>>}, + Deliver), + ?assertMatch(#'P_basic'{ + message_id = <<"reply ID">>, + correlation_id = Id, + priority = 3, + delivery_mode = 2}, + Props), + ?assertEqual({value, {<<"my key">>, longstr, <<"my val">>}}, + lists:keysearch(<<"my key">>, 1, Headers)) + after ?TIMEOUT -> ct:fail(missing_reply) + end, + + %% AMQP 0.9.1 requester cancels consumption. + ?assertMatch(#'basic.cancel_ok'{consumer_tag = CTag}, + amqp_channel:call(Chan, #'basic.cancel'{consumer_tag = CTag})), + + %% This time, when the AMQP 1.0 responder checks whether the AMQP 0.9.1 requester + %% is still there, an error should be returned. + {error, Resp} = rabbitmq_amqp_client:get_queue(LinkPair, ReplyQ), + ?assertMatch(#{subject := <<"404">>}, amqp10_msg:properties(Resp)), + ?assertEqual(#'v1_0.amqp_value'{content = {utf8, <<"queue '", ReplyQ/binary, "' in vhost '/' not found">>}}, + amqp10_msg:body(Resp)), + + #'queue.delete_ok'{} = amqp_channel:call(Chan, #'queue.delete'{queue = RequestQ}), + ok = close(Init), + ok = rabbit_ct_client_helpers:close_channel(Chan). + +%% Test that Direct Reply-To works when the requester is an AMQP 1.0 client +%% and the responder is an AMQP 0.9.1 client. +amqp_amqpl_amqp(Config) -> + RequestQ = atom_to_binary(?FUNCTION_NAME), + AddrRequestQ = rabbitmq_amqp_address:queue(RequestQ), + Id = <<"🥕"/utf8>>, + RequestPayload = <<"request payload">>, + ReplyPayload = <<"reply payload">>, + + Chan = rabbit_ct_client_helpers:open_channel(Config), + #'queue.declare_ok'{} = amqp_channel:call(Chan, #'queue.declare'{queue = RequestQ}), + + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{container_id := <<"requester">>, + notify_with_performative => true}, + {ok, Conn} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Conn), + Source = #{address => undefined, + durable => none, + expiry_policy => <<"link-detach">>, + dynamic => true, + capabilities => [<<"rabbitmq:volatile-queue">>]}, + AttachArgs = #{name => <<"receiver">>, + role => {receiver, Source, self()}, + snd_settle_mode => settled, + rcv_settle_mode => first}, + {ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs), + AddrReplyQ = receive {amqp10_event, {link, Receiver, {attached, Attach}}} -> + #'v1_0.attach'{ + source = #'v1_0.source'{ + address = {utf8, Addr}, + dynamic = true}} = Attach, + Addr + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:flow_link_credit(Receiver, 1, never), + + %% Send the request via AMQP 1.0 + {ok, SenderRequester} = amqp10_client:attach_sender_link_sync( + Session, <<"sender">>, AddrRequestQ), + ok = wait_for_credit(SenderRequester), + ok = amqp10_client:send_msg( + SenderRequester, + amqp10_msg:set_properties( + #{message_id => Id, + reply_to => AddrReplyQ}, + amqp10_msg:new(<<"t1">>, RequestPayload))), + receive {amqp10_disposition, {accepted, <<"t1">>}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + %% Receive the request via AMQP 0.9.1 + {#'basic.get_ok'{}, + #amqp_msg{props = #'P_basic'{reply_to = ReplyQ, + message_id = Id}, + payload = RequestPayload} + } = amqp_channel:call(Chan, #'basic.get'{queue = RequestQ}), + + %% Test what the docs state: + %% "If the RPC server is going to perform some expensive computation it might wish + %% to check if the client has gone away. To do this the server can declare the + %% generated reply name first on a disposable channel in order to determine whether + %% it still exists." + ?assertEqual(#'queue.declare_ok'{queue = ReplyQ, + message_count = 0, + consumer_count = 1}, + amqp_channel:call(Chan, #'queue.declare'{queue = ReplyQ})), + + %% Send the reply via AMQP 0.9.1 + amqp_channel:cast( + Chan, + #'basic.publish'{routing_key = ReplyQ}, + #amqp_msg{props = #'P_basic'{message_id = <<"msg ID reply">>, + correlation_id = Id}, + payload = ReplyPayload}), + + %% Receive the reply via AMQP 1.0 + receive {amqp10_msg, Receiver, Reply} -> + ?assertEqual(ReplyPayload, + amqp10_msg:body_bin(Reply)), + ?assertMatch(#{message_id := <<"msg ID reply">>, + correlation_id := Id}, + amqp10_msg:properties(Reply)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + + ok = end_session_sync(Session), + ok = close_connection_sync(Conn), + #'queue.delete_ok'{} = amqp_channel:call(Chan, #'queue.delete'{queue = RequestQ}), + + %% AMQP 0.9.1 responder checks again if the AMQP 1.0 requester is still there. + %% This time, the requester and its queue should be gone. + try amqp_channel:call(Chan, #'queue.declare'{queue = ReplyQ}) of + _ -> + ct:fail("expected queue.declare to fail") + catch exit:Reason -> + ?assertMatch( + {{_, {_, _, <<"NOT_FOUND - no queue '", + ReplyQ:(byte_size(ReplyQ))/binary, + "' in vhost '/'">>}}, _}, + Reason) + end. + %% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests. rpc_new_to_old_node(Config) -> rpc(0, 1, Config). @@ -337,7 +678,6 @@ rpc(RequesterNode, ResponderNode, Config) -> payload = <<"reply 1">>}), %% Let's assume the RPC server sends multiple replies for a single request. - %% (This is a bit unusual but should work.) %% Setting the reply address in CC should work. amqp_channel:cast( ResponderCh, @@ -366,7 +706,7 @@ rpc(RequesterNode, ResponderNode, Config) -> end, %% The requester sends a reply to itself. - %% (Really odd, but should work.) + %% (Odd, but should work.) amqp_channel:cast( RequesterCh, #'basic.publish'{routing_key = ReplyTo}, diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 3d9c9954cb78..41531eb22a7b 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -335,7 +335,7 @@ amqpl_amqp_bin_amqpl(_Config) -> delivery_mode = 2, priority = 98, correlation_id = <<"corr">> , - reply_to = <<"reply-to">>, + reply_to = <<"reply/to">>, expiration = <<"1">>, message_id = <<"msg-id">>, timestamp = 99, @@ -407,7 +407,7 @@ amqpl_amqp_bin_amqpl(_Config) -> Hdr10), ?assertMatch(#'v1_0.properties'{content_encoding = {symbol, <<"gzip">>}, content_type = {symbol, <<"text/plain">>}, - reply_to = {utf8, <<"reply-to">>}, + reply_to = {utf8, <<"/queues/reply%2Fto">>}, creation_time = {timestamp, 99000}, user_id = {binary, <<"banana">>}, group_id = {utf8, <<"rmq">>} @@ -452,6 +452,9 @@ amqpl_amqp_bin_amqpl(_Config) -> ?assertEqual(RoutingHeaders, maps:remove(<<"timestamp_in_ms">>, RoutingHeaders2)), + #content{properties = #'P_basic'{reply_to = ReplyTo}} = mc:protocol_state(MsgL2), + ?assertEqual(<<"reply/to">>, ReplyTo), + ok = persistent_term:put(message_interceptors, []). amqpl_cc_amqp_bin_amqpl(_Config) -> diff --git a/deps/rabbit/test/queue_type_SUITE.erl b/deps/rabbit/test/queue_type_SUITE.erl index 9519dec56f86..6268a473edcb 100644 --- a/deps/rabbit/test/queue_type_SUITE.erl +++ b/deps/rabbit/test/queue_type_SUITE.erl @@ -182,9 +182,9 @@ smoke(Config) -> ?assertEqual(#{ messages_acknowledged_total => 3, messages_delivered_consume_auto_ack_total => 0, - messages_delivered_consume_manual_ack_total => 0, + messages_delivered_consume_manual_ack_total => 2, messages_delivered_get_auto_ack_total => 0, - messages_delivered_get_manual_ack_total => 0, + messages_delivered_get_manual_ack_total => 2, messages_delivered_total => 4, messages_get_empty_total => 2, messages_redelivered_total => 1 diff --git a/deps/rabbit/test/rabbit_direct_reply_to_prop_SUITE.erl b/deps/rabbit/test/rabbit_direct_reply_to_prop_SUITE.erl index 7ae0c4d568ab..748aace92ef0 100644 --- a/deps/rabbit/test/rabbit_direct_reply_to_prop_SUITE.erl +++ b/deps/rabbit/test/rabbit_direct_reply_to_prop_SUITE.erl @@ -51,8 +51,8 @@ prop_decode_reply_to(_) -> PidParts = #{node => Node, id => 0, serial => 0, creation => 0}, IxParts = PidParts#{node := rabbit_nodes_common:make("banana", Ix)}, - IxPartsEnc = base64:encode(pid_recomposition:to_binary(IxParts)), - IxBin = <>, + IxPartsEnc = base64:encode(rabbit_pid_codec:recompose_to_binary(IxParts)), + QNameBin = <<"amq.rabbitmq.reply-to.", IxPartsEnc/binary, ".", Key/binary>>, NodeMap = maps:from_list(NodeList), NoNodeMap = maps:from_list(NoNodeList), @@ -60,10 +60,12 @@ prop_decode_reply_to(_) -> %% There is non-zero chance Random is a valid encoded Pid. NonB64 = <<0, Random/binary>>, - {ok, pid_recomposition:recompose(PidParts), Key} =:= - rabbit_direct_reply_to:decode_reply_to(IxBin, NodeMap) - andalso {error, target_node_not_found} =:= - rabbit_direct_reply_to:decode_reply_to(IxBin, NoNodeMap) - andalso {error, unrecognized_format} =:= - rabbit_direct_reply_to:decode_reply_to(NonB64, NodeMap) + {ok, rabbit_pid_codec:recompose(PidParts)} =:= + rabbit_volatile_queue:pid_from_name(QNameBin, NodeMap) + andalso {ok, Key} =:= + rabbit_volatile_queue:key_from_name(QNameBin) + andalso error =:= + rabbit_volatile_queue:pid_from_name(QNameBin, NoNodeMap) + andalso error =:= + rabbit_volatile_queue:pid_from_name(NonB64, NodeMap) end). diff --git a/deps/rabbit_common/src/delegate.erl b/deps/rabbit_common/src/delegate.erl index 6dd456c27bf8..bbd6313f7588 100644 --- a/deps/rabbit_common/src/delegate.erl +++ b/deps/rabbit_common/src/delegate.erl @@ -18,9 +18,8 @@ %% consistent route, to prevent them being reordered. In fact all %% AMQP-ish things (such as queue declaration results and basic.get) %% must take the same route as well, to ensure that clients see causal -%% ordering correctly. Therefore we have a rather generic mechanism -%% here rather than just a message-reflector. That's also why we pick -%% the delegate process to use based on a hash of the source pid. +%% ordering correctly. Therefore we can't use erpc. That's also why we +%% pick the delegate process to use based on a hash of the source pid. %% %% When a function is invoked using delegate:invoke/2, %% or delegate:invoke_no_result/2 on a group of pids, the pids are first split diff --git a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_address.erl b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_address.erl index 8bb531c048ba..60cb7fabdce4 100644 --- a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_address.erl +++ b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_address.erl @@ -8,7 +8,13 @@ -export([exchange/1, exchange/2, - queue/1]). + queue/1, + from_map/1, + to_map/1]). + +-type address_map() :: #{queue := unicode:unicode_binary()} | + #{exchange := unicode:unicode_binary(), + routing_key => unicode:unicode_binary()}. -spec exchange(unicode:unicode_binary()) -> unicode:unicode_binary(). @@ -28,3 +34,57 @@ exchange(ExchangeName, RoutingKey) -> queue(QueueName) -> QueueNameQuoted = uri_string:quote(QueueName), <<"/queues/", QueueNameQuoted/binary>>. + +-spec from_map(address_map()) -> + unicode:unicode_binary(). +from_map(#{exchange := Exchange, routing_key := RoutingKey}) -> + exchange(Exchange, RoutingKey); +from_map(#{exchange := Exchange}) -> + exchange(Exchange); +from_map(#{queue := Queue}) -> + queue(Queue). + +-spec to_map(unicode:unicode_binary()) -> + {ok, address_map()} | error. +to_map(<<"/exchanges/", Rest/binary>>) -> + case binary:split(Rest, <<"/">>, [global]) of + [ExchangeQuoted] + when ExchangeQuoted =/= <<>> -> + Exchange = uri_string:unquote(ExchangeQuoted), + {ok, #{exchange => Exchange}}; + [ExchangeQuoted, RoutingKeyQuoted] + when ExchangeQuoted =/= <<>> -> + Exchange = uri_string:unquote(ExchangeQuoted), + RoutingKey = uri_string:unquote(RoutingKeyQuoted), + {ok, #{exchange => Exchange, + routing_key => RoutingKey}}; + _ -> + error + end; +to_map(<<"/queues/">>) -> + error; +to_map(<<"/queues/", QueueQuoted/binary>>) -> + Queue = uri_string:unquote(QueueQuoted), + {ok, #{queue => Queue}}; +to_map(_) -> + error. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +address_test() -> + M1 = #{queue => <<"my queue">>}, + M2 = #{queue => <<"🥕"/utf8>>}, + M3 = #{exchange => <<"my exchange">>}, + M4 = #{exchange => <<"🥕"/utf8>>}, + M5 = #{exchange => <<"my exchange">>, + routing_key => <<"my routing key">>}, + M6 = #{exchange => <<"🥕"/utf8>>, + routing_key => <<"🍰"/utf8>>}, + lists:foreach(fun(Map) -> + {ok, Map} = to_map(from_map(Map)) + end, [M1, M2, M3, M4, M5, M6]), + + error = to_map(<<"/queues/">>), + error = to_map(<<"/exchanges/">>), + error = to_map(<<"/exchanges//key">>). +-endif. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl index 3407a238a930..21b9cfe74081 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl @@ -26,9 +26,9 @@ %% Stateless rabbit_queue_type callbacks. -export([ - is_stateful/0, declare/2, delete/4, + supports_stateful_delivery/0, deliver/3, is_enabled/0, is_compatible/3, @@ -75,11 +75,6 @@ -define(INFO_KEYS, [type, name, durable, auto_delete, arguments, pid, owner_pid, state, messages]). --spec is_stateful() -> - boolean(). -is_stateful() -> - false. - -spec declare(amqqueue:amqqueue(), node()) -> {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | {'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()} | @@ -135,6 +130,9 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) -> Err end. +supports_stateful_delivery() -> + false. + -spec deliver([{amqqueue:amqqueue(), stateless}], Msg :: mc:state(), rabbit_queue_type:delivery_options()) -> diff --git a/deps/rabbitmq_shovel/test/amqp10_static_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_static_SUITE.erl index 376ff483d50f..e18fc70cc5da 100644 --- a/deps/rabbitmq_shovel/test/amqp10_static_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_static_SUITE.erl @@ -121,13 +121,14 @@ amqp10_destination(Config, AckMode) -> {amqp10_msg, Receiver, InMsg} -> [<<42>>] = amqp10_msg:body(InMsg), Ts = Timestamp * 1000, + ReplyTo = <<"/queues/", ?UNSHOVELLED/binary>>, ?assertMatch( #{content_type := ?UNSHOVELLED, content_encoding := ?UNSHOVELLED, correlation_id := ?UNSHOVELLED, user_id := <<"guest">>, message_id := ?UNSHOVELLED, - reply_to := ?UNSHOVELLED, + reply_to := ReplyTo, %% Message timestamp is no longer overwritten creation_time := Ts}, amqp10_msg:properties(InMsg)), diff --git a/deps/rabbitmq_shovel/test/local_static_SUITE.erl b/deps/rabbitmq_shovel/test/local_static_SUITE.erl index d83a50d73f6c..4ab9dd23a6c7 100644 --- a/deps/rabbitmq_shovel/test/local_static_SUITE.erl +++ b/deps/rabbitmq_shovel/test/local_static_SUITE.erl @@ -137,13 +137,14 @@ local_destination(Config, AckMode) -> receive {amqp10_msg, Receiver, InMsg} -> + ReplyTo = <<"/queues/", ?UNSHOVELLED/binary>>, [<<42>>] = amqp10_msg:body(InMsg), #{content_type := ?UNSHOVELLED, content_encoding := ?UNSHOVELLED, correlation_id := ?UNSHOVELLED, user_id := <<"guest">>, message_id := ?UNSHOVELLED, - reply_to := ?UNSHOVELLED + reply_to := ReplyTo } = amqp10_msg:properties(InMsg), #{<<"header1">> := 1, <<"header2">> := <<"h2">>