Skip to content

Commit 10f1cb0

Browse files
committed
Forgot to add the actual asyncgpio.test module. Again. Sigh.
1 parent 0af8139 commit 10f1cb0

File tree

1 file changed

+197
-0
lines changed

1 file changed

+197
-0
lines changed

asyncgpio/test.py

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
"""
2+
This module contains helpers for testing async gpio, via the Linux kernel's
3+
``gpio_mockup`` module (writing) and ``/sys/kernel/debug/cpio`` (monitoring).
4+
"""
5+
6+
import os
7+
import re
8+
import anyio
9+
import logging
10+
import errno
11+
12+
from contextlib import asynccontextmanager
13+
from collections import namedtuple
14+
15+
logger = logging.getLogger(__name__)
16+
17+
_r_chip = re.compile(
18+
"^(?P<chip>[a-z0-9]+): GPIOs (?P<base>[0-9]+)-(?:.*, (?P<name>[-_a-zA-Z0-9]+): *$)?"
19+
)
20+
_r_pin = re.compile("^gpio-(?P<pin>[0-9]+) \\(.*\\) (?P<dir>in|out) +(?P<val>hi|lo)")
21+
22+
Pin = namedtuple("Pin",["out","level"])
23+
24+
class _GpioPin:
25+
"""
26+
Code representing one GPIO pin.
27+
"""
28+
29+
fd = None
30+
31+
def __init__(self, watcher: "GpioWatcher", chip: str, pin: int):
32+
self.watcher = watcher
33+
self.chip = chip
34+
self.pin = pin
35+
self.mon = set()
36+
self.state = (None, None)
37+
try:
38+
self.fd = os.open(
39+
os.path.join(watcher.debugfs_path, "gpio-mockup-event", chip, str(pin)),
40+
os.O_WRONLY,
41+
)
42+
except EnvironmentError as exc:
43+
if exc.errno != errno.ENOENT:
44+
raise
45+
46+
def __del__(self):
47+
if self.fd is not None:
48+
os.close(self.fd)
49+
del self.fd
50+
51+
@asynccontextmanager
52+
async def watch(self):
53+
"""
54+
An async context manager that returns an iterator for changes of
55+
this pin.
56+
57+
Values are (out,level) tuples of bool, with "out" and "high"
58+
represented as True.
59+
"""
60+
q = anyio.create_queue(10)
61+
self.mon.add(q)
62+
try:
63+
yield q
64+
finally:
65+
self.mon.remove(q)
66+
67+
async def see(self, write: bool, level: bool):
68+
s = (write, level)
69+
if self.state == s:
70+
return
71+
self.state = s
72+
logger.debug("SEE %s %d %s", self.chip, self.pin, self.state)
73+
for cb in list(self.mon):
74+
await cb.put(s)
75+
76+
def set(self, value: bool):
77+
logger.debug("SET %s %d %s", self.chip, self.pin, value)
78+
if self.fd is None:
79+
raise RuntimeError("Pin %s/%d is not controlled via the 'gpio_mockup' module" % (self.chip,self.pin))
80+
os.write(self.fd, b"1" if value else b"0")
81+
# os.lseek(self.fd, 0, os.SEEK_SET)
82+
83+
84+
class GpioWatcher:
85+
"""
86+
Code which triggers callbacks whenever a GPIO pin changes.
87+
88+
This class polls `/sys/kernel/debug/gpio` (can be overridden).
89+
"""
90+
91+
def __init__(
92+
self,
93+
interval: float = 0.2,
94+
debugfs_path: str = "/sys/kernel/debug",
95+
sysfs_path: str = "/sys",
96+
):
97+
self.interval = interval
98+
self.gpio = open(os.path.join(debugfs_path, "gpio"), "r")
99+
self.targets = dict() # chip > line > _GpioPin
100+
# self.names = {}
101+
self.sysfs_path = sysfs_path
102+
self.debugfs_path = debugfs_path
103+
gpio_dir = os.path.join(sysfs_path, "class", "gpio")
104+
105+
# for d in os.listdir(gpio_dir):
106+
# try:
107+
# with open(os.path.join(gpio_dir,d,"label"),"r") as f:
108+
# n = f.read().strip()
109+
# except EnvironmentError as e:
110+
# if e.errno == errno.ENOTDIR:
111+
# continue
112+
# raise
113+
# else:
114+
# self.names[d] = n
115+
116+
def monitor(self, chip: str, pin: int):
117+
"""
118+
Shortcut for 'self.pin(chip, pin).watch()'.
119+
"""
120+
return self.pin(chip, pin).watch()
121+
122+
def pin(self, chip: str, pin: int, create: bool = True):
123+
"""
124+
Returns a pins corresponding GpioPin
125+
"""
126+
# chip = self.names[chip]
127+
try:
128+
c = self.targets[chip]
129+
except KeyError:
130+
if not create:
131+
raise
132+
self.targets[chip] = c = dict()
133+
try:
134+
p = c[pin]
135+
except KeyError:
136+
if not create:
137+
raise
138+
c[pin] = p = _GpioPin(self, chip, pin)
139+
return p
140+
141+
async def _watch(self):
142+
# The actual monitor.
143+
while True:
144+
await self.check_pins()
145+
await anyio.sleep(self.interval)
146+
147+
async def check_pins(self):
148+
"""
149+
Read the GPIO debug file and update pin states
150+
"""
151+
chip = None
152+
base = None
153+
154+
for l in self.gpio:
155+
l = l.strip()
156+
if not l:
157+
chip = None
158+
continue
159+
if chip is None:
160+
r = _r_chip.match(l)
161+
if not r:
162+
raise ValueError(l)
163+
chip = r.group("name")
164+
if not chip:
165+
chip = r.group("chip")
166+
base = int(r.group("base"))
167+
else:
168+
r = _r_pin.match(l)
169+
if not r:
170+
breakpoint()
171+
raise ValueError(l)
172+
pin = int(r.group("pin")) - base
173+
out = r.group("dir") == "out"
174+
val = r.group("val") == "hi"
175+
176+
try:
177+
pin = self.pin(chip, pin, create=False)
178+
except KeyError:
179+
pass
180+
else:
181+
await pin.see(out, val)
182+
self.gpio.seek(0)
183+
184+
@asynccontextmanager
185+
async def run(self):
186+
"""
187+
This async context manager controls the monitoring loop.
188+
"""
189+
async with anyio.create_task_group() as tg:
190+
self.tg = tg
191+
await tg.spawn(self._watch)
192+
try:
193+
yield self
194+
finally:
195+
self.tg = None
196+
await tg.cancel_scope.cancel()
197+

0 commit comments

Comments
 (0)