diff --git a/deps/amqp10_common/src/amqp10_binary_parser.erl b/deps/amqp10_common/src/amqp10_binary_parser.erl index 13f616ff57c3..525297dfd2da 100644 --- a/deps/amqp10_common/src/amqp10_binary_parser.erl +++ b/deps/amqp10_common/src/amqp10_binary_parser.erl @@ -191,7 +191,8 @@ mapify([Key, Value | Rest]) -> %% We re-use the match context avoiding creation of sub binaries. -spec parse_many(binary(), opts()) -> [amqp10_binary_generator:amqp10_type() | - {{pos, non_neg_integer()}, amqp10_binary_generator:amqp10_type() | body}]. + {{pos, non_neg_integer()}, + amqp10_binary_generator:amqp10_type() | {body, pos_integer()}}]. parse_many(Binary, Opts) -> OptionServerMode = lists:member(server_mode, Opts), pm(Binary, OptionServerMode, 0). diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index 00a696f7cb71..6cf1de47c005 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -636,7 +636,7 @@ msg_body_encoded([{{pos, Pos}, #'v1_0.application_properties'{content = APC}} | application_properties = APC, bare_and_footer_application_properties_pos = Pos - BarePos }); -%% Base case: we assert the last part contains the mandatory body: +%% Base case: The last part must contain the mandatory body: msg_body_encoded([{{pos, Pos}, {body, Code}}], Payload, Msg) when is_binary(Payload) -> %% AMQP sender omitted properties and application-properties sections. @@ -648,7 +648,9 @@ msg_body_encoded([{{pos, Pos}, {body, Code}}], Payload, Msg) msg_body_encoded([{{pos, Pos}, {body, Code}}], BarePos, Msg) when is_integer(BarePos) -> Msg#msg_body_encoded{bare_and_footer_body_pos = Pos - BarePos, - body_code = Code}. + body_code = Code}; +msg_body_encoded([], _, #msg_body_encoded{body_code = uninit}) -> + throw(missing_amqp_message_body). %% We extract the binary part of the payload exactly once when the bare message starts. binary_part_bare_and_footer(Payload, Start) -> diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 90853185a437..7eb1fe71f5a7 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -2524,7 +2524,11 @@ incoming_link_transfer( validate_message_size(PayloadSize, MaxMessageSize), rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize), messages_received(Settled), - Mc0 = mc:init(mc_amqp, PayloadBin, #{}), + Mc0 = try mc:init(mc_amqp, PayloadBin, #{}) + catch missing_amqp_message_body -> + link_error(?V_1_0_AMQP_ERROR_DECODE_ERROR, + "message has no body", []) + end, case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of {ok, X, RoutingKeys, Mc1, PermCache} -> check_user_id(Mc1, User), diff --git a/deps/rabbit/test/amqp_dotnet_SUITE.erl b/deps/rabbit/test/amqp_dotnet_SUITE.erl index 32ab6039269e..d539f2df4101 100644 --- a/deps/rabbit/test/amqp_dotnet_SUITE.erl +++ b/deps/rabbit/test/amqp_dotnet_SUITE.erl @@ -42,7 +42,8 @@ groups() -> auth_failure, access_failure_not_allowed, access_failure_send, - streams + streams, + message_without_body ] }]. @@ -215,6 +216,9 @@ streams(Config) -> declare_queue(Config, ?FUNCTION_NAME, "stream"), run(?FUNCTION_NAME, Config). +message_without_body(Config) -> + run(?FUNCTION_NAME, Config). + %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- diff --git a/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs b/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs index f71431e97e41..72fa811371b5 100755 --- a/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs +++ b/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs @@ -574,6 +574,37 @@ module Test = printfn "Exception %A" ex () + // Test sending a message without a body section. + // Although supported by this lib and Azure Service Bus, we expect RabbitMQ + // to return a descriptive error since a body is mandatory according to the AMQP spec. + let messageWithoutBody uri = + use ac = connectAnon uri + let sender = SenderLink(ac.Session, "sender", "/exchange/amq.fanout") + + use detachEvent = new AutoResetEvent(false) + let mutable linkError : Error = null + + sender.add_Closed(new ClosedCallback(fun _ err -> + linkError <- err + detachEvent.Set() |> ignore)) + + // Create a message with Properties but no body section + let message = new Message(Properties = new Properties()) + + try + sender.Send(message, TimeSpan.FromSeconds 5.) + failwith "Expected exception not received" + with + | :? Amqp.AmqpException as ex -> + printfn "Got expected exception: %A" ex + + assertTrue (detachEvent.WaitOne(9000)) + assertNotNull linkError + assertEqual (Symbol "amqp:decode-error") linkError.Condition + assertNotNull linkError.Description + assertTrue (linkError.Description.Contains("message has no body")) + assertTrue (not ac.Session.IsClosed) + let (|AsLower|) (s: string) = match s with | null -> null @@ -645,6 +676,9 @@ let main argv = | [AsLower "no_routes_is_released"; uri] -> no_routes_is_released uri 0 + | [AsLower "message_without_body"; uri] -> + messageWithoutBody uri + 0 | _ -> printfn "test %A not found. usage: " argv 1 diff --git a/release-notes/4.0.1.md b/release-notes/4.0.1.md index f403b012de00..f45ea36caeef 100644 --- a/release-notes/4.0.1.md +++ b/release-notes/4.0.1.md @@ -136,6 +136,8 @@ Starting with RabbitMQ 4.0, RabbitMQ strictly validates that [non-reserved annotation keys](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-annotations). As a result, clients can only send symbolic keys that begin with `x-`. +Starting with RabbitMQ 4.0, RabbitMQ also strictly validates that an AMQP message contains the mandatory body. + ### MQTT RabbitMQ 3.13 [rabbitmq.conf](https://www.rabbitmq.com/docs/configure#config-file) settings `mqtt.default_user`, `mqtt.default_password`,