From e6818f0040bb09cafe33da50d68909205408460c Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 11 Oct 2024 09:35:46 +0200 Subject: [PATCH 1/4] Track requeue history Support tracking the requeue history as described in https://github.com/rabbitmq/rabbitmq-website/pull/2095 This commit: 1. adds a test case tracing the requeue history via AMQP 1.0 using the modified outcome and 2. fixes bugs in the broker which crashed if a modified message annotation value is an AMQP 1.0 list, map, or array. Complex modified annotation values (list, map, array) are stored as tagged values from now on. This means AMQP 0.9.1 consumers will not receive modified annotations of type list, map, or array (which is okay). --- .../src/amqp10_client_session.erl | 8 +- deps/rabbit/src/mc.erl | 2 +- deps/rabbit/src/mc_util.erl | 5 +- deps/rabbit/src/rabbit_amqp_session.erl | 13 ++- deps/rabbit/test/amqp_client_SUITE.erl | 85 +++++++++++++------ 5 files changed, 78 insertions(+), 35 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 981e291a3853..5be222c8b499 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -1178,14 +1178,16 @@ wrap_map_value(true) -> {boolean, true}; wrap_map_value(false) -> {boolean, false}; -wrap_map_value(V) when is_integer(V) -> - {uint, V}; +wrap_map_value(V) when is_integer(V) andalso V >= 0 -> + uint(V); wrap_map_value(V) when is_binary(V) -> utf8(V); wrap_map_value(V) when is_list(V) -> utf8(list_to_binary(V)); wrap_map_value(V) when is_atom(V) -> - utf8(atom_to_list(V)). + utf8(atom_to_list(V)); +wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) -> + TaggedValue. utf8(V) -> amqp10_client_types:utf8(V). diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 9c23ac13daf8..3352f26185de 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -44,7 +44,7 @@ -type str() :: atom() | string() | binary(). -type internal_ann_key() :: atom(). -type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt- --type x_ann_value() :: str() | integer() | float() | [x_ann_value()]. +-type x_ann_value() :: str() | integer() | float() | TaggedValue :: tuple() | [x_ann_value()]. -type protocol() :: module(). -type annotations() :: #{internal_ann_key() => term(), x_ann_key() => x_ann_value()}. diff --git a/deps/rabbit/src/mc_util.erl b/deps/rabbit/src/mc_util.erl index 669dace41f45..1f20d15699db 100644 --- a/deps/rabbit/src/mc_util.erl +++ b/deps/rabbit/src/mc_util.erl @@ -52,9 +52,8 @@ infer_type(V) when is_integer(V) -> {long, V}; infer_type(V) when is_boolean(V) -> {boolean, V}; -infer_type({T, _} = V) when is_atom(T) -> - %% looks like a pre-tagged type - V. +infer_type(TaggedValue) when is_atom(element(1, TaggedValue)) -> + TaggedValue. utf8_string_is_ascii(UTF8String) -> utf8_scan(UTF8String, fun(Char) -> Char >= 0 andalso Char < 128 end). diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 3be9ea2b00fc..71759d4e84cc 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -1938,7 +1938,7 @@ settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed, Anns1 = lists:map( %% "all symbolic keys except those beginning with "x-" are reserved." [3.2.10] fun({{symbol, <<"x-", _/binary>> = K}, V}) -> - {K, unwrap(V)} + {K, unwrap_simple_type(V)} end, KVList), maps:from_list(Anns1) end, @@ -3624,7 +3624,14 @@ format_status( topic_permission_cache => TopicPermissionCache}, maps:update(state, State, Status). -unwrap({_Tag, V}) -> + +unwrap_simple_type(V = {list, _}) -> + V; +unwrap_simple_type(V = {map, _}) -> + V; +unwrap_simple_type(V = {array, _, _}) -> + V; +unwrap_simple_type({_SimpleType, V}) -> V; -unwrap(V) -> +unwrap_simple_type(V) -> V. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index e8c64690a012..15e508962ca4 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -501,61 +501,96 @@ modified_quorum_queue(Config) -> ok = amqp10_client:send_msg(Sender, Msg2), ok = amqp10_client:detach_link(Sender), - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), + Receiver1Name = <<"receiver 1">>, + Receiver2Name = <<"receiver 2">>, + {ok, Receiver1} = amqp10_client:attach_receiver_link(Session, Receiver1Name, Address, unsettled), + {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, Receiver2Name, Address, unsettled), - {ok, M1} = amqp10_client:get_msg(Receiver), + {ok, M1} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m1">>], amqp10_msg:body(M1)), ?assertMatch(#{delivery_count := 0, first_acquirer := true}, amqp10_msg:headers(M1)), - ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}), + ok = amqp10_client:settle_msg(Receiver1, M1, {modified, false, true, #{}}), - {ok, M2a} = amqp10_client:get_msg(Receiver), + {ok, M2a} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2a)), ?assertMatch(#{delivery_count := 0, first_acquirer := true}, amqp10_msg:headers(M2a)), - ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}), + ok = amqp10_client:settle_msg(Receiver1, M2a, {modified, false, false, #{}}), - {ok, M2b} = amqp10_client:get_msg(Receiver), + {ok, M2b} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2b)), ?assertMatch(#{delivery_count := 0, first_acquirer := false}, amqp10_msg:headers(M2b)), - ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}), + ok = amqp10_client:settle_msg(Receiver1, M2b, {modified, true, false, #{}}), - {ok, M2c} = amqp10_client:get_msg(Receiver), + {ok, M2c} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2c)), ?assertMatch(#{delivery_count := 1, first_acquirer := false}, amqp10_msg:headers(M2c)), - ok = amqp10_client:settle_msg(Receiver, M2c, - {modified, true, false, - #{<<"x-opt-key">> => <<"val 1">>}}), - - {ok, M2d} = amqp10_client:get_msg(Receiver), + ok = amqp10_client:settle_msg( + Receiver1, M2c, + {modified, true, false, + %% Test that a history of requeue events can be tracked as described in + %% https://rabbitmq.com/blog/2024/10/11/modified-outcome + #{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver1Name}]}, + <<"x-opt-requeue-reason">> => {list, [{utf8, <<"reason 1">>}]}, + <<"x-opt-my-map">> => {map, [ + {{utf8, <<"k1">>}, {byte, -1}}, + {{utf8, <<"k2">>}, {ulong, 2}} + ]}}}), + + {ok, M2d} = amqp10_client:get_msg(Receiver2), ?assertEqual([<<"m2">>], amqp10_msg:body(M2d)), ?assertMatch(#{delivery_count := 2, first_acquirer := false}, amqp10_msg:headers(M2d)), - ?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)), - ok = amqp10_client:settle_msg(Receiver, M2d, - {modified, false, false, - #{<<"x-opt-key">> => <<"val 2">>, - <<"x-other">> => 99}}), - - {ok, M2e} = amqp10_client:get_msg(Receiver), + #{<<"x-opt-requeued-by">> := {array, utf8, L0}, + <<"x-opt-requeue-reason">> := L1, + <<"x-opt-my-map">> := L2} = amqp10_msg:message_annotations(M2d), + ok = amqp10_client:settle_msg( + Receiver1, M2d, + {modified, false, false, + #{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver2Name} | L0]}, + <<"x-opt-requeue-reason">> => {list, [{symbol, <<"reason 2">>} | L1]}, + <<"x-opt-my-map">> => {map, L2 ++ [{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}]}, + <<"x-other">> => 99}}), + + {ok, M2e} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2e)), ?assertMatch(#{delivery_count := 2, first_acquirer := false}, amqp10_msg:headers(M2e)), - ?assertMatch(#{<<"x-opt-key">> := <<"val 2">>, + ?assertMatch(#{<<"x-opt-requeued-by">> := {array, utf8, [{utf8, Receiver2Name}, {utf8, Receiver1Name}]}, + <<"x-opt-requeue-reason">> := [{symbol, <<"reason 2">>}, {utf8, <<"reason 1">>}], + <<"x-opt-my-map">> := [ + {{utf8, <<"k1">>}, {byte, -1}}, + {{utf8, <<"k2">>}, {ulong, 2}}, + {{symbol, <<"k3">>}, {symbol, <<"val 3">>}} + ], <<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)), - ok = amqp10_client:settle_msg(Receiver, M2e, modified), + ok = amqp10_client:settle_msg(Receiver1, M2e, modified), - ok = amqp10_client:detach_link(Receiver), - ?assertMatch({ok, #{message_count := 1}}, - rabbitmq_amqp_client:delete_queue(LinkPair, QName)), + %% Test that we can consume via AMQP 0.9.1 + Ch = rabbit_ct_client_helpers:open_channel(Config), + {#'basic.get_ok'{}, + #amqp_msg{payload = <<"m2">>, + props = #'P_basic'{headers = Headers}} + } = amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true}), + %% We expect to receive only modified AMQP 1.0 message annotations that are of simple types + %% (i.e. excluding list, map, array). + ?assertEqual({value, {<<"x-other">>, long, 99}}, + lists:keysearch(<<"x-other">>, 1, Headers)), + ?assertEqual({value, {<<"x-delivery-count">>, long, 5}}, + lists:keysearch(<<"x-delivery-count">>, 1, Headers)), + ok = rabbit_ct_client_helpers:close_channel(Ch), + + ok = amqp10_client:detach_link(Receiver1), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). From 855a32ab28ea7c33813f1e845f85bbb7f95b26d4 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 11 Oct 2024 12:12:09 +0200 Subject: [PATCH 2/4] Add alternate exchange test assertion Test the use case described in https://github.com/rabbitmq/rabbitmq-website/pull/2095 --- deps/rabbit/test/amqp_client_SUITE.erl | 41 ++++++++++++++++--- .../src/rabbitmq_amqp_client.erl | 6 +-- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 15e508962ca4..64226a6e4b33 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -596,19 +596,32 @@ modified_quorum_queue(Config) -> ok = amqp10_client:close_connection(Connection). %% Test that a message can be routed based on the message-annotations -%% provided in the modified outcome. +%% provided in the modified outcome as described in +%% https://rabbitmq.com/blog/2024/10/11/modified-outcome modified_dead_letter_headers_exchange(Config) -> {Connection, Session, LinkPair} = init(Config), + HeadersXName = <<"my headers exchange">>, + AlternateXName = <<"my alternate exchange">>, SourceQName = <<"source quorum queue">>, AppleQName = <<"dead letter classic queue receiving apples">>, BananaQName = <<"dead letter quorum queue receiving bananas">>, + TrashQName = <<"trash queue receiving anything that doesn't match">>, + + ok = rabbitmq_amqp_client:declare_exchange( + LinkPair, + HeadersXName, + #{type => <<"headers">>, + arguments => #{<<"alternate-exchange">> => {utf8, AlternateXName}}}), + + ok = rabbitmq_amqp_client:declare_exchange(LinkPair, AlternateXName, #{type => <<"fanout">>}), + {ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue( LinkPair, SourceQName, #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, <<"x-overflow">> => {utf8, <<"reject-publish">>}, <<"x-dead-letter-strategy">> => {utf8, <<"at-least-once">>}, - <<"x-dead-letter-exchange">> => {utf8, <<"amq.headers">>}}}), + <<"x-dead-letter-exchange">> => {utf8, HeadersXName}}}), {ok, #{type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue( LinkPair, AppleQName, @@ -617,14 +630,16 @@ modified_dead_letter_headers_exchange(Config) -> LinkPair, BananaQName, #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TrashQName, #{}), ok = rabbitmq_amqp_client:bind_queue( - LinkPair, AppleQName, <<"amq.headers">>, <<>>, + LinkPair, AppleQName, HeadersXName, <<>>, #{<<"x-fruit">> => {utf8, <<"apple">>}, <<"x-match">> => {utf8, <<"any-with-x">>}}), ok = rabbitmq_amqp_client:bind_queue( - LinkPair, BananaQName, <<"amq.headers">>, <<>>, + LinkPair, BananaQName, HeadersXName, <<>>, #{<<"x-fruit">> => {utf8, <<"banana">>}, <<"x-match">> => {utf8, <<"any-with-x">>}}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, TrashQName, AlternateXName, <<>>, #{}), {ok, Sender} = amqp10_client:attach_sender_link( Session, <<"test-sender">>, rabbitmq_amqp_address:queue(SourceQName)), @@ -635,6 +650,8 @@ modified_dead_letter_headers_exchange(Config) -> Session, <<"receiver apple">>, rabbitmq_amqp_address:queue(AppleQName), unsettled), {ok, ReceiverBanana} = amqp10_client:attach_receiver_link( Session, <<"receiver banana">>, rabbitmq_amqp_address:queue(BananaQName), unsettled), + {ok, ReceiverTrash} = amqp10_client:attach_receiver_link( + Session, <<"receiver trash">>, rabbitmq_amqp_address:queue(TrashQName), unsettled), ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)), ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, <<"m2">>)), @@ -644,7 +661,8 @@ modified_dead_letter_headers_exchange(Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( #{"x-fruit" => <<"apple">>}, amqp10_msg:new(<<"t4">>, <<"m4">>))), - ok = wait_for_accepts(3), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t5">>, <<"m5">>)), + ok = wait_for_accepts(5), {ok, Msg1} = amqp10_client:get_msg(Receiver), ?assertMatch(#{delivery_count := 0, @@ -685,6 +703,16 @@ modified_dead_letter_headers_exchange(Config) -> amqp10_msg:headers(MsgBanana2)), ok = amqp10_client:accept_msg(ReceiverBanana, MsgBanana2), + {ok, Msg5} = amqp10_client:get_msg(Receiver), + %% This message should be routed via the alternate exchange to the trash queue. + ok = amqp10_client:settle_msg(Receiver, Msg5, {modified, false, true, #{<<"x-fruit">> => <<"strawberry">>}}), + {ok, MsgTrash} = amqp10_client:get_msg(ReceiverTrash), + ?assertEqual([<<"m5">>], amqp10_msg:body(MsgTrash)), + ?assertMatch(#{delivery_count := 0, + first_acquirer := false}, + amqp10_msg:headers(MsgTrash)), + ok = amqp10_client:accept_msg(ReceiverTrash, MsgTrash), + ok = detach_link_sync(Sender), ok = detach_link_sync(Receiver), ok = detach_link_sync(ReceiverApple), @@ -692,6 +720,9 @@ modified_dead_letter_headers_exchange(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, SourceQName), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, AppleQName), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, BananaQName), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, TrashQName), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, HeadersXName), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, AlternateXName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). diff --git a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl index fc5da6c7b4e4..ce38b0241d10 100644 --- a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl +++ b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl @@ -48,14 +48,12 @@ replicas => [binary()], leader => binary()}. --type queue_properties() :: #{name := binary(), - durable => boolean(), +-type queue_properties() :: #{durable => boolean(), exclusive => boolean(), auto_delete => boolean(), arguments => arguments()}. --type exchange_properties() :: #{name := binary(), - type => binary(), +-type exchange_properties() :: #{type => binary(), durable => boolean(), auto_delete => boolean(), internal => boolean(), From 2e90619a6285262dad81006ff659dda866b43eba Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 11 Oct 2024 12:57:33 +0200 Subject: [PATCH 3/4] Add custom dead letter history test Test the use case described in https://github.com/rabbitmq/rabbitmq-website/pull/2095: > Rather than relying solely on RabbitMQ's built-in dead lettering tracking via x-opt-deaths, consumers can customise dead lettering event tracking. --- deps/rabbit/test/amqp_client_SUITE.erl | 83 +++++++++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 64226a6e4b33..cd3a484e04ee 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -129,6 +129,7 @@ groups() -> modified_classic_queue, modified_quorum_queue, modified_dead_letter_headers_exchange, + modified_dead_letter_history, dead_letter_headers_exchange, dead_letter_reject, dead_letter_reject_message_order_classic_queue, @@ -264,7 +265,8 @@ init_per_testcase(T, Config) end; init_per_testcase(T, Config) when T =:= modified_quorum_queue orelse - T =:= modified_dead_letter_headers_exchange -> + T =:= modified_dead_letter_headers_exchange orelse + T =:= modified_dead_letter_history -> case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of true -> rabbit_ct_helpers:testcase_started(Config, T); @@ -727,6 +729,85 @@ modified_dead_letter_headers_exchange(Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). +%% Test that custom dead lettering event tracking works as described in +%% https://rabbitmq.com/blog/2024/10/11/modified-outcome +modified_dead_letter_history(Config) -> + {Connection, Session, LinkPair} = init(Config), + Q1 = <<"qq 1">>, + Q2 = <<"qq 2">>, + + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair, Q1, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>}, + <<"x-dead-letter-exchange">> => {utf8, <<"amq.fanout">>}}}), + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair, Q2, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>}, + <<"x-dead-letter-exchange">> => {utf8, <<>>}}}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, Q2, <<"amq.fanout">>, <<>>, #{}), + + {ok, Sender} = amqp10_client:attach_sender_link( + Session, <<"test-sender">>, rabbitmq_amqp_address:queue(Q1)), + wait_for_credit(Sender), + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(Q1), unsettled), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(Q2), unsettled), + + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t">>, <<"m">>)), + ok = wait_for_accepts(1), + ok = detach_link_sync(Sender), + + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + ?assertMatch(#{delivery_count := 0, + first_acquirer := true}, + amqp10_msg:headers(Msg1)), + ok = amqp10_client:settle_msg( + Receiver1, Msg1, + {modified, true, true, + #{<<"x-opt-history-list">> => {list, [{utf8, <<"l1">>}]}, + <<"x-opt-history-map">> => {map, [{{symbol, <<"k1">>}, {byte, -1}}]}, + <<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a1">>}]}} + }), + + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertMatch(#{delivery_count := 1, + first_acquirer := false}, + amqp10_msg:headers(Msg2)), + #{<<"x-opt-history-list">> := L1, + <<"x-opt-history-map">> := L2, + <<"x-opt-history-array">> := {array, utf8, L0} + } = amqp10_msg:message_annotations(Msg2), + ok = amqp10_client:settle_msg( + Receiver2, Msg2, + {modified, true, true, + #{<<"x-opt-history-list">> => {list, [{int, -99} | L1]}, + <<"x-opt-history-map">> => {map, [{{symbol, <<"k2">>}, {symbol, <<"v2">>}} | L2]}, + <<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a2">>} | L0]}, + <<"x-other">> => 99}}), + + {ok, Msg3} = amqp10_client:get_msg(Receiver1), + ?assertEqual([<<"m">>], amqp10_msg:body(Msg3)), + ?assertMatch(#{delivery_count := 2, + first_acquirer := false}, + amqp10_msg:headers(Msg3)), + ?assertMatch(#{<<"x-opt-history-array">> := {array, utf8, [{utf8, <<"a2">>}, {utf8, <<"a1">>}]}, + <<"x-opt-history-list">> := [{int, -99}, {utf8, <<"l1">>}], + <<"x-opt-history-map">> := [{{symbol, <<"k2">>}, {symbol, <<"v2">>}}, + {{symbol, <<"k1">>}, {byte, -1}}], + <<"x-other">> := 99}, amqp10_msg:message_annotations(Msg3)), + ok = amqp10_client:accept_msg(Receiver1, Msg3), + + ok = detach_link_sync(Receiver1), + ok = detach_link_sync(Receiver2), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + %% Tests that confirmations are returned correctly %% when sending many messages async to a quorum queue. sender_settle_mode_unsettled(Config) -> From b1064fddba5de487577f62a6ee807482e7830ab7 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 11 Oct 2024 14:43:31 +0200 Subject: [PATCH 4/4] Support negative integers in modified annotations --- deps/amqp10_client/src/amqp10_client_session.erl | 9 +++++++-- deps/rabbit/test/amqp_client_SUITE.erl | 4 ++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 5be222c8b499..911886ce4143 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -1178,8 +1178,13 @@ wrap_map_value(true) -> {boolean, true}; wrap_map_value(false) -> {boolean, false}; -wrap_map_value(V) when is_integer(V) andalso V >= 0 -> - uint(V); +wrap_map_value(V) when is_integer(V) -> + case V < 0 of + true -> + {int, V}; + false -> + uint(V) + end; wrap_map_value(V) when is_binary(V) -> utf8(V); wrap_map_value(V) when is_list(V) -> diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index cd3a484e04ee..8af01fe7ff36 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -786,7 +786,7 @@ modified_dead_letter_history(Config) -> #{<<"x-opt-history-list">> => {list, [{int, -99} | L1]}, <<"x-opt-history-map">> => {map, [{{symbol, <<"k2">>}, {symbol, <<"v2">>}} | L2]}, <<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a2">>} | L0]}, - <<"x-other">> => 99}}), + <<"x-other">> => -99}}), {ok, Msg3} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m">>], amqp10_msg:body(Msg3)), @@ -797,7 +797,7 @@ modified_dead_letter_history(Config) -> <<"x-opt-history-list">> := [{int, -99}, {utf8, <<"l1">>}], <<"x-opt-history-map">> := [{{symbol, <<"k2">>}, {symbol, <<"v2">>}}, {{symbol, <<"k1">>}, {byte, -1}}], - <<"x-other">> := 99}, amqp10_msg:message_annotations(Msg3)), + <<"x-other">> := -99}, amqp10_msg:message_annotations(Msg3)), ok = amqp10_client:accept_msg(Receiver1, Msg3), ok = detach_link_sync(Receiver1),