Skip to content

Commit f1d11bf

Browse files
committed
Merge pull request #2 from adafruit/mqtt_client
Add MQTT client, refactor tests to use unittest module.
2 parents 4a0b254 + f9bc621 commit f1d11bf

File tree

11 files changed

+378
-46
lines changed

11 files changed

+378
-46
lines changed

Adafruit_IO/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
from .client import Client, AdafruitIOError, RequestError, ThrottlingError
1+
from .client import Client, AdafruitIOError, RequestError, ThrottlingError
2+
from .mqtt_client import MQTTClient

Adafruit_IO/client.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,33 +78,33 @@ def _post(self, path, params):
7878
#stream functionality
7979
def send(self, feed_name, data):
8080
feed_name = quote(feed_name)
81-
path = "api/feeds/{}/streams/send".format(feed_name)
81+
path = "api/feeds/{}/data/send".format(feed_name)
8282
return self._post(path, {'value': data})
8383

8484
def receive(self, feed_name):
8585
feed_name = quote(feed_name)
86-
path = "api/feeds/{}/streams/last".format(feed_name)
86+
path = "api/feeds/{}/data/last".format(feed_name)
8787
return self._get(path)
8888

8989
def receive_next(self, feed_name):
9090
feed_name = quote(feed_name)
91-
path = "api/feeds/{}/streams/next".format(feed_name)
91+
path = "api/feeds/{}/data/next".format(feed_name)
9292
return self._get(path)
9393

9494
def receive_previous(self, feed_name):
9595
feed_name = quote(feed_name)
96-
path = "api/feeds/{}/streams/last".format(feed_name)
96+
path = "api/feeds/{}/data/last".format(feed_name)
9797
return self._get(path)
9898

9999
def streams(self, feed_id_or_key, stream_id=None):
100100
if stream_id is None:
101-
path = "api/feeds/{}/streams".format(feed_id_or_key)
101+
path = "api/feeds/{}/data".format(feed_id_or_key)
102102
else:
103-
path = "api/feeds/{}/streams/{}".format(feed_id_or_key, stream_id)
103+
path = "api/feeds/{}/data/{}".format(feed_id_or_key, stream_id)
104104
return self._get(path)
105105

106106
def create_stream(self, feed_id_or_key, data):
107-
path = "api/feeds/{}/streams".format(feed_id_or_key)
107+
path = "api/feeds/{}/data".format(feed_id_or_key)
108108
return self._post(path, data)
109109

110110
#group functionality

Adafruit_IO/mqtt_client.py

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
# MQTT-based client for Adafruit.IO
2+
# Author: Tony DiCola ([email protected])
3+
#
4+
# Supports publishing and subscribing to feed changes from Adafruit IO using
5+
# the MQTT protcol.
6+
#
7+
# Depends on the following Python libraries:
8+
# - paho-mqtt: Paho MQTT client for python.
9+
import logging
10+
11+
import paho.mqtt.client as mqtt
12+
13+
14+
SERVICE_HOST = 'io.adafruit.com'
15+
SERVICE_PORT = 1883
16+
KEEP_ALIVE_SEC = 3600 # One minute
17+
18+
logger = logging.getLogger(__name__)
19+
20+
21+
class MQTTClient(object):
22+
"""Interface for publishing and subscribing to feed changes on Adafruit IO
23+
using the MQTT protocol.
24+
"""
25+
26+
def __init__(self, key):
27+
"""Create instance of MQTT client.
28+
29+
Required parameters:
30+
- key: The Adafruit.IO access key for your account.
31+
"""
32+
# Initialize event callbacks to be None so they don't fire.
33+
self.on_connect = None
34+
self.on_disconnect = None
35+
self.on_message = None
36+
# Initialize MQTT client.
37+
self._client = mqtt.Client()
38+
self._client.username_pw_set(key)
39+
self._client.on_connect = self._mqtt_connect
40+
self._client.on_disconnect = self._mqtt_disconnect
41+
self._client.on_message = self._mqtt_message
42+
self._connected = False
43+
44+
def _mqtt_connect(self, client, userdata, flags, rc):
45+
logger.debug('Client on_connect called.')
46+
# Check if the result code is success (0) or some error (non-zero) and
47+
# raise an exception if failed.
48+
if rc == 0:
49+
self._connected = True
50+
else:
51+
# TODO: Make explicit exception classes for these failures:
52+
# 0: Connection successful 1: Connection refused - incorrect protocol version 2: Connection refused - invalid client identifier 3: Connection refused - server unavailable 4: Connection refused - bad username or password 5: Connection refused - not authorised 6-255: Currently unused.
53+
raise RuntimeError('Error connecting to Adafruit IO with rc: {0}'.format(rc))
54+
# Call the on_connect callback if available.
55+
if self.on_connect is not None:
56+
self.on_connect(self)
57+
58+
def _mqtt_disconnect(self, client, userdata, rc):
59+
logger.debug('Client on_disconnect called.')
60+
self._connected = False
61+
# If this was an unexpected disconnect (non-zero result code) then raise
62+
# an exception.
63+
if rc != 0:
64+
raise RuntimeError('Unexpected disconnect with rc: {0}'.format(rc))
65+
# Call the on_disconnect callback if available.
66+
if self.on_disconnect is not None:
67+
self.on_disconnect(self)
68+
69+
def _mqtt_message(self, client, userdata, msg):
70+
logger.debug('Client on_message called.')
71+
# Parse out the feed id and call on_message callback.
72+
# Assumes topic looks like "api/feeds/{feed_id}/data/receive.json"
73+
if self.on_message is not None and msg.topic.startswith('api/feeds/') \
74+
and len(msg.topic) >= 28:
75+
feed_id = msg.topic[10:-18]
76+
self.on_message(self, feed_id, msg.payload)
77+
78+
def connect(self, **kwargs):
79+
"""Connect to the Adafruit.IO service. Must be called before any loop
80+
or publish operations are called. Will raise an exception if a
81+
connection cannot be made. Optional keyword arguments will be passed
82+
to paho-mqtt client connect function.
83+
"""
84+
# Skip calling connect if already connected.
85+
if self._connected:
86+
return
87+
# Connect to the Adafruit IO MQTT service.
88+
self._client.connect(SERVICE_HOST, port=SERVICE_PORT,
89+
keepalive=KEEP_ALIVE_SEC, **kwargs)
90+
91+
def is_connected(self):
92+
"""Returns True if connected to Adafruit.IO and False if not connected.
93+
"""
94+
return self._connected
95+
96+
def disconnect(self):
97+
# Disconnect MQTT client if connected.
98+
if self._connected:
99+
self._client.disconnect()
100+
101+
def loop_background(self):
102+
"""Starts a background thread to listen for messages from Adafruit.IO
103+
and call the appropriate callbacks when feed events occur. Will return
104+
immediately and will not block execution. Should only be called once.
105+
"""
106+
self._client.loop_start()
107+
108+
def loop_blocking(self):
109+
"""Listen for messages from Adafruit.IO and call the appropriate
110+
callbacks when feed events occur. This call will block execution of
111+
your program and will not return until disconnect is explicitly called.
112+
113+
This is useful if your program doesn't need to do anything else except
114+
listen and respond to Adafruit.IO feed events. If you need to do other
115+
processing, consider using the loop_background function to run a loop
116+
in the background.
117+
"""
118+
self._client.loop_forever()
119+
120+
def loop(self, timeout_sec=1.0):
121+
"""Manually process messages from Adafruit.IO. This is meant to be used
122+
inside your own main loop, where you periodically call this function to
123+
make sure messages are being processed to and from Adafruit_IO.
124+
125+
The optional timeout_sec parameter specifies at most how long to block
126+
execution waiting for messages when this function is called. The default
127+
is one second.
128+
"""
129+
self._client.loop(timeout=timeout_sec)
130+
131+
def subscribe(self, feed_id):
132+
"""Subscribe to changes on the specified feed. When the feed is updated
133+
the on_message function will be called with the feed_id and new value.
134+
"""
135+
self._client.subscribe('api/feeds/{0}/data/receive.json'.format(feed_id))
136+
137+
def publish(self, feed_id, value):
138+
"""Publish a value to a specified feed.
139+
140+
Required parameters:
141+
- feed_id: The id of the feed to update.
142+
- value: The new value to publish to the feed.
143+
"""
144+
self._client.publish('api/feeds/{0}/data/send.json'.format(feed_id),
145+
payload=value)

examples/mqtt_client.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# Example of using the MQTT client class to subscribe to and publish feed values.
2+
# Author: Tony DiCola ([email protected])
3+
4+
# Import standard python modules.
5+
import random
6+
import sys
7+
import time
8+
9+
# Import Adafruit IO client.
10+
import Adafruit_IO
11+
12+
13+
# Set to your Adafruit IO key.
14+
ADAFRUIT_IO_KEY = 'YOUR ADAFRUIT IO KEY'
15+
16+
17+
# Define callback functions which will be called when certain events happen.
18+
def connected(client):
19+
# Connected function will be called when the client is connected to Adafruit IO.
20+
# This is a good place to subscribe to feed changes. The client parameter
21+
# passed to this function is the Adafruit IO MQTT client so you can make
22+
# calls against it easily.
23+
print 'Connected to Adafruit IO! Listening for DemoFeed changes...'
24+
# Subscribe to changes on a feed named DemoFeed.
25+
client.subscribe('DemoFeed')
26+
27+
def disconnected(client):
28+
# Disconnected function will be called when the client disconnects.
29+
print 'Disconnected from Adafruit IO!'
30+
sys.exit(1)
31+
32+
def message(client, feed_id, payload):
33+
# Message function will be called when a subscribed feed has a new value.
34+
# The feed_id parameter identifies the feed, and the payload parameter has
35+
# the new value.
36+
print 'Feed {0} received new value: {1}'.format(feed_id, payload)
37+
38+
39+
# Create an MQTT client instance.
40+
client = Adafruit_IO.MQTTClient(ADAFRUIT_IO_KEY)
41+
42+
# Setup the callback functions defined above.
43+
client.on_connect = connected
44+
client.on_disconnect = disconnected
45+
client.on_message = message
46+
47+
# Connect to the Adafruit IO server.
48+
client.connect()
49+
50+
# Now the program needs to use a client loop function to ensure messages are
51+
# sent and received. There are a few options for driving the message loop,
52+
# depending on what your program needs to do.
53+
54+
# The first option is to run a thread in the background so you can continue
55+
# doing things in your program.
56+
# client.loop_background()
57+
# Now send new values every 10 seconds.
58+
print 'Publishing a new message every 10 seconds (press Ctrl-C to quit)...'
59+
while True:
60+
value = random.randint(0, 100)
61+
print 'Publishing {0} to DemoFeed.'.format(value)
62+
client.publish('DemoFeed', value)
63+
time.sleep(10)
64+
65+
# Another option is to pump the message loop yourself by periodically calling
66+
# the client loop function. Notice how the loop below changes to call loop
67+
# continuously while still sending a new message every 10 seconds. This is a
68+
# good option if you don't want to or can't have a thread pumping the message
69+
# loop in the background.
70+
#last = 0
71+
#print 'Publishing a new message every 10 seconds (press Ctrl-C to quit)...'
72+
#while True:
73+
# # Explicitly pump the message loop.
74+
# client.loop()
75+
# # Send a new message every 10 seconds.
76+
# if (time.time() - last) >= 10.0:
77+
# value = random.randint(0, 100)
78+
# print 'Publishing {0} to DemoFeed.'.format(value)
79+
# client.publish('DemoFeed', value)
80+
# last = time.time()
81+
82+
# The last option is to just call loop_blocking. This will run a message loop
83+
# forever, so your program will not get past the loop_blocking call. This is
84+
# good for simple programs which only listen to events. For more complex programs
85+
# you probably need to have a background thread loop or explicit message loop like
86+
# the two previous examples above.
87+
#client.loop_blocking()
File renamed without changes.

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from distutils.core import setup
1+
from setuptools import setup
22

33
setup(
44
name='Adafruit_IO',

tests/README.txt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
Adafruit IO Python Client Test README
22

3-
To run these tests you must have the pytest module installed. You can install
4-
this (assuming you have pip installed) by executing:
5-
sudo pip install pytest
3+
To run the tests you can use python's built in unittest module's auto discovery.
4+
Do this by running inside this tests directory:
5+
python -m unittest discover
66

77
Some tests require a valid Adafruit IO account to run, and they key for this
88
account is provided in the ADAFRUIT_IO_KEY environment variable. Make sure to
99
set this envirionment variable before running the tests, for example to run all
1010
the tests with a key execute in this directory:
11-
ADAFRUIT_IO_KEY=my_io_key_value py.test
11+
ADAFRUIT_IO_KEY=my_io_key_value python -m unittest discover
12+
13+
To add your own tests you are strongly encouraged to build off the test base
14+
class provided in base.py. This class provides a place for common functions
15+
that don't need to be duplicated across all the tests. See the existing test
16+
code for an example of how tests are written and use the base test case.

tests/base.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Base testcase class with functions and state available to all tests.
2+
# Author: Tony DiCola ([email protected])
3+
import os
4+
import time
5+
import unittest
6+
7+
import Adafruit_IO
8+
9+
10+
class IOTestCase(unittest.TestCase):
11+
12+
def get_test_key(self):
13+
"""Return the AIO key specified in the ADAFRUIT_IO_KEY environment
14+
variable, or raise an exception if it doesn't exist.
15+
"""
16+
key = os.environ.get('ADAFRUIT_IO_KEY', None)
17+
if key is None:
18+
raise RuntimeError("ADAFRUIT_IO_KEY environment variable must be " \
19+
"set with valid Adafruit IO key to run this test!")
20+
return key

tests/test_errors.py

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,23 @@
1-
import os
1+
# Test error responses with REST client.
2+
# Author: Tony DiCola ([email protected])
23
import time
3-
4-
import pytest
4+
import unittest
55

66
import Adafruit_IO
7+
import base
78

89

9-
def _get_client():
10-
"""Return an Adafruit IO client instance configured with the key specified in
11-
the ADAFRUIT_IO_KEY environment variable.
12-
"""
13-
key = os.environ.get('ADAFRUIT_IO_KEY', None)
14-
if key is None:
15-
raise RuntimeError("ADAFRUIT_IO_KEY environment variable must be set with " \
16-
"valid Adafruit IO key to run this test!")
17-
return Adafruit_IO.Client(key)
18-
10+
class TestErrors(base.IOTestCase):
1911

20-
class TestErrors:
21-
def test_request_error_from_bad_key(self):
22-
io = Adafruit_IO.Client("this is a bad key from a test")
23-
with pytest.raises(Adafruit_IO.RequestError):
24-
io.send("TestStream", 42)
12+
def test_request_error_from_bad_key(self):
13+
io = Adafruit_IO.Client("this is a bad key from a test")
14+
with self.assertRaises(Adafruit_IO.RequestError):
15+
io.send("TestStream", 42)
2516

26-
def test_throttling_error_after_6_requests_in_short_period(self):
27-
io = _get_client()
28-
with pytest.raises(Adafruit_IO.ThrottlingError):
29-
for i in range(6):
30-
io.send("TestStream", 42)
31-
time.sleep(0.1) # Small delay to keep from hammering network.
17+
@unittest.skip("Throttling test must be run in isolation to prevent other failures.")
18+
def test_throttling_error_after_6_requests_in_short_period(self):
19+
io = Adafruit_IO.Client(self.get_test_key())
20+
with self.assertRaises(Adafruit_IO.ThrottlingError):
21+
for i in range(6):
22+
io.send("TestStream", 42)
23+
time.sleep(0.1) # Small delay to keep from hammering network.

0 commit comments

Comments
 (0)