-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathlockserver
More file actions
executable file
·237 lines (208 loc) · 5.66 KB
/
lockserver
File metadata and controls
executable file
·237 lines (208 loc) · 5.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#!/usr/bin/python3 -u
import sqlite3 as sqlite
import serial
import threading
from datetime import datetime
import os
import sys, traceback
import time
import urllib.request
import urllib.parse
import json
import select
import configparser
import socket
def init():
global SEND_HASH_URL
global GET_DATA_URL
global WEB_UPDATE_INTERVAL
global WEBSERVER_PASSWORD
global DB_PATH
global GPIO_PATH
global SERIAL
configpath = None
# try $XDG_CONFIG_HOME/lockserver.ini first
if 'XDG_CONFIG_HOME' in os.environ:
configpath = os.path.join(os.environ['XDG_CONFIG_HOME'], 'lockserver.ini')
elif 'HOME' in os.environ:
configpath = os.path.join(os.environ['HOME'], '.config', 'lockserver.ini')
# fall back to ./lockserver.ini
if not os.path.isfile(configpath):
configpath = os.path.join(sys.path[0], 'lockserver.ini')
config = configparser.ConfigParser(strict=True)
# set defaults
config.read_dict({
'WebServer': {
'SendHashURL': 'https://labitat.dk/member/money/doorputer_new_hash',
'GetDataURL': 'https://labitat.dk/member/money/doorputer_get_dates',
'UpdateIntervalSec': 60,
},
'Database': {
'Path': os.path.join(sys.path[0], 'users.db'),
},
'Serial': {
'Device': '/dev/ttyUSB0',
'Bitrate': 9600,
'Parity': 'E',
'StopBits': 2,
},
})
# read config file
with open(configpath, 'r') as f:
config.read_file(f, configpath)
SEND_HASH_URL = config.get('WebServer', 'SendHashUrl') + '?{}'
GET_DATA_URL = config.get('WebServer', 'GetDataUrl') + '?{}'
WEB_UPDATE_INTERVAL = config.getint('WebServer', 'UpdateIntervalSec')
WEBSERVER_PASSWORD = config.get('WebServer', 'Password')
DB_PATH = config.get('Database', 'Path')
SERIAL = serial.Serial(
port = config.get('Serial', 'Device'),
baudrate = config.getint('Serial', 'Bitrate'),
bytesize = 8,
parity = config.get('Serial', 'Parity'),
stopbits = config.getint('Serial', 'StopBits'),
)
if config.has_option('DayMode', 'GPIOPath'):
GPIO_PATH = config['Daymode']['GPIOPath']
elif config.has_option('Daymode', 'GPIONumber'):
GPIO_PATH = '/sys/class/gpio/gpio{}'.format(config.getint('Daymode', 'GPIONumber'))
else:
GPIO_PATH = None
init()
def ui():
poll = select.poll()
button = open(os.path.join(GPIO_PATH, 'value'), 'rb')
poll.register(button, select.POLLPRI | select.POLLERR)
print("Switching to night mode")
daymode = False
SERIAL.write(b'N')
while True:
# clear events by reading the state
button.seek(0, 0)
button.read()
# wait for rising edge
poll.poll()
# wait 10ms and check if the button is still pressed
time.sleep(0.01)
button.seek(0, 0)
if button.read() != b"1\n":
print('GPIO noise')
continue
if daymode:
print("Switching to night mode")
daymode = False
SERIAL.write(b'N')
else:
print("Switching to day mode")
daymode = True
SERIAL.write(b'D')
# wait for falling edge
poll.poll()
# wait a little more to debounce
time.sleep(0.1)
if GPIO_PATH:
threading.Thread(target=ui, daemon=True).start()
def update_from_webserver():
con = sqlite.connect(DB_PATH)
cur = con.cursor()
params = urllib.parse.urlencode({'key': WEBSERVER_PASSWORD})
try:
members = None
with urllib.request.urlopen(GET_DATA_URL.format(params)) as d:
members = json.loads(d.read().decode('utf-8'))
for member in members:
cur.execute(
"SELECT hash, expires FROM hashes "
"WHERE member = ?", [
member['login']
]
)
row = cur.fetchone()
if row:
if row[0] != member['hash'] or row[1] != member['expiry_date']:
cur.execute(
"UPDATE hashes "
"SET hash = ?, expires = ? "
"WHERE member = ?", [
member['hash'],
member['expiry_date'],
member['login']
]
)
print("Updated existing row")
else:
cur.execute(
"INSERT INTO hashes (member, hash, expires) "
"VALUES (?, ?, ?)", [
member['login'],
member['hash'],
member['expiry_date']
]
)
if cur.rowcount == 0:
print("Error inserting new row")
return False
else:
print("Inserted new row")
con.commit()
return True
except:
traceback.print_exc(file=sys.stderr)
return False
def periodic_updater():
while True:
update_from_webserver()
time.sleep(WEB_UPDATE_INTERVAL)
threading.Thread(target=periodic_updater, daemon=True).start()
def send_to_webserver(hash):
params = urllib.parse.urlencode({
'key': WEBSERVER_PASSWORD,
'hash': hash
})
try:
with urllib.request.urlopen(SEND_HASH_URL.format(params)) as d:
# 202 means that the same unknown hash was
# received twice before a timeout which allows
# a website-user to claim the hash as their own
return d.getcode() == 202
except:
traceback.print_exc(file=sys.stderr)
return False
con = sqlite.connect('file:{}?mode=ro'.format(DB_PATH), uri=True)
cur = con.cursor()
# notify systemd we're running if we're started with Type=notify
if b'NOTIFY_SOCKET' in os.environb:
npath = os.environb[b'NOTIFY_SOCKET']
if npath[0] == ord('@'):
npath = bytearray(npath)
npath[0] = 0
with socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM) as s:
s.connect(npath)
s.send(b"READY=1\nSTATUS=Running\n")
while True:
data = SERIAL.readline()
data = data[:-1]
if data[:5] != b"ALIVE":
print("DEBUG: {}".format(data))
if data[:5] == b"HASH+":
h = data[5:45].decode('utf-8')
cur.execute(
"SELECT 1 "
"FROM hashes "
"WHERE hash = ? AND expires >= ?", [
h,
datetime.now().strftime('%Y-%m-%d')
]
)
if cur.fetchone() != None:
SERIAL.write(b"O")
print("Opening")
else:
print("Sending hash to webserver")
if send_to_webserver(h):
SERIAL.write(b"V")
print("Validated")
else:
SERIAL.write(b"R")
print("Rejected")
# vim: set ts=4 sw=4 noet: