Skip to content

Commit 9d7ebf3

Browse files
committed
Enforce correct transfer settled flag
For messages published to RabbitMQ, RabbitMQ honors the transfer `settled` field, no matter what value the sender settle mode was set to in the attach frame. Therefore, prior to this commit, a client could send a transfer with `settled=true` even though sender settle mode was set to `unsettled` in the attach frame. This commit enforces that the publisher sets only transfer `settled` fields that are valid with the spec. If sender settle mode is: * `unsettled`, the transfer `settled` flag must be `false`. * `settled`, the transfer `settled` flag must be `true`. * `mixed`, the transfer `settled` flag can be `true` or `false`.
1 parent 1245119 commit 9d7ebf3

File tree

4 files changed

+87
-9
lines changed

4 files changed

+87
-9
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@
6969
%% "The remotely chosen handle is referred to as the input handle." [2.6.2]
7070
-type input_handle() :: link_handle().
7171

72-
-type snd_settle_mode() :: unsettled | settled | mixed.
73-
-type rcv_settle_mode() :: first | second.
74-
7572
-type terminus_durability() :: none | configuration | unsettled_state.
7673

7774
-type target_def() :: #{address => link_address(),

deps/amqp10_common/include/amqp10_types.hrl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,10 @@
1515
-define(AMQP_ROLE_SENDER, false).
1616
-define(AMQP_ROLE_RECEIVER, true).
1717

18+
% [2.8.2]
19+
-type snd_settle_mode() :: unsettled | settled | mixed.
20+
% [2.8.3]
21+
-type rcv_settle_mode() :: first | second.
22+
1823
% [3.2.16]
1924
-define(MESSAGE_FORMAT, 0).

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@
140140
}).
141141

142142
-record(incoming_link, {
143+
snd_settle_mode :: snd_settle_mode(),
143144
%% The exchange is either defined in the ATTACH frame and static for
144145
%% the life time of the link or dynamically provided in each message's
145146
%% "to" field (address v2).
@@ -1232,7 +1233,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
12321233
name = LinkName,
12331234
handle = Handle = ?UINT(HandleInt),
12341235
source = Source,
1235-
snd_settle_mode = SndSettleMode,
1236+
snd_settle_mode = MaybeSndSettleMode,
12361237
target = Target,
12371238
initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt)
12381239
},
@@ -1243,8 +1244,10 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
12431244
user = User}}) ->
12441245
case ensure_target(Target, Vhost, User, PermCache0) of
12451246
{ok, Exchange, RoutingKey, QNameBin, PermCache} ->
1247+
SndSettleMode = snd_settle_mode(MaybeSndSettleMode),
12461248
MaxMessageSize = persistent_term:get(max_message_size),
12471249
IncomingLink = #incoming_link{
1250+
snd_settle_mode = SndSettleMode,
12481251
exchange = Exchange,
12491252
routing_key = RoutingKey,
12501253
queue_name_bin = QNameBin,
@@ -1256,7 +1259,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
12561259
name = LinkName,
12571260
handle = Handle,
12581261
source = Source,
1259-
snd_settle_mode = SndSettleMode,
1262+
snd_settle_mode = MaybeSndSettleMode,
12601263
rcv_settle_mode = ?V_1_0_RECEIVER_SETTLE_MODE_FIRST,
12611264
target = Target,
12621265
%% We are the receiver.
@@ -2304,7 +2307,8 @@ incoming_link_transfer(
23042307
rcv_settle_mode = RcvSettleMode,
23052308
handle = Handle = ?UINT(HandleInt)},
23062309
MsgPart,
2307-
#incoming_link{exchange = LinkExchange,
2310+
#incoming_link{snd_settle_mode = SndSettleMode,
2311+
exchange = LinkExchange,
23082312
routing_key = LinkRKey,
23092313
max_message_size = MaxMessageSize,
23102314
delivery_count = DeliveryCount0,
@@ -2335,6 +2339,7 @@ incoming_link_transfer(
23352339
ok = validate_multi_transfer_settled(MaybeSettled, FirstSettled),
23362340
{MsgBin0, FirstDeliveryId, FirstSettled}
23372341
end,
2342+
validate_transfer_snd_settle_mode(SndSettleMode, Settled),
23382343
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
23392344
PayloadSize = iolist_size(PayloadBin),
23402345
validate_message_size(PayloadSize, MaxMessageSize),
@@ -2914,6 +2919,15 @@ credit_reply_timeout(QType, QName) ->
29142919
default(undefined, Default) -> Default;
29152920
default(Thing, _Default) -> Thing.
29162921

2922+
snd_settle_mode({ubyte, Val}) ->
2923+
case Val of
2924+
0 -> unsettled;
2925+
1 -> settled;
2926+
2 -> mixed
2927+
end;
2928+
snd_settle_mode(undefined) ->
2929+
mixed.
2930+
29172931
transfer_frames(Transfer, Sections, unlimited) ->
29182932
[[Transfer, Sections]];
29192933
transfer_frames(Transfer, Sections, MaxFrameSize) ->
@@ -3059,6 +3073,22 @@ validate_multi_transfer_settled(Other, First)
30593073
"(interpreted) field 'settled' on first transfer (~p)",
30603074
[Other, First]).
30613075

3076+
validate_transfer_snd_settle_mode(mixed, _Settled) ->
3077+
ok;
3078+
validate_transfer_snd_settle_mode(unsettled, false) ->
3079+
%% "If the negotiated value for snd-settle-mode at attachment is unsettled,
3080+
%% then this field MUST be false (or unset) on every transfer frame for a delivery" [2.7.5]
3081+
ok;
3082+
validate_transfer_snd_settle_mode(settled, true) ->
3083+
%% "If the negotiated value for snd-settle-mode at attachment is settled,
3084+
%% then this field MUST be true on at least one transfer frame for a delivery" [2.7.5]
3085+
ok;
3086+
validate_transfer_snd_settle_mode(SndSettleMode, Settled) ->
3087+
protocol_error(
3088+
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
3089+
"sender settle mode is '~s' but transfer settled flag is interpreted as being '~s'",
3090+
[SndSettleMode, Settled]).
3091+
30623092
%% "If the message is being sent settled by the sender,
30633093
%% the value of this field [rcv-settle-mode] is ignored." [2.7.5]
30643094
validate_transfer_rcv_settle_mode(?V_1_0_RECEIVER_SETTLE_MODE_SECOND, _Settled = false) ->

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ groups() ->
4444
sender_settle_mode_unsettled,
4545
sender_settle_mode_unsettled_fanout,
4646
sender_settle_mode_mixed,
47+
invalid_transfer_settled_flag,
4748
quorum_queue_rejects,
4849
receiver_settle_mode_first,
4950
publishing_to_non_existing_queue_should_settle_with_released,
@@ -757,6 +758,51 @@ sender_settle_mode_mixed(Config) ->
757758
ok = end_session_sync(Session),
758759
ok = amqp10_client:close_connection(Connection).
759760

761+
invalid_transfer_settled_flag(Config) ->
762+
OpnConf = connection_config(Config),
763+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
764+
{ok, Session1} = amqp10_client:begin_session(Connection),
765+
{ok, Session2} = amqp10_client:begin_session(Connection),
766+
TargetAddr = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
767+
{ok, SenderSettled} = amqp10_client:attach_sender_link_sync(
768+
Session1, <<"link 1">>, TargetAddr, settled),
769+
{ok, SenderUnsettled} = amqp10_client:attach_sender_link_sync(
770+
Session2, <<"link 2">>, TargetAddr, unsettled),
771+
ok = wait_for_credit(SenderSettled),
772+
ok = wait_for_credit(SenderUnsettled),
773+
774+
ok = amqp10_client:send_msg(SenderSettled, amqp10_msg:new(<<"tag1">>, <<"m1">>, false)),
775+
receive
776+
{amqp10_event,
777+
{session, Session1,
778+
{ended,
779+
#'v1_0.error'{
780+
condition = ?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
781+
description = {utf8, Description1}}}}} ->
782+
?assertEqual(
783+
<<"sender settle mode is 'settled' but transfer settled flag is interpreted as being 'false'">>,
784+
Description1)
785+
after 5000 -> flush(missing_ended),
786+
ct:fail({missing_event, ?LINE})
787+
end,
788+
789+
ok = amqp10_client:send_msg(SenderUnsettled, amqp10_msg:new(<<"tag2">>, <<"m2">>, true)),
790+
receive
791+
{amqp10_event,
792+
{session, Session2,
793+
{ended,
794+
#'v1_0.error'{
795+
condition = ?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
796+
description = {utf8, Description2}}}}} ->
797+
?assertEqual(
798+
<<"sender settle mode is 'unsettled' but transfer settled flag is interpreted as being 'true'">>,
799+
Description2)
800+
after 5000 -> flush(missing_ended),
801+
ct:fail({missing_event, ?LINE})
802+
end,
803+
804+
ok = amqp10_client:close_connection(Connection).
805+
760806
quorum_queue_rejects(Config) ->
761807
{Connection, Session, LinkPair} = init(Config),
762808
QName = atom_to_binary(?FUNCTION_NAME),
@@ -4761,7 +4807,7 @@ dead_letter_reject_message_order(QType, Config) ->
47614807
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
47624808

47634809
{ok, Sender} = amqp10_client:attach_sender_link(
4764-
Session, <<"sender">>, rabbitmq_amqp_address:queue(QName1), unsettled),
4810+
Session, <<"sender">>, rabbitmq_amqp_address:queue(QName1), settled),
47654811
wait_for_credit(Sender),
47664812
{ok, Receiver1} = amqp10_client:attach_receiver_link(
47674813
Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), unsettled),
@@ -4852,7 +4898,7 @@ dead_letter_reject_many_message_order(QType, Config) ->
48524898
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
48534899

48544900
{ok, Sender} = amqp10_client:attach_sender_link(
4855-
Session, <<"sender">>, rabbitmq_amqp_address:queue(QName1), unsettled),
4901+
Session, <<"sender">>, rabbitmq_amqp_address:queue(QName1), settled),
48564902
wait_for_credit(Sender),
48574903
{ok, Receiver1} = amqp10_client:attach_receiver_link(
48584904
Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), unsettled),
@@ -5141,7 +5187,7 @@ footer_checksum(FooterOpt, Config) ->
51415187
SndAttachArgs = #{name => <<"my sender">>,
51425188
role => {sender, #{address => Addr,
51435189
durable => configuration}},
5144-
snd_settle_mode => settled,
5190+
snd_settle_mode => mixed,
51455191
rcv_settle_mode => first,
51465192
footer_opt => FooterOpt},
51475193
{ok, Receiver} = amqp10_client:attach_link(Session, RecvAttachArgs),

0 commit comments

Comments
 (0)