-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathceyear.py
More file actions
executable file
·178 lines (153 loc) · 6.61 KB
/
ceyear.py
File metadata and controls
executable file
·178 lines (153 loc) · 6.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python3
import binascii
import sys
import time
import usb1
import numpy as np
def validate_read(expected, actual, msg):
if expected != actual:
print('Failed %s' % msg)
print(' Expected; %s' % binascii.hexlify(expected,))
print(' Actual: %s' % binascii.hexlify(actual,))
#raise Exception('failed validate: %s' % msg)
def safefloat(f):
try:
return float(f)
except:
return None
def replay(dev):
sleep = args.delay
def bulkRead(endpoint, length, timeout=None):
time.sleep(sleep)
return dev.bulkRead(endpoint, length, timeout=(1000 if timeout is None else timeout))
def bulkWrite(endpoint, data, timeout=None):
time.sleep(sleep)
dev.bulkWrite(endpoint, data, timeout=(1000 if timeout is None else timeout))
def controlRead(bRequestType, bRequest, wValue, wIndex, wLength,
timeout=None):
time.sleep(sleep)
return dev.controlRead(bRequestType, bRequest, wValue, wIndex, wLength,
timeout=(1000 if timeout is None else timeout))
def controlWrite(bRequestType, bRequest, wValue, wIndex, data,
timeout=None):
time.sleep(sleep)
dev.controlWrite(bRequestType, bRequest, wValue, wIndex, data,
timeout=(1000 if timeout is None else timeout))
def interruptRead(endpoint, size, timeout=None):
time.sleep(sleep)
return dev.interruptRead(endpoint, size,
timeout=(1000 if timeout is None else timeout))
def interruptWrite(endpoint, data, timeout=None):
time.sleep(sleep)
dev.interruptWrite(endpoint, data, timeout=(1000 if timeout is None else timeout))
# Generated by usbrply
# Source: Linux pcap (usbmon)
# cmd: ./.local/bin/usbrply --device-hi --wrapper -p usb-sonda.pcapng
# PCapGen device hi: selected device 43
# Generated from packet 40/41
buff = controlRead(0xA1, 0x07, 0x0000, 0x0000, 24)
validate_read(b"\x01\x00\x00\x01\x04\x01\x00\x00\x00\x00\x00\x00\x00\x01\x04\x08"
b"\x00\x00\x00\x00\x00\x00\x00\x00", buff, "packet 40/41")
# Generated from packet 42/43
buff = controlRead(0xA1, 0x40, 0x0000, 0x0000, 1)
validate_read(b"\x01", buff, "packet 42/43")
# Generated from packet 74/75
buff = controlRead(0xA1, 0x07, 0x0000, 0x0000, 24)
validate_read(b"\x01\x00\x00\x01\x04\x01\x00\x00\x00\x00\x00\x00\x00\x01\x04\x08"
b"\x00\x00\x00\x00\x00\x00\x00\x00", buff, "packet 74/75")
# Generated from packet 76/77
buff = controlRead(0xA1, 0x40, 0x0000, 0x0000, 1)
validate_read(b"\x01", buff, "packet 76/77")
# Generated from packet 78/79
bulkWrite(0x02, b"\x01\x02\xFD\x00\x06\x00\x00\x00\x01\x00\x00\x00\x2A\x49\x44\x4E"
b"\x3F\x0A\x00\x00")
# Generated from packet 80/82
bulkWrite(0x02, b"\x02\x03\xFC\x00\x00\x10\x00\x00\x00\x00\x00\x00")
# Generated from packet 81/83
buff = bulkRead(0x86, 0x2000)
validate_read(b"\x02\x03\xFC\x00\x1E\x00\x00\x00\x01\x00\x00\x00\x43\x45\x54\x43"
b"\x34\x31\x2C\x41\x56\x38\x37\x32\x33\x31\x2C\x77\x67\x61\x30\x32"
b"\x30\x37\x35\x2C\x31\x2E\x30\x2E\x37\x0A", buff, "packet 81/83")
# Generated from packet 90/91
buff = controlRead(0xA1, 0x07, 0x0000, 0x0000, 24)
validate_read(b"\x01\x00\x00\x01\x04\x01\x00\x00\x00\x00\x00\x00\x00\x01\x04\x08"
b"\x00\x00\x00\x00\x00\x00\x00\x00", buff, "packet 90/91")
# Generated from packet 92/93
buff = controlRead(0xA1, 0x40, 0x0000, 0x0000, 1)
validate_read(b"\x01", buff, "packet 92/93")
# Generated from packet 94/95
bulkWrite(0x02, b"\x01\x02\xFD\x00\x06\x00\x00\x00\x01\x00\x00\x00\x2A\x49\x44\x4E"
b"\x3F\x0A\x00\x00")
# Generated from packet 96/97
bulkWrite(0x02, b"\x02\x03\xFC\x00\x00\x10\x00\x00\x02\x0A\x00\x00")
# Generated from packet 98/99
buff = bulkRead(0x86, 0x2000)
validate_read(b"\x02\x03\xFC\x00\x1E\x00\x00\x00\x01\x00\x00\x00\x43\x45\x54\x43"
b"\x34\x31\x2C\x41\x56\x38\x37\x32\x33\x31\x2C\x77\x67\x61\x30\x32"
b"\x30\x37\x35\x2C\x31\x2E\x30\x2E\x37\x0A", buff, "packet 98/99")
# Generated from packet 100/101
bulkWrite(0x02, b"\x01\x04\xFB\x00\x05\x00\x00\x00\x01\x00\x00\x00\x2A\x52\x53\x54"
b"\x0A\x00\x00\x00")
# Generated from packet 102/103
bulkWrite(0x02, b"\x01\x05\xFA\x00\x05\x00\x00\x00\x01\x00\x00\x00\x49\x4E\x49\x54"
b"\x0A\x00\x00\x00")
# Generated from packet 104/105
bulkWrite(0x02, b"\x01\x06\xF9\x00\x0D\x00\x00\x00\x01\x00\x00\x00\x49\x4E\x49\x54"
b"\x3A\x43\x4F\x4E\x54\x20\x4F\x4E\x0A\x00\x00\x00")
runavg = []
while True:
bulkWrite(0x02, b"\x01\x07\xF8\x00\x0D\x00\x00\x00\x01\x00\x00\x00\x49\x4E\x49\x54"
b"\x3A\x43\x4F\x4E\x54\x20\x4F\x4E\x0A\x00\x00\x00")
bulkWrite(0x02, b"\x01\x08\xF7\x00\x06\x00\x00\x00\x01\x00\x00\x00\x46\x45\x54\x43"
b"\x3F\x0A\x00\x00")
bulkWrite(0x02, b"\x02\x09\xF6\x00\x00\x10\x00\x00\x02\x0A\x00\x00")
buff = bulkRead(0x86, 0x2000)
processed = False
if 0 in buff:
pos = buff[::-1].index(0)
rest = buff[-pos:].decode("ascii", "ignore")
if rest.endswith("\n"):
val = safefloat(rest)
if val != None:
runavg.append(val)
print("%.1f %.1f"%(val, np.mean(runavg)))
processed = True
runavg = runavg[-10:]
if not processed:
print(buff)
def open_dev(usbcontext=None):
if usbcontext is None:
usbcontext = usb1.USBContext()
print('Scanning for devices...')
for udev in usbcontext.getDeviceList(skip_on_error=True):
vid = udev.getVendorID()
pid = udev.getProductID()
try:
vid_target = int(args.vid, 16)
pid_target = int(args.pid, 16)
except:
print("Cannot parse VID/PID (invalid format?)")
sys.exit(1)
if (vid, pid) == (vid_target, pid_target):
print("")
print("")
print('Found device')
print('Bus %03i Device %03i: ID %04x:%04x' % (
udev.getBusNumber(),
udev.getDeviceAddress(),
vid,
pid))
return udev.open()
raise Exception("Failed to find a device")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Set up the power sensor with hardcoded values and dump readings to stdout')
parser.add_argument("--delay", dest="delay", type=float, default=0.05, help="delay between replayed packets [seconds]")
parser.add_argument("--vid", dest="vid", type=str, default="04b4", help="USB Vendor ID [hex]")
parser.add_argument("--pid", dest="pid", type=str, default="1004", help="USB Product ID [hex]")
args = parser.parse_args()
usbcontext = usb1.USBContext()
dev = open_dev(usbcontext)
dev.claimInterface(0)
dev.resetDevice()
replay(dev)