@@ -144,6 +144,7 @@ all_tests_3() ->
144144 consume_credit_multiple_ack ,
145145 basic_cancel ,
146146 consumer_metrics_cleaned_on_connection_close ,
147+ consume_cancel_should_create_events ,
147148 receive_basic_cancel_on_queue_deletion ,
148149 keep_consuming_on_leader_restart ,
149150 max_length_bytes ,
@@ -1195,7 +1196,7 @@ consumer_metrics_cleaned_on_connection_close(Config) ->
11951196 Conn = rabbit_ct_client_helpers :open_connection (Config , Server ),
11961197 {ok , Ch } = amqp_connection :open_channel (Conn ),
11971198 qos (Ch , 10 , false ),
1198- CTag = << " consumer_metrics_cleaned_on_connection_close " >> ,
1199+ CTag = rabbit_data_coercion : to_binary ( ? FUNCTION_NAME ) ,
11991200 subscribe (Ch , Q , false , 0 , CTag ),
12001201 rabbit_ct_helpers :await_condition (
12011202 fun () ->
@@ -1211,6 +1212,49 @@ consumer_metrics_cleaned_on_connection_close(Config) ->
12111212
12121213 rabbit_ct_broker_helpers :rpc (Config , 0 , ? MODULE , delete_testcase_queue , [Q ]).
12131214
1215+ consume_cancel_should_create_events (Config ) ->
1216+ HandlerMod = rabbit_list_test_event_handler ,
1217+ rabbit_ct_broker_helpers :add_code_path_to_all_nodes (Config , HandlerMod ),
1218+ rabbit_ct_broker_helpers :rpc (Config , 0 ,
1219+ gen_event ,
1220+ add_handler ,
1221+ [rabbit_event , HandlerMod , []]),
1222+ [Server | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1223+
1224+ Q = ? config (queue_name , Config ),
1225+ ? assertEqual ({'queue.declare_ok' , Q , 0 , 0 },
1226+ declare (Config , Server , Q , [{<<" x-queue-type" >>, longstr , <<" stream" >>}])),
1227+
1228+ Conn = rabbit_ct_client_helpers :open_connection (Config , Server ),
1229+ {ok , Ch } = amqp_connection :open_channel (Conn ),
1230+ qos (Ch , 10 , false ),
1231+
1232+ ok = rabbit_ct_broker_helpers :rpc (Config , 0 ,
1233+ gen_event ,
1234+ call ,
1235+ [rabbit_event , HandlerMod , clear_events ]),
1236+
1237+ CTag = rabbit_data_coercion :to_binary (? FUNCTION_NAME ),
1238+
1239+ ? assertEqual ([], filtered_events (Config , consumer_created , CTag )),
1240+ ? assertEqual ([], filtered_events (Config , consumer_deleted , CTag )),
1241+
1242+ subscribe (Ch , Q , false , 0 , CTag ),
1243+
1244+ ? awaitMatch ([{event , consumer_created , _ , _ , _ }], filtered_events (Config , consumer_created , CTag ), ? WAIT ),
1245+ ? assertEqual ([], filtered_events (Config , consumer_deleted , CTag )),
1246+
1247+ amqp_channel :call (Ch , # 'basic.cancel' {consumer_tag = CTag }),
1248+
1249+ ? awaitMatch ([{event , consumer_deleted , _ , _ , _ }], filtered_events (Config , consumer_deleted , CTag ), ? WAIT ),
1250+
1251+ rabbit_ct_broker_helpers :rpc (Config , 0 ,
1252+ gen_event ,
1253+ delete_handler ,
1254+ [rabbit_event , HandlerMod , []]),
1255+
1256+ ok = rabbit_ct_client_helpers :close_connection (Conn ),
1257+ rabbit_ct_broker_helpers :rpc (Config , 0 , ? MODULE , delete_testcase_queue , [Q ]).
12141258
12151259receive_basic_cancel_on_queue_deletion (Config ) ->
12161260 [Server | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
@@ -1395,6 +1439,18 @@ filter_consumers(Config, Server, CTag) ->
13951439 end
13961440 end , [], CInfo ).
13971441
1442+
1443+ filtered_events (Config , EventType , CTag ) ->
1444+ Events = rabbit_ct_broker_helpers :rpc (Config , 0 ,
1445+ gen_event ,
1446+ call ,
1447+ [rabbit_event , rabbit_list_test_event_handler , get_events ]),
1448+ lists :filter (fun ({event , Type , Fields , _ , _ }) when Type =:= EventType ->
1449+ proplists :get_value (consumer_tag , Fields ) =:= CTag ;
1450+ (_ ) ->
1451+ false
1452+ end , Events ).
1453+
13981454consume_and_reject (Config ) ->
13991455 consume_and_ (Config , fun (DT ) -> # 'basic.reject' {delivery_tag = DT } end ).
14001456consume_and_nack (Config ) ->
0 commit comments