Skip to content

Commit a728a8b

Browse files
authored
feat: provide a context manager for read/write operations (#20)
This is a different way to implement the same functionality as `read_device_registers()` and `write_device_register()` when scheduling multiple read/write operations _on the same device_. - In addition it's now possible to perform read _and_ write operations while holding the same lock. This was not possible with `read_device_registers()` and `write_device_register()` and is expected to improve usability and robustness. - However it is not possible to use `feeph.i2c.BurstHandler()` to schedule multiple read/write operations on different devices. This regression was a conscious choice in favor of a more convenient interface.
1 parent 18d9482 commit a728a8b

File tree

3 files changed

+383
-10
lines changed

3 files changed

+383
-10
lines changed

feeph/i2c/__init__.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
# the following imports are provided for user convenience
1616
# flake8: noqa: F401
17+
from feeph.i2c.burst_handler import BurstHandler, BurstHandle
1718
from feeph.i2c.emulation import EmulatedI2C
1819

1920
LH = logging.getLogger("i2c")
@@ -28,18 +29,16 @@ def read_device_register(i2c_bus: busio.I2C, i2c_adr: int, register: int, byte_c
2829
- may raise a RuntimeError if there were too many errors
2930
3031
If you need to read multiple registers in a single transaction please
31-
use `read_device_registers()` instead. This will ensure all values are
32+
use `feeph.i2c.Burst()` instead. This will ensure all values are
3233
read while holding the same lock and prevent outside interference.
3334
3435
typical usage:
3536
```
3637
value1 = read_device_register(i2c_bus, 0x4C, 0x00)
3738
```
3839
"""
39-
reads = [
40-
(i2c_adr, register, byte_count)
41-
]
42-
return read_device_registers(i2c_bus, reads, max_tries, timeout_ms).pop()
40+
with BurstHandler(i2c_bus=i2c_bus, i2c_adr=i2c_adr, timeout_ms=timeout_ms) as bh:
41+
return bh.read_register(register=register, byte_count=byte_count, max_tries=max_tries)
4342

4443

4544
def read_device_registers(i2c_bus: busio.I2C, reads: list[tuple[int, int, int]], max_tries: int = 3, timeout_ms: int = 500) -> list[int]:
@@ -111,18 +110,16 @@ def write_device_register(i2c_bus: busio.I2C, i2c_adr: int, register: int, value
111110
- may raise a RuntimeError if there were too many errors
112111
113112
If you need to write multiple registers in a single transaction please
114-
use `write_device_registers()` instead. This will ensure all values are
113+
use `feeph.i2c.Burst()` instead. This will ensure all values are
115114
written while holding the same lock and prevent outside interference.
116115
117116
typical usage:
118117
```
119118
write_device_register(i2c_bus, 0x4C, 0x00, value)
120119
```
121120
"""
122-
writes = [
123-
(i2c_adr, register, byte_count, value)
124-
]
125-
write_device_registers(i2c_bus, writes, max_tries, timeout_ms)
121+
with BurstHandler(i2c_bus=i2c_bus, i2c_adr=i2c_adr, timeout_ms=timeout_ms) as bh:
122+
bh.write_register(register=register, value=value, byte_count=byte_count, max_tries=max_tries)
126123

127124

128125
def write_device_registers(i2c_bus: busio.I2C, writes: list[tuple[int, int, int, int]], max_tries: int = 3, timeout_ms: int = 500):

feeph/i2c/burst_handler.py

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
#!/usr/bin/env python3
2+
"""
3+
short-lived transmission handler for feeph.i2c
4+
5+
usage:
6+
```
7+
import busio
8+
import feeph.i2c
9+
10+
i2c_bus = busio.I2C(...)
11+
12+
with feeph.i2c.Burst(i2c_bus=i2c_bus, i2c_adr=0x4C) as bh:
13+
value = bh.read_register(register)
14+
bh.write_register(register, value + 1)
15+
```
16+
"""
17+
18+
import logging
19+
import time
20+
21+
# module busio provides no type hints
22+
import busio # type: ignore
23+
24+
LH = logging.getLogger("i2c")
25+
26+
27+
class BurstHandle:
28+
"""
29+
internal abstraction - !! do not instantiate !!
30+
31+
Please use `feeph.i2c.BurstHandler() instead`.
32+
"""
33+
34+
def __init__(self, i2c_bus: busio.I2C, i2c_adr: int):
35+
self._i2c_bus = i2c_bus
36+
if 0 <= i2c_adr <= 255:
37+
self._i2c_adr = i2c_adr
38+
else:
39+
raise ValueError(f"Provided I²C address {i2c_adr} is out of range! (allowed range: 0 ≤ x ≤ 255)")
40+
41+
def read_register(self, register: int, byte_count: int = 1, max_tries: int = 5) -> int:
42+
"""
43+
read a single register from I²C device identified by `i2c_adr` and
44+
return its contents as an integer value
45+
- may raise a RuntimeError if it was not possible to acquire
46+
the bus within allowed time
47+
- may raise a RuntimeError if there were too many errors
48+
"""
49+
_validate_inputs(register=register, value=0, byte_count=byte_count, max_tries=max_tries)
50+
if byte_count > 1:
51+
LH.warning("Multi byte reads are not implemented yet! Returning a single byte instead.")
52+
byte_count = 1
53+
for cur_try in range(1, 1 + max_tries):
54+
try:
55+
buf_r = bytearray(1)
56+
buf_r[0] = register
57+
buf_w = bytearray(byte_count)
58+
self._i2c_bus.writeto_then_readfrom(address=self._i2c_adr, buffer_out=buf_r, buffer_in=buf_w)
59+
# TODO properly handle multi byte reads
60+
return buf_w[0]
61+
except OSError as e:
62+
# [Errno 121] Remote I/O error
63+
LH.warning("[%s] Failed to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
64+
time.sleep(0.001)
65+
except RuntimeError as e:
66+
LH.warning("[%s] Unable to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
67+
time.sleep(0.001)
68+
else:
69+
raise RuntimeError(f"Unable to read register 0x{register:02X} after {cur_try} attempts. Giving up.")
70+
71+
def write_register(self, register: int, value: int, byte_count: int = 1, max_tries: int = 3):
72+
"""
73+
write a single register to I²C device identified by `i2c_adr`
74+
- may raise a RuntimeError if it was not possible to acquire
75+
the bus within allowed time
76+
- may raise a RuntimeError if there were too many errors
77+
"""
78+
_validate_inputs(register=register, value=value, byte_count=byte_count, max_tries=max_tries)
79+
if byte_count > 1:
80+
LH.warning("Multi byte writes are not implemented yet! Returning a single byte instead.")
81+
byte_count = 1
82+
for cur_try in range(1, 1 + max_tries):
83+
try:
84+
buf = bytearray(1 + byte_count)
85+
buf[0] = register
86+
buf[1] = value & 0xFF
87+
# TODO properly handle multi byte reads
88+
self._i2c_bus.writeto(address=self._i2c_adr, buffer=buf)
89+
return
90+
except OSError as e:
91+
# [Errno 121] Remote I/O error
92+
LH.warning("[%s] Failed to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
93+
time.sleep(0.1)
94+
except RuntimeError as e:
95+
LH.warning("[%s] Unable to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
96+
time.sleep(0.1)
97+
else:
98+
raise RuntimeError(f"Unable to read register 0x{register:02X} after {cur_try} attempts. Giving up.")
99+
100+
101+
class BurstHandler:
102+
"""
103+
a short-lived I/O operation on the I²C bus
104+
105+
Technically speaking this I/O operation could span multiple devices
106+
but we're making an design choice and assume a single device is being
107+
used. This simplifies the user interface.
108+
"""
109+
110+
def __init__(self, i2c_bus: busio.I2C, i2c_adr: int, timeout_ms: int | None = 500):
111+
self._i2c_bus = i2c_bus
112+
self._i2c_adr = i2c_adr
113+
if timeout_ms is None:
114+
self._timeout_ms = None
115+
elif isinstance(timeout_ms, int) and timeout_ms > 0:
116+
self._timeout_ms = timeout_ms
117+
else:
118+
raise ValueError("Provided timeout is not a positive integer or 'None'!")
119+
120+
def __enter__(self) -> BurstHandle:
121+
"""
122+
Try to acquire a lock for exclusive access on the I²C bus.
123+
124+
Raises a RuntimeError if it wasn't possible to acquire the lock
125+
within the given timeout.
126+
"""
127+
LH.debug("[%d] Initializing an I²C I/O burst.", id(self))
128+
# 0.001 = 1 millisecond
129+
# 0.000_001 = 1 microsecond
130+
# 0.000_000_001 = 1 nanosecond
131+
self._timestart_ns = time.perf_counter_ns()
132+
sleep_time = 0.001 # 1 millisecond
133+
if self._timeout_ms is not None:
134+
timeout_ns = self._timeout_ms * 1000 * 1000
135+
deadline = time.monotonic_ns() + timeout_ns
136+
while not self._i2c_bus.try_lock():
137+
if time.monotonic_ns() <= deadline:
138+
# I²C bus was busy, wait and retry
139+
time.sleep(sleep_time) # time is given in seconds
140+
else:
141+
# unable to acquire the lock
142+
raise RuntimeError("timed out before the I²C bus became available")
143+
else:
144+
while not self._i2c_bus.try_lock():
145+
# I²C bus was busy, wait and retry
146+
time.sleep(sleep_time) # time is given in seconds
147+
# successfully acquired a lock
148+
elapsed_ns = time.perf_counter_ns() - self._timestart_ns
149+
LH.debug("[%d] Acquired a lock on the I²C bus after %d ms.", id(self), elapsed_ns / (1000 * 1000))
150+
return BurstHandle(i2c_bus=self._i2c_bus, i2c_adr=self._i2c_adr)
151+
152+
def __exit__(self, exc_type, exc_value, exc_tb):
153+
elapsed_ns = time.perf_counter_ns() - self._timestart_ns
154+
LH.debug("[%d] I²C I/O burst completed after %d ms.", id(self), elapsed_ns / (1000 * 1000))
155+
LH.debug("[%d] Releasing the lock on the I²C bus.", id(self))
156+
self._i2c_bus.unlock()
157+
158+
def read_register(self, register: int, byte_count: int = 1, max_tries: int = 5) -> int:
159+
"""
160+
read a single register from I²C device identified by `i2c_adr` and
161+
return its contents as an integer value
162+
- may raise a RuntimeError if it was not possible to acquire
163+
the bus within allowed time
164+
- may raise a RuntimeError if there were too many errors
165+
"""
166+
_validate_inputs(register=register, value=0, byte_count=byte_count, max_tries=max_tries)
167+
if byte_count > 1:
168+
LH.warning("Multi byte reads are not implemented yet! Returning a single byte instead.")
169+
byte_count = 1
170+
for cur_try in range(1, 1 + max_tries):
171+
try:
172+
buf_r = bytearray(1)
173+
buf_r[0] = register
174+
buf_w = bytearray(byte_count)
175+
self._i2c_bus.writeto_then_readfrom(address=self._i2c_adr, buffer_out=buf_r, buffer_in=buf_w)
176+
# TODO properly handle multi byte reads
177+
return buf_w[0]
178+
except OSError as e:
179+
# [Errno 121] Remote I/O error
180+
LH.warning("[%s] Failed to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
181+
time.sleep(0.001)
182+
except RuntimeError as e:
183+
LH.warning("[%s] Unable to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
184+
time.sleep(0.001)
185+
else:
186+
raise RuntimeError(f"Unable to read register 0x{register:02X} after {cur_try} attempts. Giving up.")
187+
188+
def write_register(self, register: int, value: int, byte_count: int = 1, max_tries: int = 3):
189+
"""
190+
write a single register to I²C device identified by `i2c_adr`
191+
- may raise a RuntimeError if it was not possible to acquire
192+
the bus within allowed time
193+
- may raise a RuntimeError if there were too many errors
194+
"""
195+
_validate_inputs(register=register, value=value, byte_count=byte_count, max_tries=max_tries)
196+
if byte_count > 1:
197+
LH.warning("Multi byte writes are not implemented yet! Returning a single byte instead.")
198+
byte_count = 1
199+
for cur_try in range(1, 1 + max_tries):
200+
try:
201+
buf = bytearray(1 + byte_count)
202+
buf[0] = register
203+
buf[1] = value & 0xFF
204+
# TODO properly handle multi byte reads
205+
self._i2c_bus.writeto(address=self._i2c_adr, buffer=buf)
206+
return
207+
except OSError as e:
208+
# [Errno 121] Remote I/O error
209+
LH.warning("[%s] Failed to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
210+
time.sleep(0.1)
211+
except RuntimeError as e:
212+
LH.warning("[%s] Unable to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
213+
time.sleep(0.1)
214+
else:
215+
raise RuntimeError(f"Unable to read register 0x{register:02X} after {cur_try} attempts. Giving up.")
216+
217+
218+
def _validate_inputs(register: int, value: int, byte_count: int = 1, max_tries: int = 3):
219+
if register < 0 or register > 255:
220+
raise ValueError(f"Provided I²C device register {register} is out of range! (allowed range: 0 ≤ x ≤ 255)")
221+
max_value = pow(256, byte_count) - 1
222+
if value < 0 or value > max_value:
223+
raise ValueError(f"Provided value {register} is out of range! (allowed range: 0 ≤ x ≤ {max_value})")
224+
if byte_count < 1:
225+
raise ValueError(f"byte count must be at least 1 (value: {byte_count})")
226+
if max_tries < 0:
227+
raise ValueError(f"Provided max tries value {max_tries} is out of range! (allowed range: 0 ≤ x)")

0 commit comments

Comments
 (0)