|
| 1 | +from ot3_testing.tests.base_init import TestBase |
| 2 | +from ot3_testing.ot_type import Mount, Point |
| 3 | +from devices.amsamotion_sensor import LaserSensor |
| 4 | +from ot3_testing.test_config.gripper_leveling_config import CalibrateMethod, Gripper_Position, GripperChannel, \ |
| 5 | + GripperMiddlePosition |
| 6 | +import asyncio |
| 7 | +from typing import List |
| 8 | +from utils import Utils |
| 9 | +import os, datetime |
| 10 | + |
| 11 | +ApplyCompensationFlag = True |
| 12 | +CalibrateFlag = True |
| 13 | + |
| 14 | + |
| 15 | +class GripperLeveling(TestBase): |
| 16 | + def __init__(self, slot_location, robot_ip=None): |
| 17 | + super(GripperLeveling).__init__() |
| 18 | + self.robot_ip = robot_ip |
| 19 | + self.mount = Mount.LEFT |
| 20 | + self.laser_sensor = None |
| 21 | + self.slot_location = slot_location |
| 22 | + self.approaching = False |
| 23 | + |
| 24 | + def init_laser_sensor(self, send=False): |
| 25 | + """ |
| 26 | + init 96ch device |
| 27 | + :return: |
| 28 | + """ |
| 29 | + self.laser_sensor = LaserSensor(send=send) |
| 30 | + self.laser_sensor.init_device() |
| 31 | + |
| 32 | + async def move_to_test_point(self, p: Point): |
| 33 | + """ |
| 34 | + move to the test position |
| 35 | + :param p: |
| 36 | + :return: |
| 37 | + """ |
| 38 | + await self.api.move_to(self.mount, p, target="pipette", ) |
| 39 | + |
| 40 | + async def move_step_with_test_name(self, test_name: str, direction: str, step=0.1, |
| 41 | + method=CalibrateMethod.Dichotomy, gap: float = 0): |
| 42 | + """ |
| 43 | + move a step |
| 44 | + :param test_name: |
| 45 | + :param direction: |
| 46 | + :param step: |
| 47 | + :param method: |
| 48 | + :param gap: |
| 49 | + :return: |
| 50 | + """ |
| 51 | + |
| 52 | + if method == CalibrateMethod.Dichotomy: |
| 53 | + step = step |
| 54 | + elif method == CalibrateMethod.Approach and self.approaching: |
| 55 | + step = 0.06 |
| 56 | + elif method == CalibrateMethod.Approach and not self.approaching: |
| 57 | + step = gap * 2 |
| 58 | + self.approaching = True |
| 59 | + |
| 60 | + _point: Point = self.slot_location[self.mount][test_name]["point"] |
| 61 | + |
| 62 | + if "y" in test_name: |
| 63 | + if direction == "plus": # x+ |
| 64 | + _point = _point - Point(step, 0, 0) |
| 65 | + else: |
| 66 | + _point = _point + Point(step, 0, 0) |
| 67 | + elif "x" in test_name: |
| 68 | + if direction == "plus": # x+ |
| 69 | + _point = _point + Point(0, step, 0) |
| 70 | + else: |
| 71 | + _point = _point - Point(0, step, 0) |
| 72 | + elif "z" in test_name: |
| 73 | + if direction == "plus": # x+ |
| 74 | + _point = _point + Point(0, 0, step) |
| 75 | + else: |
| 76 | + _point = _point - Point(0, 0, step) |
| 77 | + |
| 78 | + print("move to: ", _point) |
| 79 | + await self.move_to_test_point(_point) |
| 80 | + self.slot_location[self.mount][test_name]["point"] = _point |
| 81 | + |
| 82 | + async def calibrate_to_zero(self, test_name: str, spec: float, read_definition: List[str], channel_definition, |
| 83 | + target_voltage=2.5, |
| 84 | + method=CalibrateMethod.Dichotomy): |
| 85 | + """ |
| 86 | + move fixture to zero (30mm) |
| 87 | + :param test_name: |
| 88 | + :param spec: |
| 89 | + :param read_definition: |
| 90 | + :param target_voltage: |
| 91 | + :param channel_definition: |
| 92 | + :param method: |
| 93 | + :return: |
| 94 | + """ |
| 95 | + init_step = 2 |
| 96 | + while True: |
| 97 | + # read voltage |
| 98 | + ret_dict = await self.read_definition_distance(read_definition, channel_definition, self.laser_sensor, |
| 99 | + self.mount, only_code=True) |
| 100 | + # min_voltage = min(distance_list) |
| 101 | + _min = list(ret_dict.values())[0] # judge the first channel |
| 102 | + min_voltage = await self.read_distance_mm_from_code_value(_min, get_voltage=True) |
| 103 | + print("current min voltage is: ", min_voltage) |
| 104 | + if spec >= (min_voltage - target_voltage) >= 0: |
| 105 | + break |
| 106 | + else: |
| 107 | + if min_voltage > target_voltage: |
| 108 | + await self.move_step_with_test_name(test_name, "plus", step=init_step, method=method, |
| 109 | + gap=(abs(min_voltage - target_voltage))) |
| 110 | + else: |
| 111 | + await self.move_step_with_test_name(test_name, "mimus", step=init_step, method=method, |
| 112 | + gap=(abs(min_voltage - target_voltage))) |
| 113 | + init_step = init_step / 1.2 |
| 114 | + if init_step <= 0.05: |
| 115 | + init_step = 0.05 |
| 116 | + self.approaching = False |
| 117 | + |
| 118 | + async def run_slot(self, p: Point, test_name, definition, keep_reading=False): |
| 119 | + """ |
| 120 | + run per slot, test z slot without calibrate to zero |
| 121 | + """ |
| 122 | + await self.move_to_test_point(p) |
| 123 | + # calibrate to zero |
| 124 | + if CalibrateFlag and "z" not in test_name: |
| 125 | + await self.calibrate_to_zero(test_name, 0.1, definition, GripperChannel, method=CalibrateMethod.Approach) |
| 126 | + while True: |
| 127 | + result = await self.read_definition_distance(definition, GripperChannel, self.laser_sensor, self.mount) |
| 128 | + print(result) |
| 129 | + if not keep_reading: |
| 130 | + return {test_name: result} |
| 131 | + |
| 132 | + def save_csv(self, file_path, title, content): |
| 133 | + """ |
| 134 | + save csv |
| 135 | + """ |
| 136 | + is_exist = Utils.is_file_exist(file_path) |
| 137 | + if is_exist: |
| 138 | + pass |
| 139 | + else: |
| 140 | + Utils.write_to_csv(file_path, title) |
| 141 | + Utils.write_to_csv(file_path, content) |
| 142 | + |
| 143 | + async def run_gripper_test(self, flex_name, project_path=None): |
| 144 | + """ |
| 145 | + run main |
| 146 | + """ |
| 147 | + if self.robot_ip is None: |
| 148 | + addr = self.get_address().strip() |
| 149 | + else: |
| 150 | + addr = self.robot_ip |
| 151 | + self.initial_api(addr, hc=True) |
| 152 | + await self.api.home() |
| 153 | + self.init_laser_sensor(send=False) |
| 154 | + |
| 155 | + _gripper_position = Gripper_Position[self.mount] |
| 156 | + result_dict = {} |
| 157 | + csv_title = [] |
| 158 | + csv_list = [] |
| 159 | + now = datetime.datetime.now() |
| 160 | + time_str = now.strftime("%Y-%m-%d %H:%M:%S ") |
| 161 | + csv_list.append(time_str + flex_name) |
| 162 | + csv_title.append(time_str + flex_name) |
| 163 | + |
| 164 | + for slot_key, slot_value in _gripper_position.items(): |
| 165 | + test_name = slot_key |
| 166 | + _point = slot_value["point"] |
| 167 | + _definition = slot_value["definition"] |
| 168 | + ret = await self.run_slot(_point, test_name, _definition) |
| 169 | + |
| 170 | + if "x" in slot_key: |
| 171 | + await self.api.home() |
| 172 | + result_dict.update(ret) |
| 173 | + print(result_dict) |
| 174 | + for key, value in result_dict.items(): |
| 175 | + difference = abs(list(value.values())[0] - list(value.values())[1]) |
| 176 | + for _key, _value in value.items(): |
| 177 | + csv_title.append(key + "_" + _key) |
| 178 | + csv_list.append(_value) |
| 179 | + csv_title.append(key + "_result") |
| 180 | + csv_list.append(difference) |
| 181 | + |
| 182 | + await self.api.home() |
| 183 | + await self.move_to_test_point(GripperMiddlePosition) |
| 184 | + self.laser_sensor.close() |
| 185 | + |
| 186 | + # save |
| 187 | + if project_path is not None: |
| 188 | + file_path = os.path.join(project_path, 'testing_data', 'gripper_leveling.csv') |
| 189 | + else: |
| 190 | + file_path = '../../testing_data/gripper_leveling.csv' |
| 191 | + self.save_csv(file_path, csv_title, csv_list) |
| 192 | + |
| 193 | + |
| 194 | +if __name__ == '__main__': |
| 195 | + _gripper = GripperLeveling(Gripper_Position, "192.168.6.33") |
| 196 | + asyncio.run(_gripper.run_gripper_test("xxx")) |
0 commit comments