diff --git a/doc/configuration.rst b/doc/configuration.rst index 60bfbb494..aa809670f 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -547,6 +547,26 @@ Arguments: Used by: - `HIDRelayDriver`_ +DenkoviRelay +++++++++ +An :any:`DenkoviRelay` resource describes a single output of a Denkovi relay board. + +.. code-block:: yaml + + DenkoviRelay: + index: 2 + invert: false + match: + ID_PATH: 'pci-0000:00:14.0-usb-0:2:1.0' + +Arguments: + - index (int, default=1): number of the relay to use + - invert (bool, default=False): whether to invert the relay + - match (dict): key and value pairs for a udev match, see `udev Matching`_ + +Used by: + - `DenkoviRelayDriver`_ + HttpDigitalOutput +++++++++++++++++ An :any:`HttpDigitalOutput` resource describes a generic digital output that @@ -2415,6 +2435,26 @@ Implements: Arguments: - None +DenkoviRelayDriver +~~~~~~~~~~~~~~ +An :any:`DenkoviRelayDriver` controls an `DenkoviRelay`_ or `NetworkDenkoviRelay`_ resource. +It can set and get the current state of the resource. + +Binds to: + relay: + - `DenkoviRelay`_ + - `NetworkDenkoviRelay`_ + +Implements: + - :any:`DigitalOutputProtocol` + +.. code-block:: yaml + + DenkoviRelayDriver: {} + +Arguments: + - None + ManualSwitchDriver ~~~~~~~~~~~~~~~~~~ A :any:`ManualSwitchDriver` requires the user to control a switch or jumper on diff --git a/labgrid/driver/__init__.py b/labgrid/driver/__init__.py index 721256bbf..31a387853 100644 --- a/labgrid/driver/__init__.py +++ b/labgrid/driver/__init__.py @@ -35,6 +35,7 @@ from .lxausbmuxdriver import LXAUSBMuxDriver from .pyvisadriver import PyVISADriver from .usbhidrelay import HIDRelayDriver +from .denkovirelay import DenkoviRelayDriver from .flashscriptdriver import FlashScriptDriver from .usbaudiodriver import USBAudioInputDriver from .usbvideodriver import USBVideoDriver diff --git a/labgrid/driver/denkovirelay.py b/labgrid/driver/denkovirelay.py new file mode 100644 index 000000000..acd2020b4 --- /dev/null +++ b/labgrid/driver/denkovirelay.py @@ -0,0 +1,48 @@ +import attr + +from .common import Driver +from ..factory import target_factory +from ..resource.remote import NetworkDenkoviRelay +from ..step import step +from ..protocol import DigitalOutputProtocol +from ..util.agentwrapper import AgentWrapper + + +@target_factory.reg_driver +@attr.s(eq=False) +class DenkoviRelayDriver(Driver, DigitalOutputProtocol): + bindings = { + "relay": {"DenkoviRelay", NetworkDenkoviRelay}, + } + + def __attrs_post_init__(self): + super().__attrs_post_init__() + self.wrapper = None + + def on_activate(self): + if isinstance(self.relay, NetworkDenkoviRelay): + host = self.relay.host + else: + host = None + self.wrapper = AgentWrapper(host) + self.proxy = self.wrapper.load("denkovi_relay") + + def on_deactivate(self): + self.wrapper.close() + self.wrapper = None + self.proxy = None + + @Driver.check_active + @step(args=["status"]) + def set(self, status): + if self.relay.invert: + status = not status + self.proxy.set(self.relay.busnum, self.relay.devnum, self.relay.index, status) + + @Driver.check_active + @step(result=True) + def get(self): + status = self.proxy.get(self.relay.busnum, self.relay.devnum, self.relay.index) + if self.relay.invert: + status = not status + return status diff --git a/labgrid/remote/client.py b/labgrid/remote/client.py index 27108a7b4..5a886f506 100755 --- a/labgrid/remote/client.py +++ b/labgrid/remote/client.py @@ -925,7 +925,7 @@ def digital_io(self): name = self.args.name target = self._get_target(place) from ..resource import ModbusTCPCoil, OneWirePIO, HttpDigitalOutput, WaveshareModbusTCPCoil - from ..resource.remote import NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO, NetworkHIDRelay + from ..resource.remote import NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO, NetworkHIDRelay, NetworkDenkoviRelay drv = None try: @@ -950,6 +950,8 @@ def digital_io(self): drv = self._get_driver_or_new(target, "LXAIOBusPIODriver", name=name) elif isinstance(resource, NetworkHIDRelay): drv = self._get_driver_or_new(target, "HIDRelayDriver", name=name) + elif isinstance(resource, NetworkDenkoviRelay): + drv = self._get_driver_or_new(target, "DenkoviRelayDriver", name=name) if drv: break diff --git a/labgrid/remote/exporter.py b/labgrid/remote/exporter.py index d3b406503..22335e968 100755 --- a/labgrid/remote/exporter.py +++ b/labgrid/remote/exporter.py @@ -526,6 +526,26 @@ def _get_params(self): } +@attr.s(eq=False) +class DenkoviRelayExport(USBGenericExport): + """ResourceExport for outputs on Denkovi relays""" + + def __attrs_post_init__(self): + super().__attrs_post_init__() + + def _get_params(self): + """Helper function to return parameters""" + return { + "host": self.host, + "busnum": self.local.busnum, + "devnum": self.local.devnum, + "path": self.local.path, + "vendor_id": self.local.vendor_id, + "model_id": self.local.model_id, + "index": self.local.index, + } + + @attr.s(eq=False) class USBFlashableExport(USBGenericExport): """ResourceExport for Flashable USB devices""" diff --git a/labgrid/resource/__init__.py b/labgrid/resource/__init__.py index 6ec9d5db8..7a6219a81 100644 --- a/labgrid/resource/__init__.py +++ b/labgrid/resource/__init__.py @@ -13,6 +13,7 @@ DFUDevice, DeditecRelais8, HIDRelay, + DenkoviRelay, IMXUSBLoader, LXAUSBMux, MatchedSysfsGPIO, diff --git a/labgrid/resource/remote.py b/labgrid/resource/remote.py index a29e58ee8..d245c6476 100644 --- a/labgrid/resource/remote.py +++ b/labgrid/resource/remote.py @@ -333,6 +333,17 @@ def __attrs_post_init__(self): super().__attrs_post_init__() +@target_factory.reg_resource +@attr.s(eq=False) +class NetworkDenkoviRelay(RemoteUSBResource): + """The NetworkDenkoviRelay describes a remotely accessible USB relay port""" + index = attr.ib(default=1, validator=attr.validators.instance_of(int)) + invert = attr.ib(default=False, validator=attr.validators.instance_of(bool)) + def __attrs_post_init__(self): + self.timeout = 10.0 + super().__attrs_post_init__() + + @target_factory.reg_resource @attr.s(eq=False) class NetworkSysfsGPIO(NetworkResource, ManagedResource): diff --git a/labgrid/resource/suggest.py b/labgrid/resource/suggest.py index 707779bf8..c7b89eb02 100644 --- a/labgrid/resource/suggest.py +++ b/labgrid/resource/suggest.py @@ -21,6 +21,7 @@ USBAudioInput, LXAUSBMux, HIDRelay, + DenkoviRelay, USBDebugger, USBPowerPort, MatchedSysfsGPIO @@ -55,6 +56,7 @@ def __init__(self, args): self.resources.append(USBAudioInput(**args)) self.resources.append(LXAUSBMux(**args)) self.resources.append(HIDRelay(**args)) + self.resources.append(DenkoviRelay(**args)) self.resources.append(USBDebugger(**args)) self.resources.append(USBPowerPort(**args, index=0)) self.resources.append(MatchedSysfsGPIO(**args, pin=0)) diff --git a/labgrid/resource/udev.py b/labgrid/resource/udev.py index 22c0e5e03..a98f1ae7e 100644 --- a/labgrid/resource/udev.py +++ b/labgrid/resource/udev.py @@ -706,6 +706,19 @@ def filter_match(self, device): return super().filter_match(device) + +@target_factory.reg_resource +@attr.s(eq=False) +class DenkoviRelay(USBResource): + index = attr.ib(default=1, validator=attr.validators.instance_of(int)) + invert = attr.ib(default=False, validator=attr.validators.instance_of(bool)) + + def __attrs_post_init__(self): + self.match['ID_VENDOR'] = 'FTDI' + self.match['ID_MODEL'] = 'FT245R_USB_FIFO' + super().__attrs_post_init__() + + @target_factory.reg_resource @attr.s(eq=False) class USBFlashableDevice(USBResource): diff --git a/labgrid/util/agents/denkovi_relay.py b/labgrid/util/agents/denkovi_relay.py new file mode 100644 index 000000000..1296f0477 --- /dev/null +++ b/labgrid/util/agents/denkovi_relay.py @@ -0,0 +1,114 @@ +""" +This module implements the communication protocol to switch the digital outputs +on a Denkovi USB Relay. + +Supported Functionality: + +- Turn digital output on and off +""" + +import usb.core +import usb.util + +from pylibftdi import BitBangDevice +from threading import Lock + + +class DenkoviRelay: + lock = Lock() + + def __init__(self, **args): + self._dev = usb.core.find(**args) + + if self._dev is None: + raise ValueError("Device not found") + + self._serialNumber = usb.util.get_string(self._dev, self._dev.iSerialNumber) + + if self._serialNumber is None: + raise ValueError("Failed to get device serial number") + + self.lock.acquire() + + bitbangDev = BitBangDevice(self._serialNumber) + + if bitbangDev is None: + raise ValueError("Failed to instantiate bitbang device") + + bitbangDev.direction = 0xFF + + bitbangDev.close() + + self.lock.release() + + def set_output(self, number, status): + assert 1 <= number <= 8 + number = number - 1 + + self.lock.acquire() + + bitbangDev = BitBangDevice(self._serialNumber) + + if bitbangDev is None: + raise ValueError("Failed to instantiate bitbang device") + + if status: + bitbangDev.port = bitbangDev.port | (1 << number) + else: + bitbangDev.port = bitbangDev.port & ~(1 << number) + + bitbangDev.close() + + self.lock.release() + + def get_output(self, number): + assert 1 <= number <= 8 + number = number - 1 + + val = None + + self.lock.acquire() + + bitbangDev = BitBangDevice(self._serialNumber) + + if bitbangDev is None: + raise ValueError("Failed to instantiate bitbang device") + + if bitbangDev.port & (1 << number): + val = True + else: + val = False + + bitbangDev.close() + + self.lock.release() + + return val + + def __del__(self): + usb.util.release_interface(self._dev, 0) + + +_relays = {} + + +def _get_relay(busnum, devnum): + if (busnum, devnum) not in _relays: + _relays[(busnum, devnum)] = DenkoviRelay(bus=busnum, address=devnum) + return _relays[(busnum, devnum)] + + +def handle_set(busnum, devnum, number, status): + relay = _get_relay(busnum, devnum) + relay.set_output(number, status) + + +def handle_get(busnum, devnum, number): + relay = _get_relay(busnum, devnum) + return relay.get_output(number) + + +methods = { + "set": handle_set, + "get": handle_get, +} diff --git a/pyproject.toml b/pyproject.toml index 33caa6f31..e3953d2f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,7 @@ doc = [ "Sphinx>=2.0.0", ] docker = ["docker>=5.0.2"] +libftdi = ["pylibftdi>=0.23.0"] graph = ["graphviz>=0.17.0"] kasa = ["python-kasa>=0.7.0"] modbus = ["pyModbusTCP>=0.2.0"]