diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 45ba0c43..47f8cc17 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v3 diff --git a/mypy.ini b/mypy.ini index 469e3f1b..765bff0e 100644 --- a/mypy.ini +++ b/mypy.ini @@ -66,7 +66,18 @@ exclude = (?x) ) ) ) +warn_unused_configs = True +warn_redundant_casts = True +[mypy-fixate._switching] +# Enable strict options for new code +warn_unused_ignores = True +strict_equality = True +extra_checks = True +check_untyped_defs = True +disallow_untyped_calls = True +disallow_incomplete_defs = True +disallow_untyped_defs = True # mypy will also analyse modules if they are imported by a module - even if they are excluded! diff --git a/setup.cfg b/setup.cfg index fe6d196d..3189289b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,7 +22,7 @@ package_dir = packages = find: include_package_data = True zip_safe = False -python_requires = ~=3.7 +python_requires = ~=3.8 install_requires = pyvisa @@ -35,8 +35,6 @@ install_requires = numpy PyDAQmx # for typing.protocol - typing_extensions ; python_version < "3.8" - importlib-metadata >= 1.0 ; python_version < "3.8" platformdirs [options.packages.find] diff --git a/src/fixate/__init__.py b/src/fixate/__init__.py index 22049ab2..a655e50f 100644 --- a/src/fixate/__init__.py +++ b/src/fixate/__init__.py @@ -1 +1,33 @@ +# from x import y as y tell linters that we intend +# to export the symbol as part of our public interface +# https://github.com/microsoft/pyright/blob/main/docs/typed-libraries.md#library-interface + +# Originally, test scripts had to reach into the internal +# packages and modules for imports. However, we will start +# to move the API intended for use in test scripts into the +# top level package namespace and try to be clearer about what +# is public vs private. +from fixate._switching import ( + # Type Alias + Signal as Signal, + Pin as Pin, + PinList as PinList, + PinSet as PinSet, + SignalMap as SignalMap, + TreeDef as TreeDef, + PinUpdateCallback as PinUpdateCallback, + # Runtime API + PinSetState as PinSetState, + PinUpdate as PinUpdate, + VirtualMux as VirtualMux, + VirtualSwitch as VirtualSwitch, + RelayMatrixMux as RelayMatrixMux, + AddressHandler as AddressHandler, + PinValueAddressHandler as PinValueAddressHandler, + MuxGroup as MuxGroup, + JigDriver as JigDriver, + generate_pin_group as generate_pin_group, + generate_relay_matrix_pin_list as generate_relay_matrix_pin_list, +) + __version__ = "0.6.2" diff --git a/src/fixate/_switching.py b/src/fixate/_switching.py new file mode 100644 index 00000000..afade427 --- /dev/null +++ b/src/fixate/_switching.py @@ -0,0 +1,835 @@ +""" +JigDriver is all about switching signals in a jig (and also input, but I'm +ignoring that for now...) At the script level, the key abstraction is a +`VirtualMux`, which switching between a number of `Signals`. Each signal is +defined by a number of `Pins`. When using a relay matrix, a Signal often +corresponds to a single Pin, but it doesn't have to. + +Each VirtualMux connects to the VirtualAddressMap. When the multiplex is +called on a mux, the mux determines which pin must be set or cleared + + + _____________ ______________________ + SIG_1 ---| pin0 | | VirtualAddressMap | _____________________ + SIG_2 ---| MUX pin1 |---| | | AddressHandler | + SIG_3 ---| | | pin0 |---| pin0, pin1, pin2 | + SIG_4 ---|____________| | pin1 | |___________________| + | pin2 | ____________________ + | | | AddressHandler | + _____________ | pin3 |---| pin3, pin4 | + SIG_5 ---| pin2 | | pin4 | |__________________| + SIG_6 ---| MUX pin3 |---| | ___________________ + SIG_7 ---| pin4 | | pin5 |---| AddressHandler | + SIG_8 ---| pin5 | | | | pin5 | + |____________| |_____________________| |__________________| + ^ ^ + update_pins callback AddressHandler.set_pins() + VirtualAddressMap.add_update() +""" + +from __future__ import annotations + +import itertools +import time +from typing import ( + Generic, + Optional, + Callable, + Sequence, + TypeVar, + Generator, + Union, + Collection, + Dict, + FrozenSet, + Iterable, +) +from dataclasses import dataclass +from functools import reduce +from operator import or_ + +Signal = str +Pin = str +PinList = Sequence[Pin] +PinSet = FrozenSet[Pin] +SignalMap = Dict[Signal, PinSet] +TreeDef = Sequence[Union[Signal, "TreeDef"]] + + +@dataclass(frozen=True) +class PinSetState: + off: PinSet = frozenset() + on: PinSet = frozenset() + + def __or__(self, other: PinSetState) -> PinSetState: + if isinstance(other, PinSetState): + return PinSetState(self.off | other.off, self.on | other.on) + return NotImplemented + + +@dataclass(frozen=True) +class PinUpdate: + setup: PinSetState = PinSetState() + final: PinSetState = PinSetState() + minimum_change_time: float = 0.0 + + def __or__(self, other: PinUpdate) -> PinUpdate: + if isinstance(other, PinUpdate): + return PinUpdate( + setup=self.setup | other.setup, + final=self.final | other.final, + minimum_change_time=max( + self.minimum_change_time, other.minimum_change_time + ), + ) + return NotImplemented + + +PinUpdateCallback = Callable[[PinUpdate, bool], None] + + +class VirtualMux: + pin_list: PinList = () + clearing_time: float = 0.0 + + ########################################################################### + # These methods are the public API for the class + + def __init__(self, update_pins: Optional[PinUpdateCallback] = None): + self._last_update_time = time.monotonic() + + self._update_pins: PinUpdateCallback + if update_pins is None: + self._update_pins = self._default_update_pins + else: + self._update_pins = update_pins + + # We annotate the pin_list to be an ordered sequence, because if the + # mux is defined with a map_tree, we need the ordering. But after + # initialisation, we only need set operations on the pin list, so + # we convert here and keep a reference to the set for future use. + self._pin_set = frozenset(self.pin_list) + + self._state = "" + + self._signal_map: SignalMap = self._map_signals() + + # Define the implicit signal "" which can be used to turn off all pins. + # If the signal map already has this defined, raise an error. In the old + # implementation, it allows the map to set this, but when switching the + # behaviour was hard coded. Any mux that changed the definition would + # have silently done the "wrong" thing. We can revisit this if we find + # a good application to override, but for now, don't silently allow something + # that probably isn't correct. + if "" not in self._signal_map: + self._signal_map[""] = frozenset() + else: + raise ValueError('The empty signal, "", should not be explicitly defined') + + if hasattr(self, "default_signal"): + raise ValueError("'default_signal' should not be set on a VirtualMux") + + def __call__(self, signal: Signal, trigger_update: bool = True) -> None: + """ + Convenience to avoid having to type jig.mux..multiplex. + + With this you can just type jig.mux. which is a small, but + useful saving for the most common method call. + """ + self.multiplex(signal, trigger_update) + + def multiplex(self, signal: Signal, trigger_update: bool = True) -> None: + """ + Update the multiplexer state to signal. + + The update is a two-step processes. By default, the change happens on + the second step. This can be modified by subclassing and overriding the + _calculate_pins method. + + If trigger_update is true, the output will update immediately. If false, + multiple mux changes can be set and then when trigger_update is finally + set to True all changes will happen at once. + + In general, subclasses should not override. (VirtualSwitch does, but then + delegates the real work to this method to ensure consistent behaviour.) + """ + if signal not in self._signal_map: + name = self.__class__.__name__ + raise ValueError(f"Signal '{signal}' not valid for multiplexer '{name}'") + + setup, final = self._calculate_pins(self._state, signal) + self._update_pins(PinUpdate(setup, final, self.clearing_time), trigger_update) + if signal != self._state: + self._last_update_time = time.monotonic() + self._state = signal + + def all_signals(self) -> tuple[Signal, ...]: + return tuple(self._signal_map.keys()) + + def reset(self, trigger_update: bool = True) -> None: + self.multiplex("", trigger_update) + + def wait_at_least(self, duration: float) -> None: + """ + Ensure at least `duration` seconds have elapsed since the signal was switched. + + This can be used to ensure a minimum settling time has passed since + a particular signal was enabled. + """ + now = time.monotonic() + wait_until = self._last_update_time + duration + if wait_until > now: + time.sleep(wait_until - now) + + def pins(self) -> frozenset[Pin]: + """ + Return the set off all pins used by this mux + """ + return self._pin_set + + ########################################################################### + # The following methods are potential candidates to override in a subclass + + def _calculate_pins( + self, old_signal: Signal, new_signal: Signal + ) -> tuple[PinSetState, PinSetState]: + """ + Calculate the pin sets for the two-step state change. + + The two-step state change allows us to implement break-before-make or + make-before-break behaviour. By default, the first state changes + no pins and the second state sets the pins required for new_signal. + + Subclasses can override this method to change the behaviour. It is + marked as private to discourage use, but it is intended to be subclassed. + For example, RelayMatrix overrides this to implement break-before-make + switching. + + old_signal isn't currently used, but it is provided in case a future + subclass needs to it calculate the update. In particular, it could be + useful for make-before-break behaviour. + """ + setup = PinSetState() + on_pins = self._signal_map[new_signal] + final = PinSetState(self._pin_set - on_pins, on_pins) + return setup, final + + ########################################################################### + # The following methods are intended as implementation detail and + # subclasses should avoid overriding. + + def _map_signals(self) -> SignalMap: + """ + Default implementation of the signal mapping + + We need to construct a dictionary mapping signals to a set of pins. + In the case that self.map_list is set, the is pretty trival. + If the mux is defined with self.map_tree we have more work to + do, which is recursively delegated to _map_tree + + Avoid subclassing. Consider creating helper functions to build + map_tree or map_list. + """ + if hasattr(self, "map_tree"): + return self._map_tree(self.map_tree, self.pin_list, fixed_pins=frozenset()) + elif hasattr(self, "map_list"): + return {sig: frozenset(pins) for sig, *pins in self.map_list} + else: + raise ValueError( + "VirtualMux subclass must define either map_tree or map_list" + ) + + def _map_tree(self, tree: TreeDef, pins: PinList, fixed_pins: PinSet) -> SignalMap: + """recursively add nested signal lists to the signal map. + tree: is the current sub-branch to be added. At the first call + level, this would be initialised with self.map_tree. It can be + any sequence, possibly nested. + + pins: The list of pins, taken as LSB to MSB that are assigned + to the signals in order. + + fixed_pins: At each call level, this represents the pins that + must be set for each signal in at this level. In the example + below, these are the bits for a given input to Mux A, when + mapping all the nested Mux B signals. + + example: + This shows 10 signals, routed through a number of multiplexers. + Mux B and Mux B' are distinct, but addressed with common control + signals. Mux C and Mux B/B' are nested to various levels into + the final multiplexer Mux A. + + The pin_list defines the control signals from least to most significant + The map_tree defines the signals into each multiplexer. Nesting containers + reflects the nesting of mux's. + __________ + a0-------------------------------| | + ________ | | + a1_b0----------------| |--| Mux A | + a1_b1----------------| Mux B | | 4:1 | + a1_b2----------------| 4:1 | | | + (None)--|_x3__x2_| | | + | | + ________ | | + a2_b0----------------| | | | + _______ | |--| |------ Output + a2_b1_c0--| Mux C |--| Mux B' | | | + a2_b1_c1--| 2:1 | | 4:1 | | | + |___x4__| | | | | + | | | | + a2_b2----------------| | | | + a2_b3----------------| | | | + |_x3__x2_| | | + | | + a3-------------------------------| | + |__x1__x0__| + + class Mux(VirtualMux): + pin_list = ("x0", "x1", "x2", "x3", "x4") + map_tree = ("a0", + (#a1 + "a1_b0", + "a1_b1", + "a1_b2", + None, + ), + (#a2 + "a2_b0", + (#b1 + "a2_b1_c0", + "a2_b1_c1", + ), + "a2_b2", + "a2_b3", + ), + "a3" + ) + + Alternatively: + + class Mux(VirtualMux): + pin_list = ("x0", "x1", "x2", "x3", "x4") + + mux_c = ("a2_b1_c0", "a2_b1_c1") + mux_b1 = ("a1_b0", "a1_b1", "a1_b2", None) + mux_b2 = ("a2_b0", mux_c, "a2_b2", "a2_b3") + + map_tree = ("a0", mux_b1, mux_b2, "a3") + + Final mapping: + addr signal + -------------- + 0 a0 + 1 a1_b0 + 2 a2_b0 + 3 a3 + 4 + 5 a1_b1 + 6 a2_b1_c0 + 7 + 8 + 9 a1_b2 + 10 a2_b2 + 11 + 12 + 13 (None) + 14 a2_b3 + 15 + 16 + 17 + 18 + 19 + 20 + 21 + 22 a2_b1_c1 + 23 + 24 + 25 + 26 + 27 + 28 + 29 + 30 + 31 + + For Multiplexers that depend on separate control pins, try using the shift_nested function to help + with sparse mapping. Note that shift_nested() hasn't been copied over from jig_mapping. + + __________ + a0-------------------------------| | + ________ | | + a1_b0----------------| |--| | + a1_b1----------------| Mux B | | | + a1_b2----------------| 4:1 | | | + (None)--| x3 x2 | | | + |________| | | + | Mux A | + a2-------------------------------| 4:1 | + ________ | | + a3_c0----------------| |--| | + a3_c1----------------| Mux C | | | + a3_c2----------------| 4:1 | | | + (None)--| x5 x4 | | | + |________| | | + |__x1__x0__| + + class Mux(VirtualMux): + pin_list = ("x0", "x1", "x2", "x3", "x4", "x5") + + mux_c = ("a3_c0", "a3_c1", "a3_c2", None) + mux_b = ("a1_b0", "a1_b1", "a1_b2", None) + + map_tree = ( + "a0", + mux_b, + "a2", + shift_nested(mux_c, [2]), # 2 in indicative on how many pins to skip. This case is (x2, x3) from mux_b + ) + + Note: + If shift_nested is needed, I think I'd re-implement something like TreeMap that + can be used to define the mux. + + class TreeMap: + signals: Sequence[pin | TreeMap] + pins: PinSet + + class Mux(VirtualMux): + mux_c = TreeMap(("a3_c0", "a3_c1", "a3_c2", None), ("x4", "x5")) + mux_b = TreeMap(("a1_b0", "a1_b1", "a1_b2", None), ("x2", "x3")) + map_tree = TreeMap(("a0", mux_b, "a2", mux_c), ("x1", "x0")) + """ + signal_map: SignalMap = dict() + + bits_at_this_level = (len(tree) - 1).bit_length() + pins_at_this_level = pins[:bits_at_this_level] + + for signal_or_tree, pins_for_signal in zip( + tree, _generate_bit_sets(pins_at_this_level) + ): + if signal_or_tree is None: + continue + if isinstance(signal_or_tree, Signal): + signal_map[signal_or_tree] = frozenset(pins_for_signal) | fixed_pins + else: + signal_map.update( + self._map_tree( + tree=signal_or_tree, + pins=pins[bits_at_this_level:], + fixed_pins=frozenset(pins_for_signal) | fixed_pins, + ) + ) + + return signal_map + + def __repr__(self) -> str: + return f"{self.__class__.__name__}('{self._state}')" + + @staticmethod + def _default_update_pins( + pin_updates: PinUpdate, trigger_update: bool = True + ) -> None: + """ + Output callback to effect a state change in the mux. + + This is a default implementation which simply prints the planned state change to. + stdout. When instantiated as part of a jig driver, this will end up connected + to an AddressHandler to do the actual pin changes in hardware. + + In general, this method shouldn't be overridden in a subclass. An alternative + can be provided to __init__. + """ + print(pin_updates, trigger_update) + + +class VirtualSwitch(VirtualMux): + """ + A VirtualMux that controls a single pin. + + A virtual switch is a multiplexer with a single pin. The multiplex + function can accept either boolean values or the strings 'TRUE' and + 'FALSE'. The virtual address used to switch can be defined as a list + with the single element (as if it were a multiplexer) or by using the + shorthand which is to define the pin_name attribute as a string. + """ + + pin_name: Pin = "" + map_tree = ("Off", "On") + + def multiplex( + self, signal: Union[Signal, bool], trigger_update: bool = True + ) -> None: + if signal is True: + converted_signal = "On" + elif signal is False: + converted_signal = "Off" + else: + converted_signal = signal + super().multiplex(converted_signal, trigger_update=trigger_update) + + def __call__( + self, signal: Union[Signal, bool], trigger_update: bool = True + ) -> None: + """Override call to set the type on signal_output correctly.""" + self.multiplex(signal, trigger_update) + + def __init__( + self, + update_pins: Optional[PinUpdateCallback] = None, + ): + if not self.pin_list: + self.pin_list = [self.pin_name] + super().__init__(update_pins) + + +class RelayMatrixMux(VirtualMux): + clearing_time = 0.01 + + def _calculate_pins( + self, old_signal: Signal, new_signal: Signal + ) -> tuple[PinSetState, PinSetState]: + """ + Override of _calculate_pins to implement break-before-make switching. + """ + setup = PinSetState(off=self._pin_set, on=frozenset()) + on_pins = self._signal_map[new_signal] + final = PinSetState(off=self._pin_set - on_pins, on=on_pins) + + # if the signal doesn't change, we don't want open then close again + # so return the 'final' state for both setup and final. + if old_signal == new_signal: + return final, final + return setup, final + + +class AddressHandler: + """ + Controls the IO for a set of pins. + + For output, it is assumed that all the pins under the of a given + AddressHandler are updated in one operation. + + An AddressHandler should lazily open any required hardware + resource. Ideally, it should be possible to instantiate and + inspect the AddressHandler without requiring hardware to + be connected. When `set_pins` is called, the implementation + should check for the required hardware connection and open + that driver when first called. + + Further, calling close() may "uninitialize" the driver. The + next time set_pins is called, the handler will re-open hardware. + """ + + def __init__(self, pins: Sequence[Pin]) -> None: + # we convert the pin list to an immutable tuple, incase the + # caller passing in a mutable sequence that gets modified... + self.pin_list = tuple(pins) + if hasattr(self, "pin_defaults"): + raise ValueError("'pin_defaults' should not be set on a AddressHandler") + + def set_pins(self, pins: Collection[Pin]) -> None: + """ + Called by the VirtualAddressMap to write out pin changes. + + If the underlying hardware required for the IO isn't open, + open it. + + :param pins: is a collection of pins that should be made active. All other + pins defined by the AddressHandler should be cleared. + """ + raise NotImplementedError + + def close(self) -> None: + """ + Optional close method to clean-up resources. + + This will be called automatically by the JigDriver for any + address handlers passed into the JigDriver when it as created. + """ + pass + + +class PinValueAddressHandler(AddressHandler): + """Maps pins to bit values then combines the bit values for an update""" + + def __init__(self, pins: Sequence[Pin]) -> None: + super().__init__(pins) + self._pin_lookup = { + pin: bit for pin, bit in zip(self.pin_list, _bit_generator()) + } + + def set_pins(self, pins: Collection[Pin]) -> None: + value = sum(self._pin_lookup[pin] for pin in pins) + self._update_output(value) + + def _update_output(self, value: int) -> None: + bits = len(self.pin_list) + print(f"0b{value:0{bits}b}") + + +class VirtualAddressMap: + """ + The supervisor loops through the attached virtual multiplexers each time a mux update is triggered. + """ + + def __init__(self, handlers: Sequence[AddressHandler]): + # used to work out which pins get routed to which address handler + self._handler_pin_sets: list[tuple[PinSet, AddressHandler]] = [] + for handler in handlers: + self._handler_pin_sets.append((frozenset(handler.pin_list), handler)) + self._all_pins = frozenset( + itertools.chain.from_iterable(handler.pin_list for handler in handlers) + ) + + # a list of updates that haven't been sent to address handlers yet. This + # allows a few mux changes to get updated at the same time. + self._pending_updates: list[PinUpdate] = [] + self._active_pins: set[Pin] = set() + + def add_update(self, pin_update: PinUpdate, trigger_update: bool = True) -> None: + """This method should be registered with each virtual mux to route pin changes.""" + self._pending_updates.append(pin_update) + + if trigger_update: + self._do_pending_updates() + + def _do_pending_updates(self) -> None: + """ + Collate pending updates and send pins to respective address handlers. + + 1. For each pending update, combine pins to clear and pins to set. + 2. Find the longest `minimum_change_time` of all the updates + 3. Find the AddressHandler required to set each pin. + 4. Check if there is actually anything to change. Only write out + changes that are required. + 5. Do the setup phase + 6. Wait the required change time + 7. Do the final phase + """ + collated = reduce(or_, self._pending_updates, PinUpdate()) + self._pending_updates = [] + + if in_both := collated.setup.on & collated.setup.off: + raise ValueError(f"The following pins need to be on and off {in_both}") + + if in_both := collated.final.on & collated.final.off: + raise ValueError(f"The following pins need to be on and off {in_both}") + + self._dispatch_pin_state(collated.setup) + time.sleep(collated.minimum_change_time) + self._dispatch_pin_state(collated.final) + + def _dispatch_pin_state(self, new_state: PinSetState) -> None: + # check all pins actually have an address handler to send to + if unknown_pins := (new_state.on | new_state.off) - self._all_pins: + raise ValueError(f"Can't switch unknown pin(s) {', '.join(unknown_pins)}.") + + new_active_pins = (self._active_pins | new_state.on) - new_state.off + if new_active_pins != self._active_pins: + self._active_pins = new_active_pins + for pin_set, handler in self._handler_pin_sets: + # Note that we might send an empty set here. We need to do that + # so if there are pins to clear, they get cleared. This might + # end up in redundant handler updates, but unless we track active_pins + # per-handler I don't think we can avoid that. + handler.set_pins(pin_set & self._active_pins) + + def active_pins(self) -> frozenset[Pin]: + return frozenset(self._active_pins) + + def reset(self) -> None: + """ + Sets all pins to be inactive. + + Note: this does not change the state of any VirtualMux's, so it is + possible the state of each VirtualMux and its related pins will not + be in sync. + """ + self._dispatch_pin_state(PinSetState(off=self._all_pins)) + + def update_input(self) -> None: + """ + Currently not implemented. A few jigs implement digital input, but not many. It is the intention + to implement this eventually, but for now, the old jig_mapping.py version can be used. + """ + raise NotImplementedError + + +class MuxGroup: + """ + Group multiple VirtualMux's, for use in a single Jig Driver. + + If a test script, it is expected that MuxGroup will be subclassed, with attributes + being each required VirtualMux subclass. This can be done using a dataclass: + + @dataclass + class JigMuxGroup(MuxGroup): + mux_one: MuxOne = field(default_factory=MuxOne) + mux_two: MuxTwo = field(default_factory=MuxTwo) + """ + + def get_multiplexers(self) -> list[VirtualMux]: + return [attr for attr in self.__dict__.values() if isinstance(attr, VirtualMux)] + + def reset(self) -> None: + mux_list = self.get_multiplexers() + if len(mux_list) == 0: + return + + for mux in mux_list[:-1]: + mux.reset(trigger_update=False) + mux_list[-1].reset(trigger_update=True) + + def active_signals(self) -> list[str]: + return [str(mux) for mux in self.get_multiplexers()] + + +JigSpecificMuxGroup = TypeVar("JigSpecificMuxGroup", bound=MuxGroup) + + +class JigDriver(Generic[JigSpecificMuxGroup]): + """ + Combine multiple VirtualMux's and multiple AddressHandler's. + + The jig driver joins muxes to handlers by matching up pin definitions. + """ + + def __init__( + self, + mux_group_factory: Callable[[], JigSpecificMuxGroup], + handlers: Sequence[AddressHandler], + ): + # keep a reference to handlers so that we can close them if required. + self._handlers = handlers + self.virtual_map = VirtualAddressMap(handlers) + + self.mux = mux_group_factory() + for mux in self.mux.get_multiplexers(): + # Perhaps we should instantiate the virtual mux here + # and pass in the virtual_map.add_update. But we'd have to do some + # magic in the MuxGroup call to pass add_update to each VirtualMux + # constructor, and I was hoping to just use a dataclass... + mux._update_pins = self.virtual_map.add_update + + self._validate() + + def close(self) -> None: + for handler in self._handlers: + handler.close() + + def active_pins(self) -> frozenset[Pin]: + return self.virtual_map.active_pins() + + def debug_set_pin(self, pin: Pin, value: bool) -> None: + # pin is a str, which is iterable... so we can't just throw it into + # frozen set, or we end up with frozenset deconstructing it! so + # wrap it into another single element list first + if value: + update = PinSetState(on=frozenset([pin])) + else: + update = PinSetState(off=frozenset([pin])) + self.virtual_map.add_update(PinUpdate(final=update)) + + def debug_set_pins( + self, on: Collection[Pin] = frozenset(), off: Collection[Pin] = frozenset() + ) -> None: + update = PinSetState(on=frozenset(on), off=frozenset(off)) + self.virtual_map.add_update(PinUpdate(final=update)) + + def all_mux_signals(self) -> tuple[tuple[VirtualMux, tuple[Signal, ...]], ...]: + return tuple(((mux, mux.all_signals()) for mux in self.mux.get_multiplexers())) + + def reset(self) -> None: + """ + Reset all VirtualMux's to the default signal "" (all pins off) + """ + self.mux.reset() + + def _validate(self) -> None: + """ + Do some basic sanity checks on the jig definition. + + - Ensure all pins that are used in muxes are defined by + some address handler. + + Note: It is O.K. for there to be AddressHandler pins that + are not used anywhere. Eventually we might choose to + warn about them. This it is necessary to define some jigs. + """ + all_handler_pins: set[Pin] = reduce( + or_, (set(handler.pin_list) for handler in self._handlers), set() + ) + mux_missing_pins = [] + + for mux in self.mux.get_multiplexers(): + if unknown_pins := mux.pins() - all_handler_pins: + mux_missing_pins.append((mux, unknown_pins)) + + if mux_missing_pins: + raise ValueError( + f"One or more VirtualMux uses unknown pins:\n{mux_missing_pins}" + ) + + +_T = TypeVar("_T") + + +def _generate_bit_sets(bits: Sequence[_T]) -> Generator[set[_T], None, None]: + """ + Create subsets of bits, representing bits of a list of integers + + This is easier to explain with an example + list(generate_bit_set(["x0", "x1"])) -> [set(), {'x0'}, {'x1'}, {'x0', 'x1'}] + """ + int_list = range(1 << len(bits)) if len(bits) != 0 else range(0) + return ( + {bit for i, bit in enumerate(bits) if (1 << i) & index} for index in int_list + ) + + +def _bit_generator() -> Generator[int, None, None]: + """b1, b10, b100, b1000, ...""" + return (1 << counter for counter in itertools.count()) + + +def generate_pin_group( + group_designator: int, *, pin_count: int = 16, prefix: str = "", sep: str = "" +) -> tuple[Pin, ...]: + """ + A helper to create pin names groups of pins, especially relay matrices. + + By default, returns 16 pin names, suitable for a 16 relay matrix. + Changing the default values for pin_count and prefix can be used to generate + alternate naming schemes. + + generate_pin_group(1) -> ("1K1", "1K2", "1K3", ..., "1K16") + generate_pin_group(3, prefix="RM") -> ("RM1K1", "RM1K2", "RM1K3", ..., "RM1K16") + generate_pin_group(5, pin_count=8, prefix="U") -> ("U3K1", "U3K2, "U3K3", ..., "U3K8") + """ + return tuple( + f"{prefix}{group_designator}{sep}K{relay}" for relay in range(1, pin_count + 1) + ) + + +def generate_relay_matrix_pin_list( + designators: Iterable[int], *, prefix: str = "", sep: str = "" +) -> tuple[Pin, ...]: + """ + Create a pin list for multiple relay matrix modules. + + Each module is allocated 16 pins + generate_relay_matrix_pin_list([1,2,3]) -> + ("1K1", "1K2", ..., "1K16", "2K1", ..., "2K16", "3K1", ..., "3K16") + + generate_relay_matrix_pin_list([2,3,1], prefix="RM") -> + ("RM2K1", "RM2K2", ..., "RM2K16", "RM3K1", ..., "RM3K16", "RM1K1", ..., "RM1K16") + + Combination generate_relay_matrix_pin_list and generate_pin_group to create pins + as needed for a specific jig configuration. + """ + return tuple( + itertools.chain.from_iterable( + generate_pin_group(rm_number, prefix=prefix, sep=sep) + for rm_number in designators + ) + ) diff --git a/src/fixate/drivers/__init__.py b/src/fixate/drivers/__init__.py index eb24a5b7..6504c298 100644 --- a/src/fixate/drivers/__init__.py +++ b/src/fixate/drivers/__init__.py @@ -1,8 +1,4 @@ -try: - from typing import Protocol -except ImportError: - # Protocol added in python 3.8 - from typing_extensions import Protocol +from typing import Protocol import pubsub.pub diff --git a/src/fixate/drivers/handlers.py b/src/fixate/drivers/handlers.py new file mode 100644 index 00000000..0cc64021 --- /dev/null +++ b/src/fixate/drivers/handlers.py @@ -0,0 +1,65 @@ +""" +This module implements concrete AddressHandler type, that +can be used to implement IO for the fixate.core.switching module. +""" +from __future__ import annotations + +from typing import Sequence, Optional + +from fixate import Pin, PinValueAddressHandler +from fixate.drivers import ftdi + + +class FTDIAddressHandler(PinValueAddressHandler): + """ + An address handler which uses the ftdi driver to control pins. + + We create this concrete address handler because we use it most + often. FT232 is used to bit-bang to shift register that are control + the switching in a jig. + """ + + def __init__( + self, + pins: Sequence[Pin], + ftdi_description: str, + ) -> None: + super().__init__(pins) + self._ftdi_description = ftdi_description + self._ftdi: Optional[ftdi.FTDI2xx] = None + + def _open(self) -> ftdi.FTDI2xx: + # how many bytes? enough for every pin to get a bit. We might + # end up with some left-over bits. The +7 in the expression + # ensures we round up. + bytes_required = (len(self.pin_list) + 7) // 8 + ftdi_handle = ftdi.open(ftdi_description=self._ftdi_description) + ftdi_handle.configure_bit_bang( + ftdi.BIT_MODE.FT_BITMODE_ASYNC_BITBANG, + bytes_required=bytes_required, + data_mask=4, + clk_mask=2, + latch_mask=1, + ) + # Measurement of baudrate vs bit-bang. The programming manual say 16 x, but that + # only appears to be true for lower clock rates. Keeping the actual value at 115200 + # since that was used regularly in the past + # baudrate bit-bang update rate + # 1_000_000 ~2 MHz + # 750_000 ~2.4 MHz + # 115_200 ~926 kHz + # 10_000 ~160 kHz + ftdi_handle.baud_rate = 115_200 + return ftdi_handle + + def close(self) -> None: + if self._ftdi is not None: + self._ftdi.close() + self._ftdi = None + + def _update_output(self, value: int) -> None: + # We implement the required semantics of set_pin here, + # by ensuring the ftdi device is open. + if self._ftdi is None: + self._ftdi = self._open() + self._ftdi.serial_shift_bit_bang(value) diff --git a/src/fixate/examples/jig_driver.py b/src/fixate/examples/jig_driver.py new file mode 100644 index 00000000..6d56cdf6 --- /dev/null +++ b/src/fixate/examples/jig_driver.py @@ -0,0 +1,60 @@ +""" +This file is just a test playground that shows how the update jig classes will +fit together. +""" +from __future__ import annotations +from dataclasses import dataclass, field +from fixate import ( + VirtualMux, + JigDriver, + MuxGroup, + PinValueAddressHandler, + VirtualSwitch, +) + + +class MuxOne(VirtualMux): + pin_list = ("x0", "x1", "x2") + map_list = ( + ("sig1", "x0"), + ("sig2", "x1"), + ("sig3", "x2"), + ) + + +class MuxTwo(VirtualMux): + pin_list = ("x3", "x4", "x5") + map_tree = ( + "sig4", + "sig5", + "sig6", + ( + "sig7", + "sig8", + ), + ) + + +class MuxThree(VirtualSwitch): + pin_name = "x101" + + +# Note! +# our existing scripts/jig driver, the name of the mux is the +# class of the virtual mux. This scheme below will not allow that +# to work. Instead, define an attribute name on the MuxGroup +@dataclass +class JigMuxGroup(MuxGroup): + mux_one: MuxOne = field(default_factory=MuxOne) + mux_two: MuxTwo = field(default_factory=MuxTwo) + mux_three: MuxThree = field(default_factory=MuxThree) + + +jig = JigDriver( + JigMuxGroup, [PinValueAddressHandler(("x0", "x1", "x2", "x3", "x4", "x5", "x101"))] +) + +jig.mux.mux_one("sig2", trigger_update=False) +jig.mux.mux_two("sig5") +jig.mux.mux_three("On") +jig.mux.mux_three(False) diff --git a/test/test_switching.py b/test/test_switching.py new file mode 100644 index 00000000..527f1d2c --- /dev/null +++ b/test/test_switching.py @@ -0,0 +1,594 @@ +from fixate._switching import ( + _generate_bit_sets, + VirtualMux, + _bit_generator, + PinSetState, + PinUpdate, + VirtualSwitch, + RelayMatrixMux, + PinValueAddressHandler, + generate_pin_group, + generate_relay_matrix_pin_list, + AddressHandler, + VirtualAddressMap, + MuxGroup, + JigDriver, +) + +import pytest + +################################################################ +# helper to generate data + + +def test_generate_bit_sets_empty(): + assert list(_generate_bit_sets([])) == [] + + +def test_generate_bit_sets_one_bit(): + assert list(_generate_bit_sets(["b0"])) == [set(), {"b0"}] + + +def test_generate_bit_sets_multiple_bits(): + expected = [ + set(), + {"b0"}, + {"b1"}, + {"b1", "b0"}, + {"b2"}, + {"b2", "b0"}, + {"b2", "b1"}, + {"b2", "b1", "b0"}, + ] + assert list(_generate_bit_sets(["b0", "b1", "b2"])) == expected + + +def test_bit_generator(): + """b1, b10, b100, b1000, ...""" + bit_gen = _bit_generator() + + actual = [next(bit_gen) for _ in range(8)] + expected = [1, 2, 4, 8, 16, 32, 64, 128] + assert actual == expected + + +def test_generate_pin_group(): + assert list(generate_pin_group(3)) == ( + "3K1 3K2 3K3 3K4 3K5 3K6 3K7 3K8 3K9 3K10 3K11 3K12 3K13 3K14 3K15 3K16".split() + ) + assert list(generate_pin_group(1, prefix="RM")) == ( + "RM1K1 RM1K2 RM1K3 RM1K4 RM1K5 RM1K6 RM1K7 RM1K8 RM1K9 RM1K10 RM1K11 RM1K12 RM1K13 RM1K14 RM1K15 RM1K16".split() + ) + assert list(generate_pin_group(10, pin_count=8, prefix="U")) == ( + "U10K1 U10K2 U10K3 U10K4 U10K5 U10K6 U10K7 U10K8".split() + ) + + +# fmt:off +large_pin_list_example1 = [ + "RC1K1", "RC1K2", "RC1K3", "RC1K4", "RC1K5", "RC1K6", "RC1K7", "RC1K8","RC1K9", "RC1K10", "RC1K11", "RC1K12", "RC1K13", "RC1K14", "RC1K15", "RC1K16", + "RC2K1", "RC2K2", "RC2K3", "RC2K4", "RC2K5", "RC2K6", "RC2K7", "RC2K8", "RC2K9", "RC2K10", "RC2K11", "RC2K12", "RC2K13", "RC2K14", "RC2K15", "RC2K16", + "RA1K1", "RA1K2", "RA1K3", "RA1K4", "RA1K5", "RA1K6", "RA1K7", "RA1K8", "RA1K9", "RA1K10", "RA1K11", "RA1K12", "RA1K13", "RA1K14", "RA1K15", "RA1K16", + "RA2K1", "RA2K2", "RA2K3", "RA2K4", "RA2K5", "RA2K6", "RA2K7", "RA2K8", "RA2K9", "RA2K10", "RA2K11", "RA2K12", "RA2K13", "RA2K14", "RA2K15", "RA2K16", + "RA3K1", "RA3K2", "RA3K3", "RA3K4", "RA3K5", "RA3K6", "RA3K7", "RA3K8", "RA3K9", "RA3K10", "RA3K11", "RA3K12", "RA3K13", "RA3K14", "RA3K15", "RA3K16", + "RP1K1", "RP1K2", "RP1K3", "RP1K4", "RP1K5", "RP1K6", "RP1K7", "RP1K8", "RP1K9", "RP1K10", "RP1K11", "RP1K12", "RP1K13", "RP1K14", "RP1K15", "RP1K16", + "RP2K1", "RP2K2", "RP2K3", "RP2K4", "RP2K5", "RP2K6", "RP2K7", "RP2K8", "RP2K9", "RP2K10", "RP2K11", "RP2K12", "RP2K13", "RP2K14", "RP2K15", "RP2K16", + "RH1K1", "RH1K2", "RH1K3", "RH1K4", "RH1K5", "RH1K6", "RH1K7", "RH1K8", "RH1K9", "RH1K10", "RH1K11", "RH1K12", "RH1K13", "RH1K14", "RH1K15", "RH1K16", +] + +large_pin_list_example2 = [ + # RM19 + "RM19K1", "RM19K2", "RM19K3", "RM19K4", "RM19K5", "RM19K6", "RM19K7", "RM19K8", + "RM19K9", "RM19K10", "RM19K11", "RM19K12", "RM19K13", "RM19K14", "RM19K15", "RM19K16", + # RM18 + "RM18K1", "RM18K2", "RM18K3", "RM18K4", "RM18K5", "RM18K6", "RM18K7", "RM18K8", + "RM18K9", "RM18K10", "RM18K11", "RM18K12", "RM18K13", "RM18K14", "RM18K15", "RM18K16", + # RM17 + "RM17K1", "RM17K2", "RM17K3", "RM17K4", "RM17K5", "RM17K6", "RM17K7", "RM17K8", + "RM17K9", "RM17K10", "RM17K11", "RM17K12", "RM17K13", "RM17K14", "RM17K15", "RM17K16", + # RM16 + "RM16K1", "RM16K2", "RM16K3", "RM16K4", "RM16K5", "RM16K6", "RM16K7", "RM16K8", + "RM16K9", "RM16K10", "RM16K11", "RM16K12", "RM16K13", "RM16K14", "RM16K15", "RM16K16", + # RM15 + "RM15K1", "RM15K2", "RM15K3", "RM15K4", "RM15K5", "RM15K6", "RM15K7", "RM15K8", + "RM15K9", "RM15K10", "RM15K11", "RM15K12", "RM15K13", "RM15K14", "RM15K15", "RM15K16", + # RM14 + "RM14K1", "RM14K2", "RM14K3", "RM14K4", "RM14K5", "RM14K6", "RM14K7", "RM14K8", + "RM14K9", "RM14K10", "RM14K11", "RM14K12", "RM14K13", "RM14K14", "RM14K15", "RM14K16", + # RM13 + "RM13K1", "RM13K2", "RM13K3", "RM13K4", "RM13K5", "RM13K6", "RM13K7", "RM13K8", + "RM13K9", "RM13K10", "RM13K11", "RM13K12", "RM13K13", "RM13K14", "RM13K15", "RM13K16", + # RM10 + "RM10K1", "RM10K2", "RM10K3", "RM10K4", "RM10K5", "RM10K6", "RM10K7", "RM10K8", + "RM10K9", "RM10K10", "RM10K11", "RM10K12", "RM10K13", "RM10K14", "RM10K15", "RM10K16", + # RM11 + "RM11K1", "RM11K2", "RM11K3", "RM11K4", "RM11K5", "RM11K6", "RM11K7", "RM11K8", + "RM11K9", "RM11K10", "RM11K11", "RM11K12", "RM11K13", "RM11K14", "RM11K15", "RM11K16", + # RM12 + "RM12K1", "RM12K2", "RM12K3", "RM12K4", "RM12K5", "RM12K6", "RM12K7", "RM12K8", + "RM12K9", "RM12K10", "RM12K11", "RM12K12", "RM12K13", "RM12K14", "RM12K15", "RM12K16", + # RM1 + "RM1K1", "RM1K2", "RM1K3", "RM1K4", "RM1K5", "RM1K6", "RM1K7", "RM1K8", + "RM1K9", "RM1K10", "RM1K11", "RM1K12", "RM1K13", "RM1K14", "RM1K15", "RM1K16", + # RM2 + "RM2K1", "RM2K2", "RM2K3", "RM2K4", "RM2K5", "RM2K6", "RM2K7", "RM2K8", + "RM2K9", "RM2K10", "RM2K11", "RM2K12", "RM2K13", "RM2K14", "RM2K15", "RM2K16", + # RM3 + "RM3K1", "RM3K2", "RM3K3", "RM3K4", "RM3K5", "RM3K6", "RM3K7", "RM3K8", + "RM3K9", "RM3K10", "RM3K11", "RM3K12", "RM3K13", "RM3K14", "RM3K15", "RM3K16", + # RM4 + "RM4K1", "RM4K2", "RM4K3", "RM4K4", "RM4K5", "RM4K6", "RM4K7", "RM4K8", + "RM4K9", "RM4K10", "RM4K11", "RM4K12", "RM4K13", "RM4K14", "RM4K15", "RM4K16", + # RM5 + "RM5K1", "RM5K2", "RM5K3", "RM5K4", "RM5K5", "RM5K6", "RM5K7", "RM5K8", + "RM5K9", "RM5K10", "RM5K11", "RM5K12", "RM5K13", "RM5K14", "RM5K15", "RM5K16", + # RM6 + "RM6K1", "RM6K2", "RM6K3", "RM6K4", "RM6K5", "RM6K6", "RM6K7", "RM6K8", + "RM6K9", "RM6K10", "RM6K11", "RM6K12", "RM6K13", "RM6K14", "RM6K15", "RM6K16", + # RM7 + "RM7K1", "RM7K2", "RM7K3", "RM7K4", "RM7K5", "RM7K6", "RM7K7", "RM7K8", + "RM7K9", "RM7K10", "RM7K11", "RM7K12", "RM7K13", "RM7K14", "RM7K15", "RM7K16", + # RM8 + "RM8K1", "RM8K2", "RM8K3", "RM8K4", "RM8K5", "RM8K6", "RM8K7", "RM8K8", + "RM8K9", "RM8K10", "RM8K11", "RM8K12", "RM8K13", "RM8K14", "RM8K15", "RM8K16", + # RM9 + "RM9K1", "RM9K2", "RM9K3", "RM9K4", "RM9K5", "RM9K6", "RM9K7", "RM9K8", + "RM9K9", "RM9K10", "RM9K11", "RM9K12", "RM9K13", "RM9K14", "RM9K15", "RM9K16", + # U2 + "U2K1", "U2K3", "U2K4", "U2K5", "U2K6", "U2SC6", "U2SC7", "U2SC8", +] + +large_pin_list_example3 = [ + "RM1_K1", "RM1_K2", "RM1_K3", "RM1_K4", "RM1_K5", "RM1_K6", "RM1_K7", "RM1_K8", + "RM1_K9", "RM1_K10", "RM1_K11", "RM1_K12", "RM1_K13", "RM1_K14", "RM1_K15", "RM1_K16", + "RM2_K1", "RM2_K2", "RM2_K3", "RM2_K4", "RM2_K5", "RM2_K6", "RM2_K7", "RM2_K8", + "RM2_K9", "RM2_K10", "RM2_K11", "RM2_K12", "RM2_K13", "RM2_K14", "RM2_K15", "RM2_K16", +] +# fmt:on + + +def test_generate_relay_matrix_pin_list(): + pin_list = list( + generate_relay_matrix_pin_list([1, 2], prefix="RC") + + generate_relay_matrix_pin_list([1, 2, 3], prefix="RA") + + generate_relay_matrix_pin_list([1, 2], prefix="RP") + + generate_relay_matrix_pin_list([1], prefix="RH") + ) + assert pin_list == large_pin_list_example1 + + pin_list = list( + generate_relay_matrix_pin_list( + [19, 18, 17, 16, 15, 14, 13, 10, 11, 12, 1, 2, 3, 4, 5, 6, 7, 8, 9], + prefix="RM", + ) + + ("U2K1", "U2K3", "U2K4", "U2K5", "U2K6", "U2SC6", "U2SC7", "U2SC8") + ) + assert pin_list == large_pin_list_example2 + + pin_list = list(generate_relay_matrix_pin_list([1, 2], prefix="RM", sep="_")) + assert pin_list == large_pin_list_example3 + + assert "U5_K1 U5_K2 U5_K3".split() == list( + generate_pin_group(5, pin_count=3, prefix="U", sep="_") + ) + + +################################################################ +# virtual mux definitions + + +def test_VirtualMux_simple_tree_map(): + class SimpleVirtualMux(VirtualMux): + pin_list = ["b0"] + map_tree = ["sig0", "sig1"] + + mux = SimpleVirtualMux() + # the empty signal "" get automatically added + assert mux._signal_map == {"": set(), "sig0": set(), "sig1": {"b0"}} + + +def test_VirtualMux_larger_tree_map(): + class LargerVirtualMux(VirtualMux): + pin_list = ["b0", "b1", "b2"] + map_tree = ["sig0", "sig1", "sig2", "sig3", "sig4", "sig5", "sig6", "sig7"] + + mux = LargerVirtualMux() + # the empty signal "" get automatically added + assert mux._signal_map == { + "": set(), + "sig0": set(), + "sig1": {"b0"}, + "sig2": {"b1"}, + "sig3": {"b1", "b0"}, + "sig4": {"b2"}, + "sig5": {"b2", "b0"}, + "sig6": {"b2", "b1"}, + "sig7": {"b2", "b1", "b0"}, + } + + +def test_VirtualMux_nested_tree_map(): + class NestedVirtualMux(VirtualMux): + # Final mapping: + # addr signal + # -------------- + # 0 a0 + # 1 a1_b0 + # 2 a2_b0 + # 3 a3 + # 5 a1_b1 + # 6 a2_b1_c0 + # 9 a1_b2 + # 10 a2_b2 + # 14 a2_b3 + # 22 a2_b1_c1 + pin_list = ("x0", "x1", "x2", "x3", "x4") + + mux_c = ("a2_b1_c0", "a2_b1_c1") + mux_b1 = ("a1_b0", "a1_b1", "a1_b2", None) + mux_b2 = ("a2_b0", mux_c, "a2_b2", "a2_b3") + + map_tree = ("a0", mux_b1, mux_b2, "a3") + + mux = NestedVirtualMux() + # the empty signal "" get automatically added + assert mux._signal_map == { + "": set(), + "a0": set(), + "a1_b0": {"x0"}, + "a2_b0": {"x1"}, + "a3": {"x1", "x0"}, + "a1_b1": {"x0", "x2"}, + "a2_b1_c0": {"x1", "x2"}, + "a1_b2": {"x0", "x3"}, + "a2_b2": {"x1", "x3"}, + "a2_b3": {"x1", "x2", "x3"}, + "a2_b1_c1": {"x1", "x2", "x4"}, + } + + +def test_empty_signal_should_not_be_defined(): + class BadMux1(VirtualMux): + pin_list = ["x"] + map_list = [["", "x"]] + + class BadMux2(VirtualMux): + pin_list = ["x"] + # Even though this is the "correct" definition, it is still + # not allowed + map_list = [[""]] + + with pytest.raises(ValueError): + BadMux1() + + with pytest.raises(ValueError): + BadMux2() + + +def test_default_signal_on_mux_raises(): + class BadMux(VirtualMux): + pin_list = ["x"] + map_list = [["sig1", "x"]] + default_signal = "sig1" + + with pytest.raises(ValueError): + BadMux() + + +# ############################################################### +# VirtualMux Behaviour + + +class MuxA(VirtualMux): + """A mux definition used by a few tests""" + + pin_list = ("a0", "a1") + map_list = (("sig_a1", "a0", "a1"), ("sig_a2", "a1")) + + +def test_virtual_mux_basic(): + updates = [] + mux_a = MuxA(lambda x, y: updates.append((x, y))) + + # test both the __call__ and multiplex methods trigger + # the appropriate update callback. + mux_a("sig_a1") + mux_a.multiplex("sig_a2", trigger_update=False) + mux_a("") + + clear = PinSetState(off=frozenset({"a0", "a1"})) + a1 = PinSetState(on=frozenset({"a0", "a1"})) + a2 = PinSetState(on=frozenset({"a1"}), off=frozenset({"a0"})) + assert updates == [ + (PinUpdate(PinSetState(), a1), True), + (PinUpdate(PinSetState(), a2), False), + (PinUpdate(PinSetState(), clear), True), + ] + + +def test_virtual_mux_reset(): + """Check that reset sends an update that sets all pins off""" + + updates = [] + mux_a = MuxA(lambda x, y: updates.append((x, y))) + mux_a.reset() + assert updates == [ + (PinUpdate(PinSetState(), PinSetState(off=frozenset({"a1", "a0"}))), True), + ] + + +def test_virtual_mux_invalid_signal(): + """Check an invalid signal raises an error.""" + + mux_a = MuxA() + with pytest.raises(ValueError): + mux_a("invalid signal") + + +def test_invalid_signal_map_raises(): + """A virtual mux needs one of tree_map or list_map defined""" + + class BadMux(VirtualMux): + pass + + with pytest.raises(ValueError): + bm = BadMux() + + +# ############################################################### +# VirtualSwitch Behaviour + + +def test_virtual_switch(): + class Sw(VirtualSwitch): + pin_name = "x" + + updates = [] + sw = Sw(lambda x, y: updates.append((x, y))) + + sw(True) + sw(False) + sw("On", trigger_update=False) + sw("Off") + sw("") + + on = PinSetState(on=frozenset("x")) + off = PinSetState(off=frozenset("x")) + + assert updates == [ + (PinUpdate(PinSetState(), on), True), + (PinUpdate(PinSetState(), off), True), + (PinUpdate(PinSetState(), on), False), + (PinUpdate(PinSetState(), off), True), + (PinUpdate(PinSetState(), off), True), + ] + + +# ############################################################### +# RelayMatrixMux Behaviour + + +def test_relay_matrix_mux(): + class RMMux(RelayMatrixMux): + pin_list = ("a", "b") + map_list = ( + ("sig1", "a"), + ("sig2", "b"), + ) + + sig1 = PinSetState(off=frozenset("b"), on=frozenset("a")) + sig2 = PinSetState(off=frozenset("a"), on=frozenset("b")) + off = PinSetState(off=frozenset("ab")) + + updates = [] + rm = RMMux(lambda x, y: updates.append((x, y))) + rm("sig1") + rm("sig2") + + # compared to the standard mux, the setup of the PinUpdate + # sets all pins off. The standard mux does nothing for the + # setup phase. + assert updates == [ + (PinUpdate(off, sig1, 0.01), True), + (PinUpdate(off, sig2, 0.01), True), + ] + + +def test_relay_matrix_mux_no_signal_change(): + """If the new signal is as-per previous, don't open & close again""" + + class RMMux(RelayMatrixMux): + pin_list = ("a", "b") + map_list = ( + ("sig1", "a"), + ("sig2", "b"), + ) + + sig1 = PinSetState(off=frozenset("b"), on=frozenset("a")) + off = PinSetState(off=frozenset("ab")) + + updates = [] + rm = RMMux(lambda x, y: updates.append((x, y))) + rm("sig1") + rm("sig1") + + # we don't care about the first update, that just ensure the mux in + # in the right initial state. Note that there is a bit of implementation + # detail leaking here. We could also test that no pins a added to the + # pin update. I will keep as is for now, but if we change the implementation + # it is reasonable to update this test. + assert len(updates) == 2 + assert updates[1] == (PinUpdate(sig1, sig1, 0.01), True) + + +# ############################################################### +# AddressHandler + + +def test_pin_default_on_address_handler_raise(): + class BadHandler(PinValueAddressHandler): + pin_defaults = ("x",) + + with pytest.raises(ValueError): + BadHandler(("x", "y")) + + +# ############################################################### +# VirtualAddressMap + + +class TestHandler(AddressHandler): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.updates = [] + + def set_pins(self, pins): + self.updates.append(pins) + + +def HandlerXY(): + return TestHandler("xy") + + +def HandlerAB(): + return TestHandler("ab") + + +def test_virtual_address_map_init_no_active_pins(): + vam = VirtualAddressMap([HandlerAB()]) + assert vam.active_pins() == frozenset() + + +def test_virtual_address_map_single_handler(): + """ + Go through some basic address map operations with a single handler + - set pins on, check it was sent to the handler + - do a reset and check it was sent to the handler + - ensure active_pins is as expected at each step + """ + ab = HandlerAB() + vam = VirtualAddressMap([ab]) + vam.add_update(PinUpdate(final=PinSetState(on=frozenset("ab")))) + assert ab.updates[0] == frozenset("ab") + assert vam.active_pins() == frozenset("ab") + vam.reset() + assert ab.updates[1] == frozenset() + assert vam.active_pins() == frozenset() + + +def test_virtual_address_map_single_handler_delay_trigger(): + """ + Go through some basic address map operations with a single handler, + but this time, we split the operation up into steps using + trigger_update = False. + """ + ab = HandlerAB() + vam = VirtualAddressMap([ab]) + vam.add_update( + PinUpdate(final=PinSetState(on=frozenset("a"))), trigger_update=False + ) + assert len(ab.updates) == 0 + vam.add_update(PinUpdate(final=PinSetState(on=frozenset("b"))), trigger_update=True) + assert len(ab.updates) == 1 + assert ab.updates[0] == frozenset("ab") + + +def test_virtual_address_map_setup_then_final(): + """ + Go through some basic address map operations with a single handler, + but this time, we split the operation up into steps using + trigger_update = False. + """ + ab = HandlerAB() + vam = VirtualAddressMap([ab]) + vam.add_update( + PinUpdate( + setup=PinSetState(on=frozenset("b")), + final=PinSetState(on=frozenset("a")), # note we are not turning b off here + ) + ) + assert len(ab.updates) == 2 + assert ab.updates[0] == frozenset("b") + assert ab.updates[1] == frozenset("ab") + + +def test_virtual_address_map_multiple_handlers(): + ab = HandlerAB() + xy = HandlerXY() + vam = VirtualAddressMap([ab, xy]) + vam.add_update( + PinUpdate( + setup=PinSetState(on=frozenset("b")), + final=PinSetState(on=frozenset("ay")), + ), + ) + assert len(ab.updates) == 2 + assert len(xy.updates) == 2 + assert ab.updates[0] == frozenset("b") + assert ab.updates[1] == frozenset("ab") + assert xy.updates[0] == frozenset() + assert xy.updates[1] == frozenset("y") + + vam.add_update( + PinUpdate( + final=PinSetState(off=frozenset("ay"), on=frozenset("x")), + ), + ) + assert len(ab.updates) == 3 + assert len(xy.updates) == 3 + assert ab.updates[2] == frozenset("b") + assert xy.updates[2] == frozenset("x") + + +# ############################################################### +# Jig Driver + + +def test_jig_driver_with_unknown_pins(): + handler1 = AddressHandler(("x0",)) + handler2 = AddressHandler(("x2",)) + handler3 = AddressHandler(("x1",)) + + class Mux(VirtualMux): + pin_list = ("x0", "x1") # "x1" isn't in either handler + map_list = ("sig1", "x1") + + class Group(MuxGroup): + def __init__(self): + self.mux = Mux() + + # This is O.K., because all the pins are included + JigDriver(Group, [handler1, handler2, handler3]) + + with pytest.raises(ValueError): + # This should raise, because no handler implements "x1" + JigDriver(Group, [handler1, handler2]) + + +# ############################################################### +# Helper dataclasses + + +def test_pin_set_state_or(): + a = PinSetState(frozenset("ab"), frozenset("xy")) + b = PinSetState(frozenset("cd"), frozenset("x")) + assert a | b == PinSetState(frozenset("abcd"), frozenset("xy")) + + +def test_pin_update_or(): + a = PinUpdate( + PinSetState(frozenset("a"), frozenset("b")), + PinSetState(frozenset(), frozenset("yz")), + 1.0, + ) + b = PinUpdate( + PinSetState(frozenset("x"), frozenset()), + PinSetState(frozenset("c"), frozenset("d")), + 2.0, + ) + expected = PinUpdate( + PinSetState(frozenset("ax"), frozenset("b")), + PinSetState(frozenset("c"), frozenset("yzd")), + 2.0, + ) + assert expected == a | b diff --git a/tox.ini b/tox.ini index 677e1af8..70d09ea1 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py37,py38,py39,py310,py311,py312,black,mypy +envlist = py38,py39,py310,py311,py312,black,mypy isolated_build = True [testenv] @@ -30,7 +30,7 @@ markers = [testenv:mypy] basepython = python3 -deps = mypy==1.3 +deps = mypy==1.10.0 # mypy gives different results if you actually install the stuff before you check it # separate cache to stop weirdness around sharing cache with other instances of mypy commands = mypy --cache-dir="{envdir}/mypy_cache" --config-file=mypy.ini