|
1 | 1 | # -*- coding: utf-8 -*- |
2 | 2 | # test_pwms |
3 | 3 | import json |
| 4 | +import sys |
4 | 5 | import time |
| 6 | +import types |
5 | 7 |
|
6 | 8 | import pytest |
7 | 9 | from pioreactor import pubsub |
| 10 | +from pioreactor.exc import PWMError |
8 | 11 | from pioreactor.utils import local_intermittent_storage |
9 | 12 | from pioreactor.utils.pwm import PWM |
| 13 | +from pioreactor.utils.pwm import SoftwarePWMOutputDevice |
10 | 14 | from pioreactor.whoami import get_unit_name |
11 | 15 |
|
12 | 16 |
|
@@ -98,3 +102,76 @@ def collect(msg) -> None: |
98 | 102 |
|
99 | 103 | assert json.loads(mqtt_items[-1]).get("17", 0.0) == 0.0 |
100 | 104 | assert json.loads(mqtt_items[-1]).get("12", 0.0) == 0.0 |
| 105 | + |
| 106 | + |
| 107 | +def _install_fake_lgpio(monkeypatch: pytest.MonkeyPatch) -> dict[str, float | list[float] | None]: |
| 108 | + fake_lgpio = types.ModuleType("lgpio") |
| 109 | + |
| 110 | + class FakeLgpioError(Exception): |
| 111 | + pass |
| 112 | + |
| 113 | + state: dict[str, float | list[float] | None] = {"raise_when_dc": None, "tx_calls": []} |
| 114 | + |
| 115 | + def gpiochip_open(_chip: int) -> int: |
| 116 | + return 1 |
| 117 | + |
| 118 | + def gpio_claim_output(_handle: int, _pin: int) -> None: |
| 119 | + return |
| 120 | + |
| 121 | + def tx_pwm(_handle: int, _pin: int, _frequency: float, duty_cycle: float) -> None: |
| 122 | + tx_calls = state["tx_calls"] |
| 123 | + assert isinstance(tx_calls, list) |
| 124 | + tx_calls.append(duty_cycle) |
| 125 | + |
| 126 | + raise_when_dc = state["raise_when_dc"] |
| 127 | + if (raise_when_dc is not None) and (duty_cycle == raise_when_dc): |
| 128 | + raise FakeLgpioError("simulated lgpio tx_pwm failure") |
| 129 | + |
| 130 | + def gpiochip_close(_handle: int) -> None: |
| 131 | + return |
| 132 | + |
| 133 | + fake_lgpio.error = FakeLgpioError |
| 134 | + fake_lgpio.gpiochip_open = gpiochip_open |
| 135 | + fake_lgpio.gpio_claim_output = gpio_claim_output |
| 136 | + fake_lgpio.tx_pwm = tx_pwm |
| 137 | + fake_lgpio.gpiochip_close = gpiochip_close |
| 138 | + |
| 139 | + monkeypatch.setitem(sys.modules, "lgpio", fake_lgpio) |
| 140 | + return state |
| 141 | + |
| 142 | + |
| 143 | +def test_software_pwm_dc_errors_raise_pwm_error(monkeypatch: pytest.MonkeyPatch) -> None: |
| 144 | + state = _install_fake_lgpio(monkeypatch) |
| 145 | + pwm = SoftwarePWMOutputDevice(pin=17, frequency=100) |
| 146 | + pwm.start(0) |
| 147 | + |
| 148 | + state["raise_when_dc"] = 25.0 |
| 149 | + |
| 150 | + with pytest.raises(PWMError, match="Failed to set software PWM"): |
| 151 | + pwm.dc = 25.0 |
| 152 | + |
| 153 | + |
| 154 | +def test_software_pwm_stop_errors_raise_pwm_error(monkeypatch: pytest.MonkeyPatch) -> None: |
| 155 | + state = _install_fake_lgpio(monkeypatch) |
| 156 | + pwm = SoftwarePWMOutputDevice(pin=17, frequency=100) |
| 157 | + pwm.start(20) |
| 158 | + |
| 159 | + state["raise_when_dc"] = 0.0 |
| 160 | + |
| 161 | + with pytest.raises(PWMError, match="Failed to set software PWM"): |
| 162 | + pwm.off() |
| 163 | + |
| 164 | + |
| 165 | +def test_lock_is_exclusive_after_creation() -> None: |
| 166 | + exp = "test_lock_is_exclusive_after_creation" |
| 167 | + unit = get_unit_name() |
| 168 | + |
| 169 | + pwm_first = PWM(12, 10, experiment=exp, unit=unit) |
| 170 | + pwm_second = PWM(12, 10, experiment=exp, unit=unit) |
| 171 | + |
| 172 | + pwm_first.lock() |
| 173 | + with pytest.raises(PWMError, match="GPIO-12 is currently locked"): |
| 174 | + pwm_second.lock() |
| 175 | + |
| 176 | + pwm_first.clean_up() |
| 177 | + pwm_second.clean_up() |
0 commit comments