diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 2cc387b1f2a6..9dec628b7091 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -160,7 +160,7 @@ init(Proto, Data, Anns0, Env) -> false -> Anns0#{env => Env} end, Anns2 = maps:merge(ProtoAnns, Anns1), - Anns = set_received_at_timestamp(Anns2), + Anns = ensure_received_at_timestamp(Anns2), #?MODULE{protocol = Proto, data = ProtoData, annotations = Anns}. @@ -527,6 +527,9 @@ is_cycle_v1(Queue, [{Queue, Reason} | _]) is_cycle_v1(Queue, [_ | Rem]) -> is_cycle_v1(Queue, Rem). -set_received_at_timestamp(Anns) -> +ensure_received_at_timestamp(Anns) + when is_map_key(?ANN_RECEIVED_AT_TIMESTAMP, Anns) -> + Anns; +ensure_received_at_timestamp(Anns) -> Millis = os:system_time(millisecond), Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}. diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index 9e3ac9a74aec..0975f65c57be 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -677,6 +677,9 @@ essential_properties(#msg_body_encoded{message_annotations = MA} = Msg, recover) ({{symbol, <<"x-exchange">>}, {utf8, Exchange}}, Acc) -> Acc#{?ANN_EXCHANGE => Exchange}; + ({{symbol, <<"x-opt-rabbitmq-received-time">>}, + {timestamp, Ts}}, Acc) -> + Acc#{?ANN_RECEIVED_AT_TIMESTAMP => Ts}; (_, Acc) -> Acc end, Anns, MA) diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 1949763c5c76..4b5feddb509d 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -100,7 +100,7 @@ amqpl_compat(_Config) -> Content = #content{properties = Props, payload_fragments_rev = Payload}, - XName= <<"exch">>, + XName = <<"exch">>, RoutingKey = <<"apple">>, {ok, Msg00} = rabbit_basic:message_no_id(XName, RoutingKey, Content), @@ -148,7 +148,6 @@ amqpl_compat(_Config) -> <<"x-stream-filter">> := <<"apple">>}, RoutingHeadersX), ok. - amqpl_table_x_header(_Config) -> Tbl = [{<<"type">>, longstr, <<"apple">>}, {<<"count">>, long, 99}], @@ -346,7 +345,11 @@ amqpl_amqp_bin_amqpl(_Config) -> }, Content = #content{properties = Props, payload_fragments_rev = [<<"data">>]}, - Msg = mc:init(mc_amqpl, Content, annotations()), + Msg0 = mc:init(mc_amqpl, Content, annotations()), + + ok = persistent_term:put(incoming_message_interceptors, + [{set_header_timestamp, false}]), + Msg = rabbit_message_interceptor:intercept(Msg0), ?assertEqual(<<"exch">>, mc:exchange(Msg)), ?assertEqual([<<"apple">>], mc:routing_keys(Msg)), @@ -357,7 +360,8 @@ amqpl_amqp_bin_amqpl(_Config) -> ?assertEqual({utf8, <<"msg-id">>}, mc:message_id(Msg)), ?assertEqual(1, mc:ttl(Msg)), ?assertEqual({utf8, <<"apple">>}, mc:x_header(<<"x-stream-filter">>, Msg)), - ?assert(is_integer(mc:get_annotation(rts, Msg))), + ReceivedTs = mc:get_annotation(rts, Msg), + ?assert(is_integer(ReceivedTs)), %% array type non x-headers cannot be converted into amqp RoutingHeaders = maps:remove(<<"a-array">>, mc:routing_headers(Msg, [])), @@ -365,9 +369,16 @@ amqpl_amqp_bin_amqpl(_Config) -> %% roundtrip to binary Msg10Pre = mc:convert(mc_amqp, Msg), Payload = iolist_to_binary(mc:protocol_state(Msg10Pre)), - Msg10 = mc:init(mc_amqp, Payload, #{}), + Msg10 = mc_amqp:init_from_stream(Payload, #{}), + + %% mc annotations should be recovered when reading from a stream. + ?assertEqual(<<"exch">>, mc:exchange(Msg10)), + ?assertEqual([<<"apple">>], mc:routing_keys(Msg10)), + ?assertEqual(ReceivedTs, mc:get_annotation(rts, Msg10)), + ?assertMatch(#{<<"x-exchange">> := {utf8, <<"exch">>}, - <<"x-routing-key">> := {utf8, <<"apple">>}}, + <<"x-routing-key">> := {utf8, <<"apple">>}, + <<"x-opt-rabbitmq-received-time">> := {timestamp, ReceivedTs}}, mc:x_headers(Msg10)), ?assertEqual(98, mc:priority(Msg10)), ?assertEqual(true, mc:is_persistent(Msg10)), @@ -379,7 +390,6 @@ amqpl_amqp_bin_amqpl(_Config) -> %% at this point the type is now present as a message annotation ?assertEqual({utf8, <<"45">>}, mc:x_header(<<"x-basic-type">>, Msg10)), ?assertEqual(RoutingHeaders, mc:routing_headers(Msg10, [])), - ?assert(is_integer(mc:get_annotation(rts, Msg10))), Sections = amqp10_framing:decode_bin(Payload), [ @@ -435,9 +445,12 @@ amqpl_amqp_bin_amqpl(_Config) -> ?assertEqual({utf8, <<"msg-id">>}, mc:message_id(MsgL2)), ?assertEqual(1, mc:ttl(MsgL2)), ?assertEqual({utf8, <<"apple">>}, mc:x_header(<<"x-stream-filter">>, MsgL2)), - ?assertEqual(RoutingHeaders, mc:routing_headers(MsgL2, [])), - ?assert(is_integer(mc:get_annotation(rts, MsgL2))), - ok. + ?assertEqual(ReceivedTs, mc:get_annotation(rts, MsgL2)), + RoutingHeaders2 = mc:routing_headers(MsgL2, []), + ?assertEqual(RoutingHeaders, + maps:remove(<<"timestamp_in_ms">>, RoutingHeaders2)), + + true = persistent_term:erase(incoming_message_interceptors). amqpl_cc_amqp_bin_amqpl(_Config) -> Headers = [{<<"CC">>, array, [{longstr, <<"q1">>},