Skip to content

Commit 71d1b3b

Browse files
committed
Respect message_interceptors.incoming.set_header_timestamp
When feature flag message_containers is enabled, setting ``` message_interceptors.incoming.set_header_timestamp ``` wasn't respected anymore when a message is published via MQTT to a stream and subsequently consumed via AMQP 0.9.1. This commit ensures that AMQP 0.9.1 header timestamp_in_ms will be set. Note that we must not modify the AMQP 1.0 properties section when messages are received via AMQP 1.0 and consumed via AMQP 1.0. Also, message annoation keys not starting with "x-" are reserved.
1 parent 78c842e commit 71d1b3b

File tree

4 files changed

+88
-77
lines changed

4 files changed

+88
-77
lines changed

deps/rabbit/src/mc_amqp.erl

Lines changed: 41 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -187,14 +187,13 @@ get_property(priority, Msg) ->
187187
convert_to(?MODULE, Msg, _Env) ->
188188
Msg;
189189
convert_to(TargetProto, Msg, Env) ->
190-
TargetProto:convert_from(?MODULE,
191-
msg_to_sections(Msg, fun (X) -> X end),
192-
Env).
190+
TargetProto:convert_from(?MODULE, msg_to_sections(Msg), Env).
193191

194192
serialize(Sections) ->
195193
encode_bin(Sections).
196194

197-
protocol_state(Msg0 = #msg{header = Header0}, Anns) ->
195+
protocol_state(Msg0 = #msg{header = Header0,
196+
message_annotations = MA0}, Anns) ->
198197
Redelivered = maps:get(redelivered, Anns, false),
199198
FirstAcquirer = not Redelivered,
200199
Header = case Header0 of
@@ -203,20 +202,25 @@ protocol_state(Msg0 = #msg{header = Header0}, Anns) ->
203202
#'v1_0.header'{} ->
204203
Header0#'v1_0.header'{first_acquirer = FirstAcquirer}
205204
end,
206-
Msg = Msg0#msg{header = Header},
207205

208-
#{?ANN_EXCHANGE := Exchange,
209-
?ANN_ROUTING_KEYS := [RKey | _]} = Anns,
210-
%% any x-* annotations get added as message annotations
211-
AnnsToAdd = maps:filter(fun (Key, _) -> mc_util:is_x_header(Key) end, Anns),
212-
213-
MACFun = fun(MAC) ->
214-
add_message_annotations(
215-
AnnsToAdd#{<<"x-exchange">> => wrap(utf8, Exchange),
216-
<<"x-routing-key">> => wrap(utf8, RKey)}, MAC)
217-
end,
218-
219-
msg_to_sections(Msg, MACFun).
206+
MA = maps:fold(fun(?ANN_EXCHANGE, Exchange, L) ->
207+
maps_upsert(<<"x-exchange">>, {utf8, Exchange}, L);
208+
(?ANN_ROUTING_KEYS, RKeys, L) ->
209+
RKey = hd(RKeys),
210+
maps_upsert(<<"x-routing-key">>, {utf8, RKey}, L);
211+
(<<"x-", _/binary>> = K, V, L)
212+
when V =/= undefined ->
213+
%% any x-* annotations get added as message annotations
214+
maps_upsert(K, mc_util:infer_type(V), L);
215+
(<<"timestamp_in_ms">>, V, L) ->
216+
maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L);
217+
(_, _, Acc) ->
218+
Acc
219+
end, MA0, Anns),
220+
221+
Msg = Msg0#msg{header = Header,
222+
message_annotations = MA},
223+
msg_to_sections(Msg).
220224

221225
prepare(_For, Msg) ->
222226
Msg.
@@ -225,13 +229,14 @@ prepare(_For, Msg) ->
225229

226230
msg_to_sections(#msg{header = H,
227231
delivery_annotations = DAC,
228-
message_annotations = MAC0,
232+
message_annotations = MAC,
229233
properties = P,
230234
application_properties = APC,
231235
data = Data,
232-
footer = FC}, MacFun) ->
236+
footer = FC}) ->
233237
Tail = case FC of
234-
[] -> [];
238+
[] ->
239+
[];
235240
_ ->
236241
[#'v1_0.footer'{content = FC}]
237242
end,
@@ -242,34 +247,40 @@ msg_to_sections(#msg{header = H,
242247
Data ++ Tail
243248
end,
244249
S1 = case APC of
245-
[] -> S0;
250+
[] ->
251+
S0;
246252
_ ->
247253
[#'v1_0.application_properties'{content = APC} | S0]
248254
end,
249255
S2 = case P of
250-
undefined -> S1;
256+
undefined ->
257+
S1;
251258
_ ->
252259
[P | S1]
253260
end,
254-
S3 = case MacFun(MAC0) of
255-
[] -> S2;
256-
MAC ->
261+
S3 = case MAC of
262+
[] ->
263+
S2;
264+
_ ->
257265
[#'v1_0.message_annotations'{content = MAC} | S2]
258266
end,
259267
S4 = case DAC of
260-
[] -> S3;
268+
[] ->
269+
S3;
261270
_ ->
262271
[#'v1_0.delivery_annotations'{content = DAC} | S3]
263272
end,
264273
case H of
265-
undefined -> S4;
274+
undefined ->
275+
S4;
266276
_ ->
267277
[H | S4]
268278
end.
269279

270-
271-
272-
280+
maps_upsert(Key, TaggedVal, KVList) ->
281+
TaggedKey = {symbol, Key},
282+
Elem = {TaggedKey, TaggedVal},
283+
lists:keystore(TaggedKey, 1, KVList, Elem).
273284

274285
encode_bin(undefined) ->
275286
<<>>;
@@ -377,22 +388,6 @@ decode([#'v1_0.amqp_value'{} = B | Rem], #msg{} = Msg) ->
377388
%% an amqp value can only be a singleton
378389
decode(Rem, Msg#msg{data = B}).
379390

380-
add_message_annotations(Anns, MA0) ->
381-
maps:fold(fun (K, V, Acc) ->
382-
map_add(symbol, K, mc_util:infer_type(V), Acc)
383-
end, MA0, Anns).
384-
385-
map_add(_T, _Key, undefined, Acc) ->
386-
Acc;
387-
map_add(KeyType, Key, TaggedValue, Acc0) ->
388-
TaggedKey = wrap(KeyType, Key),
389-
lists_upsert({TaggedKey, TaggedValue}, Acc0).
390-
391-
wrap(_Type, undefined) ->
392-
undefined;
393-
wrap(Type, Val) ->
394-
{Type, Val}.
395-
396391
key_find(K, [{{_, K}, {_, V}} | _]) ->
397392
V;
398393
key_find(K, [_ | Rem]) ->
@@ -478,13 +473,3 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
478473
Acc
479474
end, Anns, MA)
480475
end.
481-
482-
lists_upsert(New, L) ->
483-
lists_upsert(New, L, [], L).
484-
485-
lists_upsert({Key, _} = New, [{Key, _} | Rem], Pref, _All) ->
486-
lists:reverse(Pref, [New | Rem]);
487-
lists_upsert(New, [Item | Rem], Pref, All) ->
488-
lists_upsert(New, Rem, [Item | Pref], All);
489-
lists_upsert(New, [], _Pref, All) ->
490-
[New | All].

deps/rabbit/src/mc_amqpl.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ convert_from(mc_amqp, Sections, _Env) ->
149149
%% Add remaining x- message annotations as headers
150150
XHeaders = lists:filtermap(fun({{symbol, <<"x-cc">>}, V}) ->
151151
{true, to_091(<<"CC">>, V)};
152+
({{symbol, <<"x-opt-rabbitmq-received-time">>}, {timestamp, Ts}}) ->
153+
{true, {<<"timestamp_in_ms">>, long, Ts}};
152154
({{symbol, <<"x-", _/binary>> = K}, V})
153155
when ?IS_SHORTSTR_LEN(K) ->
154156
case is_internal_header(K) of

deps/rabbitmq_ct_helpers/include/rabbit_assert.hrl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
AwaitMatchResult = Expr,
77
case (AwaitMatchResult) of
88
Guard -> AwaitMatchResult;
9-
__V -> case erlang:system_time(millisecond) of
9+
__V -> case erlang:monotonic_time(millisecond) of
1010
AwaitMatchNow when AwaitMatchNow < AwaitMatchHorizon ->
1111
timer:sleep(
1212
min(PollingInterval,
@@ -21,7 +21,7 @@
2121
{value, __V}]})
2222
end
2323
end
24-
end)(erlang:system_time(millisecond) + Timeout))
24+
end)(erlang:monotonic_time(millisecond) + Timeout))
2525
end).
2626

2727
-define(awaitMatch(Guard, Expr, Timeout),

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1795,28 +1795,52 @@ incoming_message_interceptors(Config) ->
17951795
Key = {rabbit, ?FUNCTION_NAME},
17961796
ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]),
17971797
Ch = rabbit_ct_client_helpers:open_channel(Config),
1798-
Payload = ClientId = QName = Topic = atom_to_binary(?FUNCTION_NAME),
1799-
declare_queue(Ch, QName, []),
1800-
bind(Ch, QName, Topic),
1798+
Payload = ClientId = Topic = atom_to_binary(?FUNCTION_NAME),
1799+
CQName = <<"my classic queue">>,
1800+
Stream = <<"my stream">>,
1801+
declare_queue(Ch, CQName, [{<<"x-queue-type">>, longstr, <<"classic">>}]),
1802+
declare_queue(Ch, Stream, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
1803+
bind(Ch, CQName, Topic),
1804+
bind(Ch, Stream, Topic),
18011805
C = connect(ClientId, Config),
1802-
ok = emqtt:publish(C, Topic, Payload),
1806+
18031807
NowSecs = os:system_time(second),
1804-
NowMs = os:system_time(millisecond),
1805-
eventually(
1806-
?_assertMatch(
1807-
{#'basic.get_ok'{},
1808-
#amqp_msg{payload = Payload,
1809-
props = #'P_basic'{
1810-
timestamp = Secs,
1811-
headers = [{<<"timestamp_in_ms">>, long, Ms} | _XHeaders]
1812-
}}}
1813-
when Ms < NowMs + 4000 andalso
1814-
Ms > NowMs - 4000 andalso
1815-
Secs < NowSecs + 4 andalso
1816-
Secs > NowSecs - 4,
1817-
amqp_channel:call(Ch, #'basic.get'{queue = QName}))),
1808+
NowMillis = os:system_time(millisecond),
1809+
{ok, _} = emqtt:publish(C, Topic, Payload, qos1),
18181810

1819-
delete_queue(Ch, QName),
1811+
{#'basic.get_ok'{},
1812+
#amqp_msg{payload = Payload,
1813+
props = #'P_basic'{
1814+
timestamp = Secs,
1815+
headers = [{<<"timestamp_in_ms">>, long, Millis} | _]
1816+
}}
1817+
} = amqp_channel:call(Ch, #'basic.get'{queue = CQName}),
1818+
1819+
?assert(Secs < NowSecs + 4),
1820+
?assert(Secs > NowSecs - 4),
1821+
?assert(Millis < NowMillis + 4000),
1822+
?assert(Millis > NowMillis - 4000),
1823+
1824+
#'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{prefetch_count = 1}),
1825+
CTag = <<"my ctag">>,
1826+
#'basic.consume_ok'{} = amqp_channel:subscribe(
1827+
Ch,
1828+
#'basic.consume'{
1829+
queue = Stream,
1830+
consumer_tag = CTag,
1831+
arguments = [{<<"x-stream-offset">>, longstr, <<"first">>}]},
1832+
self()),
1833+
receive {#'basic.deliver'{consumer_tag = CTag},
1834+
#amqp_msg{payload = Payload,
1835+
props = #'P_basic'{
1836+
headers = [{<<"timestamp_in_ms">>, long, Millis} | _XHeaders]
1837+
}}} ->
1838+
ok
1839+
after 5000 -> ct:fail(missing_deliver)
1840+
end,
1841+
1842+
delete_queue(Ch, Stream),
1843+
delete_queue(Ch, CQName),
18201844
true = rpc(Config, persistent_term, erase, [Key]),
18211845
ok = emqtt:disconnect(C).
18221846

0 commit comments

Comments
 (0)