diff --git a/.github/workflows/test-make-tests.yaml b/.github/workflows/test-make-tests.yaml index 5fa4c6e43d48..a4ffd93c453c 100644 --- a/.github/workflows/test-make-tests.yaml +++ b/.github/workflows/test-make-tests.yaml @@ -25,6 +25,7 @@ jobs: - parallel-ct-set-2 - parallel-ct-set-3 - parallel-ct-set-4 + - ct-amqp_client - ct-clustering_management - eunit ct-dead_lettering - ct-feature_flags diff --git a/deps/amqp10_client/src/amqp10_client_connection.erl b/deps/amqp10_client/src/amqp10_client_connection.erl index 70069fe8b307..d97b5540df00 100644 --- a/deps/amqp10_client/src/amqp10_client_connection.erl +++ b/deps/amqp10_client/src/amqp10_client_connection.erl @@ -284,7 +284,25 @@ open_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, {keep_state, State1}; +<<<<<<< HEAD open_sent(info, {'DOWN', MRef, _, _, _}, +======= +open_sent(_EvtType, {close, Reason}, State) -> + case send_close(State, Reason) of + ok -> + %% "After writing this frame the peer SHOULD continue to read from the connection + %% until it receives the partner's close frame (in order to guard against + %% erroneously or maliciously implemented partners, a peer SHOULD implement a + %% timeout to give its partner a reasonable time to receive and process the close + %% before giving up and simply closing the underlying transport mechanism)." [ยง2.4.3] + {next_state, close_sent, State, {state_timeout, ?TIMEOUT, received_no_close_frame}}; + {error, closed} -> + {stop, normal, State}; + Error -> + {stop, Error, State} + end; +open_sent(info, {'DOWN', MRef, process, _, _}, +>>>>>>> 77e363627 (amqp10_client: Handle `close` message in the `open_sent` state) #state{reader_m_ref = MRef}) -> {stop, {shutdown, reader_down}}. @@ -324,6 +342,7 @@ close_sent(_EvtType, heartbeat, State) -> {next_state, close_sent, State}; close_sent(_EvtType, {'EXIT', _Pid, shutdown}, State) -> %% monitored processes may exit during closure +<<<<<<< HEAD {next_state, close_sent, State}; close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _}, #state{reader = ReaderPid} = State) -> @@ -333,6 +352,21 @@ close_sent(_EvtType, #'v1_0.close'{}, State) -> %% TODO: we should probably set up a timer before this to ensure %% we close down event if no reply is received {stop, normal, State}. +======= + keep_state_and_data; +close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _Reason}, + #state{reader = ReaderPid}) -> + %% if the reader exits we probably won't receive a close frame + {stop, normal}; +close_sent(_EvtType, #'v1_0.close'{} = Close, #state{config = Config}) -> + ok = notify_closed(Config, Close), + {stop, normal}; +close_sent(state_timeout, received_no_close_frame, _Data) -> + {stop, normal}; +close_sent(_EvtType, #'v1_0.open'{}, _Data) -> + %% Transition from CLOSE_PIPE to CLOSE_SENT in figure 2.23. + keep_state_and_data. +>>>>>>> 65576863f (amqp10_client: Fix crash in close_sent) set_other_procs0(OtherProcs, State) -> #{sessions_sup := SessionsSup, diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 5d6ac4f6183a..e658ec58d490 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -175,7 +175,8 @@ bats: $(BATS) tests:: bats -SLOW_CT_SUITES := backing_queue \ +SLOW_CT_SUITES := amqp_client \ + backing_queue \ channel_interceptor \ cluster \ cluster_rename \ @@ -257,8 +258,13 @@ define ct_master.erl halt(0) endef +<<<<<<< HEAD PARALLEL_CT_SET_1_A = amqp_client unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management +======= +PARALLEL_CT_SET_1_A = unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking +PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management +>>>>>>> 2c6619104 (amqp_client_SUITE: Use a dedicated CI job for this testsuite) PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit @@ -282,7 +288,7 @@ PARALLEL_CT_SET_2 = $(sort $(PARALLEL_CT_SET_2_A) $(PARALLEL_CT_SET_2_B) $(PARAL PARALLEL_CT_SET_3 = $(sort $(PARALLEL_CT_SET_3_A) $(PARALLEL_CT_SET_3_B) $(PARALLEL_CT_SET_3_C) $(PARALLEL_CT_SET_3_D)) PARALLEL_CT_SET_4 = $(sort $(PARALLEL_CT_SET_4_A) $(PARALLEL_CT_SET_4_B) $(PARALLEL_CT_SET_4_C) $(PARALLEL_CT_SET_4_D)) -SEQUENTIAL_CT_SUITES = clustering_management dead_lettering feature_flags metadata_store_clustering quorum_queue rabbit_stream_queue +SEQUENTIAL_CT_SUITES = amqp_client clustering_management dead_lettering feature_flags metadata_store_clustering quorum_queue rabbit_stream_queue PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4) ifeq ($(filter-out $(SEQUENTIAL_CT_SUITES) $(PARALLEL_CT_SUITES),$(CT_SUITES)),) diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 88826c5ef00b..819d50a20a4b 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -11,6 +11,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp10_common/include/amqp10_framing.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile([nowarn_export_all, export_all]). @@ -341,10 +342,9 @@ init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). end_per_testcase(Testcase, Config) -> - %% Assert that every testcase cleaned up. - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), - eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))), - %% Wait for sessions to terminate before starting the next test case. + %% Clean up any queues, connections, and sessions. + rpc(Config, ?MODULE, delete_queues, []), + ok = rpc(Config, rabbit_networking, close_all_connections, [<<"test finished">>]), eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, []))), %% Assert that global counters count correctly. eventually(?_assertMatch(#{publishers := 0, @@ -574,7 +574,7 @@ modified_quorum_queue(Config) -> ok = amqp10_client:settle_msg(Receiver1, M2e, modified), %% Test that we can consume via AMQP 0.9.1 - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), {#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>, props = #'P_basic'{headers = Headers}} @@ -585,7 +585,7 @@ modified_quorum_queue(Config) -> lists:keysearch(<<"x-other">>, 1, Headers)), ?assertEqual({value, {<<"x-delivery-count">>, long, 5}}, lists:keysearch(<<"x-delivery-count">>, 1, Headers)), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = amqp10_client:detach_link(Receiver1), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), @@ -1345,7 +1345,7 @@ amqp_amqpl(QType, Config) -> ok = amqp10_client:detach_link(Sender), flush(detached), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{global = false, prefetch_count = 100}), CTag = <<"my-tag">>, @@ -1428,7 +1428,7 @@ amqp_amqpl(QType, Config) -> after 30000 -> ct:fail({missing_deliver, ?LINE}) end, - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = amqp10_client:close_connection(Connection). @@ -1437,7 +1437,7 @@ message_headers_conversion(Config) -> QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), %% declare a quorum queue - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), amqp_channel:call(Ch, #'queue.declare'{ queue = QName, durable = true, @@ -1449,7 +1449,7 @@ message_headers_conversion(Config) -> amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address), amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = delete_queue(Session, QName), ok = amqp10_client:close_connection(Connection). @@ -1551,11 +1551,11 @@ multiple_sessions(Config) -> ok = amqp10_client:flow_link_credit(Receiver2, NMsgsPerReceiver, never), flush("receiver attached"), - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), [#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName, exchange = <<"amq.fanout">>}) || QName <- Qs], - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), %% Send on each session. TargetAddr = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), @@ -1611,13 +1611,13 @@ server_closes_link_stream(Config) -> server_closes_link(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, durable = true, arguments = [{<<"x-queue-type">>, longstr, QType}]}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -1692,7 +1692,7 @@ server_closes_link_exchange(Settled, Config) -> XName = atom_to_binary(?FUNCTION_NAME), QName = <<"my queue">>, RoutingKey = <<"my routing key">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = XName}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName, @@ -1734,7 +1734,7 @@ server_closes_link_exchange(Settled, Config) -> ?assertMatch(#{publishers := 0}, get_global_counters(Config)), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). @@ -1746,13 +1746,13 @@ link_target_quorum_queue_deleted(Config) -> link_target_queue_deleted(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, durable = true, arguments = [{<<"x-queue-type">>, longstr, QType}]}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -1807,7 +1807,7 @@ target_queues_deleted_accepted(Config) -> Q2 = <<"q2">>, Q3 = <<"q3">>, QNames = [Q1, Q2, Q3], - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), [begin #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName, @@ -1856,7 +1856,7 @@ target_queues_deleted_accepted(Config) -> ?assertEqual(#'queue.delete_ok'{message_count = 2}, amqp_channel:call(Ch, #'queue.delete'{queue = Q1})), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ?assert(rpc(Config, meck, validate, [Mod])), ok = rpc(Config, meck, unload, [Mod]), ok = end_session_sync(Session), @@ -1937,7 +1937,7 @@ sync_get_unsettled_stream(Config) -> sync_get_unsettled(QType, Config) -> SenderSettleMode = unsettled, QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, @@ -2026,7 +2026,7 @@ sync_get_unsettled(QType, Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). sync_get_unsettled_2_classic_queue(Config) -> sync_get_unsettled_2(<<"classic">>, Config). @@ -2041,7 +2041,7 @@ sync_get_unsettled_2_stream(Config) -> sync_get_unsettled_2(QType, Config) -> SenderSettleMode = unsettled, QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, @@ -2116,7 +2116,7 @@ sync_get_unsettled_2(QType, Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). sync_get_settled_classic_queue(Config) -> sync_get_settled(<<"classic">>, Config). @@ -2131,7 +2131,7 @@ sync_get_settled_stream(Config) -> sync_get_settled(QType, Config) -> SenderSettleMode = settled, QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, @@ -2196,7 +2196,7 @@ sync_get_settled(QType, Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). timed_get_classic_queue(Config) -> timed_get(<<"classic">>, Config). @@ -2210,7 +2210,7 @@ timed_get_stream(Config) -> %% Synchronous get with a timeout, figure 2.44. timed_get(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, @@ -2268,7 +2268,7 @@ timed_get(QType, Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). stop_classic_queue(Config) -> stop(<<"classic">>, Config). @@ -2281,7 +2281,7 @@ stop_stream(Config) -> %% Test stopping a link, figure 2.46. stop(QType, Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), QName = atom_to_binary(?FUNCTION_NAME), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ @@ -2347,7 +2347,7 @@ stop(QType, Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). consumer_priority_classic_queue(Config) -> consumer_priority(<<"classic">>, Config). @@ -2833,7 +2833,7 @@ detach_requeues_one_session_quorum_queue(Config) -> detach_requeue_one_session(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, @@ -2911,7 +2911,7 @@ detach_requeue_one_session(QType, Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection), #'queue.delete_ok'{message_count = 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). detach_requeues_drop_head_classic_queue(Config) -> QName1 = <<"q1">>, @@ -3083,7 +3083,7 @@ detach_requeues_two_connections(QType, Config) -> resource_alarm_before_session_begin(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -3134,11 +3134,11 @@ resource_alarm_before_session_begin(Config) -> ok = end_session_sync(Session1), ok = amqp10_client:close_connection(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). resource_alarm_after_session_begin(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), Address = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), @@ -3201,13 +3201,13 @@ resource_alarm_after_session_begin(Config) -> ok = amqp10_client:close_connection(Connection1), ok = amqp10_client:close_connection(Connection2), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). %% Test case for %% https://github.com/rabbitmq/rabbitmq-server/issues/12816 resource_alarm_send_many(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), Address = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), @@ -3237,7 +3237,7 @@ resource_alarm_send_many(Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). auth_attempt_metrics(Config) -> open_and_close_connection(Config), @@ -3270,7 +3270,7 @@ max_message_size_client_to_server(Config) -> ok = rpc(Config, persistent_term, put, [max_message_size, MaxMessageSize]), QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), Address = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), @@ -3294,12 +3294,12 @@ max_message_size_client_to_server(Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]). max_message_size_server_to_client(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), Address = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), @@ -3348,13 +3348,13 @@ max_message_size_server_to_client(Config) -> ok = amqp10_client:close_connection(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). last_queue_confirms(Config) -> ClassicQ = <<"my classic queue">>, QuorumQ = <<"my quorum queue">>, Qs = [ClassicQ, QuorumQ], - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{queue = ClassicQ}), #'queue.declare_ok'{} = amqp_channel:call( @@ -3420,13 +3420,13 @@ last_queue_confirms(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = ClassicQ})), ?assertEqual(#'queue.delete_ok'{message_count = 2}, amqp_channel:call(Ch, #'queue.delete'{queue = QuorumQ})), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). target_queue_deleted(Config) -> ClassicQ = <<"my classic queue">>, QuorumQ = <<"my quorum queue">>, Qs = [ClassicQ, QuorumQ], - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{queue = ClassicQ}), #'queue.declare_ok'{} = amqp_channel:call( @@ -3492,11 +3492,12 @@ target_queue_deleted(Config) -> ok = amqp10_client:close_connection(Connection), ?assertEqual(#'queue.delete_ok'{message_count = 2}, amqp_channel:call(Ch, #'queue.delete'{queue = QuorumQ})), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). target_classic_queue_down(Config) -> ClassicQueueNode = 2, - Ch = rabbit_ct_client_helpers:open_channel(Config, ClassicQueueNode), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, ClassicQueueNode), QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), #'queue.declare_ok'{} = amqp_channel:call( @@ -3504,7 +3505,7 @@ target_classic_queue_down(Config) -> queue = QName, durable = true, arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}), - ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, ClassicQueueNode), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -3585,7 +3586,8 @@ async_notify_unsettled_stream(Config) -> %% Test asynchronous notification, figure 2.45. async_notify(SenderSettleMode, QType, Config) -> %% Place queue leader on the old node. - Ch = rabbit_ct_client_helpers:open_channel(Config, 1), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, 1), QName = atom_to_binary(?FUNCTION_NAME), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ @@ -3642,7 +3644,7 @@ async_notify(SenderSettleMode, QType, Config) -> end, #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = end_session_sync(Session), ok = close_connection_sync(Connection). @@ -3650,7 +3652,7 @@ async_notify(SenderSettleMode, QType, Config) -> %% (slow queue) does not impact other link receivers (fast queues) on the **same** session. %% (This is unlike AMQP legacy where a single slow queue will block the entire connection.) link_flow_control(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), CQ = <<"cq">>, QQ = <<"qq">>, #'queue.declare_ok'{} = amqp_channel:call( @@ -3663,6 +3665,7 @@ link_flow_control(Config) -> queue = QQ, durable = true, arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), @@ -3750,7 +3753,8 @@ quorum_queue_on_new_node(Config) -> %% In mixed version tests, run the queue leader with old code %% and queue client with new code, or vice versa. queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, QueueLeaderNode), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, QueueLeaderNode), QName = atom_to_binary(?FUNCTION_NAME), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{queue = QName, @@ -3819,8 +3823,13 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) ExpectedReadyMsgs = 0, ?assertEqual(#'queue.delete_ok'{message_count = ExpectedReadyMsgs}, amqp_channel:call(Ch, #'queue.delete'{queue = QName})), +<<<<<<< HEAD ok = rabbit_ct_client_helpers:close_channel(Ch), ok = amqp10_client:close_connection(Connection). +======= + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), + ok = close_connection_sync(Connection). +>>>>>>> ce5ba6da0 (amqp_client_SUITE: Use a dedicated AMQP-0-9-1 connection per testcase) maintenance(Config) -> {ok, C0} = amqp10_client:open_connection(connection_config(0, Config)), @@ -3866,6 +3875,7 @@ leader_transfer_stream_credit_batches(Config) -> leader_transfer_credit(QName, QType, Credit, Config) -> %% Create queue with leader on node 1. +<<<<<<< HEAD {Connection1, Session1, LinkPair1} = init(1, Config), {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue( LinkPair1, @@ -3875,6 +3885,18 @@ leader_transfer_credit(QName, QType, Credit, Config) -> ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair1), ok = end_session_sync(Session1), ok = close_connection_sync(Connection1), +======= + {_, _, LinkPair1} = Init = init(1, Config), + ?awaitMatch( + {ok, #{type := QType}}, + rabbitmq_amqp_client:declare_queue( + LinkPair1, + QName, + #{arguments => #{<<"x-queue-type">> => {utf8, QType}, + <<"x-queue-leader-locator">> => {utf8, <<"client-local">>}}}), + 60000), + ok = close(Init), +>>>>>>> 603ad0d7e (amqp_client_SUITE: Retry connection in two testcases) %% Consume from a follower. OpnConf = connection_config(0, Config), @@ -3999,7 +4021,7 @@ global_counters(Config) -> messages_redelivered_total := QQRedelivered0, messages_acknowledged_total := QQAcknowledged0} = get_global_counters(Config, rabbit_quorum_queue), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), CQ = <<"my classic queue">>, QQ = <<"my quorum queue">>, CQAddress = rabbitmq_amqp_address:queue(CQ), @@ -4124,7 +4146,7 @@ global_counters(Config) -> %% m4 was returned ?assertEqual(UnroutableReturned1 + 1, UnroutableReturned2), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = amqp10_client:detach_link(Sender), ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). @@ -4132,12 +4154,12 @@ global_counters(Config) -> stream_filtering(Config) -> Stream = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(Stream), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), amqp_channel:call(Ch, #'queue.declare'{ queue = Stream, durable = true, arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -4264,7 +4286,7 @@ available_messages_stream(Config) -> available_messages(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, @@ -4356,7 +4378,7 @@ available_messages(QType, Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). incoming_message_interceptors(Config) -> Key = ?FUNCTION_NAME, @@ -4420,8 +4442,18 @@ trace(Config) -> RoutingKey = <<"my routing key">>, Payload = <<"my payload">>, CorrelationId = <<"my correlation ๐Ÿ‘€"/utf8>>, +<<<<<<< HEAD Ch = rabbit_ct_client_helpers:open_channel(Config), [#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q0}) || Q0 <- Qs], +======= + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + #'queue.declare_ok'{} = amqp_channel:call( + Ch, #'queue.declare'{ + queue = Q, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, QType}]}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = TraceQ}), +>>>>>>> ce5ba6da0 (amqp_client_SUITE: Use a dedicated AMQP-0-9-1 connection per testcase) #'queue.bind_ok'{} = amqp_channel:call( Ch, #'queue.bind'{queue = TraceQ, exchange = <<"amq.rabbitmq.trace">>, @@ -4489,6 +4521,7 @@ trace(Config) -> timer:sleep(20), ?assertMatch(#'basic.get_empty'{}, amqp_channel:call(Ch, #'basic.get'{queue = TraceQ})), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver), @@ -4533,9 +4566,9 @@ user_id(Config) -> message_ttl(Config) -> QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), @@ -4583,19 +4616,8 @@ plugin(Config) -> idle_time_out_on_server(Config) -> App = rabbit, Par = heartbeat, - {ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]), - %% Configure RabbitMQ to use an idle-time-out of 1 second. - ok = rpc(Config, application, set_env, [App, Par, 1]), - - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - receive {amqp10_event, {connection, Connection, opened}} -> ok - after 30000 -> ct:fail({missing_event, ?LINE}) - end, - - %% Mock the server socket to not have received any bytes. - rabbit_ct_broker_helpers:setup_meck(Config), Mod = rabbit_net, +<<<<<<< HEAD ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]), ok = rpc(Config, meck, expect, [Mod, getstat, 2, {ok, [{recv_oct, 999}]}]), %% The server "SHOULD try to gracefully close the connection using a close @@ -4611,10 +4633,46 @@ idle_time_out_on_server(Config) -> after 30000 -> ct:fail({missing_event, ?LINE}) end, - - ?assert(rpc(Config, meck, validate, [Mod])), - ok = rpc(Config, meck, unload, [Mod]), - ok = rpc(Config, application, set_env, [App, Par, DefaultVal]). +======= + {ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]), + try + %% Configure RabbitMQ to use an idle-time-out of 1 second. + ok = rpc(Config, application, set_env, [App, Par, 1]), + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + receive {amqp10_event, {connection, Connection, opened}} -> ok + after 30000 -> ct:fail({missing_event, ?LINE}) + end, +>>>>>>> 608405518 (amqp_client_SUITE: Ensure `idle_time_out_on_server` restores heartbeat value) + + %% Mock the server socket to not have received any bytes. + rabbit_ct_broker_helpers:setup_meck(Config), + ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]), + ok = rpc(Config, meck, expect, [Mod, getstat, fun(_Sock, [recv_oct]) -> + {ok, [{recv_oct, 999}]}; + (Sock, Opts) -> + meck:passthrough([Sock, Opts]) + end]), + + %% The server "SHOULD try to gracefully close the connection using a close + %% frame with an error explaining why" [2.4.5]. + %% Since we chose a heartbeat value of 1 second, the server should easily + %% close the connection within 5 seconds. + receive + {amqp10_event, + {connection, Connection, + {closed, + {resource_limit_exceeded, + <<"no frame received from client within idle timeout threshold">>}}}} -> ok + after 30000 -> + ct:fail({missing_event, ?LINE}) + end + after + ?assert(rpc(Config, meck, validate, [Mod])), + ok = rpc(Config, meck, unload, [Mod]), + ok = rpc(Config, application, set_env, [App, Par, DefaultVal]) + end. %% Test that the idle timeout threshold is exceeded on the client %% when no frames are sent from server to client. @@ -4703,7 +4761,7 @@ credential_expires(Config) -> %% Attaching to an exclusive source queue should fail. attach_to_exclusive_queue(Config) -> QName = <<"my queue">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{queue = QName, durable = true, @@ -4726,7 +4784,7 @@ attach_to_exclusive_queue(Config) -> ok = amqp10_client:close_connection(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). priority_classic_queue(Config) -> QArgs = #{<<"x-queue-type">> => {utf8, <<"classic">>}, @@ -5184,12 +5242,15 @@ dead_letter_into_stream(Config) -> <<"x-dead-letter-exchange">> => {utf8, <<>>}, <<"x-dead-letter-routing-key">> => {utf8, QName1} }}), - {ok, #{type := <<"stream">>}} = rabbitmq_amqp_client:declare_queue( - LinkPair1, - QName1, - #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}, - <<"x-initial-cluster-size">> => {ulong, 1} - }}), + ?awaitMatch( + {ok, #{type := <<"stream">>}}, + rabbitmq_amqp_client:declare_queue( + LinkPair1, + QName1, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}, + <<"x-initial-cluster-size">> => {ulong, 1} + }}), + 60000), {ok, Receiver} = amqp10_client:attach_receiver_link( Session1, <<"receiver">>, <<"/amq/queue/", QName1/binary>>, settled, configuration, @@ -5628,9 +5689,9 @@ receive_many_auto_flow(QType, Config) -> %% incoming-window being closed. incoming_window_closed_transfer_flow_order(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), Address = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf),