Skip to content

Commit 9a95b4c

Browse files
committed
Initial code
1 parent a9e1c0d commit 9a95b4c

File tree

2 files changed

+183
-0
lines changed

2 files changed

+183
-0
lines changed

main.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
"""An example usage of the library in server.py"""
2+
3+
from server import Server, Temperature
4+
5+
connection = Server("The Yodlers", "89c84")
6+
7+
connection.post(Temperature(100.0))
8+

server.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
"""The API wrapper for sending data to the CubeServer server
2+
See https://github.com/snorklerjoe/CubeServer-api-micropython for more info!
3+
"""
4+
5+
import urequests as requests
6+
from ubinascii import b2a_base64
7+
import network
8+
import machine
9+
10+
__all__ = [
11+
"DataClass",
12+
"DataPoint",
13+
"Temperature",
14+
"Humidity",
15+
"Pressure",
16+
"Intensity",
17+
"Text",
18+
"Server",
19+
"sta_if"
20+
]
21+
22+
23+
# Helpers:
24+
def enum(**enums):
25+
return type('Enum', (), enums)
26+
27+
def basic_auth_str(user: str, pwd: str) -> str:
28+
"""Encodes the username and password as per RFC7617 on Basic Auth"""
29+
return "Basic " + str(b2a_base64(f"{user}:{pwd}#").strip())
30+
31+
DEGREE_SIGN = u"\xb0"
32+
33+
# Global:
34+
sta_if = None
35+
36+
# Data stuff:
37+
DataClass = enum(
38+
TEMPERATURE = "temperature",
39+
HUMIDITY = "humidity",
40+
PRESSURE = "pressure",
41+
LIGHT_INTENSITY = "light intensity",
42+
COMMENT = "comment"
43+
)
44+
45+
class DataPoint():
46+
"""A class for storing and handling datapoints"""
47+
48+
@staticmethod
49+
@property
50+
#@abstractmethod
51+
def UNIT() -> str:
52+
"""A standard string representation of the unit for this datapoint"""
53+
54+
def __init__(self, data_class: DataClass, value: Union[int, float, str]):
55+
"""Initializes a piece of data to send to the server"""
56+
self.data_class = data_class
57+
self.value = value
58+
59+
def dumps(self) -> str:
60+
"""Dumps a JSON string out that the server will (hopefully) accept"""
61+
return json.dumps(
62+
{
63+
"type": self.data_class.value,
64+
"value": self.value
65+
}
66+
)
67+
68+
def __str__(self) -> str:
69+
return f"{self.value} {self.UNIT}"
70+
71+
class Temperature(DataPoint):
72+
"""A class for DataPoints that store temperature values"""
73+
UNIT = f"{DEGREE_SIGN}F"
74+
def __init__(self, value):
75+
super().__init__(DataClass.TEMPERATURE, value)
76+
77+
class Humidity(DataPoint):
78+
"""A class for DataPoints that store humidity values"""
79+
UNIT = "%"
80+
def __init__(self, value):
81+
super().__init__(DataClass.HUMIDITY, value)
82+
83+
class Pressure(DataPoint):
84+
"""A class for DataPoints that store barometric pressure values"""
85+
UNIT="inHg"
86+
def __init__(self, value):
87+
super().__init__(DataClass.PRESSURE, value)
88+
89+
class Intensity(DataPoint):
90+
"""A class for DataPoints that store light intensity values"""
91+
UNIT="lux"
92+
def __init__(self, value):
93+
super().__init__(DataClass.LIGHT_INTENSITY, value)
94+
95+
class Text(DataPoint):
96+
"""A class reserved for DataPoints that are intended as a text comment"""
97+
UNIT="" # No unit for regular strings of text
98+
def __init__(self, value: str):
99+
super().__init__(DataClass.COMMENT, value)
100+
101+
102+
103+
104+
# Main class:
105+
class Server:
106+
"""A class for the server"""
107+
def __init__(
108+
self,
109+
team_name: str,
110+
team_secret: str,
111+
server_addr: str = "https://192.168.252.1:8081",
112+
server_verify = "./trusted.pem", # :typing.Union[bool,str]
113+
auto_network: bool = True, # Whether or not to automatically connect to the AP
114+
wifi_ssid: str = "CubeServer-API"
115+
):
116+
"""Initializes the connection for the API server
117+
Please provide the case-sensitive team name and the secret key
118+
If needed, the server's address can be defined with the server_addr
119+
kwarg
120+
If no other server_verify value is given as a kwarg, this looks for
121+
a file called trusted.pem. If the file exists or if a boolean value is
122+
given, the requests.Session().verify value will be set to this
123+
value."""
124+
125+
# TODO: Localize this a bit more to improve RAM usage
126+
global sta_if
127+
if auto_network and not (sta_if and sta_if.isconnected()):
128+
sta_if = network.WLAN(network.STA_IF)
129+
sta_if.active(True)
130+
print(f"Connecting to {wifi_ssid}", end ="")
131+
sta_if.connect(wifi_ssid)
132+
while not sta_if.isconnected():
133+
print(".", end ="")
134+
machine.lightsleep(100)
135+
136+
print()
137+
print("Connected!")
138+
139+
140+
self.authstr = basic_auth_str(team_name, team_secret)
141+
self.addr = server_addr
142+
self.verify = server_verify
143+
144+
# Check status:
145+
if self.get_status() is None:
146+
raise IOError("Unable to communicate with the server.")
147+
148+
def post(
149+
self,
150+
data #: Union[list[DataPoint], DataPoint]
151+
) -> bool:
152+
"""Posts a datapoint to the server, returns True on success"""
153+
if isinstance(data, list): # Send multiple datapoints:
154+
return all(
155+
self.post(point) for point in data
156+
)
157+
# Just send one datapoint:
158+
response = requests.post(
159+
f"{self.addr}/data", data={"data":data.dumps()},
160+
headers={'Authorization': self.authstr}
161+
)
162+
if response.status_code != 201:
163+
return False
164+
return True
165+
166+
def get_status(self):
167+
response = requests.get(
168+
f"{self.addr}/status",
169+
headers={'Authorization': self.authstr}
170+
)
171+
if response.status_code in [401, 403]:
172+
print("Invalid authorization config. Check the team name and secret.")
173+
if response.status_code == 200:
174+
return response.json()
175+
return None

0 commit comments

Comments
 (0)