@@ -2895,28 +2895,51 @@ available_messages(QType, Config) ->
28952895
28962896incoming_message_interceptors (Config ) ->
28972897 Key = {rabbit , ? FUNCTION_NAME },
2898- ok = rpc (Config , persistent_term , put , [Key , [{set_header_routing_node , false }]]),
2899-
2900- OpnConf = connection_config (Config ),
2901- {ok , Connection } = amqp10_client :open_connection (OpnConf ),
2902- {ok , Session } = amqp10_client :begin_session_sync (Connection ),
2903- QName = atom_to_binary (? FUNCTION_NAME ),
2904- Address = <<" /queue/" , QName /binary >>,
2905- {ok , Receiver } = amqp10_client :attach_receiver_link (Session , <<" test-receiver" >>, Address ),
2906- Address = <<" /queue/" , QName /binary >>,
2907- {ok , Sender } = amqp10_client :attach_sender_link (Session , <<" test-sender" >>, Address , settled ),
2898+ ok = rpc (Config , persistent_term , put , [Key , [{set_header_routing_node , false },
2899+ {set_header_timestamp , false }]]),
2900+ Stream = <<" my stream" >>,
2901+ QQName = <<" my quorum queue" >>,
2902+ {Connection , Session , LinkPair } = init (Config ),
2903+ {ok , #{type := <<" stream" >>}} = rabbitmq_amqp_client :declare_queue (
2904+ LinkPair ,
2905+ Stream ,
2906+ #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" stream" >>}}}),
2907+ {ok , #{type := <<" quorum" >>}} = rabbitmq_amqp_client :declare_queue (
2908+ LinkPair ,
2909+ QQName ,
2910+ #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" quorum" >>}}}),
2911+ ok = rabbitmq_amqp_client :bind_queue (LinkPair , Stream , <<" amq.fanout" >>, <<" ignored" >>, #{}),
2912+ ok = rabbitmq_amqp_client :bind_queue (LinkPair , QQName , <<" amq.fanout" >>, <<" ignored" >>, #{}),
2913+ {ok , StreamReceiver } = amqp10_client :attach_receiver_link (Session , <<" stream receiver" >>, <<" /queue/" , Stream /binary >>),
2914+ {ok , QQReceiver } = amqp10_client :attach_receiver_link (Session , <<" qq receiver" >>, <<" /queue/" , QQName /binary >>),
2915+ {ok , Sender } = amqp10_client :attach_sender_link (Session , <<" sender" >>, <<" /exchange/amq.fanout" >>),
29082916 ok = wait_for_credit (Sender ),
2909- ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag" >>, <<" body" >>, true )),
29102917
2911- {ok , Msg } = amqp10_client :get_msg (Receiver ),
2912- ? assertEqual ([<<" body" >>], amqp10_msg :body (Msg )),
2918+ NowMillis = os :system_time (millisecond ),
2919+ Tag = <<" tag" >>,
2920+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (Tag , <<" body" >>)),
2921+ ok = wait_for_settlement (Tag ),
2922+
2923+ {ok , Msg1 } = amqp10_client :get_msg (StreamReceiver ),
2924+ {ok , Msg2 } = amqp10_client :get_msg (QQReceiver ),
2925+ ? assertEqual ([<<" body" >>], amqp10_msg :body (Msg1 )),
2926+ ? assertEqual ([<<" body" >>], amqp10_msg :body (Msg2 )),
2927+
29132928 Node = atom_to_binary (get_node_config (Config , 0 , nodename )),
2914- ? assertMatch (#{<<" x-routed-by" >> := Node },
2915- amqp10_msg :message_annotations (Msg )),
2929+ #{<<" x-routed-by" >> := Node ,
2930+ <<" x-opt-rabbitmq-received-time" >> := Millis } = amqp10_msg :message_annotations (Msg1 ),
2931+ ? assertMatch (
2932+ #{<<" x-routed-by" >> := Node ,
2933+ <<" x-opt-rabbitmq-received-time" >> := Millis }, amqp10_msg :message_annotations (Msg2 )),
2934+ ? assert (Millis < NowMillis + 4000 ),
2935+ ? assert (Millis > NowMillis - 4000 ),
29162936
29172937 ok = amqp10_client :detach_link (Sender ),
2918- ok = amqp10_client :detach_link (Receiver ),
2919- ok = delete_queue (Session , QName ),
2938+ ok = amqp10_client :detach_link (StreamReceiver ),
2939+ ok = amqp10_client :detach_link (QQReceiver ),
2940+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair , Stream ),
2941+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair , QQName ),
2942+ ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
29202943 ok = end_session_sync (Session ),
29212944 ok = amqp10_client :close_connection (Connection ),
29222945 true = rpc (Config , persistent_term , erase , [Key ]).
0 commit comments