Skip to content

Commit 0110742

Browse files
committed
AMQP10 shovel: make bare message inmutable
1 parent 9540814 commit 0110742

File tree

4 files changed

+55
-46
lines changed

4 files changed

+55
-46
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,9 @@ init_source(State = #{source := #{current := #{link := Link},
155155
init_dest(#{name := Name,
156156
shovel_type := Type,
157157
dest := #{add_forward_headers := true} = Dst} = State) ->
158-
Props = #{<<"shovelled-by">> => rabbit_nodes:cluster_name(),
159-
<<"shovel-type">> => rabbit_data_coercion:to_binary(Type),
160-
<<"shovel-name">> => rabbit_data_coercion:to_binary(Name)},
158+
Props = #{<<"x-opt-shovelled-by">> => rabbit_nodes:cluster_name(),
159+
<<"x-opt-shovel-type">> => rabbit_data_coercion:to_binary(Type),
160+
<<"x-opt-shovel-name">> => rabbit_data_coercion:to_binary(Name)},
161161
State#{dest => Dst#{cached_forward_headers => Props}};
162162
init_dest(State) ->
163163
State.
@@ -337,7 +337,7 @@ forward(Tag, Msg0,
337337
Msg1 = mc:protocol_state(mc:convert(mc_amqp, Msg0)),
338338
Records = lists:flatten([amqp10_framing:decode_bin(iolist_to_binary(S)) || S <- Msg1]),
339339
Msg2 = amqp10_msg:new(OutTag, Records, AckMode =/= on_confirm),
340-
Msg = update_amqp10_message(Msg2, mc:exchange(Msg0), mc:routing_keys(Msg0), State),
340+
Msg = add_timestamp_header(State, add_forward_headers(State, Msg2)),
341341
case send_msg(Link, Msg) of
342342
ok ->
343343
rabbit_shovel_behaviour:decr_remaining_unacked(
@@ -366,23 +366,13 @@ send_msg(Link, Msg) ->
366366
end
367367
end.
368368

369-
update_amqp10_message(Msg0, Exchange, RK, #{dest := #{properties := Props,
370-
application_properties := AppProps0,
371-
message_annotations := MsgAnns}} = State) ->
372-
Msg1 = amqp10_msg:set_properties(Props, Msg0),
373-
Msg2 = amqp10_msg:set_message_annotations(MsgAnns, Msg1),
374-
AppProps = AppProps0#{<<"exchange">> => Exchange,
375-
<<"routing_key">> => RK},
376-
Msg = amqp10_msg:set_application_properties(AppProps, Msg2),
377-
add_timestamp_header(State, add_forward_headers(State, Msg)).
378-
379369
add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) ->
380-
P =#{creation_time => os:system_time(milli_seconds)},
381-
amqp10_msg:set_properties(P, Msg);
370+
Anns = #{<<"x-opt-shovelled-timestamp">> => os:system_time(milli_seconds)},
371+
amqp10_msg:set_message_annotations(Anns, Msg);
382372
add_timestamp_header(_, Msg) -> Msg.
383373

384-
add_forward_headers(#{dest := #{cached_forward_headers := Props}}, Msg) ->
385-
amqp10_msg:set_application_properties(Props, Msg);
374+
add_forward_headers(#{dest := #{cached_forward_headers := Anns}}, Msg) ->
375+
amqp10_msg:set_message_annotations(Anns, Msg);
386376
add_forward_headers(_, Msg) -> Msg.
387377

388378
gen_unique_name(Pre0, Post0) ->

deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,16 @@ amqp10_dest_validation(_Def, User) ->
165165
{<<"dest-address">>, fun rabbit_parameter_validation:binary/2, mandatory},
166166
{<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2, optional},
167167
{<<"dest-add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2, optional},
168+
%% The bare message should be inmutable in the AMQP network.
169+
%% Before RabbitMQ 4.2, we allowed to set application properties, message
170+
%% annotations and any property. This is wrong.
171+
%% From 4.2, the only message modification allowed is the optional
172+
%% addition of forward headers and shovelled timestamp inside message
173+
%% annotations.
174+
%% To avoid breaking existing deployments, the following configuration
175+
%% keys are still accepted but will be ignored.
168176
{<<"dest-application-properties">>, fun validate_amqp10_map/2, optional},
169177
{<<"dest-message-annotations">>, fun validate_amqp10_map/2, optional},
170-
% TODO: restrict to allowed fields
171178
{<<"dest-properties">>, fun validate_amqp10_map/2, optional}
172179
].
173180

deps/rabbitmq_shovel/test/amqp10_SUITE.erl

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
-module(amqp10_SUITE).
99

1010
-include_lib("common_test/include/ct.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
1112
-include_lib("amqp_client/include/amqp_client.hrl").
1213

1314
-compile(export_all).
@@ -116,27 +117,36 @@ amqp10_destination(Config, AckMode) ->
116117
}},
117118
publish(Chan, Msg, ?EXCHANGE, ?TO_SHOVEL),
118119

120+
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
121+
Node = atom_to_binary(NodeA),
122+
119123
receive
120124
{amqp10_msg, Receiver, InMsg} ->
121125
ct:pal("GOT ~p", [InMsg]),
122126
[<<42>>] = amqp10_msg:body(InMsg),
123-
#{content_type := ?UNSHOVELLED,
124-
content_encoding := ?UNSHOVELLED,
125-
correlation_id := ?UNSHOVELLED,
126-
user_id := <<"guest">>,
127-
message_id := ?UNSHOVELLED,
128-
reply_to := ?UNSHOVELLED
129-
%% timestamp gets overwritten
130-
% creation_time := Timestamp
131-
} = amqp10_msg:properties(InMsg),
132-
#{<<"routing_key">> := ?TO_SHOVEL,
133-
<<"exchange">> := ?EXCHANGE,
134-
<<"header1">> := 1,
135-
<<"header2">> := <<"h2">>
136-
} = amqp10_msg:application_properties(InMsg),
137-
#{<<"x-basic-type">> := ?UNSHOVELLED
138-
} = amqp10_msg:message_annotations(InMsg),
139-
#{durable := true} = amqp10_msg:headers(InMsg),
127+
Ts = Timestamp * 1000,
128+
?assertMatch(
129+
#{content_type := ?UNSHOVELLED,
130+
content_encoding := ?UNSHOVELLED,
131+
correlation_id := ?UNSHOVELLED,
132+
user_id := <<"guest">>,
133+
message_id := ?UNSHOVELLED,
134+
reply_to := ?UNSHOVELLED,
135+
%% Message timestamp is no longer overwritten
136+
creation_time := Ts},
137+
amqp10_msg:properties(InMsg)),
138+
?assertMatch(
139+
#{<<"header1">> := 1,
140+
<<"header2">> := <<"h2">>},
141+
amqp10_msg:application_properties(InMsg)),
142+
?assertMatch(
143+
#{<<"x-basic-type">> := ?UNSHOVELLED,
144+
<<"x-opt-shovel-type">> := <<"static">>,
145+
<<"x-opt-shovel-name">> := <<"test_shovel">>,
146+
<<"x-opt-shovelled-by">> := Node,
147+
<<"x-opt-shovelled-timestamp">> := _},
148+
amqp10_msg:message_annotations(InMsg)),
149+
?assertMatch(#{durable := true}, amqp10_msg:headers(InMsg)),
140150
ok
141151
after ?TIMEOUT ->
142152
throw(timeout_waiting_for_deliver1)

deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,18 +154,20 @@ test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) ->
154154
<<"message-ann-value">>}]
155155
end}]),
156156
Msg = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>),
157-
ct:pal("GOT ~p", [Msg]),
158157
AppProps = amqp10_msg:application_properties(Msg),
159-
160-
?assertMatch((#{user_id := <<"guest">>, creation_time := _}),
161-
(amqp10_msg:properties(Msg))),
162-
?assertMatch((#{<<"shovel-name">> := <<"test">>,
163-
<<"shovel-type">> := <<"dynamic">>, <<"shovelled-by">> := _,
164-
<<"app-prop-key">> := <<"app-prop-value">>}),
165-
(AppProps)),
158+
Anns = amqp10_msg:message_annotations(Msg),
159+
%% We no longer add/override properties, application properties or
160+
%% message annotations. Just the forward headers and timestamp as
161+
%% message annotations. The AMQP 1.0 message is inmutable
162+
?assertNot(maps:is_key(user_id, amqp10_msg:properties(Msg))),
163+
?assertNot(maps:is_key(<<"app-prop-key">>, AppProps)),
166164
?assertEqual(undefined, maps:get(<<"delivery_mode">>, AppProps, undefined)),
167-
?assertMatch((#{<<"x-message-ann-key">> := <<"message-ann-value">>}),
168-
(amqp10_msg:message_annotations(Msg))).
165+
?assertNot(maps:is_key(<<"x-message-ann-key">>, Anns)),
166+
?assertMatch(#{<<"x-opt-shovel-name">> := <<"test">>,
167+
<<"x-opt-shovel-type">> := <<"dynamic">>,
168+
<<"x-opt-shovelled-by">> := _,
169+
<<"x-opt-shovelled-timestamp">> := _
170+
}, Anns).
169171

170172
simple_amqp10_src(Config) ->
171173
MapConfig = ?config(map_config, Config),

0 commit comments

Comments
 (0)