@@ -100,7 +100,7 @@ amqpl_compat(_Config) ->
100100 Content = # content {properties = Props ,
101101 payload_fragments_rev = Payload },
102102
103- XName = <<" exch" >>,
103+ XName = <<" exch" >>,
104104 RoutingKey = <<" apple" >>,
105105 {ok , Msg00 } = rabbit_basic :message_no_id (XName , RoutingKey , Content ),
106106
@@ -148,7 +148,6 @@ amqpl_compat(_Config) ->
148148 <<" x-stream-filter" >> := <<" apple" >>}, RoutingHeadersX ),
149149 ok .
150150
151-
152151amqpl_table_x_header (_Config ) ->
153152 Tbl = [{<<" type" >>, longstr , <<" apple" >>},
154153 {<<" count" >>, long , 99 }],
@@ -346,7 +345,11 @@ amqpl_amqp_bin_amqpl(_Config) ->
346345 },
347346 Content = # content {properties = Props ,
348347 payload_fragments_rev = [<<" data" >>]},
349- Msg = mc :init (mc_amqpl , Content , annotations ()),
348+ Msg0 = mc :init (mc_amqpl , Content , annotations ()),
349+
350+ ok = persistent_term :put (incoming_message_interceptors ,
351+ [{set_header_timestamp , false }]),
352+ Msg = rabbit_message_interceptor :intercept (Msg0 ),
350353
351354 ? assertEqual (<<" exch" >>, mc :exchange (Msg )),
352355 ? assertEqual ([<<" apple" >>], mc :routing_keys (Msg )),
@@ -357,17 +360,25 @@ amqpl_amqp_bin_amqpl(_Config) ->
357360 ? assertEqual ({utf8 , <<" msg-id" >>}, mc :message_id (Msg )),
358361 ? assertEqual (1 , mc :ttl (Msg )),
359362 ? assertEqual ({utf8 , <<" apple" >>}, mc :x_header (<<" x-stream-filter" >>, Msg )),
360- ? assert (is_integer (mc :get_annotation (rts , Msg ))),
363+ ReceivedTs = mc :get_annotation (rts , Msg ),
364+ ? assert (is_integer (ReceivedTs )),
361365
362366 % % array type non x-headers cannot be converted into amqp
363367 RoutingHeaders = maps :remove (<<" a-array" >>, mc :routing_headers (Msg , [])),
364368
365369 % % roundtrip to binary
366370 Msg10Pre = mc :convert (mc_amqp , Msg ),
367371 Payload = iolist_to_binary (mc :protocol_state (Msg10Pre )),
368- Msg10 = mc :init (mc_amqp , Payload , #{}),
372+ Msg10 = mc_amqp :init_from_stream (Payload , #{}),
373+
374+ % % mc annotations should be recovered when reading from a stream.
375+ ? assertEqual (<<" exch" >>, mc :exchange (Msg10 )),
376+ ? assertEqual ([<<" apple" >>], mc :routing_keys (Msg10 )),
377+ ? assertEqual (ReceivedTs , mc :get_annotation (rts , Msg10 )),
378+
369379 ? assertMatch (#{<<" x-exchange" >> := {utf8 , <<" exch" >>},
370- <<" x-routing-key" >> := {utf8 , <<" apple" >>}},
380+ <<" x-routing-key" >> := {utf8 , <<" apple" >>},
381+ <<" x-opt-rabbitmq-received-time" >> := {timestamp , ReceivedTs }},
371382 mc :x_headers (Msg10 )),
372383 ? assertEqual (98 , mc :priority (Msg10 )),
373384 ? assertEqual (true , mc :is_persistent (Msg10 )),
@@ -379,7 +390,6 @@ amqpl_amqp_bin_amqpl(_Config) ->
379390 % % at this point the type is now present as a message annotation
380391 ? assertEqual ({utf8 , <<" 45" >>}, mc :x_header (<<" x-basic-type" >>, Msg10 )),
381392 ? assertEqual (RoutingHeaders , mc :routing_headers (Msg10 , [])),
382- ? assert (is_integer (mc :get_annotation (rts , Msg10 ))),
383393
384394 Sections = amqp10_framing :decode_bin (Payload ),
385395 [
@@ -435,9 +445,12 @@ amqpl_amqp_bin_amqpl(_Config) ->
435445 ? assertEqual ({utf8 , <<" msg-id" >>}, mc :message_id (MsgL2 )),
436446 ? assertEqual (1 , mc :ttl (MsgL2 )),
437447 ? assertEqual ({utf8 , <<" apple" >>}, mc :x_header (<<" x-stream-filter" >>, MsgL2 )),
438- ? assertEqual (RoutingHeaders , mc :routing_headers (MsgL2 , [])),
439- ? assert (is_integer (mc :get_annotation (rts , MsgL2 ))),
440- ok .
448+ ? assertEqual (ReceivedTs , mc :get_annotation (rts , MsgL2 )),
449+ RoutingHeaders2 = mc :routing_headers (MsgL2 , []),
450+ ? assertEqual (RoutingHeaders ,
451+ maps :remove (<<" timestamp_in_ms" >>, RoutingHeaders2 )),
452+
453+ true = persistent_term :erase (incoming_message_interceptors ).
441454
442455amqpl_cc_amqp_bin_amqpl (_Config ) ->
443456 Headers = [{<<" CC" >>, array , [{longstr , <<" q1" >>},
0 commit comments