@@ -894,34 +894,30 @@ session_expiry(Config) ->
894894 ok = rpc (Config , application , set_env , [App , Par , DefaultVal ]).
895895
896896non_clean_sess_reconnect_qos1 (Config ) ->
897- non_clean_sess_reconnect (Config , qos1 ).
897+ non_clean_sess_reconnect (Config , 1 ).
898898
899899non_clean_sess_reconnect_qos0 (Config ) ->
900- non_clean_sess_reconnect (Config , qos0 ).
900+ non_clean_sess_reconnect (Config , 0 ).
901901
902902non_clean_sess_reconnect (Config , SubscriptionQoS ) ->
903903 Pub = connect (<<" publisher" >>, Config ),
904904 Topic = ClientId = atom_to_binary (? FUNCTION_NAME ),
905905
906906 C1 = connect (ClientId , Config , non_clean_sess_opts ()),
907- {ok , _ , _ } = emqtt :subscribe (C1 , Topic , SubscriptionQoS ),
908- ? assertMatch (#{consumers := 1 },
909- get_global_counters (Config )),
907+ {ok , _ , [SubscriptionQoS ]} = emqtt :subscribe (C1 , Topic , SubscriptionQoS ),
908+ ok = await_consumer_count (1 , ClientId , SubscriptionQoS , Config ),
910909
911910 ok = emqtt :disconnect (C1 ),
912- eventually (? _assertMatch (#{consumers := 0 },
913- get_global_counters (Config ))),
911+ ok = await_consumer_count (0 , ClientId , SubscriptionQoS , Config ),
914912
915- timer :sleep (20 ),
916913 ok = emqtt :publish (Pub , Topic , <<" msg-3-qos0" >>, qos0 ),
917914 {ok , _ } = emqtt :publish (Pub , Topic , <<" msg-4-qos1" >>, qos1 ),
918915
919916 C2 = connect (ClientId , Config , non_clean_sess_opts ()),
920917 % % Server should reply in CONNACK that it has session state.
921918 ? assertEqual ({session_present , 1 },
922919 proplists :lookup (session_present , emqtt :info (C2 ))),
923- ? assertMatch (#{consumers := 1 },
924- get_global_counters (Config )),
920+ ok = await_consumer_count (1 , ClientId , SubscriptionQoS , Config ),
925921
926922 ok = emqtt :publish (Pub , Topic , <<" msg-5-qos0" >>, qos0 ),
927923 {ok , _ } = emqtt :publish (Pub , Topic , <<" msg-6-qos1" >>, qos1 ),
@@ -954,21 +950,20 @@ non_clean_sess_reconnect_qos0_and_qos1(Config) ->
954950 ClientId = ? FUNCTION_NAME ,
955951
956952 C1 = connect (ClientId , Config , non_clean_sess_opts ()),
957- {ok , _ , [1 , 0 ]} = emqtt :subscribe (C1 , [{Topic1 , qos1 }, {Topic0 , qos0 }]),
958- ? assertMatch (#{consumers := 1 },
959- get_global_counters (Config )),
953+ {ok , _ , [1 , 0 ]} = emqtt :subscribe (C1 , [{Topic1 , qos1 },
954+ {Topic0 , qos0 }]),
955+ ok = await_consumer_count (1 , ClientId , 0 , Config ),
956+ ok = await_consumer_count (1 , ClientId , 1 , Config ),
960957
961958 ok = emqtt :disconnect (C1 ),
962- eventually (? _assertMatch (#{consumers := 0 },
963- get_global_counters (Config ))),
964-
959+ ok = await_consumer_count (0 , ClientId , 0 , Config ),
960+ ok = await_consumer_count (0 , ClientId , 1 , Config ),
965961 {ok , _ } = emqtt :publish (Pub , Topic0 , <<" msg-0" >>, qos1 ),
966962 {ok , _ } = emqtt :publish (Pub , Topic1 , <<" msg-1" >>, qos1 ),
967963
968964 C2 = connect (ClientId , Config , non_clean_sess_opts ()),
969- ? assertMatch (#{consumers := 1 },
970- get_global_counters (Config )),
971-
965+ ok = await_consumer_count (1 , ClientId , 0 , Config ),
966+ ok = await_consumer_count (1 , ClientId , 1 , Config ),
972967 ok = expect_publishes (C2 , Topic0 , [<<" msg-0" >>]),
973968 ok = expect_publishes (C2 , Topic1 , [<<" msg-1" >>]),
974969
@@ -1884,6 +1879,17 @@ await_confirms_unordered(From, Left) ->
18841879 ct :fail (" ~b confirms are missing" , [Left ])
18851880 end .
18861881
1882+ await_consumer_count (ConsumerCount , ClientId , QoS , Config ) ->
1883+ Ch = rabbit_ct_client_helpers :open_channel (Config ),
1884+ QueueName = rabbit_mqtt_util :queue_name_bin (
1885+ rabbit_data_coercion :to_binary (ClientId ), QoS ),
1886+ eventually (
1887+ ? _assertMatch (
1888+ # 'queue.declare_ok' {consumer_count = ConsumerCount },
1889+ amqp_channel :call (Ch , # 'queue.declare' {queue = QueueName ,
1890+ passive = true })), 500 , 10 ),
1891+ ok = rabbit_ct_client_helpers :close_channel (Ch ).
1892+
18871893declare_queue (Ch , QueueName , Args )
18881894 when is_pid (Ch ), is_binary (QueueName ), is_list (Args ) ->
18891895 # 'queue.declare_ok' {} = amqp_channel :call (
0 commit comments