From 985bc0e96c2f8ff9bf2c9a69a652aeba54d5bb39 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 17 Oct 2024 18:36:04 +0200 Subject: [PATCH 1/2] Optionally notify client app with AMQP 1.0 performative This commit notifies the client app with the AMQP performative if connection config `notify_with_performative` is set to `true`. This allows the client app to learn about all fields including properties and capabilities returned by the AMQP server. --- deps/amqp10_client/src/amqp10_client.erl | 4 ++ .../src/amqp10_client_connection.erl | 57 +++++++++++----- .../src/amqp10_client_session.erl | 67 +++++++++++++------ deps/amqp10_client/test/system_SUITE.erl | 65 ++++++++++++++++-- deps/rabbit/test/amqp_client_SUITE.erl | 4 +- .../src/rabbitmq_amqp_client.erl | 4 ++ 6 files changed, 153 insertions(+), 48 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index c5ebc7ba123f..e296d3ff8533 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -144,6 +144,8 @@ begin_session_sync(Connection, Timeout) when is_pid(Connection) -> receive {amqp10_event, {session, Session, begun}} -> {ok, Session}; + {amqp10_event, {session, Session, {begun, #'v1_0.begin'{}}}} -> + {ok, Session}; {amqp10_event, {session, Session, {ended, Err}}} -> {error, Err} after Timeout -> session_timeout @@ -186,6 +188,8 @@ attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) -> receive {amqp10_event, {link, Ref, attached}} -> {ok, Ref}; + {amqp10_event, {link, Ref, {attached, #'v1_0.attach'{}}}} -> + {ok, Ref}; {amqp10_event, {link, Ref, {detached, Err}}} -> {error, Err} after ?TIMEOUT -> link_timeout diff --git a/deps/amqp10_client/src/amqp10_client_connection.erl b/deps/amqp10_client/src/amqp10_client_connection.erl index df0548aa9ef1..8fbcb22f3d1b 100644 --- a/deps/amqp10_client/src/amqp10_client_connection.erl +++ b/deps/amqp10_client/src/amqp10_client_connection.erl @@ -63,6 +63,7 @@ notify => pid() | none, % the pid to send connection events to notify_when_opened => pid() | none, notify_when_closed => pid() | none, + notify_with_performative => boolean(), %% incoming maximum frame size set by our client application max_frame_size => pos_integer(), % TODO: constrain to large than 512 %% outgoing maximum frame size set by AMQP peer in OPEN performative @@ -253,7 +254,7 @@ hdr_sent({call, From}, begin_session, {keep_state, State1}. open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize, - idle_time_out = Timeout}, + idle_time_out = Timeout} = Open, #state{pending_session_reqs = PendingSessionReqs, config = Config} = State0) -> State = case Timeout of @@ -278,7 +279,7 @@ open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize, _ = gen_statem:reply(From, Ret), S2 end, State1, PendingSessionReqs), - ok = notify_opened(Config), + ok = notify_opened(Config, Open), {next_state, opened, State2#state{pending_session_reqs = []}}; open_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> @@ -292,19 +293,18 @@ opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) -> ok = send_heartbeat(State), {ok, Tmr} = start_heartbeat_timer(T), {keep_state, State#state{heartbeat_timer = Tmr}}; -opened(_EvtType, {close, Reason}, State = #state{config = Config}) -> +opened(_EvtType, {close, Reason}, State) -> %% We send the first close frame and wait for the reply. %% TODO: stop all sessions writing %% We could still accept incoming frames (See: 2.4.6) - ok = notify_closed(Config, Reason), case send_close(State, Reason) of ok -> {next_state, close_sent, State}; {error, closed} -> {stop, normal, State}; Error -> {stop, Error, State} end; -opened(_EvtType, #'v1_0.close'{error = Error}, State = #state{config = Config}) -> +opened(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) -> %% We receive the first close frame, reply and terminate. - ok = notify_closed(Config, translate_err(Error)), + ok = notify_closed(Config, Close), _ = send_close(State, none), {stop, normal, State}; opened({call, From}, begin_session, State) -> @@ -329,7 +329,8 @@ close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _}, #state{reader = ReaderPid} = State) -> %% if the reader exits we probably wont receive a close frame {stop, normal, State}; -close_sent(_EvtType, #'v1_0.close'{}, State) -> +close_sent(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) -> + ok = notify_closed(Config, Close), %% TODO: we should probably set up a timer before this to ensure %% we close down event if no reply is received {stop, normal, State}. @@ -489,25 +490,45 @@ socket_shutdown({tcp, Socket}, How) -> socket_shutdown({ssl, Socket}, How) -> ssl:shutdown(Socket, How). -notify_opened(#{notify_when_opened := none}) -> - ok; -notify_opened(#{notify_when_opened := Pid}) when is_pid(Pid) -> - Pid ! amqp10_event(opened), +notify_opened(#{notify_when_opened := none}, _) -> ok; -notify_opened(#{notify := Pid}) when is_pid(Pid) -> - Pid ! amqp10_event(opened), - ok; -notify_opened(_) -> +notify_opened(#{notify_when_opened := Pid} = Config, Perf) + when is_pid(Pid) -> + notify_opened0(Config, Pid, Perf); +notify_opened(#{notify := Pid} = Config, Perf) + when is_pid(Pid) -> + notify_opened0(Config, Pid, Perf); +notify_opened(_, _) -> + ok. + +notify_opened0(Config, Pid, Perf) -> + Evt = case Config of + #{notify_with_performative := true} -> + {opened, Perf}; + _ -> + opened + end, + Pid ! amqp10_event(Evt), ok. notify_closed(#{notify_when_closed := none}, _Reason) -> ok; notify_closed(#{notify := none}, _Reason) -> ok; -notify_closed(#{notify_when_closed := Pid}, Reason) when is_pid(Pid) -> - Pid ! amqp10_event({closed, Reason}), +notify_closed(#{notify_when_closed := Pid} = Config, Reason) + when is_pid(Pid) -> + notify_closed0(Config, Pid, Reason); +notify_closed(#{notify := Pid} = Config, Reason) + when is_pid(Pid) -> + notify_closed0(Config, Pid, Reason). + +notify_closed0(#{notify_with_performative := true}, Pid, Perf = #'v1_0.close'{}) -> + Pid ! amqp10_event({closed, Perf}), + ok; +notify_closed0(_, Pid, #'v1_0.close'{error = Error}) -> + Pid ! amqp10_event({closed, translate_err(Error)}), ok; -notify_closed(#{notify := Pid}, Reason) when is_pid(Pid) -> +notify_closed0(_, Pid, Reason) -> Pid ! amqp10_event({closed, Reason}), ok. diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 911886ce4143..7e2c82560398 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -254,7 +254,7 @@ unmapped({call, From}, {attach, Attach}, begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, next_outgoing_id = {uint, NOI}, incoming_window = {uint, InWindow}, - outgoing_window = {uint, OutWindow}}, + outgoing_window = {uint, OutWindow}} = Begin, #state{early_attach_requests = EARs} = State) -> State1 = State#state{remote_channel = RemoteChannel}, @@ -264,7 +264,7 @@ begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, S2 end, State1, EARs), - ok = notify_session_begun(State2), + ok = notify_session_begun(Begin, State2), {next_state, mapped, State2#state{early_attach_requests = [], next_incoming_id = NOI, @@ -291,18 +291,17 @@ mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, Incomi outgoing_window = ?UINT_OUTGOING_WINDOW}, ok = send(Flow, State), {keep_state, State#state{incoming_window = IncomingWindow}}; -mapped(cast, #'v1_0.end'{error = Err}, State) -> +mapped(cast, #'v1_0.end'{} = End, State) -> %% We receive the first end frame, reply and terminate. _ = send_end(State), % TODO: send notifications for links? - Reason = reason(Err), - ok = notify_session_ended(State, Reason), + ok = notify_session_ended(End, State), {stop, normal, State}; mapped(cast, #'v1_0.attach'{name = {utf8, Name}, initial_delivery_count = IDC, handle = {uint, InHandle}, role = PeerRoleBool, - max_message_size = MaybeMaxMessageSize}, + max_message_size = MaybeMaxMessageSize} = Attach, #state{links = Links, link_index = LinkIndex, link_handle_index = LHI} = State0) -> @@ -311,7 +310,7 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name}, LinkIndexKey = {OurRole, Name}, #{LinkIndexKey := OutHandle} = LinkIndex, #{OutHandle := Link0} = Links, - ok = notify_link_attached(Link0), + ok = notify_link_attached(Link0, Attach, State0), {DeliveryCount, MaxMessageSize} = case Link0 of @@ -334,13 +333,11 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name}, link_index = maps:remove(LinkIndexKey, LinkIndex), link_handle_index = LHI#{InHandle => OutHandle}}, {keep_state, State}; -mapped(cast, #'v1_0.detach'{handle = {uint, InHandle}, - error = Err}, +mapped(cast, #'v1_0.detach'{handle = {uint, InHandle}} = Detach, #state{links = Links, link_handle_index = LHI} = State0) -> with_link(InHandle, State0, fun (#link{output_handle = OutHandle} = Link, State) -> - Reason = reason(Err), - ok = notify_link_detached(Link, Reason), + ok = notify_link_detached(Link, Detach, State), {keep_state, State#state{links = maps:remove(OutHandle, Links), link_handle_index = maps:remove(InHandle, LHI)}} @@ -552,9 +549,8 @@ mapped(_EvtType, Msg, _State) -> [Msg, 10]), keep_state_and_data. -end_sent(_EvtType, #'v1_0.end'{error = Err}, State) -> - Reason = reason(Err), - ok = notify_session_ended(State, Reason), +end_sent(_EvtType, #'v1_0.end'{} = End, State) -> + ok = notify_session_ended(End, State), {stop, normal, State}; end_sent(_EvtType, _Frame, _State) -> % just drop frames here @@ -989,10 +985,24 @@ maybe_notify_link_credit(#link{role = sender, maybe_notify_link_credit(_Old, _New) -> ok. -notify_link_attached(Link) -> - notify_link(Link, attached). - -notify_link_detached(Link, Reason) -> +notify_link_attached(Link, Perf, #state{connection_config = Cfg}) -> + What = case Cfg of + #{notify_with_performative := true} -> + {attached, Perf}; + _ -> + attached + end, + notify_link(Link, What). + +notify_link_detached(Link, + Perf = #'v1_0.detach'{error = Err}, + #state{connection_config = Cfg}) -> + Reason = case Cfg of + #{notify_with_performative := true} -> + Perf; + _ -> + reason(Err) + end, notify_link(Link, {detached, Reason}). notify_link(#link{notify = Pid, ref = Ref}, What) -> @@ -1000,11 +1010,26 @@ notify_link(#link{notify = Pid, ref = Ref}, What) -> Pid ! Evt, ok. -notify_session_begun(#state{notify = Pid}) -> - Pid ! amqp10_session_event(begun), +notify_session_begun(Perf, #state{notify = Pid, + connection_config = Cfg}) -> + Evt = case Cfg of + #{notify_with_performative := true} -> + {begun, Perf}; + _ -> + begun + end, + Pid ! amqp10_session_event(Evt), ok. -notify_session_ended(#state{notify = Pid}, Reason) -> +notify_session_ended(Perf = #'v1_0.end'{error = Err}, + #state{notify = Pid, + connection_config = Cfg}) -> + Reason = case Cfg of + #{notify_with_performative := true} -> + Perf; + _ -> + reason(Err) + end, Pid ! amqp10_session_event({ended, Reason}), ok. diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 7a64425c7583..27a59d5efef8 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -30,7 +30,7 @@ all() -> groups() -> [ - {rabbitmq, [], shared()}, + {rabbitmq, [], shared() ++ [notify_with_performative]}, {activemq, [], shared()}, {rabbitmq_strict, [], [ basic_roundtrip_tls, @@ -458,6 +458,52 @@ transfer_id_vs_delivery_id(Config) -> ?assertEqual(serial_number:add(amqp10_msg:delivery_id(RcvMsg1), 1), amqp10_msg:delivery_id(RcvMsg2)). +notify_with_performative(Config) -> + Hostname = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + + OpenConf = #{?FUNCTION_NAME => true, + address => Hostname, + port => Port, + sasl => anon}, + + {ok, Connection} = amqp10_client:open_connection(OpenConf), + receive {amqp10_event, {connection, Connection, {opened, #'v1_0.open'{}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, Session1} = amqp10_client:begin_session(Connection), + receive {amqp10_event, {session, Session1, {begun, #'v1_0.begin'{}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, Sender1} = amqp10_client:attach_sender_link(Session1, <<"sender 1">>, <<"/exchanges/amq.fanout">>), + receive {amqp10_event, {link, Sender1, {attached, #'v1_0.attach'{}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:detach_link(Sender1), + receive {amqp10_event, {link, Sender1, {detached, #'v1_0.detach'{}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:end_session(Session1), + receive {amqp10_event, {session, Session1, {ended, #'v1_0.end'{}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + %% Test that the amqp10_client:*_sync functions work. + {ok, Session2} = amqp10_client:begin_session_sync(Connection), + {ok, Sender2} = amqp10_client:attach_sender_link_sync(Session2, <<"sender 2">>, <<"/exchanges/amq.fanout">>), + ok = amqp10_client:detach_link(Sender2), + ok = amqp10_client:end_session(Session2), + flush(), + + ok = amqp10_client:close_connection(Connection), + receive {amqp10_event, {connection, Connection, {closed, #'v1_0.close'{}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end. + % a message is sent before the link attach is guaranteed to % have completed and link credit granted % also queue a link detached immediately after transfer @@ -832,8 +878,10 @@ incoming_heartbeat(Config) -> Hostname = ?config(mock_host, Config), Port = ?config(mock_port, Config), OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> - {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}, - idle_time_out = {uint, 0}}]} + {Ch, [#'v1_0.open'{ + container_id = {utf8, <<"mock">>}, + %% The server doesn't expect any heartbeats from us (client). + idle_time_out = {uint, 0}}]} end, CloseStep = fun({0 = Ch, #'v1_0.close'{error = _TODO}, _Pay}) -> @@ -847,12 +895,18 @@ incoming_heartbeat(Config) -> MockRef = monitor(process, MockPid), ok = mock_server:set_steps(Mock, Steps), CConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config), - idle_time_out => 1000, notify => self()}, + %% If the server does not send any traffic to us (client), we will expect + %% our client to close the connection after 1 second because + %% "the value in idle-time-out SHOULD be half the peer's actual timeout threshold." + idle_time_out => 500, + notify => self()}, {ok, Connection} = amqp10_client:open_connection(CConf), + %% We expect our client to initiate closing the connection + %% and the server to reply with a close frame. receive {amqp10_event, {connection, Connection0, - {closed, {resource_limit_exceeded, <<"remote idle-time-out">>}}}} + {closed, _}}} when Connection0 =:= Connection -> ok after 5000 -> @@ -860,7 +914,6 @@ incoming_heartbeat(Config) -> end, demonitor(MockRef). - %%% HELPERS %%% diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 79f57bdc7e2d..dd641328601b 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -4611,9 +4611,7 @@ idle_time_out_on_client(Config) -> receive {amqp10_event, {connection, Connection, - {closed, - {resource_limit_exceeded, - <<"remote idle-time-out">>}}}} -> ok + {closed, _}}} -> ok after 5000 -> ct:fail({missing_event, ?LINE}) end, diff --git a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl index ce38b0241d10..0fde808151d8 100644 --- a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl +++ b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl @@ -97,6 +97,8 @@ await_attached(Ref) -> receive {amqp10_event, {link, Ref, attached}} -> ok; + {amqp10_event, {link, Ref, {attached, #'v1_0.attach'{}}}} -> + ok; {amqp10_event, {link, Ref, {detached, Err}}} -> {error, Err} after ?TIMEOUT -> @@ -129,6 +131,8 @@ await_detached(Ref) -> receive {amqp10_event, {link, Ref, {detached, normal}}} -> ok; + {amqp10_event, {link, Ref, {detached, #'v1_0.detach'{}}}} -> + ok; {amqp10_event, {link, Ref, {detached, Err}}} -> {error, Err} after ?TIMEOUT -> From 56bc881b2c4ae84a801b1eec978722705063fbce Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 17 Oct 2024 18:39:10 +0200 Subject: [PATCH 2/2] Return queue-prefix in open properties The `queue-prefix` property of the `open` frame is not part of the AMQP spec. However, let's include this property because it is also returned by ActiveMQ and Solace and understood by some client libs, e.g. ActiveMQ NMS.AMQP and Qpid JMS. Note that we do not set `topic-prefix` here because `/exchanges/amq.topic/` is a valid target address prefix but not a valid source address prefix. Closes #12531 --- deps/rabbit/src/rabbit_amqp_reader.erl | 10 +++++- deps/rabbit/test/amqp_client_SUITE.erl | 44 ++++++++++++++++++++++++-- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index bcfa6a1dcc8c..54c9ce514c68 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -136,7 +136,15 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> server_properties() -> Props0 = rabbit_reader:server_properties(amqp_1_0), Props1 = [{{symbol, K}, {utf8, V}} || {K, longstr, V} <- Props0], - Props = [{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}} | Props1], + Props = [ + {{symbol, <<"node">>}, {utf8, atom_to_binary(node())}}, + %% queue-prefix is not part of the AMQP spec. + %% However, we include this property because it is also returned by + %% ActiveMQ and Solace and understood by some client libs, + %% e.g. ActiveMQ NMS.AMQP and Qpid JMS. + %% https://github.com/rabbitmq/rabbitmq-server/issues/12531 + {{symbol, <<"queue-prefix">>}, {utf8, <<"/queues/">>}} + ] ++ Props1, {map, Props}. %%-------------------------------------------------------------------------- diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index dd641328601b..e4d0f2b93db7 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -156,7 +156,8 @@ groups() -> tcp_back_pressure_rabbitmq_internal_flow_quorum_queue, session_max_per_connection, link_max_per_session, - reserved_annotation + reserved_annotation, + open_properties_queue_prefix ]}, {cluster_size_3, [shuffle], @@ -4760,7 +4761,7 @@ dead_letter_headers_exchange(Config) -> #{arguments => #{<<"x-dead-letter-exchange">> => {utf8, <<"amq.headers">>}, <<"x-message-ttl">> => {ulong, 0}}}), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), - ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.headers">>, <<>>, + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.headers">>, <<>>, #{<<"my key">> => {uint, 5}, <<"x-my key">> => {uint, 6}, <<"x-match">> => {utf8, <<"all-with-x">>}}), @@ -5942,6 +5943,45 @@ reserved_annotation(Config) -> end, ok = close_connection_sync(Connection). +%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/12531. +%% We pretend here to be unaware of RabbitMQ's target and source address format. +%% We learn the address format from the properties field in the open frame. +open_properties_queue_prefix(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{notify_with_performative => true}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + QueuePrefix = receive {amqp10_event, {connection, Connection, + {opened, #'v1_0.open'{properties = {map, KVList}}}}} -> + {_, {utf8, QPref}} = proplists:lookup({symbol, <<"queue-prefix">>}, KVList), + QPref + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + + Address = <>, + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, unsettled), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), + wait_for_credit(Sender), + + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag">>, <<"msg">>)), + ok = wait_for_accepted(<<"tag">>), + + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual([<<"msg">>], amqp10_msg:body(Msg)), + ok = amqp10_client:accept_msg(Receiver, Msg), + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + %% internal %%