@@ -291,12 +291,13 @@ def run(participant, broker=None, done_cb=None):
291291 broker = os .environ .get ('MSGFLO_BROKER' , 'amqp://localhost' )
292292
293293 engine = None
294- if broker .startswith ('amqp://' ):
294+ broker_info = urlparse .urlparse (broker )
295+ if broker_info .scheme == 'amqp' :
295296 engine = AmqpEngine (broker )
296- elif broker . startswith ( 'mqtt://' ) :
297+ elif broker_info . scheme == 'mqtt' :
297298 engine = MqttEngine (broker )
298299 else :
299- raise ValueError ("msgflo: No engine implementation found for broker URL %s" % (broker ,))
300+ raise ValueError ("msgflo: No engine implementation found for broker URL scheme %s" % (broker_info . scheme ,))
300301
301302 if done_cb :
302303 engine .done_callback (done_cb )
@@ -310,6 +311,7 @@ def main(Participant, role):
310311 d = participant .definition
311312 waiter = gevent .event .AsyncResult ()
312313 engine = run (participant , done_cb = waiter .set )
313- print "%s(%s) running on %s" % (d ['role' ], d ['component' ], engine .broker_url )
314+ anon_url = "%s://%s" % (engine .broker_info .scheme , engine .broker_info .hostname )
315+ print "%s(%s) running on %s" % (d ['role' ], d ['component' ], anon_url )
314316 sys .stdout .flush ()
315317 waiter .wait ()
0 commit comments