@@ -878,34 +878,30 @@ session_expiry(Config) ->
878878 ok = rpc (Config , application , set_env , [App , Par , DefaultVal ]).
879879
880880non_clean_sess_reconnect_qos1 (Config ) ->
881- non_clean_sess_reconnect (Config , qos1 ).
881+ non_clean_sess_reconnect (Config , 1 ).
882882
883883non_clean_sess_reconnect_qos0 (Config ) ->
884- non_clean_sess_reconnect (Config , qos0 ).
884+ non_clean_sess_reconnect (Config , 0 ).
885885
886886non_clean_sess_reconnect (Config , SubscriptionQoS ) ->
887887 Pub = connect (<<" publisher" >>, Config ),
888888 Topic = ClientId = atom_to_binary (? FUNCTION_NAME ),
889889
890890 C1 = connect (ClientId , Config , non_clean_sess_opts ()),
891- {ok , _ , _ } = emqtt :subscribe (C1 , Topic , SubscriptionQoS ),
892- ? assertMatch (#{consumers := 1 },
893- get_global_counters (Config )),
891+ {ok , _ , [SubscriptionQoS ]} = emqtt :subscribe (C1 , Topic , SubscriptionQoS ),
892+ ok = await_consumer_count (1 , ClientId , SubscriptionQoS , Config ),
894893
895894 ok = emqtt :disconnect (C1 ),
896- eventually (? _assertMatch (#{consumers := 0 },
897- get_global_counters (Config ))),
895+ ok = await_consumer_count (0 , ClientId , SubscriptionQoS , Config ),
898896
899- timer :sleep (20 ),
900897 ok = emqtt :publish (Pub , Topic , <<" msg-3-qos0" >>, qos0 ),
901898 {ok , _ } = emqtt :publish (Pub , Topic , <<" msg-4-qos1" >>, qos1 ),
902899
903900 C2 = connect (ClientId , Config , non_clean_sess_opts ()),
904901 % % Server should reply in CONNACK that it has session state.
905902 ? assertEqual ({session_present , 1 },
906903 proplists :lookup (session_present , emqtt :info (C2 ))),
907- ? assertMatch (#{consumers := 1 },
908- get_global_counters (Config )),
904+ ok = await_consumer_count (1 , ClientId , SubscriptionQoS , Config ),
909905
910906 ok = emqtt :publish (Pub , Topic , <<" msg-5-qos0" >>, qos0 ),
911907 {ok , _ } = emqtt :publish (Pub , Topic , <<" msg-6-qos1" >>, qos1 ),
@@ -938,21 +934,20 @@ non_clean_sess_reconnect_qos0_and_qos1(Config) ->
938934 ClientId = ? FUNCTION_NAME ,
939935
940936 C1 = connect (ClientId , Config , non_clean_sess_opts ()),
941- {ok , _ , [1 , 0 ]} = emqtt :subscribe (C1 , [{Topic1 , qos1 }, {Topic0 , qos0 }]),
942- ? assertMatch (#{consumers := 1 },
943- get_global_counters (Config )),
937+ {ok , _ , [1 , 0 ]} = emqtt :subscribe (C1 , [{Topic1 , qos1 },
938+ {Topic0 , qos0 }]),
939+ ok = await_consumer_count (1 , ClientId , 0 , Config ),
940+ ok = await_consumer_count (1 , ClientId , 1 , Config ),
944941
945942 ok = emqtt :disconnect (C1 ),
946- eventually (? _assertMatch (#{consumers := 0 },
947- get_global_counters (Config ))),
948-
943+ ok = await_consumer_count (0 , ClientId , 0 , Config ),
944+ ok = await_consumer_count (0 , ClientId , 1 , Config ),
949945 {ok , _ } = emqtt :publish (Pub , Topic0 , <<" msg-0" >>, qos1 ),
950946 {ok , _ } = emqtt :publish (Pub , Topic1 , <<" msg-1" >>, qos1 ),
951947
952948 C2 = connect (ClientId , Config , non_clean_sess_opts ()),
953- ? assertMatch (#{consumers := 1 },
954- get_global_counters (Config )),
955-
949+ ok = await_consumer_count (1 , ClientId , 0 , Config ),
950+ ok = await_consumer_count (1 , ClientId , 1 , Config ),
956951 ok = expect_publishes (C2 , Topic0 , [<<" msg-0" >>]),
957952 ok = expect_publishes (C2 , Topic1 , [<<" msg-1" >>]),
958953
@@ -1868,6 +1863,17 @@ await_confirms_unordered(From, Left) ->
18681863 ct :fail (" ~b confirms are missing" , [Left ])
18691864 end .
18701865
1866+ await_consumer_count (ConsumerCount , ClientId , QoS , Config ) ->
1867+ Ch = rabbit_ct_client_helpers :open_channel (Config ),
1868+ QueueName = rabbit_mqtt_util :queue_name_bin (
1869+ rabbit_data_coercion :to_binary (ClientId ), QoS ),
1870+ eventually (
1871+ ? _assertMatch (
1872+ # 'queue.declare_ok' {consumer_count = ConsumerCount },
1873+ amqp_channel :call (Ch , # 'queue.declare' {queue = QueueName ,
1874+ passive = true })), 500 , 10 ),
1875+ ok = rabbit_ct_client_helpers :close_channel (Ch ).
1876+
18711877declare_queue (Ch , QueueName , Args )
18721878 when is_pid (Ch ), is_binary (QueueName ), is_list (Args ) ->
18731879 # 'queue.declare_ok' {} = amqp_channel :call (
0 commit comments