-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmyoManager.py
More file actions
99 lines (80 loc) · 3.44 KB
/
myoManager.py
File metadata and controls
99 lines (80 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import myo
from myo import *
from PyQt5.QtCore import QThread, pyqtSignal, QTimer
class Listener(DeviceListener):
def __init__(self, m):
super().__init__()
self.manager = m
self.data = {}
def on_connected(self, event):
self.manager.connecting = False
self.manager.connected = True
event.device.request_battery_level()
event.device.request_rssi()
event.device.stream_emg(True)
event.device.vibrate(myo.VibrationType.short)
self.manager.signals.emit({"type": event.type,
"data": {"name": event.device_name,
"mac_address": event.mac_address}})
def on_disconnected(self, event):
self.manager.signals.emit({"type": event.type,
"data": {"timeout": "",
"unOpenMyo": ""}})
self.manager.connected = False
def on_emg(self, event):
self.manager.signals.emit({"type": event.type,
"data": {"emg": event.emg}})
def on_orientation(self, event):
self.manager.signals.emit({"type": event.type,
"data": {"gyroscope": event.gyroscope,
"acceleration": event.acceleration,
"orientation": event.orientation}})
def on_battery_level(self, event):
self.manager.signals.emit({"type": event.type,
"data": {"battery": event.battery_level}})
def on_rssi(self, event):
self.manager.signals.emit({"type": event.type,
"data": {"rssi": event.rssi}})
class MyoManager(QThread):
signals = pyqtSignal(dict)
send = None
connecting = False
connected = False
def __init__(self, sender):
super().__init__()
self.send = sender
self.signals.connect(sender.callback)
myo.init(sdk_path=r'C:\myo-sdk-win-0.9.0')
def timed_out(self):
if (not self.connected) and self.connecting:
self.signals.emit({"type": EventType.disconnected,
"data": {"timeout": "timeout",
"unOpenMyo": ""}})
self.disconnect()
def connect(self):
if not self.connected and not self.connecting:
self.connecting = True
self.stop = False
QTimer.singleShot(5000, self.timed_out)
self.start()
def run(self):
try:
self.listener = Listener(self)
hub = myo.Hub("com.twins.dataset")
while hub.run(self.listener.on_event, 500):
if self.stop:
self.stop = False
break
except:
self.signals.emit({"type": EventType.disconnected,
"data": {"timeout": "",
"unOpenMyo": "unOpenMyo"}})
self.connecting = False
def disconnect(self):
if self.connected:
self.signals.emit({"type": EventType.disconnected,
"data": {"timeout": "",
"unOpenMyo": ""}})
self.connecting = False
self.connected = False
self.stop = True