-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathservice
More file actions
150 lines (129 loc) · 5.01 KB
/
service
File metadata and controls
150 lines (129 loc) · 5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/python
import sys; sys.path.insert(0, '.pylib')
import time, os, threading, traceback, socket
from select import select
from evdev import ecodes, InputDevice, list_devices
from evdev.events import KeyEvent
from hosted import node, config, device as ibdevice
from hosted.p2p import PeerGroup
config.restart_on_update()
lua = node.rpc()
def log(msg, name='touch service'):
print >>sys.stderr, "[{}] {}".format(name, msg)
event_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
event_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
@lua.call
def trigger(event):
if config.trigger_target and event:
log("sending %s:4444/%s" % (config.trigger_target, event))
event_socket.sendto(event, (config.trigger_target, 4444))
class TouchPeers(PeerGroup):
def on_peer_message(self, msg, peer_info):
self.broadcast_to_all(**msg)
def on_leader_message(self, msg, peer_info):
node[msg['target']](msg['args'])
def broadcast_node_message(self, target, args):
self.send_to_leader(
target = target,
args = args,
)
class TouchMonitor(object):
def __init__(self, touch_peers):
self._touch_peers = touch_peers
self._touch_state = dict(
down = False,
x = 0,
y = 0,
)
self._screen_w, self._screen_h = ibdevice.screen_resolution
self._usb_devices = {}
def update_devices(self):
new = set(list_devices("/dev/input/"))
old = set(self._usb_devices.keys())
for device_name in new - old:
usb_device = InputDevice(device_name)
self._usb_devices[device_name] = usb_device
import pprint
pprint.pprint(usb_device.capabilities(verbose=True))
pprint.pprint(usb_device.capabilities(verbose=False))
sys.stdout.flush()
for device_name in old - new:
del self._usb_devices[device_name]
def device_event(self, usb_device, event):
def scale(value, abs_info, screen):
if abs_info is None:
return value
vmin = abs_info.min
vmax = abs_info.max
value_range = vmax - vmin
return int(float(screen) / value_range * (value - vmin))
if event.type == ecodes.EV_KEY and event.code in (ecodes.BTN_TOUCH, ecodes.BTN_MOUSE, ecodes.BTN_LEFT):
self._touch_state['down'] = event.value != 0
if event.type == ecodes.EV_ABS:
info = usb_device.capabilities(verbose=False)
abs_infos = dict(info[ecodes.EV_ABS])
if event.code == ecodes.ABS_X:
self._touch_state['x'] = scale(
event.value, abs_infos.get(ecodes.ABS_X), self._screen_w
)
if event.code == ecodes.ABS_Y:
self._touch_state['y'] = scale(
event.value, abs_infos.get(ecodes.ABS_Y), self._screen_h
)
if event.type == ecodes.EV_KEY and event.code in ecodes.KEY:
self._touch_peers.broadcast_node_message('/event/keyboard', dict(
key = ecodes.KEY[event.code].replace("KEY_", "").lower(),
action = {
KeyEvent.key_up: "up",
KeyEvent.key_down: "down",
KeyEvent.key_hold: "hold",
}[event.value],
))
def run(self):
while 1:
self.update_devices()
r, w, e = select(self._usb_devices.values(), [], [], 5)
for usb_device in r:
try:
for ev in usb_device.read():
self.device_event(usb_device, ev)
except IOError:
# device disconnected
pass
self._touch_peers.broadcast_node_message('/event/touch', self._touch_state)
class GPIOMonitor(object):
def __init__(self, touch_peers):
self._touch_peers = touch_peers
self._pins = set()
for page in config.pages:
for link in page['links']:
if link['type'] == 'gpio':
self._pins.add(link['options']['pin'])
print >>sys.stderr, "gpios active: %s" % (self._pins,)
for pin in self._pins:
ibdevice.gpio.monitor(pin)
def run(self):
for pin, high in ibdevice.gpio.poll_forever():
self._touch_peers.broadcast_node_message('/event/gpio', dict(
pin = pin,
high = high,
))
def run_in_thread(fn):
def wrap():
try:
while 1:
fn()
except Exception:
traceback.print_exc()
os._exit(1)
thread = threading.Thread(target=wrap)
thread.daemon = True
thread.start()
if __name__ == "__main__":
touch_peers = TouchPeers(local_only=config.local_only)
touch_monitor = TouchMonitor(touch_peers)
gpio_monitor = GPIOMonitor(touch_peers)
run_in_thread(touch_monitor.run)
run_in_thread(gpio_monitor.run)
while 1:
time.sleep(120)