11
11
- Turn digital output on and off
12
12
"""
13
13
14
+ import errno
15
+ from contextlib import contextmanager
16
+ from time import monotonic , sleep
17
+
14
18
import usb .core
15
19
import usb .util
16
20
@@ -26,18 +30,35 @@ def __init__(self, **args):
26
30
raise ValueError ("Device not found" )
27
31
28
32
if self ._dev .idVendor == 0x16C0 :
29
- self .set_output = self .set_output_dcttech
30
- self .get_output = self .get_output_dcttech
33
+ self ._set_output = self ._set_output_dcttech
34
+ self ._get_output = self ._get_output_dcttech
31
35
elif self ._dev .idVendor == 0x5131 :
32
- self .set_output = self .set_output_lcus
33
- self .get_output = self .get_output_lcus
36
+ self ._set_output = self ._set_output_lcus
37
+ self ._get_output = self ._get_output_lcus
34
38
else :
35
39
raise ValueError (f"Unknown vendor/protocol for VID { self ._dev .idVendor :x} " )
36
40
37
41
if self ._dev .is_kernel_driver_active (0 ):
38
42
self ._dev .detach_kernel_driver (0 )
39
43
40
- def set_output_dcttech (self , number , status ):
44
+ @contextmanager
45
+ def _claimed (self ):
46
+ timeout = monotonic () + 1.0
47
+ while True :
48
+ try :
49
+ usb .util .claim_interface (self ._dev , 0 )
50
+ break
51
+ except usb .core .USBError as e :
52
+ if monotonic () > timeout :
53
+ raise e
54
+ if e .errno == errno .EBUSY :
55
+ sleep (0.01 )
56
+ else :
57
+ raise e
58
+ yield
59
+ usb .util .release_interface (self ._dev , 0 )
60
+
61
+ def _set_output_dcttech (self , number , status ):
41
62
assert 1 <= number <= 8
42
63
req = [0xFF if status else 0xFD , number ]
43
64
self ._dev .ctrl_transfer (
@@ -48,7 +69,7 @@ def set_output_dcttech(self, number, status):
48
69
req , # payload
49
70
)
50
71
51
- def get_output_dcttech (self , number ):
72
+ def _get_output_dcttech (self , number ):
52
73
assert 1 <= number <= 8
53
74
resp = self ._dev .ctrl_transfer (
54
75
usb .util .CTRL_TYPE_CLASS | usb .util .CTRL_RECIPIENT_DEVICE | usb .util .ENDPOINT_IN ,
@@ -59,7 +80,7 @@ def get_output_dcttech(self, number):
59
80
)
60
81
return bool (resp [7 ] & (1 << (number - 1 )))
61
82
62
- def set_output_lcus (self , number , status ):
83
+ def _set_output_lcus (self , number , status ):
63
84
assert 1 <= number <= 8
64
85
ep_in = self ._dev [0 ][(0 , 0 )][0 ]
65
86
ep_out = self ._dev [0 ][(0 , 0 )][1 ]
@@ -68,13 +89,18 @@ def set_output_lcus(self, number, status):
68
89
ep_out .write (req )
69
90
ep_in .read (64 )
70
91
71
- def get_output_lcus (self , number ):
92
+ def _get_output_lcus (self , number ):
72
93
assert 1 <= number <= 8
73
94
# we have no information on how to read the current value
74
95
return False
75
96
76
- def __del__ (self ):
77
- usb .util .release_interface (self ._dev , 0 )
97
+ def set_output (self , number , status ):
98
+ with self ._claimed ():
99
+ self ._set_output (number , status )
100
+
101
+ def get_output (self , number ):
102
+ with self ._claimed ():
103
+ self ._get_output (number )
78
104
79
105
80
106
_relays = {}
0 commit comments