Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/amqp10_common/src/amqp10_binary_parser.erl
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ mapify([Key, Value | Rest]) ->
%% We re-use the match context avoiding creation of sub binaries.
-spec parse_many(binary(), opts()) ->
[amqp10_binary_generator:amqp10_type() |
{{pos, non_neg_integer()}, amqp10_binary_generator:amqp10_type() | body}].
{{pos, non_neg_integer()},
amqp10_binary_generator:amqp10_type() | {body, pos_integer()}}].
parse_many(Binary, Opts) ->
OptionServerMode = lists:member(server_mode, Opts),
pm(Binary, OptionServerMode, 0).
Expand Down
6 changes: 4 additions & 2 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ msg_body_encoded([{{pos, Pos}, #'v1_0.application_properties'{content = APC}} |
application_properties = APC,
bare_and_footer_application_properties_pos = Pos - BarePos
});
%% Base case: we assert the last part contains the mandatory body:
%% Base case: The last part must contain the mandatory body:
msg_body_encoded([{{pos, Pos}, {body, Code}}], Payload, Msg)
when is_binary(Payload) ->
%% AMQP sender omitted properties and application-properties sections.
Expand All @@ -648,7 +648,9 @@ msg_body_encoded([{{pos, Pos}, {body, Code}}], Payload, Msg)
msg_body_encoded([{{pos, Pos}, {body, Code}}], BarePos, Msg)
when is_integer(BarePos) ->
Msg#msg_body_encoded{bare_and_footer_body_pos = Pos - BarePos,
body_code = Code}.
body_code = Code};
msg_body_encoded([], _, #msg_body_encoded{body_code = uninit}) ->
throw(missing_amqp_message_body).

%% We extract the binary part of the payload exactly once when the bare message starts.
binary_part_bare_and_footer(Payload, Start) ->
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2524,7 +2524,11 @@ incoming_link_transfer(
validate_message_size(PayloadSize, MaxMessageSize),
rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize),
messages_received(Settled),
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
Mc0 = try mc:init(mc_amqp, PayloadBin, #{})
catch missing_amqp_message_body ->
link_error(?V_1_0_AMQP_ERROR_DECODE_ERROR,
"message has no body", [])
end,
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
{ok, X, RoutingKeys, Mc1, PermCache} ->
check_user_id(Mc1, User),
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbit/test/amqp_dotnet_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ groups() ->
auth_failure,
access_failure_not_allowed,
access_failure_send,
streams
streams,
message_without_body
]
}].

Expand Down Expand Up @@ -215,6 +216,9 @@ streams(Config) ->
declare_queue(Config, ?FUNCTION_NAME, "stream"),
run(?FUNCTION_NAME, Config).

message_without_body(Config) ->
run(?FUNCTION_NAME, Config).

%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------
Expand Down
34 changes: 34 additions & 0 deletions deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,37 @@ module Test =
printfn "Exception %A" ex
()

// Test sending a message without a body section.
// Although supported by this lib and Azure Service Bus, we expect RabbitMQ
// to return a descriptive error since a body is mandatory according to the AMQP spec.
let messageWithoutBody uri =
use ac = connectAnon uri
let sender = SenderLink(ac.Session, "sender", "/exchange/amq.fanout")

use detachEvent = new AutoResetEvent(false)
let mutable linkError : Error = null

sender.add_Closed(new ClosedCallback(fun _ err ->
linkError <- err
detachEvent.Set() |> ignore))

// Create a message with Properties but no body section
let message = new Message(Properties = new Properties())

try
sender.Send(message, TimeSpan.FromSeconds 5.)
failwith "Expected exception not received"
with
| :? Amqp.AmqpException as ex ->
printfn "Got expected exception: %A" ex

assertTrue (detachEvent.WaitOne(9000))
assertNotNull linkError
assertEqual (Symbol "amqp:decode-error") linkError.Condition
assertNotNull linkError.Description
assertTrue (linkError.Description.Contains("message has no body"))
assertTrue (not ac.Session.IsClosed)

let (|AsLower|) (s: string) =
match s with
| null -> null
Expand Down Expand Up @@ -645,6 +676,9 @@ let main argv =
| [AsLower "no_routes_is_released"; uri] ->
no_routes_is_released uri
0
| [AsLower "message_without_body"; uri] ->
messageWithoutBody uri
0
| _ ->
printfn "test %A not found. usage: <test> <uri>" argv
1
2 changes: 2 additions & 0 deletions release-notes/4.0.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ Starting with RabbitMQ 4.0, RabbitMQ strictly validates that
[non-reserved annotation keys](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-annotations).
As a result, clients can only send symbolic keys that begin with `x-`.

Starting with RabbitMQ 4.0, RabbitMQ also strictly validates that an AMQP message contains the mandatory body.

### MQTT

RabbitMQ 3.13 [rabbitmq.conf](https://www.rabbitmq.com/docs/configure#config-file) settings `mqtt.default_user`, `mqtt.default_password`,
Expand Down
Loading