|
| 1 | +"""Python driver for Alicat BASIS mass flow devices, using serial communication. |
| 2 | +
|
| 3 | +Distributed under the GNU General Public License v2 |
| 4 | +""" |
| 5 | +from __future__ import annotations |
| 6 | + |
| 7 | +from typing import Any |
| 8 | + |
| 9 | +from .driver import FlowMeter |
| 10 | +from .util import Client, SerialClient, _is_float |
| 11 | + |
| 12 | +GASES = ['Air', 'Ar', 'CO2', 'N2', 'O2', 'N2O', 'H2', 'He', 'CH4'] |
| 13 | + |
| 14 | +class BASISMeter(FlowMeter): |
| 15 | + """Python driver for BASIS Flow Meters. |
| 16 | +
|
| 17 | + [Reference](https://www.alicat.com/wp-content/documents/manuals/DOC-MANUAL-BASIS2.pdf). |
| 18 | +
|
| 19 | + This communicates with the flow meter over a USB or RS-232/RS-485 |
| 20 | + connection using pyserial. |
| 21 | + """ |
| 22 | + |
| 23 | + def __init__(self, address: str = '/dev/ttyUSB0', unit: str = 'A', baudrate: int = 38400, **kwargs: Any) -> None: |
| 24 | + """Connect this driver with the appropriate USB / serial port. |
| 25 | +
|
| 26 | + Args: |
| 27 | + address: The serial port or TCP address:port. Default '/dev/ttyUSB0'. |
| 28 | + unit: The Alicat-specified unit ID, A-Z. Default 'A'. |
| 29 | + baudrate: The baud rate of the device. Default 38400. |
| 30 | + """ |
| 31 | + self.hw: Client = SerialClient(address=address, baudrate=baudrate, **kwargs) |
| 32 | + |
| 33 | + self.unit = unit |
| 34 | + self.keys = ['temperature', 'mass_flow', 'totalizer', 'setpoint', |
| 35 | + 'valve_drive', 'gas'] |
| 36 | + self.open = True |
| 37 | + self.firmware: str | None = None |
| 38 | + |
| 39 | + async def __aenter__(self, *args: Any) -> BASISMeter: |
| 40 | + """Provide async enter to context manager.""" |
| 41 | + return self |
| 42 | + |
| 43 | + @classmethod |
| 44 | + async def is_connected(cls, port: str, unit: str = 'A') -> bool: |
| 45 | + """Return True if the specified port is connected to this device. |
| 46 | +
|
| 47 | + This class can be used to automatically identify ports with connected |
| 48 | + Alicats. Iterate through all connected interfaces, and use this to |
| 49 | + test. Ports that come back True should be valid addresses. |
| 50 | + """ |
| 51 | + is_device = False |
| 52 | + try: |
| 53 | + device = cls(port, unit) |
| 54 | + try: |
| 55 | + c = await device.get() |
| 56 | + assert c |
| 57 | + is_device = True |
| 58 | + finally: |
| 59 | + await device.close() |
| 60 | + except Exception: |
| 61 | + pass |
| 62 | + return is_device |
| 63 | + |
| 64 | + async def get(self) -> dict[str, Any]: |
| 65 | + """Get the current state of the flow controller. |
| 66 | +
|
| 67 | + From the Alicat mass flow controller documentation, this data is: |
| 68 | + * Pressure (normally in psia) |
| 69 | + * Temperature (normally in C) |
| 70 | + * Mass flow (in units specified at time of order) |
| 71 | + * Total flow |
| 72 | + * Currently selected gas |
| 73 | +
|
| 74 | + Args: |
| 75 | + retries: Number of times to re-attempt reading. Default 2. |
| 76 | + Returns: |
| 77 | + The state of the flow controller, as a dictionary. |
| 78 | +
|
| 79 | + """ |
| 80 | + command = f'{self.unit}' |
| 81 | + line = await self._write_and_read(command) |
| 82 | + if not line: |
| 83 | + raise OSError("Could not read values") |
| 84 | + spl = line.split() |
| 85 | + unit, values = spl[0], spl[1:] |
| 86 | + |
| 87 | + # Over range errors for mass, volume, pressure, and temperature |
| 88 | + # Explicitly silenced because I find it redundant. |
| 89 | + while values[-1].upper() in ['MOV', 'VOV', 'POV', 'TOV']: |
| 90 | + del values[-1] |
| 91 | + if unit != self.unit: |
| 92 | + raise ValueError("Flow controller unit ID mismatch.") |
| 93 | + if len(values) == 5 and len(self.keys) == 6: |
| 94 | + self.keys.remove('setpoint') |
| 95 | + return {k: (float(v) if _is_float(v) else v) |
| 96 | + for k, v in zip(self.keys, values, strict=True)} |
| 97 | + |
| 98 | + async def set_gas(self, gas: str | int) -> None: |
| 99 | + """Set the gas type. |
| 100 | +
|
| 101 | + Args: |
| 102 | + gas: The gas type, as a string or integer. Supported strings are: |
| 103 | + 'Air', 'Ar', 'CO2', 'N2', 'O2', 'N2O', 'H2', 'He', 'CH4' |
| 104 | + """ |
| 105 | + if isinstance(gas, str): |
| 106 | + if gas not in GASES: |
| 107 | + raise ValueError(f"{gas} not supported!") |
| 108 | + gas_number = GASES.index(gas) |
| 109 | + else: |
| 110 | + gas_number = gas |
| 111 | + command = f'{self.unit}GS {gas_number}' |
| 112 | + res = await self._write_and_read(command) |
| 113 | + |
| 114 | + if not res: |
| 115 | + raise OSError("Cannot set gas.") |
| 116 | + |
| 117 | + async def tare(self, duration: int = 10) -> None: |
| 118 | + """Tare flow. |
| 119 | +
|
| 120 | + Args: |
| 121 | + duration: The time in ms to collect values for calculating the tare |
| 122 | + offset. The value is a positive integer from 1-32767. |
| 123 | + """ |
| 124 | + command = f'{self.unit}V {duration}' |
| 125 | + line = await self._write_and_read(command) |
| 126 | + |
| 127 | + if line == '?': |
| 128 | + raise OSError("Unable to tare flow.") |
| 129 | + |
| 130 | + async def reset_totalizer(self) -> None: |
| 131 | + """Reset the totalizer.""" |
| 132 | + command = f'{self.unit}T' |
| 133 | + await self._write_and_read(command) |
| 134 | + |
| 135 | + async def get_firmware(self) -> str: |
| 136 | + """Get the device firmware version.""" |
| 137 | + if self.firmware is None: |
| 138 | + command = f'{self.unit}VE' |
| 139 | + self.firmware = await self._write_and_read(command) |
| 140 | + if not self.firmware: |
| 141 | + raise OSError("Unable to get firmware.") |
| 142 | + return self.firmware |
| 143 | + |
| 144 | + async def flush(self) -> None: |
| 145 | + """Read all available information. Use to clear queue.""" |
| 146 | + self._test_controller_open() |
| 147 | + await self.hw.clear() |
| 148 | + |
| 149 | + async def close(self) -> None: |
| 150 | + """Close the flow meter. Call this on program termination. |
| 151 | +
|
| 152 | + Also closes the serial port if no other FlowMeter object has |
| 153 | + a reference to the port. |
| 154 | + """ |
| 155 | + if not self.open: |
| 156 | + return |
| 157 | + await self.hw.close() |
| 158 | + self.open = False |
| 159 | + |
| 160 | + |
| 161 | +class BASISController(BASISMeter): |
| 162 | + """Python driver for Alicat Flow Controllers. |
| 163 | +
|
| 164 | + [Reference](http://www.alicat.com/products/mass-flow-meters-and- |
| 165 | + controllers/mass-flow-controllers/). |
| 166 | +
|
| 167 | + This communicates with the flow controller over a USB or RS-232/RS-485 |
| 168 | + connection using pyserial. |
| 169 | +
|
| 170 | + To set up your Alicat flow controller, power on the device and make sure |
| 171 | + that the "Input" option is set to "Serial". |
| 172 | + """ |
| 173 | + |
| 174 | + def __init__(self, address: str='/dev/ttyUSB0', unit: str='A', baudrate: int = 38400, **kwargs: Any) -> None: |
| 175 | + """Connect this driver with the appropriate USB / serial port. |
| 176 | +
|
| 177 | + Args: |
| 178 | + address: The serial port. Default '/dev/ttyUSB0'. |
| 179 | + unit: The Alicat-specified unit ID, A-Z. Default 'A'. |
| 180 | + baudrate: The baud rate of the device. Default 38400. |
| 181 | + """ |
| 182 | + BASISMeter.__init__(self, address, unit, baudrate, **kwargs) |
| 183 | + |
| 184 | + async def __aenter__(self, *args: Any) -> BASISController: |
| 185 | + """Provide async enter to context manager.""" |
| 186 | + return self |
| 187 | + |
| 188 | + async def get(self) -> dict[str, Any]: |
| 189 | + """Get the current state of the flow controller. |
| 190 | +
|
| 191 | + From the Alicat mass flow controller documentation, this data is: |
| 192 | + * Temperature (normally in C) |
| 193 | + * Mass flow (in units specified at time of order) |
| 194 | + * Flow setpoint (in units of control point) |
| 195 | + * Flow control point (either 'mass_flow' or 'HLD') |
| 196 | + * Total flow |
| 197 | + * Currently selected gas |
| 198 | +
|
| 199 | + Returns: |
| 200 | + The state of the flow controller, as a dictionary. |
| 201 | + """ |
| 202 | + cp = "mass flow" |
| 203 | + |
| 204 | + command = f'{self.unit}' |
| 205 | + line = await self._write_and_read(command) |
| 206 | + if not line: |
| 207 | + raise OSError("Could not read values") |
| 208 | + spl = line.split() |
| 209 | + unit, values = spl[0], spl[1:] |
| 210 | + |
| 211 | + # Over range errors for mass, volume, pressure, and temperature |
| 212 | + # Explicitly silenced because I find it redundant. |
| 213 | + while values[-1].upper() in ['MOV', 'VOV', 'POV', 'TOV']: |
| 214 | + del values[-1] |
| 215 | + if unit != self.unit: |
| 216 | + raise ValueError("Flow controller unit ID mismatch.") |
| 217 | + if len(values) == 5 and len(self.keys) == 6: |
| 218 | + del self.keys[-3] |
| 219 | + if values[-1] == "HLD": |
| 220 | + cp = "HLD" |
| 221 | + del values[-1] |
| 222 | + state = {k: (float(v) if _is_float(v) else v) |
| 223 | + for k, v in zip(self.keys, values, strict=False)} |
| 224 | + state['control_point'] = cp |
| 225 | + return state |
| 226 | + |
| 227 | + async def get_totalizer_batch(self) -> list[float]: |
| 228 | + """Get the totalizer batch volume. |
| 229 | +
|
| 230 | + Returns: |
| 231 | + volume: Batch volume |
| 232 | + remaining: Remaining batch volume |
| 233 | + """ |
| 234 | + remaining = await self._write_and_read(f'{self.unit}DV 64') |
| 235 | + current = await self._write_and_read(f'{self.unit}TB') |
| 236 | + if current == '?' or remaining == '?': |
| 237 | + raise OSError("Unable to read totalizer batch volume.") |
| 238 | + return [current, remaining.split(" ")[-1]] # type: ignore |
| 239 | + |
| 240 | + async def set_totalizer_batch(self, batch_volume: float) -> None: |
| 241 | + """Set the totalizer batch volume. |
| 242 | +
|
| 243 | + Args: |
| 244 | + batch_volume: Target batch volume, in same units as units |
| 245 | + on device |
| 246 | + """ |
| 247 | + command = f'{self.unit}TB {batch_volume}' |
| 248 | + line = await self._write_and_read(command) |
| 249 | + |
| 250 | + if line == '?': |
| 251 | + raise OSError("Unable to set totalizer batch volume. Check if volume is out of range for device.") |
| 252 | + |
| 253 | + async def hold(self, percentage: float) -> None: |
| 254 | + """Override command to issue a valve hold at a certain percentage of full drive. |
| 255 | +
|
| 256 | + Args: |
| 257 | + percentage : Percentage of full valve drive |
| 258 | + """ |
| 259 | + command = f'{self.unit}HPUR {percentage}' |
| 260 | + await self._write_and_read(command) |
| 261 | + |
| 262 | + async def cancel_hold(self) -> None: |
| 263 | + """Cancel valve hold.""" |
| 264 | + command = f'{self.unit}C' |
| 265 | + await self._write_and_read(command) |
| 266 | + |
| 267 | + async def get_pid(self) -> dict[str, str]: |
| 268 | + """Read the current PID values on the controller. |
| 269 | +
|
| 270 | + Values include the P value and I value. |
| 271 | + Values returned as a dictionary. |
| 272 | + """ |
| 273 | + self.pid_keys = ['P', 'I'] |
| 274 | + |
| 275 | + command = f'{self.unit}LCG' |
| 276 | + line = await self._write_and_read(command) |
| 277 | + if not line: |
| 278 | + raise OSError("Could not get PID values.") |
| 279 | + spl = line.split() |
| 280 | + return dict(zip(self.pid_keys, spl[1:], strict=False)) |
| 281 | + |
| 282 | + async def set_pid(self, p: int, i: int) -> None: |
| 283 | + """Set specified PID parameters. |
| 284 | +
|
| 285 | + Args: |
| 286 | + p: Proportional gain |
| 287 | + i: Integral gain. Only used in PD2I loop type. |
| 288 | + """ |
| 289 | + command = f'{self.unit}LCG {p} {i}' |
| 290 | + await self._write_and_read(command) |
| 291 | + |
| 292 | + async def set_flow_rate(self, flowrate: float) -> None: |
| 293 | + """Set the target setpoint.""" |
| 294 | + command = f'{self.unit}S {flowrate}' |
| 295 | + line = await self._write_and_read(command) |
| 296 | + if not line: |
| 297 | + raise OSError("Could not set setpoint.") |
| 298 | + try: |
| 299 | + current = float(line.split()[4]) |
| 300 | + except IndexError: |
| 301 | + raise OSError("Could not set setpoint.") from None |
| 302 | + if current is not None and abs(current - flowrate) > 0.1: |
| 303 | + raise OSError("Could not set setpoint.") |
0 commit comments