55
66try :
77 from urllib .parse import urlparse
8+ from urllib .parse import parse_qsl
89except ImportError :
9- from urlparse import urlparse
10+ from urlparse import urlparse
11+ from urlparse import parse_qsl
1012
1113from optparse import OptionParser
1214
@@ -248,6 +250,14 @@ def __init__(self, broker):
248250 self .participants = []
249251 self .connected = False
250252
253+ params = dict (parse_qsl (self .broker_info .query ))
254+ default_port = 1883
255+ if self .broker_info .scheme == 'mqtts' :
256+ default_port = 8883
257+ ca_certs = params .get ('ca_certs' )
258+ certfile = params .get ('certfile' )
259+ keyfile = params .get ('keyfile' )
260+ self ._client .tls_set (ca_certs = ca_certs , certfile = certfile , keyfile = keyfile )
251261 if self .broker_info .username :
252262 self ._client .username_pw_set (self .broker_info .username , self .broker_info .password )
253263
@@ -257,9 +267,7 @@ def __init__(self, broker):
257267 self ._client .on_subscribe = lambda c , u , m , q : self ._on_subscribe (c , u , m , q )
258268
259269 host = self .broker_info .hostname
260- port = self .broker_info .port
261- if port is None :
262- port = 1883
270+ port = self .broker_info .port or default_port
263271 self ._client .connect (host , port , 60 )
264272
265273 def add_participant (self , participant , iips = {}):
@@ -387,9 +395,9 @@ def run(participants, broker=None, done_cb=None, iips={}):
387395
388396 engine = None
389397 broker_info = urlparse (broker )
390- if broker_info .scheme == 'amqp' :
398+ if broker_info .scheme in ( 'amqp' , 'amqps' ) :
391399 engine = AmqpEngine (broker )
392- elif broker_info .scheme == 'mqtt' :
400+ elif broker_info .scheme in ( 'mqtt' , 'mqtts' ) :
393401 engine = MqttEngine (broker )
394402 else :
395403 raise ValueError ("msgflo: No engine implementation found for broker URL scheme %s" % (broker_info .scheme ,))
0 commit comments