diff --git a/test.json b/test.json new file mode 100644 index 00000000..660a457f --- /dev/null +++ b/test.json @@ -0,0 +1,21 @@ +{ + "nodes": [ + { + "id": "xyz", + "name": "三轴", + "children": [], + "parent": "laiyu", + "type": "device", + "class": "SynthonXFlowV2", + "position": { + "x": 0, + "y": 0, + "z": 0 + }, + "config": { + "port": "COM5" + } + } + ], + "links": [] +} \ No newline at end of file diff --git a/unilabos/devices/SynthonX/SynthonX.py b/unilabos/devices/SynthonX/SynthonX.py new file mode 100644 index 00000000..1c07da16 --- /dev/null +++ b/unilabos/devices/SynthonX/SynthonX.py @@ -0,0 +1,904 @@ +""" +XYZ平台和移液枪的控制脚本 +所有设备共用一个 RS485/串口。 +XYZ 步进驱动器的 RS485 地址:X=1,Y=2,Z=3。 +SOPA 移液器的 RS485 地址:4。 +默认串口:COM3,波特率:115200。 +忽略CRC报错选项:在XYZ运动过程中启用,以防止偶发串扰导致的CRC校验失败中断运动。 +可交互式运行 +SynthonX 团队 +""" + +import sys +import time +import json +import threading +import logging +from dataclasses import dataclass, asdict +from enum import Enum, IntEnum +from typing import Optional, Dict, List + +try: + import serial +except Exception as e: + raise RuntimeError("Please install pyserial: pip install pyserial") from e + + +# ------------------------------- Logging ------------------------------- +logger = logging.getLogger("unified_xyz_yyq") +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") + + +# =========================== Shared RS485 Bus ========================== +class SharedRS485Bus: + """One serial port for everything + a global lock.""" + + def __init__(self, port: str = "COM3", baudrate: int = 115200, timeout: float = 0.2): + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self.serial = None + self.lock = threading.Lock() + + def open(self): + if self.serial and self.serial.is_open: + return True + self.serial = serial.Serial( + port=self.port, baudrate=self.baudrate, + bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, timeout=self.timeout + ) + logger.info(f"Opened RS485 bus on {self.port}") + return True + + def close(self): + if self.serial and self.serial.is_open: + self.serial.close() + logger.info("Closed RS485 bus") + + def reset_input(self): + if self.serial: + self.serial.reset_input_buffer() + + def write(self, data: bytes): + self.serial.write(data) + + def read(self, n: int = 256) -> bytes: + return self.serial.read(n) + + def read_exact(self, n: int, overall_timeout: float = 0.3) -> bytes: + """Read exactly n bytes within overall_timeout; return b'' if timeout.""" + if n <= 0: + return b"" + buf = b"" + deadline = time.time() + overall_timeout + while len(buf) < n: + if time.time() > deadline: + break + need = n - len(buf) + chunk = self.read(need) + if chunk: + buf += chunk + else: + time.sleep(0.001) + return buf + + +# ======================= XYZ: Low-level Modbus ======================== +class MotorAxis(Enum): + X = 1 + Y = 2 + Z = 3 + + +class MotorStatus(Enum): + STANDBY = 0x0000 + RUNNING = 0x0001 + COLLISION_STOP = 0x0002 + FORWARD_LIMIT_STOP = 0x0003 + REVERSE_LIMIT_STOP = 0x0004 + + +class ModbusException(Exception): + pass + + +@dataclass +class MotorPosition: + steps: int + speed: int + current: int + status: MotorStatus + + +class XYZModbus: + """Minimal Modbus RTU helper bound to the shared bus. + + 简化容错:ignore_crc_error=True 时,CRC 重试耗尽后直接忽略继续,不再统计次数。 + """ + + REG_STATUS = 0x00 + REG_POSITION_HIGH = 0x01 + REG_POSITION_LOW = 0x02 + REG_ACTUAL_SPEED = 0x03 + REG_EMERGENCY_STOP = 0x04 + REG_CURRENT = 0x05 + REG_ENABLE = 0x06 + + # position mode + REG_TARGET_POSITION_HIGH = 0x10 + REG_TARGET_POSITION_LOW = 0x11 + REG_POSITION_SPEED = 0x13 + REG_POSITION_ACCELERATION = 0x14 + REG_POSITION_PRECISION = 0x15 + + # speed mode + REG_SPEED_MODE_SPEED = 0x61 + REG_SPEED_MODE_ACCELERATION = 0x62 + + def __init__(self, bus: SharedRS485Bus, ignore_crc_error: bool = False): + self.bus = bus + self.ignore_crc_error = ignore_crc_error + + def set_ignore_crc(self, flag: bool): + self.ignore_crc_error = bool(flag) + + @staticmethod + def _crc16(data: bytes) -> bytes: + crc = 0xFFFF + for b in data: + crc ^= b + for _ in range(8): + if crc & 0x0001: + crc >>= 1 + crc ^= 0xA001 + else: + crc >>= 1 + return crc.to_bytes(2, "little") + + def _xfer(self, slave: int, payload: bytes, retries: int = 3) -> bytes: + req = bytes([slave]) + payload + frame = req + self._crc16(req) + fn_req = payload[0] + + # 不做统计,只在最终失败时可选择忽略返回 + for attempt in range(1, retries + 1): + with self.bus.lock: + if not self.bus.serial or not self.bus.serial.is_open: + raise ModbusException("Bus not open") + + self.bus.reset_input() + self.bus.write(frame) + + time.sleep(0.010) + + try: + base = 0.30 + 0.15*(attempt-1) + header = self.bus.read_exact(2, overall_timeout=base) + if len(header) < 2: + raise ModbusException("No response") + + addr, fn = header[0], header[1] + if addr != slave: + # 把这一帧当成串扰/回波,丢弃后继续本次尝试 + time.sleep(0.005) + continue + + if (fn & 0x80) != 0: + rest = self.bus.read_exact(3, overall_timeout=base) + resp = header + rest + if len(rest) < 3: + raise ModbusException("Short exception response") + if resp[-2:] != self._crc16(resp[:-2]): + logger.warning(f"CRC mismatch (exception response) attempt {attempt}/{retries} slave={slave} fn=0x{fn_req:02X}") + if attempt >= retries: + if self.ignore_crc_error: + logger.error("CRC mismatch(异常帧)重试耗尽已忽略 (风险:异常码可能失真)") + return resp # 返回未校验异常帧 + raise ModbusException("CRC mismatch (exception)") + time.sleep(0.005) + continue + ex_code = resp[2] + raise ModbusException(f"Modbus exception: 0x{ex_code:02X}") + + if fn == 0x03: + bc_b = self.bus.read_exact(1, overall_timeout=base) + if len(bc_b) < 1: + raise ModbusException("Short response (no byte count)") + bc = bc_b[0] + data_crc = self.bus.read_exact(bc + 2, overall_timeout=base + 0.20) + resp = header + bc_b + data_crc + if len(data_crc) < bc + 2: + raise ModbusException("Short response (payload)") + elif fn in (0x06, 0x10): + rest = self.bus.read_exact(6, overall_timeout=base + 0.20) + resp = header + rest + if len(rest) < 6: + raise ModbusException("Short response") + else: + tail = self.bus.read_exact(254, overall_timeout=base + 0.30) + resp = header + tail + if len(resp) < 3: + raise ModbusException("Short response") + + if resp[-2:] != self._crc16(resp[:-2]): + logger.warning(f"CRC mismatch (attempt {attempt}/{retries}) slave={slave} fn=0x{fn_req:02X}") + if attempt >= retries: + if self.ignore_crc_error: + logger.error("CRC mismatch 重试耗尽已忽略 (风险:数据未校验)") + return resp # 直接返回未校验帧 + raise ModbusException("CRC mismatch") + time.sleep(0.005) + continue + + if resp[1] != fn_req: + raise ModbusException(f"Unexpected function: {resp[1]:02X} (!={fn_req:02X})") + + return resp # 成功 + + except ModbusException: + if attempt >= retries: + # 已在 CRC 分支处理 ignore 情况;这里直接抛出其他类型异常 + raise + time.sleep(0.01) + + def read_regs(self, slave: int, addr: int, count: int) -> List[int]: + fn = 0x03 + payload = bytes([fn]) + addr.to_bytes(2, "big") + count.to_bytes(2, "big") + resp = self._xfer(slave, payload) + byte_count = resp[2] + vals = [] + for i in range(0, byte_count, 2): + vals.append(int.from_bytes(resp[3 + i:5 + i], "big")) + return vals + + def write_reg(self, slave: int, addr: int, val: int) -> bool: + fn = 0x06 + payload = bytes([fn]) + addr.to_bytes(2, "big") + val.to_bytes(2, "big") + try: + resp = self._xfer(slave, payload) + except ModbusException as e: + logger.warning(f"write_reg: ModbusException slave={slave} addr={addr} val={val}: {e}") + return False + except Exception as e: + logger.error(f"write_reg: unexpected error: {e}") + return False + + if not resp: + logger.warning(f"write_reg: no response slave={slave} addr={addr}") + return False + return len(resp) >= 8 and resp[1] == fn + + def write_regs(self, slave: int, start: int, values: List[int]) -> bool: + """ + 写多个寄存器(含保护):当底层无响应或异常时返回 False,不再返回 None。 + """ + fn = 0x10 + bc = len(values) * 2 + payload = bytes([fn]) + start.to_bytes(2, "big") + len(values).to_bytes(2, "big") + bytes([bc]) + for v in values: + payload += v.to_bytes(2, "big") + + try: + resp = self._xfer(slave, payload) + except ModbusException as e: + logger.warning(f"write_regs: ModbusException slave={slave} start={start} vals={values}: {e}") + return False + except Exception as e: + logger.error(f"write_regs: unexpected error: {e}") + return False + + if not resp: + logger.warning(f"write_regs: no response slave={slave} start={start}") + return False + return len(resp) >= 8 and resp[1] == fn + + +# ===================== XYZ: High-level Controller ===================== +@dataclass +class MachineConfig: + steps_per_mm_x: float = 204.8 + steps_per_mm_y: float = 204.8 + steps_per_mm_z: float = 3276.8 + max_travel_x: float = 340.0 + max_travel_y: float = 250.0 + max_travel_z: float = 250.0 + safe_z_height: float = 5.0 + z_approach_height: float = 5.0 + homing_speed: int = 100 + homing_timeout: float = 30.0 + safe_clearance: float = 1.0 + position_stable_time: float = 3.0 + position_check_interval: float = 0.2 + default_speed: int = 100 + default_acceleration: int = 500 + homing_speed_x: Optional[int] = None + homing_speed_y: Optional[int] = None + homing_speed_z: Optional[int] = None + homing_accel_x: Optional[int] = None + homing_accel_y: Optional[int] = None + homing_accel_z: Optional[int] = None + + +@dataclass +class CoordinateOrigin: + machine_origin_steps: Dict[str, int] = None + work_origin_steps: Dict[str, int] = None + is_homed: bool = False + + def __post_init__(self): + if self.machine_origin_steps is None: + self.machine_origin_steps = {"x": 0, "y": 0, "z": 0} + if self.work_origin_steps is None: + self.work_origin_steps = {"x": 0, "y": 0, "z": 0} + + +class CoordinateSystemError(Exception): + pass + + +class SharedXYZController: + """XYZ controller using the shared bus and Modbus helper.""" + + def __init__(self, bus: SharedRS485Bus, cfg: Optional[MachineConfig] = None): + self.bus = bus + self.mb = XYZModbus(bus) + self.cfg = cfg or MachineConfig() + self.origin = CoordinateOrigin() + self.addr = {MotorAxis.X: 1, MotorAxis.Y: 2, MotorAxis.Z: 3} # keep 1/2/3 + + def mm_to_steps(self, axis: MotorAxis, mm: float) -> int: + if axis == MotorAxis.X: + return int(mm * self.cfg.steps_per_mm_x) + if axis == MotorAxis.Y: + return int(mm * self.cfg.steps_per_mm_y) + if axis == MotorAxis.Z: + return int(mm * self.cfg.steps_per_mm_z) + raise ValueError(axis) + + def steps_to_mm(self, axis: MotorAxis, steps: int) -> float: + if axis == MotorAxis.X: + return steps / self.cfg.steps_per_mm_x + if axis == MotorAxis.Y: + return steps / self.cfg.steps_per_mm_y + if axis == MotorAxis.Z: + return steps / self.cfg.steps_per_mm_z + raise ValueError(axis) + + def enable(self, axis: MotorAxis, on: bool = True) -> bool: + return self.mb.write_reg(self.addr[axis], XYZModbus.REG_ENABLE, 0x0001 if on else 0x0000) + + def emergency_stop(self, axis: MotorAxis) -> bool: + return self.mb.write_reg(self.addr[axis], XYZModbus.REG_EMERGENCY_STOP, 0x0000) + + def get_motor_status(self, axis: MotorAxis) -> MotorPosition: + a = self.addr[axis] + v = self.mb.read_regs(a, XYZModbus.REG_STATUS, 6) + status = MotorStatus(v[0]) + pos = (v[1] << 16) | v[2] + if pos > 0x7FFFFFFF: + pos -= 0x100000000 + speed = v[3] + current = v[5] + return MotorPosition(pos, speed, current, status) + + def move_to_steps(self, axis: MotorAxis, steps: int, speed_rpm: int = 1000, + accel: int = 1000, precision: int = 100) -> bool: + a = self.addr[axis] + if steps < 0: + steps = (steps + 0x100000000) & 0xFFFFFFFF + hi = (steps >> 16) & 0xFFFF + lo = steps & 0xFFFF + ok = self.mb.write_regs(a, XYZModbus.REG_TARGET_POSITION_HIGH, [ + hi, lo, speed_rpm, accel, precision + ]) + return ok + + def wait_for_completion(self, axis: MotorAxis, timeout: float = 20.0) -> bool: + t0 = time.time() + misses = 0 + while time.time() - t0 < timeout: + try: + st = self.get_motor_status(axis) + misses = 0 + if st.status == MotorStatus.STANDBY: + return True + except ModbusException: + misses += 1 + if misses >= 10: + raise + time.sleep(0.05) + return False + + def get_homing_speed(self, axis: MotorAxis) -> int: + if axis == MotorAxis.X and self.cfg.homing_speed_x is not None: + return self.cfg.homing_speed_x + if axis == MotorAxis.Y and self.cfg.homing_speed_y is not None: + return self.cfg.homing_speed_y + if axis == MotorAxis.Z and self.cfg.homing_speed_z is not None: + return self.cfg.homing_speed_z + return self.cfg.homing_speed + + def get_homing_accel(self, axis: MotorAxis) -> int: + if axis == MotorAxis.X and self.cfg.homing_accel_x is not None: + return self.cfg.homing_accel_x + if axis == MotorAxis.Y and self.cfg.homing_accel_y is not None: + return self.cfg.homing_accel_y + if axis == MotorAxis.Z and self.cfg.homing_accel_z is not None: + return self.cfg.homing_accel_z + return 500 + + def home_axis(self, axis: MotorAxis, direction: int = -1) -> bool: + a = self.addr[axis] + self.enable(axis, True) + speed = self.get_homing_speed(axis) * direction + accel = self.get_homing_accel(axis) + if not self.mb.write_reg(a, XYZModbus.REG_SPEED_MODE_ACCELERATION, accel & 0xFFFF): + return False + if not self.mb.write_reg(a, XYZModbus.REG_SPEED_MODE_SPEED, speed & 0xFFFF): + return False + last = None + stable_since = None + t0 = time.time() + while time.time() - t0 < self.cfg.homing_timeout: + st = self.get_motor_status(axis) + pos = st.steps + if (direction < 0 and st.status == MotorStatus.REVERSE_LIMIT_STOP) or \ + (direction > 0 and st.status == MotorStatus.FORWARD_LIMIT_STOP): + self.emergency_stop(axis) + final = pos + break + if last is not None: + if abs(pos - last) <= 1: + stable_since = stable_since or time.time() + if time.time() - stable_since >= self.cfg.position_stable_time: + self.emergency_stop(axis) + final = pos + break + else: + stable_since = None + last = pos + time.sleep(self.cfg.position_check_interval) + else: + self.emergency_stop(axis) + final = self.get_motor_status(axis).steps + + clear_steps = self.mm_to_steps(axis, self.cfg.safe_clearance) + safe_pos = final + (-direction) * clear_steps + self.move_to_steps(axis, safe_pos, self.cfg.default_speed, self.cfg.default_acceleration) + self.wait_for_completion(axis, 10.0) + self.origin.machine_origin_steps[axis.name.lower()] = final + return True + + def home_all(self) -> bool: + for ax in (MotorAxis.Z, MotorAxis.X, MotorAxis.Y): + if not self.home_axis(ax, -1): + return False + time.sleep(0.3) + self.origin.is_homed = True + # 自动将当前回零后位置作为工作坐标系原点。 + # 这样后续工作系(0,0,0)即对应回零后当前位置。 + try: + self.set_work_origin_here() + except Exception: + # 若失败不影响 homing 结果,只在需要时手动再设。 + pass + return True + + def set_work_origin_here(self) -> bool: + pos = { + 'x': self.get_motor_status(MotorAxis.X).steps, + 'y': self.get_motor_status(MotorAxis.Y).steps, + 'z': self.get_motor_status(MotorAxis.Z).steps, + } + self.origin.work_origin_steps = pos + return True + + def work_to_machine_steps(self, x=None, y=None, z=None) -> Dict[str, int]: + out = {} + if x is not None: + out['x'] = self.origin.work_origin_steps['x'] + self.mm_to_steps(MotorAxis.X, x) + if y is not None: + out['y'] = self.origin.work_origin_steps['y'] + self.mm_to_steps(MotorAxis.Y, y) + if z is not None: + out['z'] = self.origin.work_origin_steps['z'] + self.mm_to_steps(MotorAxis.Z, z) + return out + + def check_limits(self, x=None, y=None, z=None): + if x is not None and (x < 0 or x > self.cfg.max_travel_x): + raise CoordinateSystemError(f"X out of range: {x}") + if y is not None and (y < 0 or y > self.cfg.max_travel_y): + raise CoordinateSystemError(f"Y out of range: {y}") + if z is not None and (z < 0 or z > self.cfg.max_travel_z): + raise CoordinateSystemError(f"Z out of range: {z}") + + def move_to_work_safe(self, x=None, y=None, z=None, speed=None, accel=None) -> bool: + self.check_limits(x, y, z) + speed = speed or self.cfg.default_speed + accel = accel or self.cfg.default_acceleration + if z is not None: + safe_steps = self.work_to_machine_steps(z=self.cfg.safe_z_height)['z'] + self.move_to_steps(MotorAxis.Z, safe_steps, speed, accel) + self.wait_for_completion(MotorAxis.Z, 10.0) + if x is not None: + self.move_to_steps(MotorAxis.X, self.work_to_machine_steps(x=x)['x'], speed, accel) + if y is not None: + self.move_to_steps(MotorAxis.Y, self.work_to_machine_steps(y=y)['y'], speed, accel) + if x is not None: + self.wait_for_completion(MotorAxis.X, 20.0) + if y is not None: + self.wait_for_completion(MotorAxis.Y, 20.0) + if z is not None: + self.move_to_steps(MotorAxis.Z, self.work_to_machine_steps(z=z)['z'], speed, accel) + self.wait_for_completion(MotorAxis.Z, 20.0) + return True + + def move_to_work_safe(self, x=None, y=None, z=None, speed=None, accel=None) -> bool: + """ + 安全移动到工作坐标 (X/Y/Z, mm)。在本次运动过程中临时忽略 Modbus 的 CRC mismatch, + 以避免因为偶发串扰导致的 CRC 校验失败而中断运动;运动结束后恢复原有设置。 + """ + # 1) 限位与默认参数 + self.check_limits(x, y, z) + speed = speed or self.cfg.default_speed + accel = accel or self.cfg.default_acceleration + + # 2) 临时开启“忽略 CRC 错误” + prev_ignore = getattr(self.mb, "ignore_crc_error", False) + try: + self.mb.set_ignore_crc(True) + + # 3) 先抬到安全 Z(若给了 z 目标) + if z is not None: + safe_steps = self.work_to_machine_steps(z=self.cfg.safe_z_height)['z'] + self.move_to_steps(MotorAxis.Z, safe_steps, speed, accel) + self.wait_for_completion(MotorAxis.Z, 10.0) + + # 4) 下发 XY 目标 + if x is not None: + self.move_to_steps(MotorAxis.X, self.work_to_machine_steps(x=x)['x'], speed, accel) + if y is not None: + self.move_to_steps(MotorAxis.Y, self.work_to_machine_steps(y=y)['y'], speed, accel) + + # 5) 等待 XY 完成 + if x is not None: + self.wait_for_completion(MotorAxis.X, 20.0) + if y is not None: + self.wait_for_completion(MotorAxis.Y, 20.0) + + # 6) 最后降到目标 Z + if z is not None: + self.move_to_steps(MotorAxis.Z, self.work_to_machine_steps(z=z)['z'], speed, accel) + self.wait_for_completion(MotorAxis.Z, 20.0) + + return True + finally: + # 7) 恢复之前的 CRC 忽略开关 + try: + self.mb.set_ignore_crc(prev_ignore) + except Exception: + pass + + + + def move_rel_z_mm(self, dz: float, speed=1000, accel=1000) -> bool: + cur = self.get_motor_status(MotorAxis.Z).steps + tgt = cur + self.mm_to_steps(MotorAxis.Z, dz) + self.move_to_steps(MotorAxis.Z, tgt, speed, accel, 50) + return self.wait_for_completion(MotorAxis.Z, 10.0) + + def machine_steps_to_work_mm(self, x=None, y=None, z=None): + out = {} + if x is not None: + dx = int(x) - int(self.origin.work_origin_steps['x']) + out['x'] = self.steps_to_mm(MotorAxis.X, dx) + if y is not None: + dy = int(y) - int(self.origin.work_origin_steps['y']) + out['y'] = self.steps_to_mm(MotorAxis.Y, dy) + if z is not None: + dz = int(z) - int(self.origin.work_origin_steps['z']) + out['z'] = self.steps_to_mm(MotorAxis.Z, dz) + return out + + def machine_to_work_mm(self, x=None, y=None, z=None): + out = {} + if x is not None: + xs = self.mm_to_steps(MotorAxis.X, x) - self.origin.work_origin_steps['x'] + out['x'] = self.steps_to_mm(MotorAxis.X, xs) + if y is not None: + ys = self.mm_to_steps(MotorAxis.Y, y) - self.origin.work_origin_steps['y'] + out['y'] = self.steps_to_mm(MotorAxis.Y, ys) + if z is not None: + zs = self.mm_to_steps(MotorAxis.Z, z) - self.origin.work_origin_steps['z'] + out['z'] = self.steps_to_mm(MotorAxis.Z, zs) + return out + + def get_work_position_mm(self): + sx = self.get_motor_status(MotorAxis.X).steps + sy = self.get_motor_status(MotorAxis.Y).steps + sz = self.get_motor_status(MotorAxis.Z).steps + return self.machine_steps_to_work_mm(x=sx, y=sy, z=sz) + + +# ====================== YYQ-style SOPA Pipette ======================== +@dataclass +class SOPAConfig: + address: int = 4 # 固定为 4 + timeout: float = 2.0 + + +class SOPAPipetteYYQ: + """ + A minimal SOPA pipette driver adapted from YYQ.py to use the shared bus. + Kept functions: initialize, eject_tip, aspirate, dispense. + Command packing follows YYQ: '/{addr}{CMD}E' + checksum(sum&0xFF). + NOTE: We intentionally keep the 'HE' and 'RE' forms for compatibility. + """ + + def __init__(self, bus: SharedRS485Bus, config: SOPAConfig = SOPAConfig()): + self.bus = bus + self.config = config + self.is_initialized = False + + # ---- low-level helpers + def _send_command(self, cmd: str): + address = str(self.config.address) + full_cmd = f"/{address}{cmd}E".encode("ascii") + checksum = bytes([sum(full_cmd) & 0xFF]) + payload = full_cmd + checksum + with self.bus.lock: + self.bus.reset_input() + self.bus.write(payload) + logger.debug(f"[YYQ] TX: {payload!r}") + # simple pacing, keep same semantics as YYQ example + time.sleep(0.1) + + def _read_response(self) -> str: + # very permissive read, mirroring YYQ approach + time.sleep(0.2) + data = b"" + with self.bus.lock: + if self.bus.serial.in_waiting: + data = self.bus.serial.read_all() + txt = data.decode(errors="ignore") + if txt: + logger.debug(f"[YYQ] RX: {txt!r}") + return txt + + # ---- the four functions + def initialize(self) -> bool: + try: + logger.info("🚀 初始化移液枪中(YYQ样式)...") + # YYQ used "HE" (so final becomes '/4HEE' + checksum). Keep it as-is. + self._send_command("HE") + time.sleep(10) + self.is_initialized = True + logger.info("✅ 初始化完成") + return True + except Exception as e: + logger.error(f"初始化失败: {e}") + return False + + def eject_tip(self): + try: + # YYQ used "RE" + self._send_command("RE") + time.sleep(1) + logger.info("🗑️ 枪头已弹出") + except Exception as e: + logger.error(f"弹出枪头失败: {e}") + + def aspirate(self, volume_uL: float): + try: + vol = int(volume_uL) + logger.info(f"💧 吸液 {vol} µL...") + self._send_command(f"P{vol}") + time.sleep(max(0.2, vol / 200.0)) + logger.info("✅ 吸液完成") + except Exception as e: + logger.error(f"吸液失败: {e}") + + def dispense(self, volume_uL: float): + try: + vol = int(volume_uL) + logger.info(f"💦 排液 {vol} µL...") + self._send_command(f"D{vol}") + time.sleep(max(0.2, vol / 200.0)) + logger.info("✅ 排液完成") + except Exception as e: + logger.error(f"排液失败: {e}") + + +# ======================== Liquid Station (ALL) ======================== +@dataclass +class LiquidParams: + delay_after_aspirate: float = 0.5 + delay_after_dispense: float = 0.5 + + +class LiquidStation: + """Bring XYZ and Pipette together on one port, with a CLI.""" + + def __init__(self, port: str = "COM3", baudrate: int = 115200): + self.bus = SharedRS485Bus(port, baudrate) + self.points_file = "points.json" + self.params = LiquidParams() + self._points = {} + self.xyz = None + self.pip = None + + def connect(self): + self.bus.open() + self.xyz = SharedXYZController(self.bus) + self.pip = SOPAPipetteYYQ(self.bus) + logger.info("Controllers are ready (shared bus).") + + def disconnect(self): + self.bus.close() + + # ---- points DB + def load_points(self): + try: + with open(self.points_file, "r", encoding="utf-8") as f: + self._points = json.load(f) + except Exception: + self._points = {} + + def save_points(self): + with open(self.points_file, "w", encoding="utf-8") as f: + json.dump(self._points, f, indent=2, ensure_ascii=False) + + # ---- station ops + def home_all(self): + return self.xyz.home_all() + + def set_work_origin_here(self): + return self.xyz.set_work_origin_here() + + def move_to(self, x=None, y=None, z=None, speed=None, accel=None): + return self.xyz.move_to_work_safe(x, y, z, speed, accel) + + def move_to_direct(self, x=None, y=None, z=None, speed=None, accel=None, z_order: str = "auto"): + """不抬Z直接移动到工作坐标。z_order可为 first/last/auto""" + return self.xyz.move_to_work_direct(x=x, y=y, z=z, speed=speed, accel=accel, z_order=z_order) + + def move_rel_z(self, dz_mm: float): + return self.xyz.move_rel_z_mm(dz_mm, 1000, 1000) + + # ---- pipette (only 4 functions) + def pipette_init(self): + return self.pip.initialize() + + def eject_tip(self): + return self.pip.eject_tip() + + def aspirate(self, vol_ul: float): + return self.pip.aspirate(vol_ul) + + def dispense(self, vol_ul: float): + return self.pip.dispense(vol_ul) + + def estop_all(self): + for ax in (MotorAxis.X, MotorAxis.Y, MotorAxis.Z): + try: + self.xyz.emergency_stop(ax) + except Exception: + pass + logger.warning("Emergency stop requested") + + +# ================================ CLI ================================= +def main(): + print("\n=== Unified XYZ + YYQ SOPA (Single-Port) ===") + port = input("串口端口 (默认 COM3): ").strip() or "COM3" + station = LiquidStation(port) + station.connect() + station.load_points() + + init_pip = input("是否初始化移液器? (y/N): ").strip().lower() in ("y", "yes") + if init_pip: + if station.pipette_init(): + print("移液器初始化完成。") + else: + print("移液器初始化失败。") + + while True: + print("\n" + "=" * 50) + print("1) 全轴回零(Z→X→Y)") + print("2) 设定当前位置为工作原点") + print("3) 安全移动到点 (X/Y/Z,mm)") + print("4) Z 轴相对移动 (mm)") + print("5) 保存/前往点位") + print("6) 移液:初始化 / 弹出枪头 / 吸液 / 排液") + print("7) 直接移动(不抬Z) X/Y/Z + 顺序(first/last/auto)") + print("99) 紧急停止") + print("0) 退出") + choice = input("选择: ").strip() + + if choice == "0": + break + + elif choice == "1": + print("回零中…") + print("成功" if station.home_all() else "失败") + + elif choice == "2": + print("设定工作原点…") + print("成功" if station.set_work_origin_here() else "失败") + + elif choice == "3": + x = input("X(mm, 空=跳过): ").strip() + y = input("Y(mm, 空=跳过): ").strip() + z = input("Z(mm, 空=跳过): ").strip() + x = float(x) if x else None + y = float(y) if y else None + z = float(z) if z else None + ok = station.move_to(x, y, z) + print("到位" if ok else "失败") + + elif choice == "4": + dz = float(input("Z 相对位移(mm,正=下降): ").strip()) + ok = station.move_rel_z(dz) + print("完成" if ok else "失败") + + elif choice == "5": + sub = input("(a)保存点 (b)前往点: ").strip().lower() + if sub == "a": + name = input("点名: ").strip() + x = float(input("X(mm): ").strip()) + y = float(input("Y(mm): ").strip()) + z = float(input("Z(mm): ").strip()) + station._points[name] = {"x": x, "y": y, "z": z} + station.save_points() + print("已保存") + else: + name = input("点名: ").strip() + pt = station._points.get(name) + if not pt: + print("未找到该点") + else: + ok = station.move_to(pt["x"], pt["y"], pt["z"]) + print("到位" if ok else "失败") + + elif choice == "6": + sub = input("(a)初始化 (b)弹出枪头 (c)吸液 (d)排液: ").strip().lower() + if sub == "a": + print("初始化…") + print("完成" if station.pipette_init() else "失败") + elif sub == "b": + print("弹出枪头…") + station.eject_tip() + print("完成") + elif sub == "c": + vol = float(input("吸液体积(µL): ").strip()) + station.aspirate(vol) + elif sub == "d": + vol = float(input("排液体积(µL): ").strip()) + station.dispense(vol) + else: + print("无效子选项") + + elif choice == "7": + x = input("X(mm, 空=跳过): ").strip() + y = input("Y(mm, 空=跳过): ").strip() + z = input("Z(mm, 空=跳过): ").strip() + z_order = input("Z顺序(first/last/auto, 默认auto): ").strip().lower() or "auto" + x = float(x) if x else None + y = float(y) if y else None + z = float(z) if z else None + ok = station.move_to_direct(x=x, y=y, z=z, z_order=z_order) + print("到位" if ok else "失败") + + elif choice == "99": + station.estop_all() + print("已急停") + + else: + print("无效选项") + + station.disconnect() + print("Bye.") + + +if __name__ == "__main__": + main() diff --git a/unilabos/devices/SynthonX/SynthonX_flow_v3.py b/unilabos/devices/SynthonX/SynthonX_flow_v3.py new file mode 100644 index 00000000..2653840f --- /dev/null +++ b/unilabos/devices/SynthonX/SynthonX_flow_v3.py @@ -0,0 +1,634 @@ +""" +SynthonX_flow_v3.py +- 点到点移动:调用 SynthonX 的 move_to_work_safe(通过 SynthonX_gui.Station 封装) +- 下探/上升/小幅相对位移:调用 SynthonX_gui 的 move_relative_direct + +功能: +1) 系统初始化 +2) 取枪头 / 放枪头(丢到 C原位+4排;例如 C1 -> D49,找不到D49则尝试C49) +3) 转移液体(A > B,或 B > D;可多目标,后续目标用“相对位移”) +4) 过滤(D9-16 取推杆;D1-8 过滤;最后弹出) +5) 推动(D25:下探 → +Y推进 → 抬起 → 丢枪头) +6) 装核磁(D1-8 源 -> D17-24 NMR管,放下全部液体 → 丢枪头) + +注意: +- 所有点位均从 points_gui.json 读取(工作坐标,单位 mm;需包含 x/y/z 字段)。 +- 所有“下探/上升/相对XY位移”均使用 Station.move_relative_direct(SynthonX_gui 提供)。 +- 所有“到某个点位(含上方高度)”均使用 Station.move_to_work_safe(SynthonX 提供)。 + +SynthonX团队 +""" + +from __future__ import annotations +import os +import json +import time +import re +from dataclasses import dataclass +from typing import Dict, List, Iterable, Optional, Tuple, Union +from contextlib import contextmanager +from .SynthonX_gui import Station +from .SynthonX_reactor import RelayController +import logging +logging.getLogger("unified_xyz_yyq").setLevel(logging.ERROR) # 只看错误,不显示 CRC mismatch 的 warning + +class _HideCRCFilter(logging.Filter): + def filter(self, record): + return "CRC mismatch" not in record.getMessage() + +_crc_logger = logging.getLogger("unified_xyz_yyq") +_crc_logger.addFilter(_HideCRCFilter()) +_crc_logger.propagate = False # 防止向上层冒泡 + +# -------------------- 小工具 -------------------- + +def _require(cond: bool, msg: str): + if not cond: + raise ValueError(msg) + +def _is_seq(x) -> bool: + return isinstance(x, (list, tuple)) + +def _idx_from_name(name: str) -> int: + """提取 'C1' / 'D49' 的编号 -> 1 / 49""" + try: + return int(''.join(c for c in name if c.isdigit())) + except Exception: + raise ValueError(f"无法解析点名编号: {name!r}") + +def _zone_from_name(name: str) -> str: + """返回点位所属分区的字母(A/B/C/D)""" + for c in name: + if c.isalpha(): + return c.upper() + raise ValueError(f"无法解析点名分区: {name!r}") + +def _split_names(names: Union[str, Iterable[str]]) -> List[str]: + if isinstance(names, str): + return [p.strip() for p in names.split(",") if p.strip()] + elif isinstance(names, (list, tuple)): + return [str(p).strip() for p in names] + else: + return [str(names).strip()] + + +def _zone_from_name(name: str) -> str: + """返回点位所属分区的字母(A/B/C/D)""" + for c in name: + if c.isalpha(): + return c.upper() + raise ValueError(f"无法解析点名分区: {name!r}") + +# -------------------- 配置与主类 -------------------- + +@dataclass +class FlowConfig: + port: str = "COM5" + baudrate: int = 115200 + points_file: str = "points_gui.json" + # 运动/沉降参数 + approach_lift: float = 0.0 # 到点位时在其上方 approach_lift(mm) 处驻停 + settle_s: int = 5 # 每次接触或位移后的沉降时间(秒) + delay_after_aspirate: float = 0.35 # 吸液后的等待(秒) + delay_after_dispense: float = 0.35 # 放液后的等待(秒) + # —— 搅拌器(USB 继电器)串口配置 —— + relay_port: str = "COM7" + relay_baudrate: int = 9600 + relay_timeout: float = 1.0 + +class SynthonXFlowV2: + """ + 提供高层流程 API;内部统一用 Station: + - 点到点:station.move_to_work_safe(x, y, z) + - 相对位移/下探/上升:station.move_relative_direct(dx, dy, dz) + - 移液:station.pip_init / pip_asp / pip_dsp / pip_eject + """ + def __init__(self, + port: str = "COM5", + baudrate: int = 115200, + points_file: str = "points_gui.json", + approach_lift: float = 0.0, + settle_s: int = 5, + delay_after_aspirate: float = 0.35, + delay_after_dispense: float = 0.35, + relay_port: str = "COM7", + relay_baudrate: int = 9600, + relay_timeout: float = 1.0): + + # 在内部重新组合成 cfg 对象,供其他方法使用 + self.cfg = FlowConfig( + port=port, + baudrate=baudrate, + points_file=points_file, + approach_lift=approach_lift, + settle_s=settle_s, + delay_after_aspirate=delay_after_aspirate, + delay_after_dispense=delay_after_dispense, + relay_port = relay_port, + relay_baudrate = relay_baudrate, + relay_timeout = relay_timeout + ) + + # 后续代码不变,继续使用 self.cfg + self.station = Station(self.cfg.port, self.cfg.baudrate) + if not self.station.connect(): + raise RuntimeError("串口连接失败,请检查端口/波特率/接线。") + self.points: Dict[str, Dict[str, float]] = self._load_points(self.cfg.points_file) + + # —— 初始化搅拌器控制器(可选)—— + self.reactor: Optional[RelayController] = None + try: + self.reactor = RelayController(port=self.cfg.relay_port, + baudrate=self.cfg.relay_baudrate, + timeout=self.cfg.relay_timeout) + except Exception as e: + print(f"[警告] 创建 RelayController 失败:{e}(继续运行,仅禁用搅拌功能)") + + def stir_connect(self) -> bool: + """连接搅拌器串口。""" + _require(self.reactor is not None, "未创建 Reactor 控制器") + try: + self.reactor.connect() + return True + except Exception as e: + print(f"[搅拌] 连接失败:{e}") + return False + + def stir_on(self, wait_response: bool = True) -> bool: + _require(self.reactor is not None, "未创建 Reactor 控制器") + if not self.reactor.ser or not self.reactor.ser.is_open: + self.stir_connect() + try: + self.reactor.on(wait_response=wait_response) + print("[搅拌] ON") + return True + except Exception as e: + print(f"[搅拌] 开启失败:{e}") + return False + + def stir_off(self, wait_response: bool = True) -> bool: + _require(self.reactor is not None, "未创建 Reactor 控制器") + if not self.reactor.ser or not self.reactor.ser.is_open: + self.stir_connect() + try: + self.reactor.off(wait_response=wait_response) + print("[搅拌] OFF") + return True + except Exception as e: + print(f"[搅拌] 关闭失败:{e}") + return False + + def stir_toggle(self, wait_response: bool = True) -> bool: + _require(self.reactor is not None, "未创建 Reactor 控制器") + if not self.reactor.ser or not self.reactor.ser.is_open: + self.stir_connect() + try: + self.reactor.toggle(wait_response=wait_response) + print("[搅拌] TOGGLE") + return True + except Exception as e: + print(f"[搅拌] 切换失败:{e}") + return False + + def stir_for(self, seconds: float, wait_response: bool = True) -> bool: + """阻塞式搅拌指定秒数,超简单实用。""" + _require(seconds > 0, "seconds 必须>0") + if self.stir_on(wait_response=wait_response): + try: + print(f"持续搅拌 {seconds} 秒...") + time.sleep(float(seconds)) + finally: + self.stir_off(wait_response=wait_response) + print("搅拌结束") + return True + return False + + @contextmanager + def stirring(self, wait_response: bool = True): + """上下文模式:with flows.stirring(): ...""" + self.stir_on(wait_response=wait_response) + try: + yield + finally: + self.stir_off(wait_response=wait_response) + + def stir_safe_shutdown(self): + """脚本结束时可调用,确保关闭继电器并释放串口。""" + if self.reactor is not None: + try: + self.reactor.ensure_off_on_exit() + except Exception: + pass + + def pipette_init(self) -> bool: + return self.station.pip_init() + + # ---------- 点位 ---------- + def _load_points(self, path: str) -> Dict[str, Dict[str, float]]: + abspath = os.path.join(os.path.dirname(__file__), path) + with open(abspath, "r", encoding="utf-8") as f: + data = json.load(f) + # 基础校验 + for k, v in data.items(): + for key in ("x", "y", "z"): + if key not in v: + raise KeyError(f"点位 {k} 缺少字段 {key}") + return data + + def _pt(self, name: str) -> Dict[str, float]: + if name not in self.points: + raise KeyError(f"点位不存在: {name}") + return self.points[name] + + # ---------- 运动辅助(严格按你的规则调用底层 API) ---------- + def _go_to_point_above(self, name: str, lift: Optional[float] = None) -> float: + """使用 move_to_work_safe 到达“点位上方 lift 高度”,返回上方的绝对 z 高度。""" + p = self._pt(name) + lift = self.cfg.approach_lift if lift is None else float(lift) + z_above = float(p["z"]) + float(lift) + self.station.move_to_work_safe(x=p["x"], y=p["y"], z=z_above) + return z_above + + def _down_rel(self, dz: float): + _require(dz >= 0, "下探距离必须>=0") + self.station.move_relative_direct(0.0, 0.0, -float(dz)) + time.sleep(self.cfg.settle_s) + + def _up_rel(self, dz: float): + _require(dz >= 0, "上移距离必须>=0") + self.station.move_relative_direct(0.0, 0.0, +float(dz)) + time.sleep(self.cfg.settle_s) + + def _xy_rel(self, dx: float, dy: float): + self.station.move_relative_direct(float(dx), float(dy), 0.0) + time.sleep(self.cfg.settle_s) + + def _map_waste_slot(self, c_tip_name: str) -> str: + zone = _zone_from_name(c_tip_name) + if zone == "D": + if c_tip_name in self.points: + return c_tip_name + raise KeyError(f"废弃位 {c_tip_name} 不存在于 points") + _require(zone == "C", "枪头原始点名应来自 C 区(或直接给一个 D 区废弃位)") + idx = _idx_from_name(c_tip_name) + target_idx = idx + 48 + d_name = f"D{target_idx}" + if d_name in self.points: + return d_name + c_name2 = f"C{target_idx}" + if c_name2 in self.points: + return c_name2 + raise KeyError(f"未找到废弃位:既没有 {d_name} 也没有 {c_name2}") + + # ======================================================= + # 1) 系统初始化 + # ======================================================= + def system_init(self) -> bool: + print("系统初始化:全轴回零...") + ok = self.station.home_safe() + print("系统已回零") + ok2 = self.station.set_work_origin_here() + print("设置当前位置设为工作原点") + return bool(ok and ok2) + + # ======================================================= + # 2) 取枪头 / 放枪头 + # ======================================================= + def pick_tip(self, tip_point: str, down_mm: float = 120) -> bool: + p = self._pt(tip_point) + self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"]) + time.sleep(self.cfg.settle_s) + self.station.move_relative_direct(0.0, 0.0, float(down_mm)) + time.sleep(self.cfg.settle_s) + self.station.move_relative_direct(0.0, 0.0, -float(down_mm)) + time.sleep(self.cfg.settle_s) + print(f'{tip_point}枪头已经装载') + return True + + def drop_tip(self, tip_point: str, down_mm: float = 60) -> bool: + p = self._pt(tip_point) + self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"]) + time.sleep(self.cfg.settle_s) + self.station.move_relative_direct(0.0, 0.0, float(down_mm)) + time.sleep(self.cfg.settle_s) + self.station.pip_eject() + time.sleep(self.cfg.settle_s) + self.station.move_relative_direct(0.0, 0.0, -float(down_mm)) + time.sleep(self.cfg.settle_s) + print(f'枪头已经弃置在{tip_point}') + return True + + # ======================================================= + # 3) 转移液体(A > B 或 B > D) + # ======================================================= + def _do_transfer(self, + src_name: str, + dst_names: Union[str, Iterable[str]], + tip_c_name: str, + total_ul: float, + down_src_mm: float, + down_dst_mm: float, + split_volumes: Optional[List[float]] = None, + stir_post_s: Optional[float] = None) -> bool: + + # 修改:统一解析目标名 + dst_list = _split_names(dst_names) + _require(len(dst_list) >= 1, "目标点名至少1个") + + # (1) 取枪头 + self.pick_tip(tip_c_name, down_mm=120) + time.sleep(self.cfg.settle_s) + + # (2) 到源位 + src_p = self._pt(src_name) + self.station.move_to_work_safe(x=src_p["x"], y=src_p["y"], z=src_p["z"]) + time.sleep(self.cfg.settle_s) + print('到达源点位') + + # (3) 下探源位 + self.station.move_relative_direct(0.0, 0.0, float(down_src_mm)) + time.sleep(self.cfg.settle_s) + print('下探完成') + + # (4) 吸液 + self.station.pip_asp(float(total_ul)) + time.sleep(self.cfg.settle_s) + print('吸取液体完成') + + # (5) 回升源位 + self.station.move_relative_direct(0.0, 0.0, -float(down_src_mm)) + time.sleep(self.cfg.settle_s) + print('回升完成') + + # === 目标处理 === + if len(dst_list) == 1: + # 单目标 + dst_p = self._pt(dst_list[0]) + self.station.move_to_work_safe(x=dst_p["x"], y=dst_p["y"], z=dst_p["z"]) + time.sleep(self.cfg.settle_s) + print('移动到目标点位') + + self.station.move_relative_direct(0.0, 0.0, float(down_dst_mm)) + time.sleep(self.cfg.settle_s) + print('下探') + + self.station.pip_dsp(float(total_ul)) + time.sleep(self.cfg.settle_s) + time.sleep(self.cfg.delay_after_dispense) + + self.station.move_relative_direct(0.0, 0.0, -float(down_dst_mm)) + time.sleep(self.cfg.settle_s) + + else: + # 多目标 + if split_volumes is not None: + _require(len(split_volumes) == len(dst_list), "split_volumes 长度需与目标数量一致") + vols = [float(v) for v in split_volumes] + else: + each = float(total_ul) / float(len(dst_list)) + vols = [each] * len(dst_list) + + first_name = dst_list[0] + first_p = self._pt(first_name) + self.station.move_to_work_safe(x=first_p["x"], y=first_p["y"], z=first_p["z"]) + time.sleep(self.cfg.settle_s) + + self.station.move_relative_direct(0.0, 0.0, float(down_dst_mm)) + time.sleep(self.cfg.settle_s) + + self.station.pip_dsp(vols[0]) + time.sleep(self.cfg.settle_s) + + base_x, base_y = first_p["x"], first_p["y"] + for nm, v in zip(dst_list[1:], vols[1:]): + self.station.move_relative_direct(0.0, 0.0, -float(down_dst_mm)) + time.sleep(self.cfg.settle_s) + p = self._pt(nm) + dx, dy = p["x"] - base_x, p["y"] - base_y + self.station.move_relative_direct(float(dx), float(dy), 0.0) + time.sleep(self.cfg.settle_s) + self.station.move_relative_direct(0.0, 0.0, float(down_dst_mm)) + time.sleep(self.cfg.settle_s) + self.station.pip_dsp(v) + time.sleep(self.cfg.settle_s) + base_x, base_y = p["x"], p["y"] + + # —— 如果设定了加液后搅拌时间,则触发搅拌 —— + if stir_post_s is not None and float(stir_post_s) > 0: + try: + print(f"[搅拌] 加液完成,搅拌 {float(stir_post_s)} s ...") + self.stir_for(float(stir_post_s)) + except Exception as e: + print(f"[搅拌] 触发失败:{e}(忽略,不影响主流程)") + + # 映射 C 槽到 +48 的弃置位 + def upgrade_c_name(name: str) -> str: + m = re.fullmatch(r"C(\d+)", name.strip().upper()) + if not m: + return name + idx = int(m.group(1)) + return f"C{idx + 48}" + tip_c_name_new = upgrade_c_name(tip_c_name) + + # (10) 放枪头 + self.drop_tip(tip_c_name_new, down_mm=60.0) + return True + + def transfer_A_to_B(self, + a_name: str, + b_names: Union[str, Iterable[str]], + tip_c_name: str, + total_ul: float, + split_volumes: Optional[List[float]] = None, + stir_post_s: Optional[float] = None) -> bool: + _require(_zone_from_name(a_name) == "A", "源位必须在 A 区") + down_a_mm = 121.0 + down_b_mm = 26.0 + if _is_seq(b_names): + for nm in b_names: + _require(_zone_from_name(nm) == "B", "目标必须都在 B 区") + else: + _require(_zone_from_name(b_names) == "B", "目标必须在 B 区") + _require(_zone_from_name(tip_c_name) in ("C"), "枪头点名应在 C 区(或给定 D 区废弃位)") + return self._do_transfer(a_name, b_names, tip_c_name, total_ul, down_a_mm, down_b_mm, + split_volumes, stir_post_s) + + def transfer_B_to_D(self, + b_name: str, + d_names: Union[str, Iterable[str]], + tip_c_name: str, + total_ul: float, + split_volumes: Optional[List[float]] = None, + stir_post_s: Optional[float] = None) -> bool: + _require(_zone_from_name(b_name) == "B", "源位必须在 B 区") + down_b_mm = 46.0 + down_d_mm = 6.0 + if _is_seq(d_names): + for nm in d_names: + _require(_zone_from_name(nm) == "D", "目标必须都在 D 区") + else: + _require(_zone_from_name(d_names) == "D", "目标必须在 D 区") + _require(_zone_from_name(tip_c_name) in ("C"), "枪头点名应在 C 区") + return self._do_transfer(b_name, d_names, tip_c_name, total_ul, down_b_mm, down_d_mm, + split_volumes, stir_post_s) + + # ======================================================= + # 4) 过滤 + # ======================================================= + def filtering(self, + pusher_name: str, # D9-D16 + filter_name: str, # D1-D8 + down_pick_mm: float, + down_filter_mm: float) -> bool: + pusher_p = self._pt(pusher_name) + self.station.move_to_work_safe(x=pusher_p["x"], y=pusher_p["y"], z=pusher_p["z"]) + time.sleep(self.cfg.settle_s) + print("到达推杆点位") + + self.station.move_relative_direct(0.0, 0.0, float(down_pick_mm)) + time.sleep(self.cfg.settle_s) + + self.station.move_relative_direct(0.0, 0.0, -float(down_pick_mm)) + time.sleep(self.cfg.settle_s) + print("取推杆完成") + + filter_p = self._pt(filter_name) + self.station.move_to_work_safe(x=filter_p["x"], y=filter_p["y"], z=filter_p["z"]) + time.sleep(self.cfg.settle_s) + print("到达过滤点位") + + self.station.move_relative_direct(0.0, 0.0, float(down_filter_mm)) + time.sleep(self.cfg.settle_s) + print("下压过滤完成") + + self.station.move_relative_direct(0.0, 0.0, -20.0) + time.sleep(self.cfg.settle_s) + print("吸取空气完成") + + self.station.move_relative_direct(0.0, 0.0, 20.0) + time.sleep(self.cfg.settle_s) + print("排除剩余液体完成") + + self.station.move_relative_direct(0.0, 0.0, -10.0) + time.sleep(self.cfg.settle_s) + + self.station.pip_eject() + time.sleep(self.cfg.settle_s) + print("推杆已弹出") + + self.station.move_relative_direct(0.0, 0.0, -50.0) + time.sleep(self.cfg.settle_s) + + print("过滤流程完成") + return True + + # ======================================================= + # 5) 推动(D25) + # ======================================================= + def pushing(self, + tip_c_name: str) -> bool: + z_down_mm = 136.0 + y_forward_mm = 60.0 + _require(z_down_mm > 0, "z_down_mm 必须>0") + _require(y_forward_mm > 0, "y_forward_mm 必须>0") + _require(_zone_from_name(tip_c_name) in ("C"), "枪头点名应在C区") + + self.pick_tip(tip_c_name) + + d25_p = self._pt("D25") + self.station.move_to_work_safe(x=d25_p["x"], y=d25_p["y"], z=d25_p["z"]) + time.sleep(self.cfg.settle_s) + print("到达 D25 点位") + + self.station.move_relative_direct(0.0, 0.0, float(z_down_mm)) + time.sleep(self.cfg.settle_s) + print("下压完成") + + self.station.move_relative_direct(0.0, float(y_forward_mm), 0.0) + time.sleep(self.cfg.settle_s) + print("推动完成") + + self.station.move_relative_direct(0.0, 0.0, -float(z_down_mm)) + time.sleep(self.cfg.settle_s) + print("抬升完成") + + def upgrade_c_name(name: str) -> str: + m = re.fullmatch(r"C(\d+)", name.strip().upper()) + if not m: + return name + idx = int(m.group(1)) + return f"C{idx + 48}" + tip_c_name_new = upgrade_c_name(tip_c_name) + self.drop_tip(tip_c_name_new) + print("推动流程完成") + + return True + + # ======================================================= + # 6) 装核磁(单独函数) + # ======================================================= + def load_for_nmr(self, + src_d_name: str, # 源:D区 (如 D1-D8) + dst_d_name: str, # 目标:D区 (如 D17-D24) + tip_c_name: str, # 枪头:C区 + total_ul: float, + stir_post_s: Optional[float] = None) -> bool: + down_src_mm = 138 + down_dst_mm = 9 + _require(_zone_from_name(src_d_name) == "D", "源位必须在 D 区") + _require(_zone_from_name(dst_d_name) == "D", "目标位必须在 D 区") + _require(src_d_name != dst_d_name, "源与目标不能相同") + _require(_zone_from_name(tip_c_name) == "C", "枪头点名必须在 C 区") + + return self._do_transfer( + src_name=src_d_name, + dst_names=dst_d_name, + tip_c_name=tip_c_name, + total_ul=float(total_ul), + down_src_mm=float(down_src_mm), + down_dst_mm=float(down_dst_mm), + split_volumes=None, + stir_post_s=stir_post_s + ) + + +if __name__ == "__main__": + flows = SynthonXFlowV2(FlowConfig( + port="COM5", + baudrate=115200, + points_file="points_gui.json", + approach_lift=6.0, + settle_s=6, + relay_port="COM7", + relay_baudrate=9600, + relay_timeout=1.0 + )) + flows.stir_for(30) +''' + # 1) 初始化坐标系统 + flows.system_init() + # 2) 初始化移液枪 + flows.pipette_init() + # # flows.pipette_aspirate(300) + # # flows.pipette_dispense(300) + + # # 2) 取/放枪头(示例) + # flows.pick_tip("C1", down_mm=120) + # flows.drop_tip("C96", down_mm=60) + + # # # 3) A > B(单目标或多目标;多目标示例按均分) + # flows.transfer_A_to_B("A1", ["B1"], "C1", total_ul=300.0) + + # # 4) B > D + # flows.transfer_B_to_D("B1", ["D1"], "C2", total_ul=300.0) + + # 5) 过滤(D9-16 → D1-8) + # flows.filtering("D9", "D1", down_pick_mm=117, down_filter_mm=73) + + # # 6) 推动(D25) + # flows.pushing("C3", z_down_mm=135, y_forward_mm=60) + + # 7) 装核磁 + flows.load_for_nmr("D1", "D17", "C4", total_ul=300) +''' + \ No newline at end of file diff --git a/unilabos/devices/SynthonX/SynthonX_gui.py b/unilabos/devices/SynthonX/SynthonX_gui.py new file mode 100644 index 00000000..b6c78cb0 --- /dev/null +++ b/unilabos/devices/SynthonX/SynthonX_gui.py @@ -0,0 +1,815 @@ +""" +SynthonX GUI (External Front-End) +--------------------------------- +为 `SynthonX.py` 提供一个外接可视化界面,不修改后端脚本。 +""" +from __future__ import annotations +import os, sys, time, json, math, threading +from dataclasses import dataclass +from typing import Optional, Dict + +try: + from .SynthonX import ( + SharedRS485Bus, + SharedXYZController, + SOPAPipetteYYQ, + MachineConfig, + MotorAxis, + ) +except Exception as e: + raise RuntimeError("未找到 SynthonX.py,请将本GUI与 SynthonX.py 放在同一目录") from e + +# ============================= +# 后端封装(薄层):Station +# ============================= +class Station: + def __init__(self, port: str = "COM3", baudrate: int = 115200): + self.port = port + self.baudrate = baudrate + self.bus: Optional[SharedRS485Bus] = None + self.xyz: Optional[SharedXYZController] = None + self.pip: Optional[SOPAPipetteYYQ] = None + self.cfg = MachineConfig() + self.connected = False + + # ---- 连接/断开 ---- + def connect(self) -> bool: + try: + self.bus = SharedRS485Bus(self.port, self.baudrate) + self.bus.open() + self.xyz = SharedXYZController(self.bus, self.cfg) + self.pip = SOPAPipetteYYQ(self.bus) + self.connected = True + return True + except Exception: + self.connected = False + return False + + def disconnect(self) -> None: + if self.bus: + try: + self.bus.close() + except Exception: + pass + self.connected = False + + # ---- XYZ 基础 ---- + def set_work_origin_here(self) -> bool: + assert self.xyz is not None + return self.xyz.set_work_origin_here() + + def home_safe(self) -> bool: + """全轴回零(Z→X→Y)。""" + assert self.xyz is not None + try: + # 优先使用后端的 home_all(若支持传入顺序则按 Z→X→Y) + if hasattr(self.xyz, 'home_all'): + try: + return bool(self.xyz.home_all((MotorAxis.Z, MotorAxis.X, MotorAxis.Y))) + except TypeError: + return bool(self.xyz.home_all()) + # 兼容后端不提供 home_all 的情况:逐轴回零(Z→X→Y) + ok = True + for ax in (MotorAxis.Z, MotorAxis.X, MotorAxis.Y): + try: + self.xyz.home_axis(ax, -1) + self.xyz.wait_for_completion(ax, 60.0) + except Exception: + ok = False + return ok + except Exception: + return False + + def emergency_stop(self) -> bool: + assert self.xyz is not None + ok = True + for ax in (MotorAxis.X, MotorAxis.Y, MotorAxis.Z): + try: + self.xyz.emergency_stop(ax) + except Exception: + ok = False + return ok + + # ---- 读取“工作坐标系”下的当前位置(mm) ---- + def get_status_mm(self) -> Dict[str, float]: + """返回【工作坐标系】下的当前位置 (mm)。""" + assert self.xyz is not None + # 1) 优先:后端直接提供工作坐标 + if hasattr(self.xyz, "get_work_position_mm"): + try: + pos = self.xyz.get_work_position_mm() + return { + "x": float(pos.get("x", 0.0)), + "y": float(pos.get("y", 0.0)), + "z": float(pos.get("z", 0.0)), + } + except Exception: + pass + # 2) 其次:基于步数做“机->工”换算 + sx = self.xyz.get_motor_status(MotorAxis.X).steps + sy = self.xyz.get_motor_status(MotorAxis.Y).steps + sz = self.xyz.get_motor_status(MotorAxis.Z).steps + # 2.1 若提供 machine_steps_to_work_mm + if hasattr(self.xyz, "machine_steps_to_work_mm"): + try: + w = self.xyz.machine_steps_to_work_mm(x=sx, y=sy, z=sz) + return {"x": float(w["x"]), "y": float(w["y"]), "z": float(w["z"])} + except Exception: + pass + # 2.2 若提供 machine_to_work_mm(先转 mm 再“机->工”) + if hasattr(self.xyz, "machine_to_work_mm"): + try: + mx = self.xyz.steps_to_mm(MotorAxis.X, sx) + my = self.xyz.steps_to_mm(MotorAxis.Y, sy) + mz = self.xyz.steps_to_mm(MotorAxis.Z, sz) + w = self.xyz.machine_to_work_mm(x=mx, y=my, z=mz) + return {"x": float(w["x"]), "y": float(w["y"]), "z": float(w["z"])} + except Exception: + pass + # 3) 兜底:假定 steps_to_mm 已包含零点偏置(部分固件/后端会这么做) + return { + "x": self.xyz.steps_to_mm(MotorAxis.X, sx), + "y": self.xyz.steps_to_mm(MotorAxis.Y, sy), + "z": self.xyz.steps_to_mm(MotorAxis.Z, sz), + } + + # ---- 绝对安全移动(调用后端已有策略:抬Z→XY→落Z)---- + def move_to_work_safe(self, x=None, y=None, z=None, speed: Optional[int]=None, acc: Optional[int]=None) -> bool: + assert self.xyz is not None + return self.xyz.move_to_work_safe(x, y, z, speed, acc) + + def move_to_work_direct(self, x=None, y=None, z=None, + speed: Optional[int] = None, + acc: Optional[int] = None, + z_order: str = "auto") -> bool: + """ + 绝对直达:不抬Z。 + 优先调用后端 SynthonX.SharedXYZController.move_to_work_direct(..., accel=..., z_order=...) + 若后端没有该API,则回退到现有的相对直达策略。 + """ + assert self.xyz is not None + speed = speed or self.cfg.default_speed + acc = acc or self.cfg.default_acceleration + + # 优先走后端的原生实现(支持 z_order) + try: + if hasattr(self.xyz, "move_to_work_direct"): + # 注意后端参数名是 accel,这里把 acc 传给 accel + return bool(self.xyz.move_to_work_direct( + x=x, y=y, z=z, speed=speed, accel=acc, z_order=z_order + )) + except Exception: + # 不中断,下面走回退路径 + pass + + # ---- 回退实现:用当前位置算 Δ,再走相对直达(不抬Z)---- + try: + cur = self.get_status_mm() + dx = (x - cur['x']) if x is not None else 0.0 + dy = (y - cur['y']) if y is not None else 0.0 + dz = (z - cur['z']) if z is not None else 0.0 + return self.move_relative_direct(dx, dy, dz, speed=speed, acc=acc) + except Exception: + return False + + # ---- 相对直接移动(快速移动:不抬Z)---- + def move_relative_direct(self, dx: float, dy: float, dz: float, speed: Optional[int]=None, acc: Optional[int]=None) -> bool: + """基于当前位置直接到新目标(工作坐标 Δmm),不抬Z; + 策略:若目标Z>当前Z,先XY后Z;若目标Z<=当前Z,先Z后XY。 + """ + assert self.xyz is not None + speed = speed or self.cfg.default_speed + acc = acc or self.cfg.default_acceleration + # 当前绝对步(机坐标步数) + sx = self.xyz.get_motor_status(MotorAxis.X).steps + sy = self.xyz.get_motor_status(MotorAxis.Y).steps + sz = self.xyz.get_motor_status(MotorAxis.Z).steps + # Δmm→Δsteps(Δ与零点无关,可直接换算) + tx = sx + self.xyz.mm_to_steps(MotorAxis.X, dx) + ty = sy + self.xyz.mm_to_steps(MotorAxis.Y, dy) + tz = sz + self.xyz.mm_to_steps(MotorAxis.Z, dz) + # 顺序:仅按相对大小决定 + order = ("xy","z") if tz > sz else ("z","xy") + ok = True + try: + if "z" in order[0]: + ok &= self.xyz.move_to_steps(MotorAxis.Z, tz, speed, acc) + ok &= self.xyz.wait_for_completion(MotorAxis.Z, 20.0) + ok &= self.xyz.move_to_steps(MotorAxis.X, tx, speed, acc) + ok &= self.xyz.move_to_steps(MotorAxis.Y, ty, speed, acc) + ok &= self.xyz.wait_for_completion(MotorAxis.X, 20.0) + ok &= self.xyz.wait_for_completion(MotorAxis.Y, 20.0) + else: + ok &= self.xyz.move_to_steps(MotorAxis.X, tx, speed, acc) + ok &= self.xyz.move_to_steps(MotorAxis.Y, ty, speed, acc) + ok &= self.xyz.wait_for_completion(MotorAxis.X, 20.0) + ok &= self.xyz.wait_for_completion(MotorAxis.Y, 20.0) + ok &= self.xyz.move_to_steps(MotorAxis.Z, tz, speed, acc) + ok &= self.xyz.wait_for_completion(MotorAxis.Z, 20.0) + except Exception: + ok = False + return bool(ok) + + # ---- 点位存取 ---- + def load_points(self, path: str) -> Dict[str, Dict[str, float]]: + try: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + except Exception: + return {} + + def save_points(self, path: str, data: Dict[str, Dict[str, float]]) -> None: + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + # ---- Pipette ---- + def pip_init(self) -> bool: + assert self.pip is not None + return self.pip.initialize() + + def pip_eject(self) -> bool: + assert self.pip is not None + self.pip.eject_tip() + return True + + def pip_asp(self, ul: float) -> bool: + assert self.pip is not None + return self.pip.aspirate(ul) + + def pip_dsp(self, ul: float) -> bool: + assert self.pip is not None + return self.pip.dispense(ul) + + +# ============================= +# GUI +# ============================= +import tkinter as tk +from tkinter import ttk, messagebox + +class LogConsole(ttk.Frame): + def __init__(self, master): + super().__init__(master) + self.text = tk.Text(self, height=10) + self.text.pack(fill=tk.BOTH, expand=True) + def log(self, s: str): + ts = time.strftime('%H:%M:%S') + self.text.insert(tk.END, f"[{ts}] {s}\n") + self.text.see(tk.END) + +class App(tk.Tk): + def __init__(self): + super().__init__() + self.title("SynthonX GUI (External) — 工作坐标") + self.geometry("980x780") + self.resizable(True, True) + + # 顶部:连接区 + top = ttk.Frame(self) + top.pack(fill=tk.X, padx=8, pady=6) + ttk.Label(top, text="串口:").pack(side=tk.LEFT) + self.port_var = tk.StringVar(value=("COM3" if sys.platform == "win32" else "/dev/ttyUSB0")) + ttk.Entry(top, textvariable=self.port_var, width=14).pack(side=tk.LEFT, padx=6) + ttk.Label(top, text="波特率:").pack(side=tk.LEFT) + self.baud_var = tk.IntVar(value=115200) + ttk.Entry(top, textvariable=self.baud_var, width=8).pack(side=tk.LEFT, padx=6) + self.btn_conn = ttk.Button(top, text="连接", command=self.on_connect) + self.btn_conn.pack(side=tk.LEFT, padx=6) + ttk.Button(top, text="断开", command=self.on_disconnect).pack(side=tk.LEFT, padx=6) + self.lbl_conn = ttk.Label(top, text="未连接", foreground="#B00") + self.lbl_conn.pack(side=tk.LEFT, padx=10) + + # Notebook + self.nb = ttk.Notebook(self) + self.nb.pack(fill=tk.BOTH, expand=True) + self.tab_xyz = ttk.Frame(self.nb) + self.tab_pip = ttk.Frame(self.nb) + self.tab_flow = ttk.Frame(self.nb) + self.tab_map = ttk.Frame(self.nb) + self.nb.add(self.tab_xyz, text="XYZ 控制(工作坐标)") + self.nb.add(self.tab_pip, text="移液枪") + self.nb.add(self.tab_flow, text="操作向导") + self.nb.add(self.tab_map, text="示意图(工作坐标)") + + # 日志 + self.console = LogConsole(self) + self.console.pack(fill=tk.BOTH, expand=False, padx=8, pady=6) + + # Station & 点位 + self.station: Optional[Station] = None + self.points_path = os.path.join(os.path.dirname(__file__), "points_gui.json") + self.points: Dict[str, Dict[str, float]] = {} + self._load_points() + + # 仅在点击后显示的“所选点名” + self._selected_point_name: str = "" # <<< 新增:记录被选中的点名 + + # 构建各页 + self._build_xyz_tab() + self._build_pip_tab() + self._build_flow_tab() + self._build_map_tab() + + # ---------- 连接 ---------- + def on_connect(self): + try: + self.station = Station(self.port_var.get(), int(self.baud_var.get())) + if self.station.connect(): + self.lbl_conn.config(text="已连接", foreground="#0A0") + self.console.log("连接成功(真实串口)") + else: + self.lbl_conn.config(text="连接失败", foreground="#B00") + self.console.log("连接失败:请检查端口/波特率/接线") + except Exception as e: + self.lbl_conn.config(text="异常", foreground="#B00") + self.console.log(f"连接异常:{e}") + + def on_disconnect(self): + if self.station: + self.station.disconnect() + self.lbl_conn.config(text="未连接", foreground="#B00") + self.console.log("已断开") + + # ---------- XYZ Tab ---------- + def _build_xyz_tab(self): + f = self.tab_xyz + # 基本 + base = ttk.LabelFrame(f, text="基本") + base.pack(fill=tk.X, padx=8, pady=8) + ttk.Button(base, text="设置当前位置为工作原点", command=self.xyz_set_origin).pack(side=tk.LEFT, padx=6, pady=6) + ttk.Button(base, text="全轴回零 (Z→X→Y)", command=self.xyz_home).pack(side=tk.LEFT, padx=6, pady=6) + ttk.Button(base, text="紧急停止", command=self.xyz_emg).pack(side=tk.LEFT, padx=6, pady=6) + + # 速度/加速度 + sp = ttk.LabelFrame(f, text="速度/加速度 (rpm / 无量纲)") + sp.pack(fill=tk.X, padx=8, pady=8) + self.speed_var = tk.IntVar(value=100) + self.acc_var = tk.IntVar(value=1000) + row = ttk.Frame(sp); row.pack(fill=tk.X, padx=4, pady=4) + ttk.Label(row, text="速度:").pack(side=tk.LEFT) + ttk.Entry(row, textvariable=self.speed_var, width=8).pack(side=tk.LEFT, padx=6) + ttk.Label(row, text="加速度:").pack(side=tk.LEFT) + ttk.Entry(row, textvariable=self.acc_var, width=8).pack(side=tk.LEFT, padx=6) + # 运动区:左=绝对移动,右=相对位移 + mv = ttk.LabelFrame(f, text="移动(工作坐标,mm):左=绝对 | 右=相对(Δ)") + mv.pack(fill=tk.X, padx=8, pady=8) + + mv_left = ttk.Frame(mv); mv_left.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(6, 3), pady=4) + mv_right = ttk.Frame(mv); mv_right.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(3, 6), pady=4) + + # --- 左侧:绝对移动 --- + ttk.Label(mv_left, text="绝对目标 (X/Y/Z,mm,工作坐标)").pack(anchor="w") + self.x_var = tk.DoubleVar(value=0.0) + self.y_var = tk.DoubleVar(value=0.0) + self.z_var = tk.DoubleVar(value=0.0) + for label, var in (("X", self.x_var),("Y", self.y_var),("Z", self.z_var)): + row = ttk.Frame(mv_left); row.pack(fill=tk.X, padx=0, pady=3) + ttk.Label(row, text=f"{label}=").pack(side=tk.LEFT) + ttk.Entry(row, textvariable=var, width=10).pack(side=tk.LEFT) + + # 勾选后不抬Z:绝对“直达” + self.direct_abs_var = tk.BooleanVar(value=False) + ttk.Checkbutton( + mv_left, + text="不抬Z轴(绝对直达,按Z高低决定先后)", + variable=self.direct_abs_var + ).pack(anchor="w", pady=(4,2)) + + ttk.Button(mv_left, text="执行绝对移动", command=self.xyz_move_absolute).pack(anchor="w", pady=(2,0)) + + # --- 右侧:相对位移 --- + ttk.Label(mv_right, text="相对位移 Δ (mm,工作坐标)").pack(anchor="w") + self.rx_var = tk.DoubleVar(value=0.0) + self.ry_var = tk.DoubleVar(value=0.0) + self.rz_var = tk.DoubleVar(value=0.0) + for label, var in (("ΔX", self.rx_var),("ΔY", self.ry_var),("ΔZ", self.rz_var)): + row = ttk.Frame(mv_right); row.pack(fill=tk.X, padx=0, pady=3) + ttk.Label(row, text=f"{label}=").pack(side=tk.LEFT) + ttk.Entry(row, textvariable=var, width=10).pack(side=tk.LEFT) + + ttk.Button(mv_right, text="执行相对位移", command=self.xyz_move_relative_inputs).pack(anchor="w", pady=(2,0)) + + + # 点位管理 + pm = ttk.LabelFrame(f, text="位置点管理(JSON,工作坐标)") + pm.pack(fill=tk.X, padx=8, pady=8) + self.point_name_var = tk.StringVar(value="") + row1 = ttk.Frame(pm); row1.pack(fill=tk.X, padx=4, pady=3) + ttk.Label(row1, text="点名:").pack(side=tk.LEFT) + ttk.Entry(row1, textvariable=self.point_name_var, width=18).pack(side=tk.LEFT, padx=6) + ttk.Button(row1, text="保存当前 X/Y/Z 为该点", command=self.save_point).pack(side=tk.LEFT, padx=6) + row2 = ttk.Frame(pm); row2.pack(fill=tk.X, padx=4, pady=3) + ttk.Label(row2, text="点列表:").pack(side=tk.LEFT) + self.point_combo = ttk.Combobox(row2, values=sorted(self.points.keys()), width=20) + self.point_combo.pack(side=tk.LEFT, padx=6) + ttk.Button(row2, text="移动到点(安全)", command=self.move_to_point).pack(side=tk.LEFT, padx=6) + ttk.Button(row2, text="删除点", command=self.delete_point).pack(side=tk.LEFT, padx=6) + ttk.Button(row2, text="刷新列表", command=self.refresh_points_combo).pack(side=tk.LEFT, padx=6) + + # 状态读取 + st = ttk.LabelFrame(f, text="当前位置(工作坐标 mm / 步)") + st.pack(fill=tk.X, padx=8, pady=8) + self.lbl_pos = ttk.Label(st, text="X: -, Y: -, Z: -") + self.lbl_pos.pack(side=tk.LEFT, padx=6) + ttk.Button(st, text="刷新", command=self.xyz_refresh).pack(side=tk.LEFT, padx=6) + + # XYZ 事件 + def xyz_move_absolute(self): + s = self._need_station(); + if not s: return + try: + sp = int(self.speed_var.get()) + ac = int(self.acc_var.get()) + x, y, z = self.x_var.get(), self.y_var.get(), self.z_var.get() + if getattr(self, "direct_abs_var", None) and self.direct_abs_var.get(): + ok = s.move_to_work_direct(x, y, z, speed=sp, acc=ac) + self.console.log(f"绝对直达(工作坐标,不抬Z):{'OK' if ok else 'Fail'} (x={x}, y={y}, z={z}, speed={sp}, acc={ac})") + else: + ok = s.move_to_work_safe(x, y, z, speed=sp, acc=ac) + self.console.log(f"安全移动(工作坐标,抬Z):{'OK' if ok else 'Fail'} (x={x}, y={y}, z={z}, speed={sp}, acc={ac})") + except Exception as e: + messagebox.showerror("移动失败", str(e)) + self.console.log(f"移动失败:{e}") + + def xyz_move_relative_inputs(self): + s = self._need_station(); + if not s: return + try: + sp = int(self.speed_var.get()) + ac = int(self.acc_var.get()) + dx, dy, dz = self.rx_var.get(), self.ry_var.get(), self.rz_var.get() + ok = s.move_relative_direct(dx, dy, dz, speed=sp, acc=ac) + if ok: + # 获取并输出当前绝对坐标(工作坐标) + pos = s.get_status_mm() + self.console.log( + f"相对位移(工作坐标):OK (Δx={dx}, Δy={dy}, Δz={dz}, speed={sp}, acc={ac}) → 绝对(x={pos['x']:.3f}, y={pos['y']:.3f}, z={pos['z']:.3f})" + ) + # 刷新状态标签 + try: self.xyz_refresh() + except Exception: pass + else: + self.console.log(f"相对位移(工作坐标):Fail (Δx={dx}, Δy={dy}, Δz={dz}, speed={sp}, acc={ac})") + except Exception as e: + messagebox.showerror("相对位移失败", str(e)) + self.console.log(f"相对位移失败:{e}") + + def xyz_move_dispatch(self): + if self.relative_var.get(): + self.xyz_move_relative() + else: + self.xyz_move_safe() + + def xyz_set_origin(self): + s = self._need_station(); + if not s: return + s.set_work_origin_here() + self.console.log("工作原点已更新为当前位置") + + def xyz_home(self): + s = self._need_station(); + if not s: return + s.home_safe() + self.console.log("全轴回零 (Z→X→Y) 完成") + + def xyz_emg(self): + s = self._need_station(); + if not s: return + ok = s.emergency_stop() + self.console.log(f"紧急停止:{'OK' if ok else 'Fail'}") + + def xyz_move_safe(self): + s = self._need_station(); + if not s: return + try: + sp = int(self.speed_var.get()) + ac = int(self.acc_var.get()) + ok = s.move_to_work_safe(self.x_var.get(), self.y_var.get(), self.z_var.get(), speed=sp, acc=ac) + self.console.log(f"安全移动(工作坐标):{'OK' if ok else 'Fail'} (speed={sp}, acc={ac})") + except Exception as e: + messagebox.showerror("移动失败", str(e)) + self.console.log(f"移动失败:{e}") + + def xyz_move_relative(self): + s = self._need_station(); + if not s: return + try: + sp = int(self.speed_var.get()) + ac = int(self.acc_var.get()) + dx, dy, dz = self.x_var.get(), self.y_var.get(), self.z_var.get() + ok = s.move_relative_direct(dx, dy, dz, speed=sp, acc=ac) + if ok: + pos = s.get_status_mm() + self.console.log( + f"相对直接移动(工作坐标):OK (Δx={dx}, Δy={dy}, Δz={dz}, speed={sp}, acc={ac}) → 绝对(x={pos['x']:.3f}, y={pos['y']:.3f}, z={pos['z']:.3f})" + ) + try: self.xyz_refresh() + except Exception: pass + else: + self.console.log(f"相对直接移动(工作坐标):Fail (Δx={dx}, Δy={dy}, Δz={dz}, speed={sp}, acc={ac})") + except Exception as e: + messagebox.showerror("相对移动失败", str(e)) + self.console.log(f"相对移动失败:{e}") + + def xyz_refresh(self): + s = self._need_station(); + if not s: return + try: + pos = s.get_status_mm() # 工作坐标 + # 同时显示步数(机坐标步)供调试 + px = s.xyz.get_motor_status(MotorAxis.X).steps + py = s.xyz.get_motor_status(MotorAxis.Y).steps + pz = s.xyz.get_motor_status(MotorAxis.Z).steps + self.lbl_pos.config(text=( + f"工作坐标 X:{pos['x']:.2f} mm ({px}步) " + f"Y:{pos['y']:.2f} mm ({py}步) " + f"Z:{pos['z']:.2f} mm ({pz}步)" + )) + except Exception as e: + messagebox.showerror("读取失败", str(e)) + + # ---------- Pipette Tab ---------- + def _build_pip_tab(self): + f = self.tab_pip + base = ttk.LabelFrame(f, text="基础") + base.pack(fill=tk.X, padx=8, pady=8) + ttk.Button(base, text="初始化", command=self.pip_init).pack(side=tk.LEFT, padx=6, pady=6) + ttk.Button(base, text="弹出枪头", command=self.pip_eject).pack(side=tk.LEFT, padx=6, pady=6) + ops = ttk.LabelFrame(f, text="吸/排液 (uL)") + ops.pack(fill=tk.X, padx=8, pady=8) + self.asp_var = tk.DoubleVar(value=100) + self.dsp_var = tk.DoubleVar(value=100) + ttk.Label(ops, text="吸液:").pack(side=tk.LEFT) + ttk.Entry(ops, textvariable=self.asp_var, width=8).pack(side=tk.LEFT) + ttk.Button(ops, text="执行吸液", command=self.pip_asp).pack(side=tk.LEFT, padx=6) + ttk.Label(ops, text="排液:").pack(side=tk.LEFT) + ttk.Entry(ops, textvariable=self.dsp_var, width=8).pack(side=tk.LEFT) + ttk.Button(ops, text="执行排液", command=self.pip_dsp).pack(side=tk.LEFT, padx=6) + st = ttk.LabelFrame(f, text="状态") + st.pack(fill=tk.X, padx=8, pady=8) + self.lbl_pip = ttk.Label(st, text="-") + self.lbl_pip.pack(side=tk.LEFT, padx=6, pady=6) + + def pip_init(self): + s = self._need_station(); + if not s: return + if s.pip_init(): + self.console.log("移液枪初始化完成") + self.lbl_pip.config(text="已初始化") + + def pip_eject(self): + s = self._need_station(); + if not s: return + s.pip_eject(); self.console.log("枪头已弹出") + + def pip_asp(self): + s = self._need_station(); + if not s: return + v = self.asp_var.get() + if s.pip_asp(v): + self.console.log(f"吸液 {v} uL 完成") + + def pip_dsp(self): + s = self._need_station(); + if not s: return + v = self.dsp_var.get() + if s.pip_dsp(v): + self.console.log(f"排液 {v} uL 完成") + + # ---------- Flow Tab ---------- + def _build_flow_tab(self): + f = self.tab_flow + box = ttk.LabelFrame(f, text="Transfer Demo") + box.pack(fill=tk.X, padx=8, pady=8) + ttk.Label(box, text="演示:吸 100 → 排 100(需已装枪头)").pack(side=tk.LEFT, padx=6) + ttk.Button(box, text="Run", command=self.flow_demo).pack(side=tk.LEFT, padx=6) + + def flow_demo(self): + s = self._need_station(); + if not s: return + s.pip_asp(100) + s.pip_dsp(100) + self.console.log("Transfer demo 完成") + + # ---------- Map Tab ---------- + def _build_map_tab(self): + f = self.tab_map + container = ttk.Frame(f); container.pack(fill=tk.BOTH, expand=True, padx=8, pady=8) + left = ttk.Frame(container); right = ttk.Frame(container) + left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + right.pack(side=tk.LEFT, fill=tk.Y, padx=10) + self.map_canvas = tk.Canvas(left, width=640, height=480, background="white", highlightthickness=1, highlightbackground="#999") + self.map_canvas.pack(fill=tk.BOTH, expand=True) + self.map_canvas.bind("", self.map_on_click) + self.map_canvas.bind("", lambda e: self.draw_map()) + ttk.Label(right, text="所选点:").pack(anchor="w", pady=(4,0)) + self.map_selected = tk.StringVar(value="") + ttk.Label(right, textvariable=self.map_selected).pack(anchor="w") + ttk.Button(right, text="刷新示意图", command=self.draw_map).pack(fill=tk.X, pady=6) + ttk.Separator(right, orient="horizontal").pack(fill=tk.X, pady=8) + ttk.Button(right, text="安全运动到所选点(绝对)", command=self.map_move_safe).pack(fill=tk.X, pady=6) + ttk.Button(right, text="相对运动到所选点(Δ=目标-当前)", command=self.map_move_relative).pack(fill=tk.X, pady=6) + tip = "示意图仅展示 XY 平面;点击点后会在画布上显示该点名称。\n安全移动=抬Z-XY-落Z;相对移动=直接Δmm。\n所有坐标均为【工作坐标】。" + ttk.Label(right, text=tip, wraplength=240, foreground="#555").pack(fill=tk.X, pady=8) + self.draw_map() + + def _workspace_xy(self): + try: + if self.station and self.station.xyz: + mc = self.station.cfg + else: + mc = MachineConfig() + return float(mc.max_travel_x), float(mc.max_travel_y) + except Exception: + return 340.0, 250.0 + + def _xy_to_canvas(self, x_mm, y_mm, cw, ch, scale, margin): + cx = margin + x_mm * scale + cy = ch - margin - y_mm * scale # 画布 y 向下 + return cx, cy + + def draw_map(self): + cnv = getattr(self, 'map_canvas', None) + if not cnv: return + cnv.delete("all") + cw = cnv.winfo_width() or 640 + ch = cnv.winfo_height() or 480 + margin = 40 + max_x, max_y = self._workspace_xy() + scale = min((cw-2*margin)/max_x, (ch-2*margin)/max_y) + # 边框 + cnv.create_rectangle(margin, margin, cw-margin, ch-margin, outline="#333", width=2) + # 外侧标注为“工作坐标系” + cnv.create_text(cw - margin, ch - 6, anchor="se", text="工作坐标系 (mm)", fill="#333") + # 点(默认只画点,不显示文字;仅被选中的点显示名称) + self._map_item_to_name = {} + r = 5 + for name, p in sorted(self.points.items()): + try: + x, y = float(p.get("x", 0.0)), float(p.get("y", 0.0)) + except Exception: + continue + cx, cy = self._xy_to_canvas(x, y, cw, ch, scale, margin) + # 选中点:红色加粗并显示名称;未选中:蓝色 + sel = (name == self._selected_point_name) + color = "#ff375f" if sel else "#0a84ff" + width = 3 if sel else 2 + item = cnv.create_oval(cx-r, cy-r, cx+r, cy+r, outline=color, width=width, tags=("point",)) + self._map_item_to_name[item] = name + if sel: + cnv.create_text(cx+10, cy-12, anchor="w", text=name, fill="#333") # 仅选中时显示名称 + + # 当前坐标十字(工作坐标) + try: + if self.station and self.station.connected: + cur = self.station.get_status_mm() # 工作坐标 + cx, cy = self._xy_to_canvas(cur['x'], cur['y'], cw, ch, scale, margin) + cnv.create_line(cx-8, cy, cx+8, cy, width=2) + cnv.create_line(cx, cy-8, cx, cy+8, width=2) + cnv.create_text(cx+12, cy-10, anchor="w", text=f"当前({cur['x']:.1f},{cur['y']:.1f})") + except Exception: + pass + + def map_on_click(self, event): + cnv = getattr(self, 'map_canvas', None) + if not cnv: return + item = cnv.find_closest(event.x, event.y) + if not item: return + iid = item[0] + if "point" not in cnv.gettags(iid): + return + # 通过映射拿到点名 + name = self._map_item_to_name.get(iid, "") + # 记录并刷新(仅选中时显示名称) + self._selected_point_name = name + self.map_selected.set(name) + self.draw_map() + + def map_move_safe(self): + s = self._need_station(); + if not s: return + name = self.map_selected.get().strip() + if name not in self.points: + messagebox.showwarning("未选择点", "请先在示意图上点击一个点"); return + p = self.points[name] + sp, ac = int(self.speed_var.get()), int(self.acc_var.get()) + try: + ok = s.move_to_work_safe(p['x'], p['y'], p['z'], speed=sp, acc=ac) + self.console.log(f"[示意图] 安全移动到 '{name}'(工作坐标): {'OK' if ok else 'Fail'} → (x={p['x']}, y={p['y']}, z={p['z']})") + except Exception as e: + messagebox.showerror("移动失败", str(e)) + self.console.log(f"[示意图] 移动失败:{e}") + + def map_move_relative(self): + s = self._need_station(); + if not s: return + name = self.map_selected.get().strip() + if name not in self.points: + messagebox.showwarning("未选择点", "请先在示意图上点击一个点"); return + p = self.points[name] + try: + cur = s.get_status_mm() # 工作坐标 + dx, dy, dz = p['x']-cur['x'], p['y']-cur['y'], p['z']-cur['z'] + except Exception: + dx, dy, dz = p['x'], p['y'], p['z'] + sp, ac = int(self.speed_var.get()), int(self.acc_var.get()) + try: + ok = s.move_relative_direct(dx, dy, dz, speed=sp, acc=ac) + if ok: + pos = s.get_status_mm() + self.console.log( + f"[示意图] 相对移动至 '{name}'(工作坐标): OK (Δx={dx:.3f}, Δy={dy:.3f}, Δz={dz:.3f}) → 绝对(x={pos['x']:.3f}, y={pos['y']:.3f}, z={pos['z']:.3f})" + ) + try: self.draw_map() + except Exception: pass + else: + self.console.log(f"[示意图] 相对移动至 '{name}'(工作坐标): Fail (Δx={dx:.3f}, Δy={dy:.3f}, Δz={dz:.3f})") + except Exception as e: + messagebox.showerror("相对移动失败", str(e)) + self.console.log(f"[示意图] 相对移动失败:{e}") + + # ---------- 点位存取 ---------- + def _load_points(self): + try: + if os.path.exists(self.points_path): + with open(self.points_path, 'r', encoding='utf-8') as f: + self.points = json.load(f) + else: + self.points = {} + except Exception: + self.points = {} + # 若组合框已创建,刷新 + if hasattr(self, 'point_combo'): + self.refresh_points_combo() + + def _save_points(self): + try: + with open(self.points_path, 'w', encoding='utf-8') as f: + json.dump(self.points, f, ensure_ascii=False, indent=2) + self.console.log("点位已保存(工作坐标)") + except Exception as e: + messagebox.showerror("保存失败", str(e)) + self.console.log(f"点位保存失败:{e}") + + def save_point(self): + s = self._need_station(); + if not s: return + name = (self.point_name_var.get() or '').strip() + if not name: + messagebox.showwarning("无名称", "请先输入点名称"); return + pos = s.get_status_mm() # 工作坐标 + self.points[name] = {"x": pos['x'], "y": pos['y'], "z": pos['z']} + self._save_points(); self.refresh_points_combo() + self.console.log(f"保存点 '{name}'(工作坐标): x={pos['x']:.3f}, y={pos['y']:.3f}, z={pos['z']:.3f}") + + def move_to_point(self): + s = self._need_station(); + if not s: return + name = (self.point_combo.get() or self.point_name_var.get()).strip() + if name not in self.points: + messagebox.showwarning("未找到点", f"未找到点: {name}"); return + p = self.points[name] + sp, ac = int(self.speed_var.get()), int(self.acc_var.get()) + try: + ok = s.move_to_work_safe(p['x'], p['y'], p['z'], speed=sp, acc=ac) + self.console.log(f"移动到点 '{name}'(工作坐标): {'OK' if ok else 'Fail'} → (x={p['x']}, y={p['y']}, z={p['z']})") + except Exception as e: + messagebox.showerror("移动失败", str(e)) + self.console.log(f"移动失败:{e}") + + def delete_point(self): + name = (self.point_combo.get() or self.point_name_var.get()).strip() + if name and name in self.points: + del self.points[name] + self._save_points(); self.refresh_points_combo() + self.console.log(f"已删除点 '{name}'") + # 若删除的是已选点,清空选择 + if name == self._selected_point_name: + self._selected_point_name = "" + self.map_selected.set("") + self.draw_map() + else: + messagebox.showwarning("未找到点", f"未找到点: {name}") + + def refresh_points_combo(self): + if hasattr(self, 'point_combo'): + names = sorted(self.points.keys()) + self.point_combo['values'] = names + if names and (self.point_combo.get() not in names): + self.point_combo.set(names[0]) + self.draw_map() + + # ---------- helpers ---------- + def _need_station(self) -> Optional[Station]: + if not self.station or not self.station.connected: + messagebox.showwarning("未连接", "请先连接设备") + return None + return self.station + + +def main(): + app = App() + app.mainloop() + +if __name__ == "__main__": + main() diff --git a/unilabos/devices/SynthonX/SynthonX_reactor.py b/unilabos/devices/SynthonX/SynthonX_reactor.py new file mode 100644 index 00000000..0d2562bc --- /dev/null +++ b/unilabos/devices/SynthonX/SynthonX_reactor.py @@ -0,0 +1,106 @@ +import serial +import serial.tools.list_ports +import time +import sys + +# 继电器控制指令 +RELAY_ON = bytes.fromhex('A0 01 01 A2') # 打开继电器 +RELAY_OFF = bytes.fromhex('A0 01 00 A1') # 关闭继电器 +RELAY_ON_WITH_RESPONSE = bytes.fromhex('A0 01 03 A4') # 打开继电器,带回复 +RELAY_OFF_WITH_RESPONSE = bytes.fromhex('A0 01 02 A3') # 关闭继电器,带回复 + +class RelayController: + """USB继电器控制类 + + 参数: + port: 串口名称, 如 'COM3' + baudrate: 波特率, 默认9600 + timeout: 读超时秒数 + """ + def __init__(self, port: str = 'COM3', baudrate: int = 9600, timeout: float = 1.0): + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self.ser: serial.Serial | None = None + self._state: bool | None = None # True=ON False=OFF None=未知 + + def connect(self): + if self.ser and self.ser.is_open: + return + try: + self.ser = serial.Serial(self.port, self.baudrate, timeout=self.timeout) + time.sleep(2) # 等待设备初始化 + except Exception as e: + raise RuntimeError(f'连接串口失败: {e}') + + def _write(self, data: bytes, expect_response: bool = True) -> bytes | None: + if not self.ser or not self.ser.is_open: + raise RuntimeError('串口未连接, 请先调用 connect()') + self.ser.write(data) + if expect_response: + # 读取直到最后一个字节或超时 + end_byte = data[-1:] + resp = self.ser.read_until(end_byte) + return resp + return None + + def on(self, wait_response: bool = True): + resp = self._write(RELAY_ON_WITH_RESPONSE if wait_response else RELAY_ON, expect_response=wait_response) + self._state = True + return resp + + def off(self, wait_response: bool = True): + resp = self._write(RELAY_OFF_WITH_RESPONSE if wait_response else RELAY_OFF, expect_response=wait_response) + self._state = False + return resp + + def toggle(self, wait_response: bool = True): + if self._state is None: + # 未知状态默认先关闭再打开 + self.off(wait_response) + time.sleep(0.1) + return self.on(wait_response) + return self.off(wait_response) if self._state else self.on(wait_response) + + def state(self) -> bool | None: + return self._state + + def close(self): + if self.ser and self.ser.is_open: + self.ser.close() + + def ensure_off_on_exit(self): + # 安全关闭 + try: + self.off(wait_response=False) + except Exception: + pass + finally: + self.close() + +# 示例入口 +if __name__ == '__main__': + print('=' * 40) + print(' USB继电器控制程序') + print('=' * 40) + run_time_input = input('输入运行时间(秒): ').strip() + try: + run_seconds = float(run_time_input) + except ValueError: + print('输入不是数字, 退出.') + sys.exit(1) + + controller = RelayController(port='COM7') + try: + controller.connect() + controller.on() # 初始化关闭 + print('打开搅拌器') + print(f'等待 {run_seconds} 秒后关闭...') + time.sleep(run_seconds) + controller.off() + print('关闭搅拌器') + except Exception as e: + print('发生错误:', e) + finally: + controller.ensure_off_on_exit() + print('程序结束') \ No newline at end of file diff --git a/unilabos/devices/SynthonX/points_gui.json b/unilabos/devices/SynthonX/points_gui.json new file mode 100644 index 00000000..f78d2c60 --- /dev/null +++ b/unilabos/devices/SynthonX/points_gui.json @@ -0,0 +1,687 @@ +{ + "C1": { + "x": 195.934375, + "y": 96.6064453125, + "z": 63.79443359375 + }, + "C2": { + "x": 204.934375, + "y": 96.6064453125, + "z": 63.79443359375 + }, + "C3": { + "x": 213.934375, + "y": 96.6064453125, + "z": 63.79443359375 + }, + "C4": { + "x": 222.934375, + "y": 96.6064453125, + "z": 63.79443359375 + }, + "C5": { + "x": 231.934375, + "y": 96.6064453125, + "z": 63.79443359375 + }, + "C6": { + "x": 240.934375, + "y": 96.6064453125, + "z": 63.79443359375 + }, + "C7": { + "x": 249.934375, + "y": 96.6064453125, + "z": 63.79443359375 + }, + "C8": { + "x": 258.934375, + "y": 96.6064453125, + "z": 63.79443359375 + }, + "C9": { + "x": 267.934375, + "y": 96.6064453125, + "z": 63.79443359375 + }, + "C10": { + "x": 276.934375, + "y": 96.6064453125, + "z": 63.79443359375 + }, + "C11": { + "x": 285.934375, + "y": 96.6064453125, + "z": 63.79443359375 + }, + "C12": { + "x": 294.934375, + "y": 96.6064453125, + "z": 63.79443359375 + }, + "C13": { + "x": 195.934375, + "y": 105.6064453125, + "z": 63.79443359375 + }, + "C14": { + "x": 204.934375, + "y": 105.6064453125, + "z": 63.79443359375 + }, + "C15": { + "x": 213.934375, + "y": 105.6064453125, + "z": 63.79443359375 + }, + "C16": { + "x": 222.934375, + "y": 105.6064453125, + "z": 63.79443359375 + }, + "C17": { + "x": 231.934375, + "y": 105.6064453125, + "z": 63.79443359375 + }, + "C18": { + "x": 240.934375, + "y": 105.6064453125, + "z": 63.79443359375 + }, + "C19": { + "x": 249.934375, + "y": 105.6064453125, + "z": 63.79443359375 + }, + "C20": { + "x": 258.934375, + "y": 105.6064453125, + "z": 63.79443359375 + }, + "C21": { + "x": 267.934375, + "y": 105.6064453125, + "z": 63.79443359375 + }, + "C22": { + "x": 276.934375, + "y": 105.6064453125, + "z": 63.79443359375 + }, + "C23": { + "x": 285.934375, + "y": 105.6064453125, + "z": 63.79443359375 + }, + "C24": { + "x": 294.934375, + "y": 105.6064453125, + "z": 63.79443359375 + }, + "C25": { + "x": 195.934375, + "y": 114.6064453125, + "z": 63.79443359375 + }, + "C26": { + "x": 204.934375, + "y": 114.6064453125, + "z": 63.79443359375 + }, + "C27": { + "x": 213.934375, + "y": 114.6064453125, + "z": 63.79443359375 + }, + "C28": { + "x": 222.934375, + "y": 114.6064453125, + "z": 63.79443359375 + }, + "C29": { + "x": 231.934375, + "y": 114.6064453125, + "z": 63.79443359375 + }, + "C30": { + "x": 240.934375, + "y": 114.6064453125, + "z": 63.79443359375 + }, + "C31": { + "x": 249.934375, + "y": 114.6064453125, + "z": 63.79443359375 + }, + "C32": { + "x": 258.934375, + "y": 114.6064453125, + "z": 63.79443359375 + }, + "C33": { + "x": 267.934375, + "y": 114.6064453125, + "z": 63.79443359375 + }, + "C34": { + "x": 276.934375, + "y": 114.6064453125, + "z": 63.79443359375 + }, + "C35": { + "x": 285.934375, + "y": 114.6064453125, + "z": 63.79443359375 + }, + "C36": { + "x": 294.934375, + "y": 114.6064453125, + "z": 63.79443359375 + }, + "C37": { + "x": 195.934375, + "y": 123.6064453125, + "z": 63.79443359375 + }, + "C38": { + "x": 204.934375, + "y": 123.6064453125, + "z": 63.79443359375 + }, + "C39": { + "x": 213.934375, + "y": 123.6064453125, + "z": 63.79443359375 + }, + "C40": { + "x": 222.934375, + "y": 123.6064453125, + "z": 63.79443359375 + }, + "C41": { + "x": 231.934375, + "y": 123.6064453125, + "z": 63.79443359375 + }, + "C42": { + "x": 240.934375, + "y": 123.6064453125, + "z": 63.79443359375 + }, + "C43": { + "x": 249.934375, + "y": 123.6064453125, + "z": 63.79443359375 + }, + "C44": { + "x": 258.934375, + "y": 123.6064453125, + "z": 63.79443359375 + }, + "C45": { + "x": 267.934375, + "y": 123.6064453125, + "z": 63.79443359375 + }, + "C46": { + "x": 276.934375, + "y": 123.6064453125, + "z": 63.79443359375 + }, + "C47": { + "x": 285.934375, + "y": 123.6064453125, + "z": 63.79443359375 + }, + "C48": { + "x": 294.934375, + "y": 123.6064453125, + "z": 63.79443359375 + }, + "C49": { + "x": 195.934375, + "y": 132.6064453125, + "z": 63.79443359375 + }, + "C50": { + "x": 204.934375, + "y": 132.6064453125, + "z": 63.79443359375 + }, + "C51": { + "x": 213.934375, + "y": 132.6064453125, + "z": 63.79443359375 + }, + "C52": { + "x": 222.934375, + "y": 132.6064453125, + "z": 63.79443359375 + }, + "C53": { + "x": 231.934375, + "y": 132.6064453125, + "z": 63.79443359375 + }, + "C54": { + "x": 240.934375, + "y": 132.6064453125, + "z": 63.79443359375 + }, + "C55": { + "x": 249.934375, + "y": 132.6064453125, + "z": 63.79443359375 + }, + "C56": { + "x": 258.934375, + "y": 132.6064453125, + "z": 63.79443359375 + }, + "C57": { + "x": 267.934375, + "y": 132.6064453125, + "z": 63.79443359375 + }, + "C58": { + "x": 276.934375, + "y": 132.6064453125, + "z": 63.79443359375 + }, + "C59": { + "x": 285.934375, + "y": 132.6064453125, + "z": 63.79443359375 + }, + "C60": { + "x": 294.934375, + "y": 132.6064453125, + "z": 63.79443359375 + }, + "C61": { + "x": 195.934375, + "y": 141.6064453125, + "z": 63.79443359375 + }, + "C62": { + "x": 204.934375, + "y": 141.6064453125, + "z": 63.79443359375 + }, + "C63": { + "x": 213.934375, + "y": 141.6064453125, + "z": 63.79443359375 + }, + "C64": { + "x": 222.934375, + "y": 141.6064453125, + "z": 63.79443359375 + }, + "C65": { + "x": 231.934375, + "y": 141.6064453125, + "z": 63.79443359375 + }, + "C66": { + "x": 240.934375, + "y": 141.6064453125, + "z": 63.79443359375 + }, + "C67": { + "x": 249.934375, + "y": 141.6064453125, + "z": 63.79443359375 + }, + "C68": { + "x": 258.934375, + "y": 141.6064453125, + "z": 63.79443359375 + }, + "C69": { + "x": 267.934375, + "y": 141.6064453125, + "z": 63.79443359375 + }, + "C70": { + "x": 276.934375, + "y": 141.6064453125, + "z": 63.79443359375 + }, + "C71": { + "x": 285.934375, + "y": 141.6064453125, + "z": 63.79443359375 + }, + "C72": { + "x": 294.934375, + "y": 141.6064453125, + "z": 63.79443359375 + }, + "C73": { + "x": 195.934375, + "y": 150.6064453125, + "z": 63.79443359375 + }, + "C74": { + "x": 204.934375, + "y": 150.6064453125, + "z": 63.79443359375 + }, + "C75": { + "x": 213.934375, + "y": 150.6064453125, + "z": 63.79443359375 + }, + "C76": { + "x": 222.934375, + "y": 150.6064453125, + "z": 63.79443359375 + }, + "C77": { + "x": 231.934375, + "y": 150.6064453125, + "z": 63.79443359375 + }, + "C78": { + "x": 240.934375, + "y": 150.6064453125, + "z": 63.79443359375 + }, + "C79": { + "x": 249.934375, + "y": 150.6064453125, + "z": 63.79443359375 + }, + "C80": { + "x": 258.934375, + "y": 150.6064453125, + "z": 63.79443359375 + }, + "C81": { + "x": 267.934375, + "y": 150.6064453125, + "z": 63.79443359375 + }, + "C82": { + "x": 276.934375, + "y": 150.6064453125, + "z": 63.79443359375 + }, + "C83": { + "x": 285.934375, + "y": 150.6064453125, + "z": 63.79443359375 + }, + "C84": { + "x": 294.934375, + "y": 150.6064453125, + "z": 63.79443359375 + }, + "C85": { + "x": 195.934375, + "y": 159.6064453125, + "z": 63.79443359375 + }, + "C86": { + "x": 204.934375, + "y": 159.6064453125, + "z": 63.79443359375 + }, + "C87": { + "x": 213.934375, + "y": 159.6064453125, + "z": 63.79443359375 + }, + "C88": { + "x": 222.934375, + "y": 159.6064453125, + "z": 63.79443359375 + }, + "C89": { + "x": 231.934375, + "y": 159.6064453125, + "z": 63.79443359375 + }, + "C90": { + "x": 240.934375, + "y": 159.6064453125, + "z": 63.79443359375 + }, + "C91": { + "x": 249.934375, + "y": 159.6064453125, + "z": 63.79443359375 + }, + "C92": { + "x": 258.934375, + "y": 159.6064453125, + "z": 63.79443359375 + }, + "C93": { + "x": 267.934375, + "y": 159.6064453125, + "z": 63.79443359375 + }, + "C94": { + "x": 276.934375, + "y": 159.6064453125, + "z": 63.79443359375 + }, + "C95": { + "x": 285.934375, + "y": 159.6064453125, + "z": 63.79443359375 + }, + "C96": { + "x": 294.934375, + "y": 159.6064453125, + "z": 63.79443359375 + }, + "A1": { + "x": 49.9951171875, + "y": 0.0, + "z": 51.0 + }, + "A2": { + "x": 85.232421875, + "y": 0.109375, + "z": 51.79443359375 + }, + "A3": { + "x": 120.232421875, + "y": 0.109375, + "z": 51.79443359375 + }, + "A4": { + "x": 155.232421875, + "y": 0.109375, + "z": 51.79443359375 + }, + "A5": { + "x": 48.232421875, + "y": 35.109375, + "z": 51.79443359375 + }, + "A6": { + "x": 83.232421875, + "y": 35.109375, + "z": 51.79443359375 + }, + "A7": { + "x": 118.232421875, + "y": 35.109375, + "z": 51.79443359375 + }, + "A8": { + "x": 158.232421875, + "y": 35.109375, + "z": 51.79443359375 + }, + "B1": { + "x": 213.232421875, + "y": 10.109375, + "z": 111.79443359375 + }, + "B2": { + "x": 238.232421875, + "y": 10.109375, + "z": 111.79443359375 + }, + "B3": { + "x": 263.232421875, + "y": 10.109375, + "z": 111.79443359375 + }, + "B4": { + "x": 288.232421875, + "y": 10.109375, + "z": 111.79443359375 + }, + "B5": { + "x": 213.232421875, + "y": 40.109375, + "z": 111.79443359375 + }, + "B6": { + "x": 240.232421875, + "y": 40.109375, + "z": 111.79443359375 + }, + "B7": { + "x": 263.232421875, + "y": 40.109375, + "z": 111.79443359375 + }, + "B8": { + "x": 288.232421875, + "y": 40.109375, + "z": 111.79443359375 + }, + "D1": { + "x": 47.2294921875, + "y": 77.1123046875, + "z": 54.001 + }, + "D2": { + "x": 77.2294921875, + "y": 77.1123046875, + "z": 54.001 + }, + "D3": { + "x": 107.2294921875, + "y": 77.1123046875, + "z": 54.001 + }, + "D4": { + "x": 137.2294921875, + "y": 77.1123046875, + "z": 54.001 + }, + "D5": { + "x": 47.2294921875, + "y": 98.1123046875, + "z": 54.001 + }, + "D6": { + "x": 77.2294921875, + "y": 98.1123046875, + "z": 54.001 + }, + "D7": { + "x": 107.2294921875, + "y": 98.1123046875, + "z": 54.001 + }, + "D8": { + "x": 137.2294921875, + "y": 98.1123046875, + "z": 54.001 + }, + "D9": { + "x": 40.5302734375, + "y": 174.2431640625, + "z": 0.0 + }, + "D10": { + "x": 55.5302734375, + "y": 174.2431640625, + "z": 0.0 + }, + "D11": { + "x": 70.5302734375, + "y": 174.2431640625, + "z": 0.0 + }, + "D12": { + "x": 85.5302734375, + "y": 174.2431640625, + "z": 0.0 + }, + "D13": { + "x": 100.5302734375, + "y": 174.2431640625, + "z": 0.0 + }, + "D14": { + "x": 115.5302734375, + "y": 174.2431640625, + "z": 0.0 + }, + "D15": { + "x": 130.5302734375, + "y": 174.2431640625, + "z": 0.0 + }, + "D16": { + "x": 145.5302734375, + "y": 174.2431640625, + "z": 0.0 + }, + "D17": { + "x": 39.5341796875, + "y": 185.9412, + "z": 3.0191650390625 + }, + "D18": { + "x": 54.5341796875, + "y": 185.9412, + "z": 3.0191650390625 + }, + "D19": { + "x": 69.5341796875, + "y": 185.9412, + "z": 3.0191650390625 + }, + "D20": { + "x": 84.5341796875, + "y": 185.9412, + "z": 3.0191650390625 + }, + "D21": { + "x": 99.5341796875, + "y": 185.9412, + "z": 3.0191650390625 + }, + "D22": { + "x": 114.5341796875, + "y": 185.9412, + "z": 3.0191650390625 + }, + "D23": { + "x": 129.5341796875, + "y": 185.9412, + "z": 3.0191650390625 + }, + "D24": { + "x": 144.5341796875, + "y": 185.9412, + "z": 3.0191650390625 + }, + "D25": { + "x": 93.0009765625, + "y": 86.89453125, + "z": 3.99981689453125 + } +} \ No newline at end of file diff --git a/unilabos/registry/devices/synthonx.yaml b/unilabos/registry/devices/synthonx.yaml new file mode 100644 index 00000000..eff7193c --- /dev/null +++ b/unilabos/registry/devices/synthonx.yaml @@ -0,0 +1,638 @@ +SynthonXFlowV2: + category: + - xyzmove + - synthonx + class: + action_value_mappings: + auto-stir_connect: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: stir_connect参数 + type: object + type: UniLabJsonCommand + auto-stir_for: + feedback: {} + goal: {} + goal_default: + seconds: null + wait_response: true + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + seconds: + type: number + wait_response: + default: true + type: boolean + required: + - seconds + type: object + result: {} + required: + - goal + title: stir_for参数 + type: object + type: UniLabJsonCommand + auto-stir_off: + feedback: {} + goal: {} + goal_default: + wait_response: true + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + wait_response: + default: true + type: boolean + required: [] + type: object + result: {} + required: + - goal + title: stir_off参数 + type: object + type: UniLabJsonCommand + auto-stir_on: + feedback: {} + goal: {} + goal_default: + wait_response: true + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + wait_response: + default: true + type: boolean + required: [] + type: object + result: {} + required: + - goal + title: stir_on参数 + type: object + type: UniLabJsonCommand + auto-stir_safe_shutdown: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: stir_safe_shutdown参数 + type: object + type: UniLabJsonCommand + auto-stir_toggle: + feedback: {} + goal: {} + goal_default: + wait_response: true + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + wait_response: + default: true + type: boolean + required: [] + type: object + result: {} + required: + - goal + title: stir_toggle参数 + type: object + type: UniLabJsonCommand + auto-stirring: + feedback: {} + goal: {} + goal_default: + wait_response: true + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + wait_response: + default: true + type: boolean + required: [] + type: object + result: {} + required: + - goal + title: stirring参数 + type: object + type: UniLabJsonCommand + drop_tip: + goal_default: + down_mm: 60.0 + handles: {} + placeholder_keys: {} + schema: + description: 移动到计算出的废弃枪头位置,下探,弹出枪头,然后回升。 + properties: + feedback: {} + goal: + properties: + down_mm: + default: 60.0 + description: 下探距离 (mm) + type: number + tip_point: + description: 原始枪头点位名称 (例如 'C1',将自动映射到废弃位) 或直接指定废弃位 (例如 'D49') + type: string + required: + - tip_point + type: object + result: + properties: + success: + type: boolean + required: + - success + type: object + required: + - goal + title: drop_tip参数 + type: UniLabJsonCommand + filtering: + goal_default: {} + handles: {} + placeholder_keys: {} + schema: + description: 执行过滤操作,使用推杆将液体通过过滤器压出。 + properties: + feedback: {} + goal: + properties: + down_filter_mm: + description: 执行过滤时的下压距离 (mm) + type: number + down_pick_mm: + description: 拾取推杆时的下探距离 (mm) + type: number + filter_name: + description: 过滤器点位名称 (例如 'D1'-'D8') + type: string + pusher_name: + description: 推杆点位名称 (例如 'D9'-'D16') + type: string + required: + - pusher_name + - filter_name + - down_pick_mm + - down_filter_mm + type: object + result: + properties: + success: + type: boolean + required: + - success + type: object + required: + - goal + title: filtering参数 + type: UniLabJsonCommand + load_for_nmr: + goal_default: {} + handles: {} + placeholder_keys: {} + schema: + description: 执行装载核磁样品的操作,从D区源孔位转移液体到D区目标孔位(核磁管)。 + properties: + feedback: {} + goal: + properties: + dst_d_name: + description: D区目标孔位(核磁管)名称 (例如 'D17') + type: string + src_d_name: + description: D区源孔位名称 (例如 'D1') + type: string + tip_c_name: + description: 用于此次转移的C区枪头点位名称 (例如 'C4') + type: string + total_ul: + description: 总吸液体积 (ul) + type: number + required: + - src_d_name + - dst_d_name + - tip_c_name + - total_ul + - down_src_mm + - down_dst_mm + type: object + result: + properties: + success: + type: boolean + required: + - success + type: object + required: + - goal + title: load_for_nmr参数 + type: UniLabJsonCommand + pick_tip: + goal_default: + down_mm: 120.0 + handles: {} + placeholder_keys: {} + schema: + description: 移动到指定枪头位置,下探并拾取枪头,然后回升。 + properties: + feedback: {} + goal: + properties: + down_mm: + default: 120.0 + description: 下探距离 (mm) + type: number + tip_point: + description: 枪头点位名称 (例如 'C1') + type: string + required: + - tip_point + type: object + result: + properties: + success: + type: boolean + required: + - success + type: object + required: + - goal + title: pick_tip参数 + type: UniLabJsonCommand + pipette_asp_then_dsp: + goal_default: {} + handles: {} + placeholder_keys: {} + schema: + description: 在当前位置连续执行吸液后排液,不移动XYZ,可用于测试活塞。 + properties: + feedback: {} + goal: + properties: + asp_ul: + description: 吸液体积 (ul) + type: number + dsp_ul: + description: 排液体积 (ul) + type: number + required: + - asp_ul + - dsp_ul + type: object + result: + properties: + success: + type: boolean + required: + - success + type: object + required: + - goal + title: pipette_asp_then_dsp参数 + type: UniLabJsonCommand + pipette_aspirate: + goal_default: {} + handles: {} + placeholder_keys: {} + schema: + description: 在当前位置吸取指定体积的液体 (ul),不进行XYZ移动。 + properties: + feedback: {} + goal: + properties: + volume_ul: + description: 要吸取的液体体积 (ul) + type: number + required: + - volume_ul + type: object + result: + properties: + success: + type: boolean + required: + - success + type: object + required: + - goal + title: pipette_aspirate参数 + type: UniLabJsonCommand + pipette_dispense: + goal_default: {} + handles: {} + placeholder_keys: {} + schema: + description: 在当前位置分配指定体积的液体 (ul),不进行XYZ移动。 + properties: + feedback: {} + goal: + properties: + volume_ul: + description: 要分配的液体体积 (ul) + type: number + required: + - volume_ul + type: object + result: + properties: + success: + type: boolean + required: + - success + type: object + required: + - goal + title: pipette_dispense参数 + type: UniLabJsonCommand + pipette_init: + goal_default: {} + handles: {} + placeholder_keys: {} + schema: + description: 初始化移液枪。 + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: + properties: + success: + type: boolean + required: + - success + type: object + required: + - goal + title: pipette_init参数 + type: UniLabJsonCommand + pushing: + goal_default: {} + handles: {} + placeholder_keys: {} + schema: + description: 在D25点位执行推动操作。 + properties: + feedback: {} + goal: + properties: + tip_c_name: + description: 用于推动的C区枪头点位名称 (例如 'C3') + type: string + y_forward_mm: + description: 向Y轴正方向推动的距离 (mm) + type: number + z_down_mm: + description: 下压距离 (mm) + type: number + required: + - tip_c_name + - z_down_mm + - y_forward_mm + type: object + result: + properties: + success: + type: boolean + required: + - success + type: object + required: + - goal + title: pushing参数 + type: UniLabJsonCommand + system_init: + goal_default: {} + handles: {} + placeholder_keys: {} + schema: + description: 初始化系统,包括全轴回零并将当前位置设为工作原点。 + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: + properties: + success: + type: boolean + required: + - success + type: object + required: + - goal + title: system_init参数 + type: UniLabJsonCommand + transfer_A_to_B: + goal_default: + split_volumes: null + handles: {} + placeholder_keys: {} + schema: + description: 从A区源孔位转移液体到一个或多个B区目标孔位。 + properties: + feedback: {} + goal: + properties: + a_name: + description: A区源孔位名称 (例如 'A1') + type: string + b_names: + description: 一个或多个B区目标孔位名称 (例如 'B1' 或 ['B1', 'B2']) + oneOf: + - type: string + - items: + type: string + type: array + split_volumes: + description: (可选) 当有多个目标时,指定分配给每个目标的体积列表,需与目标数量一致。如果为null,则均分总体积。 + items: + type: number + nullable: true + type: array + tip_c_name: + description: 用于此次转移的C区枪头点位名称 (例如 'C1') + type: string + total_ul: + description: 总吸液体积 (ul) + type: number + required: + - a_name + - b_names + - tip_c_name + - total_ul + - down_a_mm + - down_b_mm + type: object + result: + properties: + success: + type: boolean + required: + - success + type: object + required: + - goal + title: transfer_A_to_B参数 + type: UniLabJsonCommand + transfer_B_to_D: + goal_default: + split_volumes: null + handles: {} + placeholder_keys: {} + schema: + description: 从B区源孔位转移液体到一个或多个D区目标孔位。 + properties: + feedback: {} + goal: + properties: + b_name: + description: B区源孔位名称 (例如 'B1') + type: string + d_names: + description: 一个或多个D区目标孔位名称 (例如 'D1' 或 ['D1', 'D2']) + oneOf: + - type: string + - items: + type: string + type: array + split_volumes: + description: (可选) 当有多个目标时,指定分配给每个目标的体积列表,需与目标数量一致。如果为null,则均分总体积。 + items: + type: number + nullable: true + type: array + tip_c_name: + description: 用于此次转移的C区枪头点位名称 (例如 'C2') + type: string + total_ul: + description: 总吸液体积 (ul) + type: number + required: + - b_name + - d_names + - tip_c_name + - total_ul + - down_b_mm + - down_d_mm + type: object + result: + properties: + success: + type: boolean + required: + - success + type: object + required: + - goal + title: transfer_B_to_D参数 + type: UniLabJsonCommand + module: unilabos.devices.synthonX.SynthonX_flow_v3:SynthonXFlowV2 + status_types: {} + type: python + config_info: [] + description: SynthonX V2 流程控制器,封装了取放枪头、液体转移、过滤、推动和装载核磁等高级操作。 + handles: [] + icon: '' + init_param_schema: + config: + properties: + approach_lift: + default: 0.0 + type: number + baudrate: + default: 115200 + type: integer + delay_after_aspirate: + default: 0.35 + type: number + delay_after_dispense: + default: 0.35 + type: number + points_file: + default: points_gui.json + type: string + port: + default: COM5 + type: string + relay_baudrate: + default: 9600 + type: integer + relay_port: + default: COM7 + type: string + relay_timeout: + default: 1.0 + type: number + settle_s: + default: 5 + type: integer + required: [] + type: object + data: + properties: {} + required: [] + type: object + version: 1.0.0