@@ -35,6 +35,7 @@ groups() ->
3535 [{cluster_size_1 , [shuffle ],
3636 [
3737 mqtt_amqpl_mqtt ,
38+ amqpl_mqtt_gh_12707 ,
3839 mqtt_amqp_mqtt ,
3940 amqp_mqtt_amqp ,
4041 mqtt_stomp_mqtt ,
@@ -104,7 +105,6 @@ mqtt_amqpl_mqtt(Config) ->
104105 # 'queue.bind_ok' {} = amqp_channel :call (Ch , # 'queue.bind' {queue = Q ,
105106 exchange = <<" amq.topic" >>,
106107 routing_key = <<" my.topic" >>}),
107- % % MQTT 5.0 to AMQP 0.9.1
108108 C = connect (ClientId , Config ),
109109 MqttResponseTopic = <<" response/topic" >>,
110110 {ok , _ , [1 ]} = emqtt :subscribe (C , #{'Subscription-Identifier' => 999 }, [{MqttResponseTopic , [{qos , 1 }]}]),
@@ -169,6 +169,39 @@ mqtt_amqpl_mqtt(Config) ->
169169
170170 ok = emqtt :disconnect (C ).
171171
172+ amqpl_mqtt_gh_12707 (Config ) ->
173+ Q = ClientId = atom_to_binary (? FUNCTION_NAME ),
174+ Ch = rabbit_ct_client_helpers :open_channel (Config ),
175+ # 'queue.declare_ok' {} = amqp_channel :call (Ch , # 'queue.declare' {queue = Q , durable = true }),
176+ # 'queue.bind_ok' {} = amqp_channel :call (Ch , # 'queue.bind' {queue = Q ,
177+ exchange = <<" amq.topic" >>,
178+ routing_key = <<" gh12707" >>}),
179+ C = connect (ClientId , Config ),
180+
181+ Topic = <<" gh12707" >>,
182+ Payload = <<" gh_12707" >>,
183+
184+ {ok , _ , [1 ]} = emqtt :subscribe (C , #{'Subscription-Identifier' => 676 }, [{Topic , [{qos , 1 }]}]),
185+ amqp_channel :call (Ch , # 'basic.publish' {exchange = <<" amq.topic" >>,
186+ routing_key = Topic },
187+ # amqp_msg {payload = Payload ,
188+ props = # 'P_basic' {content_type = <<" application/json" >>,
189+ expiration = <<" 12707" >>,
190+ headers = []}}),
191+
192+ receive {publish ,
193+ #{topic := MqttTopic ,
194+ payload := MqttPayload }} ->
195+ ? assertEqual (MqttTopic , Topic ),
196+ ? assertEqual (Payload , MqttPayload )
197+ after 1000 ->
198+ ct :fail (" did not receive a delivery" )
199+ end ,
200+
201+ amqp_channel :call (Ch , # 'queue.delete' {queue = Q }),
202+ rabbit_ct_client_helpers :close_channel (Ch ),
203+ ok = emqtt :disconnect (C ).
204+
172205mqtt_amqp_mqtt (Config ) ->
173206 Host = ? config (rmq_hostname , Config ),
174207 Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
0 commit comments