diff --git a/scc/controller.py b/scc/controller.py index ffec2585..6d4d954b 100644 --- a/scc/controller.py +++ b/scc/controller.py @@ -31,7 +31,7 @@ def __init__(self) -> None: self.time_elapsed = 0.0 - def get_type(self) -> None: + def get_type(self) -> str: """ This method has to return type identifier - short string without spaces that describes type of controller which should be unique for each diff --git a/scc/drivers/ds4drv.py b/scc/drivers/ds4drv.py index 8486da24..be6a3eff 100644 --- a/scc/drivers/ds4drv.py +++ b/scc/drivers/ds4drv.py @@ -5,10 +5,14 @@ import ctypes import logging +import os import sys from typing import TYPE_CHECKING +from usb1 import USBDeviceHandle + from scc.constants import STICK_PAD_MAX, STICK_PAD_MIN, ControllerFlags, SCButtons +from scc.controller import Controller from scc.drivers.evdevdrv import ( HAVE_EVDEV, EvdevController, @@ -28,9 +32,11 @@ HIDController, HIDDecoder, _lib, + button_to_bit, hiddrv_test, ) -from scc.drivers.usb import register_hotplug_device +from scc.drivers.usb import USBDevice, register_hotplug_device +from scc.lib.hidraw import HIDRaw from scc.tools import init_logging, set_logging_level if TYPE_CHECKING: @@ -45,7 +51,7 @@ DS4_V1_PRODUCT_ID = 0x05C4 -class DS4Controller(HIDController): +class DS4Controller(Controller): # Most of axes are the same BUTTON_MAP = ( SCButtons.X, @@ -73,6 +79,11 @@ class DS4Controller(HIDController): ) + def __init__(self, daemon: "SCCDaemon") -> None: + self.daemon = daemon + Controller.__init__(self) + + def _load_hid_descriptor(self, config, max_size, vid, pid, test_mode): # Overrided and hardcoded self._decoder = HIDDecoder() @@ -143,9 +154,8 @@ def _load_hid_descriptor(self, config, max_size, vid, pid, test_mode): for x in range(BUTTON_COUNT): self._decoder.buttons.button_map[x] = 64 for x, sc in enumerate(DS4Controller.BUTTON_MAP): - self._decoder.buttons.button_map[x] = self.button_to_bit(sc) + self._decoder.buttons.button_map[x] = button_to_bit(sc) - self._packet_size = 64 # TODO: Which commit made data switch from bytes to bytearray? def input(self, endpoint: int, data: bytearray) -> None: @@ -192,6 +202,82 @@ def _generate_id(self) -> str: return id +class DS4HIDController(DS4Controller, HIDController): + def __init__(self, device: "USBDevice", daemon: "SCCDaemon", handle: "USBDeviceHandle", config_file, config, test_mode = False): + DS4Controller.__init__(self, daemon) + HIDController.__init__(self, device, daemon, handle, config_file, config, test_mode) + + +class DS4HIDRawController(DS4Controller, Controller): + def __init__(self, driver: "DS4HIDRawDriver", syspath, hidrawdev: "HIDRaw", vid, pid) -> None: + self.driver = driver + self.syspath = syspath + + DS4Controller.__init__(self, driver.daemon) + + self._device_name = hidrawdev.getName() + self._hidrawdev = hidrawdev + self._fileno = hidrawdev._device.fileno() + self._id = self._generate_id() if driver else "-" + + self._packet_size = 78 + self._load_hid_descriptor(driver.config, self._packet_size, vid, pid, None) + + # self._set_operational() + self.read_serial() + self._poller = self.daemon.get_poller() + if self._poller: + self._poller.register(self._fileno, self._poller.POLLIN, self._input) + # self.daemon.get_device_monitor().add_remove_callback(syspath, self.close) + self.daemon.add_controller(self) + + def read_serial(self): + self._serial = (self._hidrawdev + .getPhysicalAddress().replace(b":", b"")) + + def _input(self, *args): + data = self._hidrawdev.read(self._packet_size) + if data[0] != 0x11: + return + self.input(self._fileno, data[2:]) + + def close(self): + if self._poller: + self._poller.unregister(self._fileno) + + self.daemon.remove_controller(self) + self._hidrawdev._device.close() + + +class DS4HIDRawDriver: + def __init__(self, daemon: "SCCDaemon", config: dict): + self.config = config + self.daemon = daemon + daemon.get_device_monitor().add_callback("bluetooth", VENDOR_ID, PRODUCT_ID, self.make_bt_hidraw_callback, None) + daemon.get_device_monitor().add_callback("bluetooth", VENDOR_ID, DS4_V1_PRODUCT_ID, self.make_bt_hidraw_callback, None) + + def retry(self, syspath: str): + pass + + def make_bt_hidraw_callback(self, syspath: str, vid, pid, *whatever): + hidrawname = self.daemon.get_device_monitor().get_hidraw(syspath) + if hidrawname is None: + return None + try: + dev = HIDRaw(open(os.path.join("/dev/", hidrawname), "w+b")) + return DS4HIDRawController(self, syspath, dev, vid, pid) + except Exception as e: + log.exception(e) + return None + + def get_device_name(self): + return "Dualshock 4 over Bluetooth HIDRaw" + + def get_type(self): + return "ds4bt_hidraw" + + + class DS4EvdevController(EvdevController): TOUCH_FACTOR_X = STICK_PAD_MAX / 940.0 TOUCH_FACTOR_Y = STICK_PAD_MAX / 470.0 @@ -381,8 +467,8 @@ def _generate_id(self) -> str: def init(daemon: "SCCDaemon", config: dict) -> bool: """Register hotplug callback for DS4 device.""" - def hid_callback(device, handle) -> DS4Controller: - return DS4Controller(device, daemon, handle, None, None) + def hid_callback(device, handle) -> DS4HIDController: + return DS4HIDController(device, daemon, handle, None, None) def make_evdev_device(sys_dev_path: str, *whatever): devices = get_evdev_devices_from_syspath(sys_dev_path) @@ -435,7 +521,10 @@ def fail_cb(syspath: str, vid: int, pid: int) -> None: register_hotplug_device(hid_callback, VENDOR_ID, PRODUCT_ID, on_failure=fail_cb) # DS4 v.1 register_hotplug_device(hid_callback, VENDOR_ID, DS4_V1_PRODUCT_ID, on_failure=fail_cb) - if HAVE_EVDEV and config["drivers"].get("evdevdrv"): + if config["drivers"].get("hiddrv"): + # Only enable HIDRaw support for BT connections if hiddrv is enabled + _drv = DS4HIDRawDriver(daemon, config) + elif HAVE_EVDEV and config["drivers"].get("evdevdrv"): # DS4 v.2 daemon.get_device_monitor().add_callback("bluetooth", VENDOR_ID, PRODUCT_ID, make_evdev_device, None) # DS4 v.1 @@ -449,4 +538,4 @@ def fail_cb(syspath: str, vid: int, pid: int) -> None: """ Called when executed as script """ init_logging() set_logging_level(True, True) - sys.exit(hiddrv_test(DS4Controller, [ "054c:09cc" ])) + sys.exit(hiddrv_test(DS4HIDController, [ "054c:09cc" ])) diff --git a/scc/drivers/ds5drv.py b/scc/drivers/ds5drv.py index 8b69ce6d..1daeca9b 100644 --- a/scc/drivers/ds5drv.py +++ b/scc/drivers/ds5drv.py @@ -43,6 +43,7 @@ HIDController, HIDDecoder, _lib, + button_to_bit, hiddrv_test, ) from scc.drivers.usb import register_hotplug_device @@ -338,7 +339,7 @@ def _load_hid_descriptor(self, config, max_size, vid, pid, test_mode): for x in range(BUTTON_COUNT): self._decoder.buttons.button_map[x] = 64 for x, sc in enumerate(DS5Controller.BUTTON_MAP): - self._decoder.buttons.button_map[x] = self.button_to_bit(sc) + self._decoder.buttons.button_map[x] = button_to_bit(sc) self._packet_size = 64 diff --git a/scc/drivers/hiddrv.py b/scc/drivers/hiddrv.py index f966fe56..30e4363d 100644 --- a/scc/drivers/hiddrv.py +++ b/scc/drivers/hiddrv.py @@ -59,6 +59,16 @@ ] +def button_to_bit(sc) -> int: + sc, bit = int(sc), 0 + while sc and (sc & 1 == 0): + sc >>= 1 + bit += 1 + if sc & 1 == 1: + return bit + return BUTTON_COUNT - 1 + + class HIDDrvError(Exception): pass class NotHIDDevice(HIDDrvError): @@ -286,24 +296,13 @@ def _build_button_map(self, config: dict): # Not used here pass else: - buttons[keycode] = self.button_to_bit(getattr(SCButtons, value)) + buttons[keycode] = button_to_bit(getattr(SCButtons, value)) else: buttons = list(range(BUTTON_COUNT)) return (ctypes.c_uint8 * BUTTON_COUNT)(*buttons) - @staticmethod - def button_to_bit(sc) -> int: - sc, bit = int(sc), 0 - while sc and (sc & 1 == 0): - sc >>= 1 - bit += 1 - if sc & 1 == 1: - return bit - return BUTTON_COUNT - 1 - - def _build_axis_maping(self, axis, config: dict, mode = AxisMode.AXIS): """Convert configuration mapping for _one_ axis to value situable for self._decoder.axes field.""" axis_config = config.get("axes", {}).get(str(int(axis)))