@@ -35,6 +35,7 @@ groups() ->
3535 [{cluster_size_1 , [shuffle ],
3636 [
3737 mqtt_amqpl_mqtt ,
38+ amqpl_mqtt_gh_12707 ,
3839 mqtt_amqp_mqtt ,
3940 amqp_mqtt_amqp ,
4041 mqtt_stomp_mqtt ,
@@ -169,6 +170,39 @@ mqtt_amqpl_mqtt(Config) ->
169170
170171 ok = emqtt :disconnect (C ).
171172
173+ amqpl_mqtt_gh_12707 (Config ) ->
174+ Q = ClientId = atom_to_binary (? FUNCTION_NAME ),
175+ Ch = rabbit_ct_client_helpers :open_channel (Config ),
176+ # 'queue.declare_ok' {} = amqp_channel :call (Ch , # 'queue.declare' {queue = Q , durable = true }),
177+ # 'queue.bind_ok' {} = amqp_channel :call (Ch , # 'queue.bind' {queue = Q ,
178+ exchange = <<" amq.topic" >>,
179+ routing_key = <<" gh12707" >>}),
180+ C = connect (ClientId , Config ),
181+
182+ Topic = <<" gh12707" >>,
183+ Payload = <<" gh_12707" >>,
184+
185+ {ok , _ , [1 ]} = emqtt :subscribe (C , #{'Subscription-Identifier' => 676 }, [{Topic , [{qos , 1 }]}]),
186+ amqp_channel :call (Ch , # 'basic.publish' {exchange = <<" amq.topic" >>,
187+ routing_key = Topic },
188+ # amqp_msg {payload = Payload ,
189+ props = # 'P_basic' {content_type = <<" application/json" >>,
190+ expiration = <<" 12707" >>,
191+ headers = []}}),
192+
193+ receive {publish ,
194+ #{topic := MqttTopic ,
195+ payload := MqttPayload }} ->
196+ ? assertEqual (MqttTopic , Topic ),
197+ ? assertEqual (Payload , MqttPayload )
198+ after 1000 ->
199+ ct :fail (" did not receive a delivery" )
200+ end ,
201+
202+ amqp_channel :call (Ch , # 'queue.delete' {queue = Q }),
203+ rabbit_ct_client_helpers :close_channel (Ch ),
204+ ok = emqtt :disconnect (C ).
205+
172206mqtt_amqp_mqtt (Config ) ->
173207 Host = ? config (rmq_hostname , Config ),
174208 Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
0 commit comments