diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 9006727ab61f..bfeb692c0b02 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -258,7 +258,7 @@ define ct_master.erl endef PARALLEL_CT_SET_1_A = amqp_client unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking -PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management +PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit diff --git a/deps/rabbit/ct.test.spec b/deps/rabbit/ct.test.spec index bd8d628a4b19..62f63daff854 100644 --- a/deps/rabbit/ct.test.spec +++ b/deps/rabbit/ct.test.spec @@ -18,7 +18,8 @@ , amqp_credit_api_v2_SUITE , amqp_filtex_SUITE , amqp_proxy_protocol_SUITE -, amqp_system_SUITE +, amqp_dotnet_SUITE +, amqp_jms_SUITE , amqpl_consumer_ack_SUITE , amqpl_direct_reply_to_SUITE , amqqueue_backward_compatibility_SUITE diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 958ffb8e360c..17d997a78a55 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -29,6 +29,7 @@ assert_event_prop/2]). -import(amqp_utils, [init/1, init/2, + close/1, connection_config/1, connection_config/2, web_amqp/1, flush/1, @@ -446,7 +447,7 @@ reliable_send_receive(QType, Outcome, Config) -> %% Fields delivery-failed and message-annotations are not implemented. modified_classic_queue(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), {ok, #{type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue( LinkPair, QName, #{arguments => #{<<"x-queue-type">> => {utf8, <<"classic">>}}}), @@ -483,9 +484,7 @@ modified_classic_queue(Config) -> ok = amqp10_client:detach_link(Receiver), ?assertMatch({ok, #{message_count := 1}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). %% We test the modified outcome with quorum queues. %% We expect that quorum queues implement field @@ -494,7 +493,7 @@ modified_classic_queue(Config) -> %% * message-annotations correctly modified_quorum_queue(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), {ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue( LinkPair, QName, #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}), @@ -598,15 +597,13 @@ modified_quorum_queue(Config) -> ok = amqp10_client:detach_link(Receiver1), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). %% Test that a message can be routed based on the message-annotations %% provided in the modified outcome as described in %% https://rabbitmq.com/blog/2024/10/11/modified-outcome modified_dead_letter_headers_exchange(Config) -> - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), HeadersXName = <<"my headers exchange">>, AlternateXName = <<"my alternate exchange">>, SourceQName = <<"source quorum queue">>, @@ -730,14 +727,12 @@ modified_dead_letter_headers_exchange(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, TrashQName), ok = rabbitmq_amqp_client:delete_exchange(LinkPair, HeadersXName), ok = rabbitmq_amqp_client:delete_exchange(LinkPair, AlternateXName), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). %% Test that custom dead lettering event tracking works as described in %% https://rabbitmq.com/blog/2024/10/11/modified-outcome modified_dead_letter_history(Config) -> - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), Q1 = <<"qq 1">>, Q2 = <<"qq 2">>, @@ -809,15 +804,13 @@ modified_dead_letter_history(Config) -> ok = detach_link_sync(Receiver2), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q1), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). %% Tests that confirmations are returned correctly %% when sending many messages async to a quorum queue. sender_settle_mode_unsettled(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}, {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), Address = rabbitmq_amqp_address:queue(QName), @@ -842,12 +835,10 @@ sender_settle_mode_unsettled(Config) -> ok = amqp10_client:detach_link(Sender), ?assertMatch({ok, #{message_count := NumMsgs}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). sender_settle_mode_unsettled_fanout(Config) -> - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QNames = [<<"q1">>, <<"q2">>, <<"q3">>], [begin {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), @@ -876,15 +867,13 @@ sender_settle_mode_unsettled_fanout(Config) -> [?assertMatch({ok, #{message_count := NumMsgs}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)) || QName <- QNames], - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). %% Tests that confirmations are returned correctly %% when sending many messages async to a quorum queue where %% every 3rd message is settled by the sender. sender_settle_mode_mixed(Config) -> - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QName = atom_to_binary(?FUNCTION_NAME), QProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}, {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), @@ -918,9 +907,7 @@ sender_settle_mode_mixed(Config) -> ok = amqp10_client:detach_link(Sender), ?assertMatch({ok, #{message_count := NumMsgs}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). invalid_transfer_settled_flag(Config) -> OpnConf = connection_config(Config), @@ -968,7 +955,7 @@ invalid_transfer_settled_flag(Config) -> ok = close_connection_sync(Connection). quorum_queue_rejects(Config) -> - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QName = atom_to_binary(?FUNCTION_NAME), QProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, <<"x-max-length">> => {ulong, 1}, @@ -1009,9 +996,7 @@ quorum_queue_rejects(Config) -> ok = amqp10_client:detach_link(Sender), ?assertMatch({ok, #{message_count := 2}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = amqp10_client:end_session(Session), - ok = close_connection_sync(Connection). + ok = close(Init). receiver_settle_mode_first(Config) -> QName = atom_to_binary(?FUNCTION_NAME), @@ -2374,7 +2359,7 @@ consumer_priority_quorum_queue(Config) -> consumer_priority(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), @@ -2451,14 +2436,12 @@ consumer_priority(QType, Config) -> ok = amqp10_client:detach_link(ReceiverHighPrio), ok = amqp10_client:detach_link(ReceiverLowPrio), {ok, #{message_count := 1}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). single_active_consumer_priority_quorum_queue(Config) -> QType = <<"quorum">>, QName = atom_to_binary(?FUNCTION_NAME), - {Connection, Session1, LinkPair} = init(Config), + {Connection, Session1, LinkPair} = Init = init(Config), QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}, <<"x-single-active-consumer">> => true}}, {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), @@ -2587,9 +2570,7 @@ single_active_consumer_priority_quorum_queue(Config) -> ok = amqp10_client:detach_link(Recv1), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session1), - ok = close_connection_sync(Connection). + ok = close(Init). single_active_consumer_classic_queue(Config) -> single_active_consumer(<<"classic">>, Config). @@ -2599,7 +2580,7 @@ single_active_consumer_quorum_queue(Config) -> single_active_consumer(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}, <<"x-single-active-consumer">> => true}}, {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), @@ -2698,9 +2679,7 @@ single_active_consumer(QType, Config) -> ok = amqp10_client:detach_link(Receiver2), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). single_active_consumer_drain_classic_queue(Config) -> single_active_consumer_drain(<<"classic">>, Config). @@ -2710,7 +2689,7 @@ single_active_consumer_drain_quorum_queue(Config) -> single_active_consumer_drain(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}, <<"x-single-active-consumer">> => true}}, {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), @@ -2815,9 +2794,7 @@ single_active_consumer_drain(QType, Config) -> end, ?assertMatch({ok, #{message_count := 0}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). %% "A session endpoint can choose to unmap its output handle for a link. In this case, the endpoint MUST %% send a detach frame to inform the remote peer that the handle is no longer attached to the link endpoint. @@ -2935,7 +2912,7 @@ detach_requeues_drop_head_classic_queue(Config) -> QName2 = <<"q2">>, Addr1 = rabbitmq_amqp_address:queue(QName1), Addr2 = rabbitmq_amqp_address:queue(QName2), - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), {ok, #{}} = rabbitmq_amqp_client:declare_queue( LinkPair, QName1, @@ -2988,9 +2965,7 @@ detach_requeues_drop_head_classic_queue(Config) -> ok = amqp10_client:detach_link(Receiver2), {ok, #{message_count := 1}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). detach_requeues_two_connections_classic_queue(Config) -> detach_requeues_two_connections(<<"classic">>, Config). @@ -3880,15 +3855,13 @@ leader_transfer_stream_credit_batches(Config) -> leader_transfer_credit(QName, QType, Credit, Config) -> %% Create queue with leader on node 1. - {Connection1, Session1, LinkPair1} = init(1, Config), + {_, _, LinkPair1} = Init = init(1, Config), {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue( LinkPair1, QName, #{arguments => #{<<"x-queue-type">> => {utf8, QType}, <<"x-queue-leader-locator">> => {utf8, <<"client-local">>}}}), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair1), - ok = end_session_sync(Session1), - ok = close_connection_sync(Connection1), + ok = close(Init), OpnConf = connection_config(0, Config), {ok, Connection0} = amqp10_client:open_connection(OpnConf), @@ -3939,15 +3912,13 @@ leader_transfer_stream_send(Config) -> %% Test a leader transfer while we send to the queue. leader_transfer_send(QName, QType, Config) -> %% Create queue with leader on node 1. - {Connection1, Session1, LinkPair1} = init(1, Config), + {_, _, LinkPair1} = Init = init(1, Config), {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue( LinkPair1, QName, #{arguments => #{<<"x-queue-type">> => {utf8, QType}, <<"x-queue-leader-locator">> => {utf8, <<"client-local">>}}}), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair1), - ok = end_session_sync(Session1), - ok = close_connection_sync(Connection1), + ok = close(Init), %% Send from a follower. OpnConf = connection_config(0, Config), @@ -4399,7 +4370,7 @@ incoming_message_interceptors(Config) -> {set_header_timestamp, false}]]), Stream = <<"my stream">>, QQName = <<"my quorum queue">>, - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), {ok, #{type := <<"stream">>}} = rabbitmq_amqp_client:declare_queue( LinkPair, Stream, @@ -4442,9 +4413,7 @@ incoming_message_interceptors(Config) -> ok = amqp10_client:detach_link(QQReceiver), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QQName), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection), + ok = close(Init), true = rpc(Config, persistent_term, erase, [Key]). trace_classic_queue(Config) -> @@ -4803,7 +4772,7 @@ priority_quorum_queue(Config) -> priority(QArgs, Config). priority(QArgs, Config) -> - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{arguments => QArgs}), @@ -4846,12 +4815,10 @@ priority(QArgs, Config) -> ok = amqp10_client:detach_link(Receiver2), ok = amqp10_client:detach_link(Sender), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). dead_letter_headers_exchange(Config) -> - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QName1 = <<"q1">>, QName2 = <<"q2">>, {ok, _} = rabbitmq_amqp_client:declare_queue( @@ -4943,12 +4910,10 @@ dead_letter_headers_exchange(Config) -> ok = amqp10_client:detach_link(Sender), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). dead_letter_reject(Config) -> - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QName1 = <<"q1">>, QName2 = <<"q2">>, QName3 = <<"q3">>, @@ -5051,9 +5016,7 @@ dead_letter_reject(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName3), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). dead_letter_reject_message_order_classic_queue(Config) -> dead_letter_reject_message_order(<<"classic">>, Config). @@ -5062,7 +5025,7 @@ dead_letter_reject_message_order_quorum_queue(Config) -> dead_letter_reject_message_order(<<"quorum">>, Config). dead_letter_reject_message_order(QType, Config) -> - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QName1 = <<"q1">>, QName2 = <<"q2">>, {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue( @@ -5142,9 +5105,7 @@ dead_letter_reject_message_order(QType, Config) -> ok = amqp10_client:detach_link(Sender), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). dead_letter_reject_many_message_order_classic_queue(Config) -> dead_letter_reject_many_message_order(<<"classic">>, Config). @@ -5153,7 +5114,7 @@ dead_letter_reject_many_message_order_quorum_queue(Config) -> dead_letter_reject_many_message_order(<<"quorum">>, Config). dead_letter_reject_many_message_order(QType, Config) -> - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QName1 = <<"q1">>, QName2 = <<"q2">>, {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue( @@ -5231,9 +5192,7 @@ dead_letter_reject_many_message_order(QType, Config) -> ok = amqp10_client:detach_link(Sender), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). %% Dead letter from a quorum queue into a stream. dead_letter_into_stream(Config) -> @@ -5318,7 +5277,7 @@ accept_multiple_message_order(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), @@ -5357,9 +5316,7 @@ accept_multiple_message_order(QType, Config) -> ok = amqp10_client:detach_link(Receiver), ?assertMatch({ok, #{message_count := 0}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). release_multiple_message_order_classic_queue(Config) -> release_multiple_message_order(<<"classic">>, Config). @@ -5371,7 +5328,7 @@ release_multiple_message_order(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), @@ -5421,9 +5378,7 @@ release_multiple_message_order(QType, Config) -> ok = amqp10_client:detach_link(Receiver), ?assertMatch({ok, #{message_count := 0}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). %% This test asserts the following §3.2 requirement: @@ -5442,7 +5397,7 @@ footer_checksum(FooterOpt, Config) -> adler32 -> <<"x-opt-adler-32">> end, - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QName = atom_to_binary(FooterOpt), Addr = rabbitmq_amqp_address:queue(QName), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), @@ -5585,9 +5540,7 @@ footer_checksum(FooterOpt, Config) -> ok = amqp10_client:detach_link(Receiver), ok = amqp10_client:detach_link(Sender), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). receive_many_made_available_over_time_classic_queue(Config) -> receive_many_made_available_over_time(<<"classic">>, Config). @@ -5788,16 +5741,14 @@ incoming_window_closed_stop_link(Config) -> end, {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close({Connection, Session, LinkPair}). %% Test that we can close a link while our session incoming-window is closed. incoming_window_closed_close_link(Config) -> QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), @@ -5831,9 +5782,7 @@ incoming_window_closed_close_link(Config) -> end, {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). incoming_window_closed_rabbitmq_internal_flow_classic_queue(Config) -> incoming_window_closed_rabbitmq_internal_flow(<<"classic">>, Config). @@ -5845,7 +5794,7 @@ incoming_window_closed_rabbitmq_internal_flow(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), - {Connection, Session, LinkPair} = init(Config), + {_, Session, LinkPair} = Init = init(Config), QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), @@ -5884,9 +5833,7 @@ incoming_window_closed_rabbitmq_internal_flow(QType, Config) -> ok = detach_link_sync(Receiver), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close(Init). tcp_back_pressure_rabbitmq_internal_flow_classic_queue(Config) -> tcp_back_pressure_rabbitmq_internal_flow(<<"classic">>, Config). @@ -5978,9 +5925,7 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) -> ok = detach_link_sync(Receiver), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), - ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = close({Connection, Session, LinkPair}). session_max_per_connection(Config) -> App = rabbit, diff --git a/deps/rabbit/test/amqp_system_SUITE.erl b/deps/rabbit/test/amqp_dotnet_SUITE.erl similarity index 60% rename from deps/rabbit/test/amqp_system_SUITE.erl rename to deps/rabbit/test/amqp_dotnet_SUITE.erl index 10cea6440844..af55bb773e68 100644 --- a/deps/rabbit/test/amqp_system_SUITE.erl +++ b/deps/rabbit/test/amqp_dotnet_SUITE.erl @@ -5,9 +5,10 @@ %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% --module(amqp_system_SUITE). +-module(amqp_dotnet_SUITE). -include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl"). -compile(nowarn_export_all). @@ -15,37 +16,32 @@ all() -> [ - {group, dotnet}, - {group, java} + {group, cluster_size_1} ]. groups() -> - [ - {dotnet, [], [ - roundtrip, - roundtrip_to_amqp_091, - default_outcome, - no_routes_is_released, - outcomes, - fragmentation, - message_annotations, - footer, - data_types, - %% TODO at_most_once, - reject, - redelivery, - released, - routing, - invalid_routes, - auth_failure, - access_failure_not_allowed, - access_failure_send, - streams - ]}, - {java, [], [ - roundtrip - ]} - ]. + [{cluster_size_1, [], + [ + roundtrip, + roundtrip_to_amqp_091, + default_outcome, + no_routes_is_released, + outcomes, + fragmentation, + message_annotations, + footer, + data_types, + reject, + redelivery, + released, + routing, + invalid_routes, + auth_failure, + access_failure_not_allowed, + access_failure_send, + streams + ] + }]. %% ------------------------------------------------------------------- %% Testsuite setup/teardown. @@ -63,26 +59,20 @@ init_per_suite(Config) -> end_per_suite(Config) -> Config. -init_per_group(Group, Config) -> +init_per_group(cluster_size_1, Config) -> Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, Suffix}, - {amqp_client_library, Group} - ]), - GroupSetupStep = case Group of - dotnet -> fun build_dotnet_test_project/1; - java -> fun build_maven_test_project/1 - end, + Config1 = rabbit_ct_helpers:set_config(Config, {rmq_nodename_suffix, Suffix}), Config2 = rabbit_ct_helpers:run_setup_steps( Config1, - [GroupSetupStep] ++ + [fun build_dotnet_test_project/1] ++ rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()), ok = rabbit_ct_broker_helpers:enable_feature_flag(Config2, 'rabbitmq_4.0.0'), Config2. end_per_group(_, Config) -> - rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_helpers:run_teardown_steps( + Config, rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()). @@ -93,28 +83,14 @@ end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). build_dotnet_test_project(Config) -> - TestProjectDir = filename:join( - [?config(data_dir, Config), "fsharp-tests"]), - Ret = rabbit_ct_helpers:exec(["dotnet", "restore"], - [{cd, TestProjectDir}]), - case Ret of + TestProjectDir = filename:join([?config(data_dir, Config), "fsharp-tests"]), + case rabbit_ct_helpers:exec(["dotnet", "restore"], + [{cd, TestProjectDir}]) of {ok, _} -> rabbit_ct_helpers:set_config( Config, {dotnet_test_project_dir, TestProjectDir}); - _ -> - ct:fail({"'dotnet restore' failed", Ret}) - end. - -build_maven_test_project(Config) -> - TestProjectDir = filename:join([?config(data_dir, Config), "java-tests"]), - Ret = rabbit_ct_helpers:exec([TestProjectDir ++ "/mvnw", "test-compile"], - [{cd, TestProjectDir}]), - case Ret of - {ok, _} -> - rabbit_ct_helpers:set_config(Config, - {maven_test_project_dir, TestProjectDir}); - _ -> - ct:fail({"'mvnw test-compile' failed", Ret}) + Other -> + ct:fail({"'dotnet restore' failed", Other}) end. %% ------------------------------------------------------------------- @@ -123,58 +99,53 @@ build_maven_test_project(Config) -> roundtrip(Config) -> declare_queue(Config, ?FUNCTION_NAME, "quorum"), - run(Config, [{dotnet, "roundtrip"}, - {java, "RoundTripTest"}]). - -streams(Config) -> - declare_queue(Config, ?FUNCTION_NAME, "stream"), - run(Config, [{dotnet, "streams"}]). + run(?FUNCTION_NAME, Config). roundtrip_to_amqp_091(Config) -> declare_queue(Config, ?FUNCTION_NAME, "classic"), - run(Config, [{dotnet, "roundtrip_to_amqp_091"}]). + run(?FUNCTION_NAME, Config). default_outcome(Config) -> declare_queue(Config, ?FUNCTION_NAME, "classic"), - run(Config, [{dotnet, "default_outcome"}]). + run(?FUNCTION_NAME, Config). no_routes_is_released(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config), amqp_channel:call(Ch, #'exchange.declare'{exchange = <<"no_routes_is_released">>, durable = true}), - run(Config, [{dotnet, "no_routes_is_released"}]). + run(?FUNCTION_NAME, Config). outcomes(Config) -> declare_queue(Config, ?FUNCTION_NAME, "classic"), - run(Config, [{dotnet, "outcomes"}]). + run(?FUNCTION_NAME, Config). fragmentation(Config) -> declare_queue(Config, ?FUNCTION_NAME, "classic"), - run(Config, [{dotnet, "fragmentation"}]). + run(?FUNCTION_NAME, Config). message_annotations(Config) -> declare_queue(Config, ?FUNCTION_NAME, "classic"), - run(Config, [{dotnet, "message_annotations"}]). + run(?FUNCTION_NAME, Config). footer(Config) -> declare_queue(Config, ?FUNCTION_NAME, "classic"), - run(Config, [{dotnet, "footer"}]). + run(?FUNCTION_NAME, Config). data_types(Config) -> declare_queue(Config, ?FUNCTION_NAME, "classic"), - run(Config, [{dotnet, "data_types"}]). + run(?FUNCTION_NAME, Config). reject(Config) -> declare_queue(Config, ?FUNCTION_NAME, "classic"), - run(Config, [{dotnet, "reject"}]). + run(?FUNCTION_NAME, Config). redelivery(Config) -> declare_queue(Config, ?FUNCTION_NAME, "quorum"), - run(Config, [{dotnet, "redelivery"}]). + run(?FUNCTION_NAME, Config). released(Config) -> declare_queue(Config, ?FUNCTION_NAME, "quorum"), - run(Config, [{dotnet, "released"}]). + run(?FUNCTION_NAME, Config). routing(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config), @@ -203,23 +174,18 @@ routing(Config) -> exchange = <<"amq.direct">>, routing_key = <<"direct_q">> }), - - run(Config, [ - {dotnet, "routing"} - ]). + run(?FUNCTION_NAME, Config). invalid_routes(Config) -> - run(Config, [ - {dotnet, "invalid_routes"} - ]). + run(?FUNCTION_NAME, Config). auth_failure(Config) -> - run(Config, [ {dotnet, "auth_failure"} ]). + run(?FUNCTION_NAME, Config). access_failure_not_allowed(Config) -> User = atom_to_binary(?FUNCTION_NAME), ok = rabbit_ct_broker_helpers:add_user(Config, User, <<"boo">>), - run(Config, [ {dotnet, "access_failure_not_allowed"} ]), + run(?FUNCTION_NAME, Config), ok = rabbit_ct_broker_helpers:delete_user(Config, User). access_failure_send(Config) -> @@ -230,45 +196,35 @@ access_failure_send(Config) -> <<"^banana.*">>, %% write <<"^banana.*">> %% read ), - run(Config, [ {dotnet, "access_failure_send"} ]), + run(?FUNCTION_NAME, Config), ok = rabbit_ct_broker_helpers:delete_user(Config, User). -run(Config, Flavors) -> - ClientLibrary = ?config(amqp_client_library, Config), - Fun = case ClientLibrary of - dotnet -> fun run_dotnet_test/2; - java -> fun run_java_test/2 - end, - {ClientLibrary, TestName} = proplists:lookup(ClientLibrary, Flavors), - Fun(Config, TestName). +streams(Config) -> + declare_queue(Config, ?FUNCTION_NAME, "stream"), + run(?FUNCTION_NAME, Config). -run_dotnet_test(Config, Method) -> - TestProjectDir = ?config(dotnet_test_project_dir, Config), - Uri = rabbit_ct_broker_helpers:node_uri(Config, 0, [{use_ipaddr, true}]), - Ret = rabbit_ct_helpers:exec(["dotnet", "run", "--", Method, Uri ], - [ - {cd, TestProjectDir} - ]), - ct:pal("~s: result ~p", [?FUNCTION_NAME, Ret]), - {ok, _} = Ret. - -run_java_test(Config, Class) -> - TestProjectDir = ?config(maven_test_project_dir, Config), - Ret = rabbit_ct_helpers:exec([ - TestProjectDir ++ "/mvnw", - "test", - {"-Dtest=~ts", [Class]}, - {"-Drmq_broker_uri=~ts", [rabbit_ct_broker_helpers:node_uri(Config, 0)]} - ], - [{cd, TestProjectDir}]), - {ok, _} = Ret. +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- declare_queue(Config, Name, Type) -> Ch = rabbit_ct_client_helpers:open_channel(Config), #'queue.declare_ok'{} = - amqp_channel:call(Ch, #'queue.declare'{queue = atom_to_binary(Name, utf8), - durable = true, - arguments = [{<<"x-queue-type">>, - longstr, Type}]}), + amqp_channel:call(Ch, #'queue.declare'{queue = atom_to_binary(Name, utf8), + durable = true, + arguments = [{<<"x-queue-type">>, + longstr, Type}]}), rabbit_ct_client_helpers:close_channel(Ch), ok. + +run(TestNameAtom, Config) -> + TestName = atom_to_list(TestNameAtom), + TestProjectDir = ?config(dotnet_test_project_dir, Config), + Uri = rabbit_ct_broker_helpers:node_uri(Config, 0, [{use_ipaddr, true}]), + case rabbit_ct_helpers:exec(["dotnet", "run", "--", TestName, Uri], + [{cd, TestProjectDir}]) of + {ok, _Stdout_} -> + ok; + {error, _ExitCode, _Stdout} -> + ct:fail(TestName) + end. diff --git a/deps/rabbit/test/amqp_system_SUITE_data/console/Program.cs b/deps/rabbit/test/amqp_dotnet_SUITE_data/console/Program.cs similarity index 100% rename from deps/rabbit/test/amqp_system_SUITE_data/console/Program.cs rename to deps/rabbit/test/amqp_dotnet_SUITE_data/console/Program.cs diff --git a/deps/rabbit/test/amqp_system_SUITE_data/console/README.md b/deps/rabbit/test/amqp_dotnet_SUITE_data/console/README.md similarity index 100% rename from deps/rabbit/test/amqp_system_SUITE_data/console/README.md rename to deps/rabbit/test/amqp_dotnet_SUITE_data/console/README.md diff --git a/deps/rabbit/test/amqp_system_SUITE_data/console/standalone.csproj b/deps/rabbit/test/amqp_dotnet_SUITE_data/console/standalone.csproj similarity index 100% rename from deps/rabbit/test/amqp_system_SUITE_data/console/standalone.csproj rename to deps/rabbit/test/amqp_dotnet_SUITE_data/console/standalone.csproj diff --git a/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs b/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs similarity index 100% rename from deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs rename to deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs diff --git a/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/fsharp-tests.fsproj b/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/fsharp-tests.fsproj similarity index 100% rename from deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/fsharp-tests.fsproj rename to deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/fsharp-tests.fsproj diff --git a/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/global.json b/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/global.json similarity index 100% rename from deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/global.json rename to deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/global.json diff --git a/deps/rabbit/test/amqp_jms_SUITE.erl b/deps/rabbit/test/amqp_jms_SUITE.erl new file mode 100644 index 000000000000..a97bd5d68b0e --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE.erl @@ -0,0 +1,180 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(amqp_jms_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). + +-compile(nowarn_export_all). +-compile(export_all). + +-import(amqp_utils, + [init/1, + close/1, + connection_config/1, + detach_link_sync/1, + end_session_sync/1, + close_connection_sync/1]). + +all() -> + [ + {group, cluster_size_1} + ]. + +groups() -> + [{cluster_size_1, [shuffle], + [ + message_types_jms_to_jms, + message_types_jms_to_amqp + ] + }]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +suite() -> + [ + {timetrap, {minutes, 2}} + ]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(amqp10_client), + rabbit_ct_helpers:log_environment(), + Config. + +end_per_suite(Config) -> + Config. + +init_per_group(cluster_size_1, Config) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, {rmq_nodename_suffix, Suffix}), + Config2 = rabbit_ct_helpers:merge_app_env( + Config1, + {rabbit, + [{permit_deprecated_features, + %% We want to test JMS solely with AMQP address v2 + %% since that will be the only option in the future. + #{amqp_address_v1 => false} + }] + }), + Config3 = rabbit_ct_helpers:run_setup_steps( + Config2, + [fun build_maven_test_project/1] ++ + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + ok = rabbit_ct_broker_helpers:enable_feature_flag(Config3, 'rabbitmq_4.0.0'), + Config3. + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +build_maven_test_project(Config) -> + TestProjectDir = ?config(data_dir, Config), + case rabbit_ct_helpers:exec([filename:join([TestProjectDir, "mvnw"]), "test-compile"], + [{cd, TestProjectDir}]) of + {ok, _} -> + Config; + Other -> + ct:fail({"'mvnw test-compile' failed", Other}) + end. + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +%% Send different message types from JMS client to JMS client. +message_types_jms_to_jms(Config) -> + TestName = QName = atom_to_binary(?FUNCTION_NAME), + ok = declare_queue(QName, <<"quorum">>, Config), + ok = run(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config), + ok = delete_queue(QName, Config). + +%% Send different message types from JMS client to Erlang AMQP 1.0 client. +message_types_jms_to_amqp(Config) -> + TestName = QName = atom_to_binary(?FUNCTION_NAME), + ok = declare_queue(QName, <<"quorum">>, Config), + Address = rabbitmq_amqp_address:queue(QName), + + %% The JMS client sends messaegs. + ok = run(TestName, [{"-Dqueue=~ts", [Address]}], Config), + + %% The Erlang AMQP 1.0 client receives messages. + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled), + {ok, Msg1} = amqp10_client:get_msg(Receiver), + ?assertEqual( + #'v1_0.amqp_value'{content = {utf8, <<"msg1🥕"/utf8>>}}, + amqp10_msg:body(Msg1)), + {ok, Msg2} = amqp10_client:get_msg(Receiver), + ?assertEqual( + #'v1_0.amqp_value'{ + content = {map, [ + {{utf8, <<"key1">>}, {utf8, <<"value">>}}, + {{utf8, <<"key2">>}, true}, + {{utf8, <<"key3">>}, {double, -1.1}}, + {{utf8, <<"key4">>}, {long, -1}} + ]}}, + amqp10_msg:body(Msg2)), + {ok, Msg3} = amqp10_client:get_msg(Receiver), + ?assertEqual( + [ + #'v1_0.amqp_sequence'{ + content = [{utf8, <<"value">>}, + true, + {double, -1.1}, + {long, -1}]} + ], + amqp10_msg:body(Msg3)), + + ok = detach_link_sync(Receiver), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection), + ok = delete_queue(QName, Config). + +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +run(TestName, JavaProps, Config) -> + TestProjectDir = ?config(data_dir, Config), + Cmd = [filename:join([TestProjectDir, "mvnw"]), + "test", + {"-Dtest=JmsTest#~ts", [TestName]}, + {"-Drmq_broker_uri=~ts", [rabbit_ct_broker_helpers:node_uri(Config, 0)]} + ] ++ JavaProps, + case rabbit_ct_helpers:exec(Cmd, [{cd, TestProjectDir}]) of + {ok, _Stdout_} -> + ok; + {error, _ExitCode, _Stdout} -> + ct:fail(TestName) + end. + +declare_queue(Name, Type, Config) -> + {_, _, LinkPair} = Init = init(Config), + {ok, #{type := Type}} = rabbitmq_amqp_client:declare_queue( + LinkPair, Name, + #{arguments => #{<<"x-queue-type">> => {utf8, Type}}}), + ok = close(Init). + +delete_queue(Name, Config) -> + {_, _, LinkPair} = Init = init(Config), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Name), + ok = close(Init). diff --git a/deps/rabbit/test/amqp_system_SUITE_data/java-tests/.gitignore b/deps/rabbit/test/amqp_jms_SUITE_data/.gitignore similarity index 100% rename from deps/rabbit/test/amqp_system_SUITE_data/java-tests/.gitignore rename to deps/rabbit/test/amqp_jms_SUITE_data/.gitignore diff --git a/deps/rabbit/test/amqp_system_SUITE_data/java-tests/.mvn/wrapper/maven-wrapper.properties b/deps/rabbit/test/amqp_jms_SUITE_data/.mvn/wrapper/maven-wrapper.properties similarity index 100% rename from deps/rabbit/test/amqp_system_SUITE_data/java-tests/.mvn/wrapper/maven-wrapper.properties rename to deps/rabbit/test/amqp_jms_SUITE_data/.mvn/wrapper/maven-wrapper.properties diff --git a/deps/rabbit/test/amqp_system_SUITE_data/java-tests/mvnw b/deps/rabbit/test/amqp_jms_SUITE_data/mvnw similarity index 100% rename from deps/rabbit/test/amqp_system_SUITE_data/java-tests/mvnw rename to deps/rabbit/test/amqp_jms_SUITE_data/mvnw diff --git a/deps/rabbit/test/amqp_system_SUITE_data/java-tests/mvnw.cmd b/deps/rabbit/test/amqp_jms_SUITE_data/mvnw.cmd similarity index 100% rename from deps/rabbit/test/amqp_system_SUITE_data/java-tests/mvnw.cmd rename to deps/rabbit/test/amqp_jms_SUITE_data/mvnw.cmd diff --git a/deps/rabbit/test/amqp_system_SUITE_data/java-tests/pom.xml b/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml similarity index 95% rename from deps/rabbit/test/amqp_system_SUITE_data/java-tests/pom.xml rename to deps/rabbit/test/amqp_jms_SUITE_data/pom.xml index e40b72b44099..cce3ecb58f45 100644 --- a/deps/rabbit/test/amqp_system_SUITE_data/java-tests/pom.xml +++ b/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml @@ -1,11 +1,11 @@ 4.0.0 - com.rabbitmq.amqp1_0.tests.proton - rabbitmq-amqp1.0-java-tests + com.rabbitmq.amqp.tests.jms + rabbitmq-amqp-jms-tests jar 1.0-SNAPSHOT - rabbitmq-amqp1.0-java-tests + rabbitmq-amqp-jms-tests https://www.rabbitmq.com 5.10.2 diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java new file mode 100644 index 000000000000..f5c5bffba2b2 --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java @@ -0,0 +1,132 @@ +package com.rabbitmq.amqp.tests.jms; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import jakarta.jms.*; +import java.util.*; +import javax.naming.Context; +import org.junit.jupiter.api.Test; + +public class JmsTest { + + private javax.naming.Context getContext() throws Exception{ + // Configure a JNDI initial context, see + // https://github.com/apache/qpid-jms/blob/main/qpid-jms-docs/Configuration.md#configuring-a-jndi-initialcontext + Hashtable env = new Hashtable<>(); + env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); + + String uri = System.getProperty("rmq_broker_uri", "amqp://localhost:5672"); + // For a list of options, see + // https://github.com/apache/qpid-jms/blob/main/qpid-jms-docs/Configuration.md#jms-configuration-options + uri = uri + "?jms.clientID=my-client-id"; + env.put("connectionfactory.myConnection", uri); + + String queueName = System.getProperty("queue"); + if (queueName != null) { + env.put("queue.myQueue", queueName); + } + + javax.naming.Context context = new javax.naming.InitialContext(env); + return context; + } + + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#jakarta-messaging-message-types + @Test + public void message_types_jms_to_jms() throws Exception { + Context context = getContext(); + ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(); + Destination queue = (Destination) context.lookup("myQueue"); + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + // TextMessage + String msg1 = "msg1"; + TextMessage textMessage = session.createTextMessage(msg1); + producer.send(textMessage); + TextMessage receivedTextMessage = (TextMessage) consumer.receive(5000); + assertEquals(msg1, receivedTextMessage.getText()); + + // BytesMessage + String msg2 = "msg2"; + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeUTF(msg2); + producer.send(bytesMessage); + BytesMessage receivedBytesMessage = (BytesMessage) consumer.receive(5000); + assertEquals(msg2, receivedBytesMessage.readUTF()); + + // MapMessage + MapMessage mapMessage = session.createMapMessage(); + mapMessage.setString("key1", "value"); + mapMessage.setBoolean("key2", true); + mapMessage.setDouble("key3", 1.0); + mapMessage.setLong("key4", 1L); + producer.send(mapMessage); + MapMessage receivedMapMessage = (MapMessage) consumer.receive(5000); + assertEquals("value", receivedMapMessage.getString("key1")); + assertEquals(true, receivedMapMessage.getBoolean("key2")); + assertEquals(1.0, receivedMapMessage.getDouble("key3")); + assertEquals(1L, receivedMapMessage.getLong("key4")); + + // StreamMessage + StreamMessage streamMessage = session.createStreamMessage(); + streamMessage.writeString("value"); + streamMessage.writeBoolean(true); + streamMessage.writeDouble(1.0); + streamMessage.writeLong(1L); + producer.send(streamMessage); + StreamMessage receivedStreamMessage = (StreamMessage) consumer.receive(5000); + assertEquals("value", receivedStreamMessage.readString()); + assertEquals(true, receivedStreamMessage.readBoolean()); + assertEquals(1.0, receivedStreamMessage.readDouble()); + assertEquals(1L, receivedStreamMessage.readLong()); + + // ObjectMessage + ObjectMessage objectMessage = session.createObjectMessage(); + ArrayList list = new ArrayList<>(Arrays.asList(1, 2, 3)); + objectMessage.setObject(list); + producer.send(objectMessage); + ObjectMessage receivedObjectMessage = (ObjectMessage) consumer.receive(5000); + assertEquals(list, receivedObjectMessage.getObject()); + } + } + + @Test + public void message_types_jms_to_amqp() throws Exception { + Context context = getContext(); + ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(); + Destination queue = (Destination) context.lookup("myQueue"); + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + // TextMessage + String msg1 = "msg1🥕"; + TextMessage textMessage = session.createTextMessage(msg1); + producer.send(textMessage); + + // MapMessage + MapMessage mapMessage = session.createMapMessage(); + mapMessage.setString("key1", "value"); + mapMessage.setBoolean("key2", true); + mapMessage.setDouble("key3", -1.1); + mapMessage.setLong("key4", -1L); + producer.send(mapMessage); + + // StreamMessage + StreamMessage streamMessage = session.createStreamMessage(); + streamMessage.writeString("value"); + streamMessage.writeBoolean(true); + streamMessage.writeDouble(-1.1); + streamMessage.writeLong(-1L); + producer.send(streamMessage); + } + } +} diff --git a/deps/rabbit/test/amqp_system_SUITE_data/java-tests/src/test/resources/logback-test.xml b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/resources/logback-test.xml similarity index 100% rename from deps/rabbit/test/amqp_system_SUITE_data/java-tests/src/test/resources/logback-test.xml rename to deps/rabbit/test/amqp_jms_SUITE_data/src/test/resources/logback-test.xml diff --git a/deps/rabbit/test/amqp_system_SUITE_data/java-tests/src/test/java/com/rabbitmq/amqp1_0/tests/jms/RoundTripTest.java b/deps/rabbit/test/amqp_system_SUITE_data/java-tests/src/test/java/com/rabbitmq/amqp1_0/tests/jms/RoundTripTest.java deleted file mode 100644 index 365ee95b8841..000000000000 --- a/deps/rabbit/test/amqp_system_SUITE_data/java-tests/src/test/java/com/rabbitmq/amqp1_0/tests/jms/RoundTripTest.java +++ /dev/null @@ -1,51 +0,0 @@ -// vim:sw=4:et: - -package com.rabbitmq.amqp1_0.tests.jms; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import jakarta.jms.*; -import java.util.*; -import javax.naming.Context; -import org.junit.jupiter.api.Test; - -/** Unit test for simple App. */ -public class RoundTripTest { - - @Test - public void test_roundtrip() throws Exception { - String uri = System.getProperty("rmq_broker_uri", "amqp://localhost:5672"); - Hashtable env = new Hashtable<>(); - env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); - env.put("connectionfactory.myFactoryLookup", uri); - env.put("queue.myQueueLookup", "my-queue"); - env.put("jms.sendTimeout", 5); - env.put("jms.requestTimeout", 5); - javax.naming.Context context = new javax.naming.InitialContext(env); - - assertNotNull(uri); - - ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup"); - Destination queue = (Destination) context.lookup("myQueueLookup"); - - try (Connection connection = factory.createConnection("guest", "guest")) { - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer messageProducer = session.createProducer(queue); - MessageConsumer messageConsumer = session.createConsumer(queue); - - TextMessage message = session.createTextMessage("Hello world!"); - messageProducer.send( - message, - DeliveryMode.NON_PERSISTENT, - Message.DEFAULT_PRIORITY, - Message.DEFAULT_TIME_TO_LIVE); - TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L); - - assertEquals(message.getText(), receivedMessage.getText()); - } - } -} diff --git a/deps/rabbit/test/amqp_utils.erl b/deps/rabbit/test/amqp_utils.erl index 9de9a1bbfa06..58312f70becf 100644 --- a/deps/rabbit/test/amqp_utils.erl +++ b/deps/rabbit/test/amqp_utils.erl @@ -10,6 +10,7 @@ -include_lib("amqp10_common/include/amqp10_framing.hrl"). -export([init/1, init/2, + close/1, connection_config/1, connection_config/2, web_amqp/1, flush/1, @@ -31,6 +32,11 @@ init(Node, Config) -> {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), {Connection, Session, LinkPair}. +close({Connection, Session, LinkPair}) -> + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + connection_config(Config) -> connection_config(0, Config).