Skip to content

Commit 35ee775

Browse files
committed
Fixes for EMQ X
Fix for EMQ X which does not allow subscribing to `#` and occasionally publishes to $SYS. Other fixes include waiting for the proper client connection to finish subscribing before validating.
1 parent 4654466 commit 35ee775

File tree

1 file changed

+12
-6
lines changed

1 file changed

+12
-6
lines changed

interoperability/client_test5.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ def connectionLost(self, cause):
4646
logging.info("connectionLost %s" % str(cause))
4747

4848
def publishArrived(self, topicName, payload, qos, retained, msgid, properties=None):
49+
if topicName.startswith("$SYS/"):
50+
return True
4951
logging.info("publishArrived %s %s %d %s %d %s", topicName, payload, qos, retained, msgid, str(properties))
5052
self.messages.append((topicName, payload, qos, retained, msgid, properties))
5153
self.messagedicts.append({"topicname" : topicName, "payload" : payload,
@@ -69,10 +71,12 @@ def cleanRetained():
6971
curclient = mqtt_client.Client("clean retained".encode("utf-8"))
7072
curclient.registerCallback(callback)
7173
curclient.connect(host=host, port=port, cleanstart=True)
72-
curclient.subscribe(["#"], [MQTTV5.SubscribeOptions(0)])
74+
# Not all brokers (EMQ X) allow us to subscribe to #, so subscribe to + and +/# to accomplish the same
75+
curclient.subscribe(["+"], [MQTTV5.SubscribeOptions(0)])
76+
curclient.subscribe(["+/#"], [MQTTV5.SubscribeOptions(0)])
7377
time.sleep(2) # wait for all retained messages to arrive
7478
for message in callback.messages:
75-
logging.info("deleting retained message for topic", message[0])
79+
logging.info("deleting retained message for topic %s", message[0])
7680
curclient.publish(message[0], b"", 0, retained=True)
7781
curclient.disconnect()
7882
time.sleep(.1)
@@ -339,7 +343,8 @@ def test_subscribe_failure(self):
339343
time.sleep(1)
340344
# subscribeds is a list of (msgid, [qos])
341345
logging.info(callback.subscribeds)
342-
assert callback.subscribeds[0][1][0].value == 0x80, "return code should be 0x80 %s" % callback.subscribeds
346+
self.assertEqual(callback.subscribeds[0][1][0].value, 0x80,
347+
"return code should be 0x80 %s" % callback.subscribeds)
343348
except:
344349
traceback.print_exc()
345350
succeeded = False
@@ -551,7 +556,7 @@ def test_subscribe_options(self):
551556
aclient.subscribe([topics[0]], [MQTTV5.SubscribeOptions(2, noLocal=True)])
552557
self.waitfor(callback.subscribeds, 1, 3)
553558
bclient.subscribe([topics[0]], [MQTTV5.SubscribeOptions(2, noLocal=True)])
554-
self.waitfor(callback.subscribeds, 1, 3)
559+
self.waitfor(callback2.subscribeds, 1, 3)
555560
aclient.publish(topics[0], b"noLocal test", 1, retained=False)
556561

557562
self.waitfor(callback2.messages, 1, 3)
@@ -651,6 +656,7 @@ def test_subscribe_identifiers(self):
651656
sub_properties.clear()
652657
sub_properties.SubscriptionIdentifier = 3
653658
bclient.subscribe([topics[0]+"/#"], [MQTTV5.SubscribeOptions(2)], properties=sub_properties)
659+
self.waitfor(callback2.subscribeds, 2, 3)
654660

655661
bclient.publish(topics[0], b"sub identifier test", 1, retained=False)
656662

@@ -661,7 +667,7 @@ def test_subscribe_identifiers(self):
661667

662668
self.waitfor(callback2.messages, 1, 3)
663669
self.assertEqual(len(callback2.messages), 1, callback2.messages)
664-
expected_subsids = set([2, 3])
670+
expected_subsids = {2, 3}
665671
received_subsids = set(callback2.messages[0][5].SubscriptionIdentifier)
666672
self.assertEqual(received_subsids, expected_subsids, received_subsids)
667673
bclient.disconnect()
@@ -679,7 +685,7 @@ def test_request_response(self):
679685
self.waitfor(callback.subscribeds, 1, 3)
680686

681687
bclient.subscribe([topics[0]], [MQTTV5.SubscribeOptions(2, noLocal=True)])
682-
self.waitfor(callback.subscribeds, 1, 3)
688+
self.waitfor(callback2.subscribeds, 1, 3)
683689

684690
publish_properties = MQTTV5.Properties(MQTTV5.PacketTypes.PUBLISH)
685691
publish_properties.ResponseTopic = topics[0]

0 commit comments

Comments
 (0)