-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
66 lines (51 loc) · 2.07 KB
/
server.py
File metadata and controls
66 lines (51 loc) · 2.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import argparse
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
from serial import Serial
class LelitServer(HTTPServer):
def __init__(self, serial_port, serial_baudrate, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sensor = Serial(serial_port, serial_baudrate, timeout=1)
def get_sensor_data(self):
self.sensor.flushInput()
raw_data = self.sensor.readline()
raw_data = self.sensor.readline()
return self.__parse_sensor_data__(str(raw_data))
@staticmethod
def __parse_sensor_data__(data):
if not data or len(data) < 10:
return {}
data = str(data).split(',')
return {
'version': data[0][2:],
'steam': int(data[1]),
'steam_target': int(data[2]),
'heat_exchanger': int(data[3]),
'fast_heating_countdown': int(data[4]),
'heating': data[5][0] == '1',
}
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
try:
data = self.server.get_sensor_data()
except Exception as e:
self.send_response(500)
self.end_headers()
self.wfile.write(f'Error: {e}'.encode())
return
self.send_response(200)
self.end_headers()
self.wfile.write(json.dumps(data).encode())
return
def do_POST(self):
return
if __name__ == '__main__':
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--host', default='localhost', help='Hostname to serve from')
arg_parser.add_argument('--port', type=int, default=8000, help='Port number on which to listen')
arg_parser.add_argument('--serial-port', default='/dev/ttyUSB0', help='Serial port to read from')
arg_parser.add_argument('--serial-baudrate', type=int, default=9600, help='Serial baudrate')
args = arg_parser.parse_args()
server = LelitServer(args.serial_port, args.serial_baudrate, (args.host, args.port), RequestHandler)
print(f'Listening on {args.host}:{args.port}')
server.serve_forever()