diff --git a/doc/configuration.rst b/doc/configuration.rst index 4a3ffea66..a30d2e367 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -556,9 +556,11 @@ A :any:`SysfsGPIO` resource describes a GPIO line. SysfsGPIO: index: 12 + invert: False Arguments: - index (int): index of the GPIO line + - invert (bool, default=False): optional, whether the logic level is inverted(active-low) Used by: - `GpioDigitalOutputDriver`_ @@ -577,6 +579,7 @@ USB based gpiochips. '@SUBSYSTEM': 'usb' '@ID_SERIAL_SHORT': 'D38EJ8LF' pin: 0 + invert: False The example would search for a USB gpiochip with the key `ID_SERIAL_SHORT` and the value `D38EJ8LF` and use the pin 0 of this device. @@ -585,6 +588,7 @@ The `ID_SERIAL_SHORT` property is set by the usb_id builtin helper program. Arguments: - match (dict): key and value pairs for a udev match, see `udev Matching`_ - pin (int): gpio pin number within the matched gpiochip. + - invert (bool, default=False): optional, whether the logic level is inverted (active-low) Used by: - `GpioDigitalOutputDriver`_ @@ -2213,6 +2217,72 @@ Implements: Arguments: - delay (float, default=2.0): delay in seconds between off and on +ManualButtonDriver +~~~~~~~~~~~~~~~~~~ +A :any:`ManualButtonDriver` requires the user to control the taget button. +This is required if a strategy is used with the target, but no automatic +button control is available. + +The driver's name will be displayed during interaction. + +Binds to: + - None + +Implements: + - :any:`ButtonProtocol` + +.. code-block:: yaml + + ManualButtonDriver: + name: 'example-board' + +Arguments: + - None + +ExternalButtonDriver +~~~~~~~~~~~~~~~~~~~~ +An :any:`ExternalButtonDriver` is used to control a target button via an +external command. + +Binds to: + - None + +Implements: + - :any:`ButtonProtocol` + +.. code-block:: yaml + + ExternalButtonDriver: + cmd_press: 'example_command press and hold' + cmd_release: 'example_command release' + cmd_press_for: 'example_command press_for' + delay: 2.0 + +Arguments: + - cmd_press (str): command to press and hold the button on the board + - cmd_release (str): command to release the button on the board + - cmd_press_for (str): command to press, pause, and release the button on the board + - delay (float, default=1.0): delay in seconds when calling press_for + +DigitalOutputButtonDriver +~~~~~~~~~~~~~~~~~~~~~~~~~ +A :any:`DigitalOutputButtonDriver` is used to control a target button via a +DigitalOutputDriver + +Binds to: + - :any:`DigitalOutputProtocol` + +Implements: + - :any:`ButtonProtocol` + +.. code-block:: yaml + + DigitalOutputButtonDriver: + delay: 2.0 + +Arguments: + - delay (float, default=1.0): delay in seconds when calling press_for + GpioDigitalOutputDriver ~~~~~~~~~~~~~~~~~~~~~~~ The :any:`GpioDigitalOutputDriver` writes a digital signal to a GPIO line. @@ -2228,13 +2298,17 @@ Binds to: Implements: - :any:`DigitalOutputProtocol` + - :any:`ResetProtocol` + - :any:`PowerProtocol` + - :any:`ButtonProtocol` .. code-block:: yaml - GpioDigitalOutputDriver: {} + GpioDigitalOutputDriver: + delay: 2.0 Arguments: - - None + - delay (float, default=1.0): delay in seconds between off and on for a power cycle or between states for button press_for SerialPortDigitalOutputDriver ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/examples/sysfsgpio/export-gpio.yaml b/examples/sysfsgpio/export-gpio.yaml index 489befd16..808a82c2a 100644 --- a/examples/sysfsgpio/export-gpio.yaml +++ b/examples/sysfsgpio/export-gpio.yaml @@ -1,3 +1,4 @@ desk: - GpioDigitalOutputDriver: + SysfsGPIO: index: 60 + invert: False diff --git a/examples/sysfsgpio/import-gpio.yaml b/examples/sysfsgpio/import-gpio.yaml index 4ba7b223f..40954a448 100644 --- a/examples/sysfsgpio/import-gpio.yaml +++ b/examples/sysfsgpio/import-gpio.yaml @@ -5,5 +5,3 @@ targets: name: gpio drivers: GpioDigitalOutputDriver: {} -options: - coordinator_address: 'labgrid:20408' diff --git a/examples/sysfsgpio/sysfsgpio.py b/examples/sysfsgpio/sysfsgpio.py index 98acf0ac1..c16bca198 100644 --- a/examples/sysfsgpio/sysfsgpio.py +++ b/examples/sysfsgpio/sysfsgpio.py @@ -13,11 +13,12 @@ StepLogger.start() t = Target("main") -r = SysfsGPIO(t, name=None, index=60) +r = SysfsGPIO(t, name=None, index=60, invert=True) d = GpioDigitalOutputDriver(t, name=None) p = t.get_driver("DigitalOutputProtocol") print(t.resources) +print("Testing IO") p.set(True) print(p.get()) time.sleep(2) @@ -26,3 +27,34 @@ time.sleep(2) p.set(True) print(p.get()) +time.sleep(2) +p.invert() +print(p.get()) +time.sleep(2) +p.invert() +print(p.get()) +time.sleep(2) + +print("Testing Power") +p.off() +print(p.get()) +time.sleep(2) +p.on() +print(p.get()) +time.sleep(2) +p.cycle() +print(p.get()) +time.sleep(2) + +print("Testing Button") +p.release() +print(p.get()) +time.sleep(2) +p.press() +print(p.get()) +time.sleep(2) +p.release() +print(p.get()) +time.sleep(2) +p.press_for() +print(p.get()) diff --git a/examples/sysfsgpio/sysfsgpio_remote.py b/examples/sysfsgpio/sysfsgpio_remote.py index 4b16b8466..a1d016742 100644 --- a/examples/sysfsgpio/sysfsgpio_remote.py +++ b/examples/sysfsgpio/sysfsgpio_remote.py @@ -15,6 +15,7 @@ p = t.get_driver("DigitalOutputProtocol") print(t.resources) +print("Testing IO") p.set(True) print(p.get()) time.sleep(2) @@ -23,3 +24,34 @@ time.sleep(2) p.set(True) print(p.get()) +time.sleep(2) +p.invert() +print(p.get()) +time.sleep(2) +p.invert() +print(p.get()) +time.sleep(2) + +print("Testing Power") +p.off() +print(p.get()) +time.sleep(2) +p.on() +print(p.get()) +time.sleep(2) +p.cycle() +print(p.get()) +time.sleep(2) + +print("Testing Button") +p.release() +print(p.get()) +time.sleep(2) +p.press() +print(p.get()) +time.sleep(2) +p.release() +print(p.get()) +time.sleep(2) +p.press_for() +print(p.get()) diff --git a/labgrid/driver/__init__.py b/labgrid/driver/__init__.py index 721256bbf..6eee7d23a 100644 --- a/labgrid/driver/__init__.py +++ b/labgrid/driver/__init__.py @@ -16,6 +16,8 @@ DigitalOutputPowerDriver, YKUSHPowerDriver, \ USBPowerDriver, SiSPMPowerDriver, NetworkPowerDriver, \ PDUDaemonDriver +from .buttondriver import ManualButtonDriver, ExternalButtonDriver, \ + DigitalOutputButtonDriver from .usbloader import MXSUSBDriver, IMXUSBDriver, BDIMXUSBDriver, RKUSBDriver, UUUDriver from .usbsdmuxdriver import USBSDMuxDriver from .usbsdwiredriver import USBSDWireDriver diff --git a/labgrid/driver/buttondriver.py b/labgrid/driver/buttondriver.py new file mode 100644 index 000000000..066c688f6 --- /dev/null +++ b/labgrid/driver/buttondriver.py @@ -0,0 +1,107 @@ +import shlex +import time +import math +from importlib import import_module + +import attr + +from ..factory import target_factory +from ..protocol import ButtonProtocol, DigitalOutputProtocol +from ..step import step +from ..util.proxy import proxymanager +from ..util.helper import processwrapper +from .common import Driver +from .exception import ExecutionError + + +@target_factory.reg_driver +@attr.s(eq=False) +class ManualButtonDriver(Driver, ButtonProtocol): + """ManualButtonDriver - Driver to tell the user to control a target's button""" + + @Driver.check_active + @step() + def press(self): + self.target.interact( + f"Press and hold the button on target {self.target.name} and press enter" + ) + + @Driver.check_active + @step() + def release(self): + self.target.interact( + f"Release the button on the target {self.target.name} press enter" + ) + + @Driver.check_active + @step() + def press_for(self): + self.target.interact( + f"Press and then Release the button on target {self.target.name} for {self.delay} seconds and press enter" + ) + +@target_factory.reg_driver +@attr.s(eq=False) +class ExternalButtonDriver(Driver, ButtonProtocol): + """ExternalButtonDriver - Driver using an external command to control a target's button""" + cmd_press = attr.ib(validator=attr.validators.instance_of(str)) + cmd_release = attr.ib(validator=attr.validators.instance_of(str)) + cmd_press_for = attr.ib(validator=attr.validators.instance_of(str)) + delay = attr.ib(default=1.0, validator=attr.validators.instance_of(float)) + + @Driver.check_active + @step() + def press(self): + cmd = shlex.split(self.cmd_press) + processwrapper.check_output(cmd) + + @Driver.check_active + @step() + def release(self): + cmd = shlex.split(self.cmd_release) + processwrapper.check_output(cmd) + + @Driver.check_active + @step() + def press_for(self): + if self.cmd_press_for is not None: + cmd = shlex.split(self.cmd_press_for) + processwrapper.check_output(cmd) + else: + self.press() + time.sleep(self.delay) + self.release() + +@target_factory.reg_driver +@attr.s(eq=False) +class DigitalOutputButtonDriver(Driver, ButtonProtocol): + """ + DigitalOutputButtonDriver uses a DigitalOutput to control a button + """ + bindings = {"output": DigitalOutputProtocol, } + delay = attr.ib(default=1.0, validator=attr.validators.instance_of(float)) + + def __attrs_post_init__(self): + super().__attrs_post_init__() + + @Driver.check_active + @step() + def press(self): + self.output.set(True) + + @Driver.check_active + @step() + def release(self): + self.output.set(False) + + @Driver.check_active + @step() + def press_for(self): + self.press() + time.sleep(self.delay) + self.release() + + @Driver.check_active + @step() + def get(self): + return self.output.get() diff --git a/labgrid/driver/gpiodriver.py b/labgrid/driver/gpiodriver.py index 2de90618e..b640cce49 100644 --- a/labgrid/driver/gpiodriver.py +++ b/labgrid/driver/gpiodriver.py @@ -1,8 +1,9 @@ """All GPIO-related drivers""" import attr +import time from ..factory import target_factory -from ..protocol import DigitalOutputProtocol +from ..protocol import DigitalOutputProtocol, ResetProtocol, PowerProtocol, ButtonProtocol from ..resource.remote import NetworkSysfsGPIO from ..step import step from .common import Driver @@ -11,11 +12,12 @@ @target_factory.reg_driver @attr.s(eq=False) -class GpioDigitalOutputDriver(Driver, DigitalOutputProtocol): +class GpioDigitalOutputDriver(Driver, DigitalOutputProtocol, ResetProtocol, PowerProtocol, ButtonProtocol): bindings = { "gpio": {"SysfsGPIO", "NetworkSysfsGPIO"}, } + delay = attr.ib(default=1.0, validator=attr.validators.instance_of(float)) def __attrs_post_init__(self): super().__attrs_post_init__() @@ -37,9 +39,53 @@ def on_deactivate(self): @Driver.check_active @step(args=['status']) def set(self, status): - self.proxy.set(self.gpio.index, status) + self.proxy.set(self.gpio.index, self.gpio.invert, status) @Driver.check_active @step(result=True) def get(self): - return self.proxy.get(self.gpio.index) + return self.proxy.get(self.gpio.index, self.gpio.invert) + + @Driver.check_active + @step(result=True) + def invert(self): + self.set(not self.get()) + + @Driver.check_active + @step(result=True) + def reset(self): + self.cycle() + + @Driver.check_active + @step(result=True) + def on(self): + self.set(True) + + @Driver.check_active + @step(result=True) + def off(self): + self.set(False) + + @Driver.check_active + @step(result=True) + def cycle(self): + self.off() + time.sleep(self.delay) + self.on() + + @Driver.check_active + @step(result=True) + def press(self): + self.set(True) + + @Driver.check_active + @step(result=True) + def release(self): + self.set(False) + + @Driver.check_active + @step(result=True) + def press_for(self): + self.press() + time.sleep(self.delay) + self.release() diff --git a/labgrid/protocol/__init__.py b/labgrid/protocol/__init__.py index 0ac225622..749539c81 100644 --- a/labgrid/protocol/__init__.py +++ b/labgrid/protocol/__init__.py @@ -3,6 +3,7 @@ from .consoleprotocol import ConsoleProtocol from .linuxbootprotocol import LinuxBootProtocol from .powerprotocol import PowerProtocol +from .buttonprotocol import ButtonProtocol from .filetransferprotocol import FileTransferProtocol from .infoprotocol import InfoProtocol from .digitaloutputprotocol import DigitalOutputProtocol diff --git a/labgrid/protocol/buttonprotocol.py b/labgrid/protocol/buttonprotocol.py new file mode 100644 index 000000000..abc4a292e --- /dev/null +++ b/labgrid/protocol/buttonprotocol.py @@ -0,0 +1,25 @@ +import abc + + +class ButtonProtocol(abc.ABC): + """Abstract class providing the ButtonProtocol interface""" + + @abc.abstractmethod + def press(self): + """Implementations should "press and hold" the button.""" + raise NotImplementedError + + @abc.abstractmethod + def release(self): + """Implementations should "release" the button""" + raise NotImplementedError + + @abc.abstractmethod + def press_for(self, time: float): + """Implementations should "press" the button for time seconds and then "release" the button again""" + raise NotImplementedError + + @abc.abstractmethod + def get(self): + """Implementations should return the status of the button""" + raise NotImplementedError diff --git a/labgrid/protocol/digitaloutputprotocol.py b/labgrid/protocol/digitaloutputprotocol.py index 6b1ce7e23..eec3f6b45 100644 --- a/labgrid/protocol/digitaloutputprotocol.py +++ b/labgrid/protocol/digitaloutputprotocol.py @@ -13,3 +13,7 @@ def get(self): def set(self, status): """Implementations should set the status of the digital output""" raise NotImplementedError + + @abc.abstractmethod + def invert(self): + """Implementations should invert the the status of the digital output""" diff --git a/labgrid/remote/client.py b/labgrid/remote/client.py index a55113f67..87247009d 100755 --- a/labgrid/remote/client.py +++ b/labgrid/remote/client.py @@ -866,7 +866,7 @@ def power(self): name = self.args.name target = self._get_target(place) from ..resource.power import NetworkPowerPort, PDUDaemonPort - from ..resource.remote import NetworkUSBPowerPort, NetworkSiSPMPowerPort + from ..resource.remote import NetworkUSBPowerPort, NetworkSiSPMPowerPort, NetworkSysfsGPIO from ..resource import TasmotaPowerPort, NetworkYKUSHPowerPort drv = None @@ -888,6 +888,8 @@ def power(self): drv = self._get_driver_or_new(target, "TasmotaPowerDriver", name=name) elif isinstance(resource, NetworkYKUSHPowerPort): drv = self._get_driver_or_new(target, "YKUSHPowerDriver", name=name) + elif isinstance(resource, NetworkSysfsGPIO): + drv = self._get_driver_or_new(target, "GpioDigitalOutputDriver", name=name) if drv: break @@ -899,6 +901,32 @@ def power(self): if action == "get": print(f"power{' ' + name if name else ''} for place {place.name} is {'on' if res else 'off'}") + def button(self): + place = self.get_acquired_place() + action = self.args.action + delay = self.args.delay + name = self.args.name + target = self._get_target(place) + from ..resource.remote import NetworkSysfsGPIO + + drv = None + try: + drv = target.get_driver("ButtonProtocol", name=name) + except NoDriverFoundError: + for resource in target.resources: + if isinstance(resource, NetworkSysfsGPIO): + drv = self._get_driver_or_new(target, "GpioDigitalOutputDriver", name=name) + if drv: + break + + if not drv: + raise UserError("target has no compatible resource available") + if delay is not None: + drv.delay = delay + res = getattr(drv, action)() + if action == "get": + print(f"button{' ' + name if name else ''} for place {place.name} is {'pressed' if res else 'released'}") + def digital_io(self): place = self.get_acquired_place() action = self.args.action @@ -937,6 +965,8 @@ def digital_io(self): drv.set(True) elif action == "low": drv.set(False) + elif action == "invert": + drv.invert() async def _console(self, place, target, timeout, *, logfile=None, loop=False, listen_only=False): name = self.args.name @@ -1821,8 +1851,16 @@ def main(): subparser.add_argument("--name", "-n", help="optional resource name") subparser.set_defaults(func=ClientSession.power) + subparser = subparsers.add_parser("button", help="change (or get) a place's button status") + subparser.add_argument("action", choices=["press", "release", "press_for", "get"]) + subparser.add_argument( + "-t", "--delay", type=float, default=None, help="wait time in seconds between the press and release during press_for" + ) + subparser.add_argument("--name", "-n", help="optional resource name") + subparser.set_defaults(func=ClientSession.button) + subparser = subparsers.add_parser("io", help="change (or get) a digital IO status") - subparser.add_argument("action", choices=["high", "low", "get"], help="action") + subparser.add_argument("action", choices=["high", "low", "invert", "get"], help="action") subparser.add_argument("name", help="optional resource name", nargs="?") subparser.set_defaults(func=ClientSession.digital_io) diff --git a/labgrid/remote/exporter.py b/labgrid/remote/exporter.py index d3b406503..c4a635232 100755 --- a/labgrid/remote/exporter.py +++ b/labgrid/remote/exporter.py @@ -642,16 +642,19 @@ def _get_params(self): return { "host": self.host, "index": self.local.index, + "invert": self.local.invert, } def _get_start_params(self): return { "index": self.local.index, + "invert": self.local.invert, } def _start(self, start_params): """Start a GPIO export to userspace""" index = start_params["index"] + invert = start_params["invert"] if self.export_path.exists(): self.system_exported = True @@ -672,7 +675,6 @@ def _stop(self, start_params): with open(export_sysfs_path, mode="wb") as unexport: unexport.write(str(index).encode("utf-8")) - exports["SysfsGPIO"] = GPIOSysFSExport exports["MatchedSysfsGPIO"] = GPIOSysFSExport diff --git a/labgrid/resource/base.py b/labgrid/resource/base.py index d8cdb984c..0df31a8f1 100644 --- a/labgrid/resource/base.py +++ b/labgrid/resource/base.py @@ -42,5 +42,7 @@ class SysfsGPIO(Resource): """The basic SysfsGPIO contains an index Args: - index (int): index of target gpio line.""" + index (int): index of target gpio line. + invert (bool) : optional, whether the logic level is inverted (active-low)""" index = attr.ib(default=None, validator=attr.validators.instance_of(int)) + invert = attr.ib(default=False, validator=attr.validators.instance_of(bool)) diff --git a/labgrid/resource/remote.py b/labgrid/resource/remote.py index a29e58ee8..205e135f3 100644 --- a/labgrid/resource/remote.py +++ b/labgrid/resource/remote.py @@ -339,7 +339,9 @@ class NetworkSysfsGPIO(NetworkResource, ManagedResource): manager_cls = RemotePlaceManager """The NetworkSysfsGPIO describes a remotely accessible gpio line""" + index = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(int))) + invert = attr.ib(default=False, validator=attr.validators.instance_of(bool)) def __attrs_post_init__(self): self.timeout = 10.0 super().__attrs_post_init__() diff --git a/labgrid/resource/udev.py b/labgrid/resource/udev.py index eb553cfb2..fd66d892d 100644 --- a/labgrid/resource/udev.py +++ b/labgrid/resource/udev.py @@ -758,8 +758,10 @@ class MatchedSysfsGPIO(USBResource): """The MatchedSysfsGPIO described a SysfsGPIO matched by Udev Args: - pin (int): gpio pin number within the matched gpiochip.""" + pin (int): gpio pin number within the matched gpiochip. + invert (bool): optional, whether the logic level is inverted (active-low)""" pin = attr.ib(default=None, validator=attr.validators.instance_of(int)) + invert = attr.ib(default=False, validator=attr.validators.instance_of(bool)) index = None def __attrs_post_init__(self): diff --git a/labgrid/util/agents/sysfsgpio.py b/labgrid/util/agents/sysfsgpio.py index 362eab5ce..cd8a903f8 100644 --- a/labgrid/util/agents/sysfsgpio.py +++ b/labgrid/util/agents/sysfsgpio.py @@ -2,6 +2,7 @@ This module implements switching GPIOs via sysfs GPIO kernel interface. Takes an integer property 'index' which refers to the already exported GPIO device. +Takes a boolean property 'invert' which inverts logical values if set to True (active-low) """ import logging @@ -23,7 +24,7 @@ def _assert_gpio_line_is_exported(index): if not os.path.exists(gpio_sysfs_path): raise ValueError("Device not found") - def __init__(self, index): + def __init__(self, index, invert): self._logger = logging.getLogger("Device: ") GpioDigitalOutput._assert_gpio_line_is_exported(index) gpio_sysfs_path = os.path.join(GpioDigitalOutput._gpio_sysfs_path_prefix, @@ -40,6 +41,10 @@ def __init__(self, index): gpio_sysfs_value_path = os.path.join(gpio_sysfs_path, 'value') self.gpio_sysfs_value_fd = os.open(gpio_sysfs_value_path, flags=(os.O_RDWR | os.O_SYNC)) + gpio_sysfs_active_low_path = os.path.join(gpio_sysfs_path, 'active_low') + with open(gpio_sysfs_active_low_path, 'w') as active_low_fd: + active_low_fd.write(str(int(invert))) + def __del__(self): os.close(self.gpio_sysfs_value_fd) self.gpio_sysfs_value_fd = None @@ -69,18 +74,18 @@ def set(self, status): _gpios = {} -def _get_gpio_line(index): +def _get_gpio_line(index, invert): if index not in _gpios: - _gpios[index] = GpioDigitalOutput(index=index) + _gpios[index] = GpioDigitalOutput(index=index, invert=invert) return _gpios[index] -def handle_set(index, status): - gpio_line = _get_gpio_line(index) +def handle_set(index, invert, status): + gpio_line = _get_gpio_line(index, invert) gpio_line.set(status) -def handle_get(index): - gpio_line = _get_gpio_line(index) +def handle_get(index, invert): + gpio_line = _get_gpio_line(index, invert) return gpio_line.get()