@@ -172,7 +172,8 @@ groups() ->
172172 x_cc_annotation_exchange_routing_key_empty ,
173173 x_cc_annotation_queue ,
174174 x_cc_annotation_null ,
175- bad_x_cc_annotation_exchange
175+ bad_x_cc_annotation_exchange ,
176+ decimal_types
176177 ]},
177178
178179 {cluster_size_3 , [shuffle ],
@@ -6589,6 +6590,69 @@ bad_x_cc_annotation_exchange(Config) ->
65896590 ok = end_session_sync (Session ),
65906591 ok = close_connection_sync (Connection ).
65916592
6593+ % % Test that RabbitMQ can store and forward AMQP decimal types.
6594+ decimal_types (Config ) ->
6595+ QName = atom_to_binary (? FUNCTION_NAME ),
6596+ Address = rabbitmq_amqp_address :queue (QName ),
6597+ {_ , Session , LinkPair } = Init = init (Config ),
6598+ {ok , _ } = rabbitmq_amqp_client :declare_queue (
6599+ LinkPair , QName ,
6600+ #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" quorum" >>}}}),
6601+ {ok , Sender } = amqp10_client :attach_sender_link (Session , <<" sender" >>, Address ),
6602+ ok = wait_for_credit (Sender ),
6603+
6604+ Decimal32Zero = <<16#22 , 16#50 , 0 , 0 >>,
6605+ Decimal64Zero = <<16#22 , 16#34 , 0 , 0 , 0 , 0 , 0 , 0 >>,
6606+ Decimal128Zero = <<16#22 , 16#08 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 >>,
6607+ Decimal3242 = <<16#22 , 16#50 , 16#00 , 16#2A >>, % 42
6608+ Decimal32NaN = <<16#7C , 0 , 0 , 0 >>,
6609+ Body = # 'v1_0.amqp_value' {content = {list , [{as_is , 16#74 , Decimal32Zero },
6610+ {as_is , 16#84 , Decimal64Zero },
6611+ {as_is , 16#94 , Decimal128Zero }]}},
6612+ MsgAnns = #{<<" x-decimal-32" >> => {as_is , 16#74 , Decimal3242 },
6613+ <<" x-decimal-64" >> => {as_is , 16#84 , Decimal64Zero },
6614+ <<" x-decimal-128" >> => {as_is , 16#94 , Decimal128Zero },
6615+ <<" x-list" >> => {list , [{as_is , 16#94 , Decimal128Zero }]},
6616+ <<" x-map" >> => {map , [{{utf8 , <<" key-1" >>},
6617+ {as_is , 16#94 , Decimal128Zero }}]}},
6618+ AppProps = #{<<" decimal-32" >> => {as_is , 16#74 , Decimal32NaN }},
6619+ Msg0 = amqp10_msg :set_message_annotations (
6620+ MsgAnns ,
6621+ amqp10_msg :set_application_properties (
6622+ AppProps ,
6623+ amqp10_msg :new (<<" tag" >>, Body ))),
6624+ ok = amqp10_client :send_msg (Sender , Msg0 ),
6625+ ok = wait_for_accepted (<<" tag" >>),
6626+ ok = amqp10_client :send_msg (Sender , Msg0 ),
6627+ ok = wait_for_accepted (<<" tag" >>),
6628+ ok = detach_link_sync (Sender ),
6629+
6630+ % % Consume the first message via AMQP 1.0
6631+ {ok , Receiver } = amqp10_client :attach_receiver_link (
6632+ Session , <<" receiver" >>, Address , unsettled ),
6633+ {ok , Msg } = amqp10_client :get_msg (Receiver ),
6634+ ? assertEqual (Body , amqp10_msg :body (Msg )),
6635+ ? assertMatch (#{<<" x-decimal-32" >> := {as_is , 16#74 , Decimal3242 },
6636+ <<" x-decimal-64" >> := {as_is , 16#84 , Decimal64Zero },
6637+ <<" x-decimal-128" >> := {as_is , 16#94 , Decimal128Zero },
6638+ <<" x-list" >> := [{as_is , 16#94 , Decimal128Zero }],
6639+ <<" x-map" >> := [{{utf8 , <<" key-1" >>},
6640+ {as_is , 16#94 , Decimal128Zero }}]},
6641+ amqp10_msg :message_annotations (Msg )),
6642+ ? assertEqual (AppProps , amqp10_msg :application_properties (Msg )),
6643+ ok = amqp10_client :accept_msg (Receiver , Msg ),
6644+ ok = detach_link_sync (Receiver ),
6645+
6646+ % % Consume the second message via AMQP 0.9.1
6647+ % % We expect to receive the message without any crashes.
6648+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
6649+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {}},
6650+ amqp_channel :call (Ch , # 'basic.get' {queue = QName , no_ack = true })),
6651+ ok = rabbit_ct_client_helpers :close_connection_and_channel (Conn , Ch ),
6652+
6653+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair , QName ),
6654+ ok = close (Init ).
6655+
65926656% % Attach a receiver to an unavailable quorum queue.
65936657attach_to_down_quorum_queue (Config ) ->
65946658 QName = <<" q-down" >>,
0 commit comments