Skip to content

Commit 21e285e

Browse files
authored
Merge pull request #8 from msgflo/fixes
Support binary data, MQTT username/password, some msgflo-python binary fixes
2 parents db8db64 + 02ca72b commit 21e285e

File tree

3 files changed

+59
-36
lines changed

3 files changed

+59
-36
lines changed

bin/msgflo-python

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,7 @@ def main():
3333
module = load_module_file(modulepath)
3434
classes = find_participant_classes(module)
3535
Participant = classes[0]
36-
37-
participant = Participant(role)
38-
d = participant.definition
39-
waiter = gevent.event.AsyncResult()
40-
engine = msgflo.run(participant, done_cb=waiter.set)
41-
print "%s(%s) running on %s" % (d['role'], d['component'], engine.broker_url)
42-
sys.stdout.flush()
43-
waiter.wait()
36+
msgflo.main(Participant, role)
4437

4538
if __name__ == '__main__':
4639
logging.basicConfig()

examples/repeat.py

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,19 @@ def __init__(self, role):
1111
d = {
1212
'component': 'PythonRepeat',
1313
'label': 'Repeat input data without change',
14+
'inports': [
15+
{ 'id': 'in', 'type': 'any' },
16+
],
17+
'outports': [
18+
{ 'id': 'out', 'type': 'any' },
19+
],
1420
}
1521
msgflo.Participant.__init__(self, d, role)
1622

1723
def process(self, inport, msg):
1824
self.send('out', msg.data)
1925
self.ack(msg)
2026

21-
22-
def main():
23-
waiter = gevent.event.AsyncResult()
24-
role = sys.argv[1] if len(sys.argv) > 1 else 'repeat'
25-
repeater = Repeat(role)
26-
engine = msgflo.run(repeater, done_cb=waiter.set)
27-
28-
print "Repeat running on %s" % (engine.broker_url)
29-
sys.stdout.flush()
30-
waiter.wait()
31-
print "Shutdown"
32-
sys.stdout.flush()
33-
3427
if __name__ == '__main__':
35-
logging.basicConfig()
36-
main()
28+
msgflo.main(Repeat)
3729

msgflo/msgflo.py

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
sys.path.append(os.path.abspath("."))
55

66
import logging
7-
logger = logging.getLogger('msgflo')
7+
logging.basicConfig()
88
log_level = os.environ.get('MSGFLO_PYTHON_LOGLEVEL')
9+
logger = logging.getLogger('msgflo')
910
if log_level:
1011
level = getattr(logging, log_level.upper())
11-
logging.basicConfig(level=level)
12+
logger.setLevel(level)
1213

1314
import gevent
1415
import gevent.event
@@ -83,8 +84,10 @@ def run(self):
8384
raise NotImplementedError
8485

8586
class Message(object):
86-
def __init__(self, data):
87-
self.data = data
87+
def __init__(self, raw):
88+
self.buffer = raw
89+
self.data = raw
90+
self.json = None
8891

8992
class AmqpEngine(Engine):
9093

@@ -165,7 +168,7 @@ def _send_discovery(self, channel, definition):
165168
}
166169
msg = haigha_Message(json.dumps(m))
167170
channel.basic.publish(msg, '', 'fbp')
168-
logger.debug('sent discovery message', msg)
171+
logger.debug('sent discovery message')
169172
return
170173

171174
def _setup_queue(self, part, channel, direction, port):
@@ -174,23 +177,34 @@ def _setup_queue(self, part, channel, direction, port):
174177
def handle_input(msg):
175178
logger.debug("Received message: %s" % (msg,))
176179

177-
msg.data = json.loads(msg.body.decode("utf-8"))
180+
msg.buffer = msg.body
181+
try:
182+
msg.json = json.loads(msg.body.decode("utf-8"))
183+
msg.data = msg.json # compat
184+
except ValueError as e:
185+
# Not JSON, assume binary
186+
msg.data = msg.buffer
187+
178188
part.process(port, msg)
179189
return
180190

181191
if 'in' in direction:
182192
channel.queue.declare(queue)
183193
channel.basic.consume(queue=queue, consumer=handle_input, no_ack=False)
184-
logger.debug('subscribed to', queue)
194+
logger.debug('subscribed to %s' % queue)
185195
else:
186196
channel.exchange.declare(queue, 'fanout')
187-
logger.debug('created outqueue')
197+
logger.debug('created outqueue %s' % queue)
188198

189199
class MqttEngine(Engine):
190200
def __init__(self, broker):
191201
Engine.__init__(self, broker)
192202

193203
self._client = mqtt.Client()
204+
205+
if self.broker_info.username:
206+
self._client.username_pw_set(self.broker_info.username, self.broker_info.password)
207+
194208
self._client.on_connect = self._on_connect
195209
self._client.on_message = self._on_message
196210
self._client.on_subscribe = self._on_subscribe
@@ -252,9 +266,16 @@ def _on_subscribe(self, client, userdata, mid, granted_qos):
252266
def _on_message(self, client, userdata, mqtt_msg):
253267
logging.debug('got message on %s' % mqtt_msg.topic)
254268
port = "" # FIXME: map from topic back to port
269+
255270
def notify():
256-
data = json.loads(str(mqtt_msg.payload))
257-
msg = Message(data)
271+
msg = Message(mqtt_msg.payload)
272+
try:
273+
msg.json = json.loads(str(mqtt_msg.payload))
274+
msg.data = msg.json # compat
275+
except ValueError as e:
276+
# Not JSON, assume binary
277+
msg.data = msg.buffer
278+
258279
self.participant.process(port, msg)
259280

260281
gevent.spawn(notify)
@@ -275,16 +296,33 @@ def run(participant, broker=None, done_cb=None):
275296
broker = os.environ.get('MSGFLO_BROKER', 'amqp://localhost')
276297

277298
engine = None
278-
if broker.startswith('amqp://'):
299+
broker_info = urlparse.urlparse(broker)
300+
if broker_info.scheme == 'amqp':
279301
engine = AmqpEngine(broker)
280-
elif broker.startswith('mqtt://'):
302+
elif broker_info.scheme == 'mqtt':
281303
engine = MqttEngine(broker)
282304
else:
283-
raise ValueError("msgflo: No engine implementation found for broker URL %s" % (broker,))
305+
raise ValueError("msgflo: No engine implementation found for broker URL scheme %s" % (broker_info.scheme,))
284306

285307
if done_cb:
286308
engine.done_callback(done_cb)
287309
engine.add_participant(participant)
288310
engine.run()
289311

290312
return engine
313+
314+
def main(Participant, role=None):
315+
if not role:
316+
try:
317+
role = sys.argv[0]
318+
except IndexError, e:
319+
role = participant.definition.component.tolower()
320+
321+
participant = Participant(role)
322+
d = participant.definition
323+
waiter = gevent.event.AsyncResult()
324+
engine = run(participant, done_cb=waiter.set)
325+
anon_url = "%s://%s" % (engine.broker_info.scheme, engine.broker_info.hostname)
326+
print "%s(%s) running on %s" % (d['role'], d['component'], anon_url)
327+
sys.stdout.flush()
328+
waiter.wait()

0 commit comments

Comments
 (0)