|
| 1 | +"""Command models to manually store a labware into a Flex Stacker in an unsafe situation.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | +from typing import Optional, Literal, TYPE_CHECKING, Type, Union, cast |
| 5 | + |
| 6 | +from pydantic import BaseModel, Field |
| 7 | +from pydantic.json_schema import SkipJsonSchema |
| 8 | + |
| 9 | +from opentrons_shared_data.labware.labware_definition import LabwareDefinition |
| 10 | + |
| 11 | +from ..command import ( |
| 12 | + AbstractCommandImpl, |
| 13 | + BaseCommand, |
| 14 | + BaseCommandCreate, |
| 15 | + SuccessData, |
| 16 | +) |
| 17 | +from ..flex_stacker.common import ( |
| 18 | + labware_locations_for_group, |
| 19 | + labware_location_base_sequence, |
| 20 | + primary_location_sequence, |
| 21 | + adapter_location_sequence, |
| 22 | + lid_location_sequence, |
| 23 | +) |
| 24 | +from ...errors import ( |
| 25 | + ErrorOccurrence, |
| 26 | + CannotPerformModuleAction, |
| 27 | + LabwareNotLoadedOnModuleError, |
| 28 | + FlexStackerLabwarePoolNotYetDefinedError, |
| 29 | +) |
| 30 | +from ...resources import ModelUtils |
| 31 | +from ...state import update_types |
| 32 | +from ...types import ( |
| 33 | + LabwareLocationSequence, |
| 34 | + InStackerHopperLocation, |
| 35 | + StackerStoredLabwareGroup, |
| 36 | + ModuleLocation, |
| 37 | +) |
| 38 | + |
| 39 | +from opentrons.hardware_control.modules.types import PlatformState |
| 40 | + |
| 41 | + |
| 42 | +if TYPE_CHECKING: |
| 43 | + from opentrons.protocol_engine.state.state import StateView |
| 44 | + from opentrons.protocol_engine.state.module_substates import FlexStackerSubState |
| 45 | + from opentrons.protocol_engine.execution import EquipmentHandler |
| 46 | + |
| 47 | + |
| 48 | +UnsafeFlexStackerManualStoreCommandType = Literal[ |
| 49 | + "unsafe/flexStacker/manualStore" |
| 50 | +] |
| 51 | + |
| 52 | + |
| 53 | +class UnsafeFlexStackerManualStoreParams(BaseModel): |
| 54 | + """Input parameters for a labware store command.""" |
| 55 | + |
| 56 | + moduleId: str = Field( |
| 57 | + ..., |
| 58 | + description="Unique ID of the flex stacker.", |
| 59 | + ) |
| 60 | + |
| 61 | + |
| 62 | +class UnsafeFlexStackerManualStoreResult(BaseModel): |
| 63 | + """Result data from a labware storage command.""" |
| 64 | + |
| 65 | + eventualDestinationLocationSequence: ( |
| 66 | + LabwareLocationSequence | SkipJsonSchema[None] |
| 67 | + ) = Field( |
| 68 | + None, |
| 69 | + description=( |
| 70 | + "The full location in which all labware moved by this command will eventually reside." |
| 71 | + ), |
| 72 | + ) |
| 73 | + primaryOriginLocationSequence: LabwareLocationSequence | SkipJsonSchema[ |
| 74 | + None |
| 75 | + ] = Field(None, description=("The origin location of the primary labware.")) |
| 76 | + primaryLabwareId: str | SkipJsonSchema[None] = Field( |
| 77 | + None, description="The primary labware in the stack that was stored." |
| 78 | + ) |
| 79 | + adapterOriginLocationSequence: LabwareLocationSequence | SkipJsonSchema[ |
| 80 | + None |
| 81 | + ] = Field(None, description=("The origin location of the adapter labware, if any.")) |
| 82 | + adapterLabwareId: str | SkipJsonSchema[None] = Field( |
| 83 | + None, description="The adapter in the stack that was stored, if any." |
| 84 | + ) |
| 85 | + lidOriginLocationSequence: LabwareLocationSequence | SkipJsonSchema[None] = Field( |
| 86 | + None, description=("The origin location of the lid labware, if any.") |
| 87 | + ) |
| 88 | + lidLabwareId: str | SkipJsonSchema[None] = Field( |
| 89 | + None, description="The lid in the stack that was stored, if any." |
| 90 | + ) |
| 91 | + primaryLabwareURI: str = Field( |
| 92 | + ..., |
| 93 | + description="The labware definition URI of the primary labware.", |
| 94 | + ) |
| 95 | + adapterLabwareURI: str | SkipJsonSchema[None] = Field( |
| 96 | + None, |
| 97 | + description="The labware definition URI of the adapter labware.", |
| 98 | + ) |
| 99 | + lidLabwareURI: str | SkipJsonSchema[None] = Field( |
| 100 | + None, |
| 101 | + description="The labware definition URI of the lid labware.", |
| 102 | + ) |
| 103 | + |
| 104 | + |
| 105 | +class UnsafeFlexStackerManualStoreImpl( |
| 106 | + AbstractCommandImpl[UnsafeFlexStackerManualStoreParams, SuccessData[UnsafeFlexStackerManualStoreResult]] |
| 107 | +): |
| 108 | + """Implementation of a labware store command.""" |
| 109 | + |
| 110 | + def __init__( |
| 111 | + self, |
| 112 | + state_view: StateView, |
| 113 | + equipment: EquipmentHandler, |
| 114 | + model_utils: ModelUtils, |
| 115 | + **kwargs: object, |
| 116 | + ) -> None: |
| 117 | + self._state_view = state_view |
| 118 | + self._equipment = equipment |
| 119 | + self._model_utils = model_utils |
| 120 | + |
| 121 | + def _verify_labware_to_store( |
| 122 | + self, params: UnsafeFlexStackerManualStoreParams, stacker_state: FlexStackerSubState |
| 123 | + ) -> tuple[str, str | None, str | None]: |
| 124 | + location = self._state_view.modules.get_location(params.moduleId) |
| 125 | + try: |
| 126 | + bottom_id = self._state_view.labware.get_id_by_module(params.moduleId) |
| 127 | + except LabwareNotLoadedOnModuleError: |
| 128 | + raise CannotPerformModuleAction( |
| 129 | + f"Flex Stacker in {location} cannot store labware because its carriage is empty" |
| 130 | + ) |
| 131 | + labware_ids = self._state_view.labware.get_labware_stack_from_parent(bottom_id) |
| 132 | + labware_defs = [ |
| 133 | + self._state_view.labware.get_definition(id) for id in labware_ids |
| 134 | + ] |
| 135 | + |
| 136 | + lid_id: str | None = None |
| 137 | + |
| 138 | + pool_list = stacker_state.get_pool_definition_ordered_list() |
| 139 | + assert pool_list is not None |
| 140 | + if len(labware_ids) != len(pool_list): |
| 141 | + raise CannotPerformModuleAction( |
| 142 | + f"Cannot store labware stack that does not correspond with the configuration of Flex Stacker in {location}" |
| 143 | + ) |
| 144 | + if stacker_state.pool_lid_definition is not None: |
| 145 | + if labware_defs[-1] != stacker_state.pool_lid_definition: |
| 146 | + raise CannotPerformModuleAction( |
| 147 | + f"Cannot store labware stack that does not correspond with the configuration of Flex Stacker in {location}" |
| 148 | + ) |
| 149 | + lid_id = labware_ids[-1] |
| 150 | + |
| 151 | + if stacker_state.pool_adapter_definition is not None: |
| 152 | + if ( |
| 153 | + labware_defs[0] != stacker_state.pool_adapter_definition |
| 154 | + or labware_defs[1] != stacker_state.pool_primary_definition |
| 155 | + ): |
| 156 | + raise CannotPerformModuleAction( |
| 157 | + f"Cannot store labware stack that does not correspond with the configuration of Flex Stacker in {location}" |
| 158 | + ) |
| 159 | + else: |
| 160 | + return labware_ids[1], labware_ids[0], lid_id |
| 161 | + else: |
| 162 | + if labware_defs[0] != stacker_state.pool_primary_definition: |
| 163 | + raise CannotPerformModuleAction( |
| 164 | + f"Cannot store labware stack that does not correspond with the configuration of Flex Stacker in {location}" |
| 165 | + ) |
| 166 | + return labware_ids[0], None, lid_id |
| 167 | + |
| 168 | + async def execute(self, params: UnsafeFlexStackerManualStoreParams) -> SuccessData[UnsafeFlexStackerManualStoreResult]: |
| 169 | + """Execute the labware storage command.""" |
| 170 | + stacker_state = self._state_view.modules.get_flex_stacker_substate( |
| 171 | + params.moduleId |
| 172 | + ) |
| 173 | + |
| 174 | + location = self._state_view.modules.get_location(params.moduleId) |
| 175 | + pool_definitions = stacker_state.get_pool_definition_ordered_list() |
| 176 | + if pool_definitions is None: |
| 177 | + raise FlexStackerLabwarePoolNotYetDefinedError( |
| 178 | + message=f"The Flex Stacker in {location} has not been configured yet and cannot be filled." |
| 179 | + ) |
| 180 | + |
| 181 | + if ( |
| 182 | + len(stacker_state.contained_labware_bottom_first) |
| 183 | + == stacker_state.max_pool_count |
| 184 | + ): |
| 185 | + raise CannotPerformModuleAction( |
| 186 | + f"Cannot store labware in Flex Stacker in {location} because it is full" |
| 187 | + ) |
| 188 | + |
| 189 | + primary_id, maybe_adapter_id, maybe_lid_id = self._verify_labware_to_store( |
| 190 | + params, stacker_state |
| 191 | + ) |
| 192 | + |
| 193 | + # Allow propagation of ModuleNotAttachedError. |
| 194 | + stacker_hw = self._equipment.get_module_hardware_api(stacker_state.module_id) |
| 195 | + |
| 196 | + state_update = update_types.StateUpdate() |
| 197 | + |
| 198 | + # Validate that the stacker is fully in the extended position. |
| 199 | + if stacker_hw: |
| 200 | + stacker_hw.set_stacker_identify(True) |
| 201 | + if stacker_hw.platform_state != PlatformState.EXTENDED: |
| 202 | + raise CannotPerformModuleAction( |
| 203 | + f"Cannot manually store a labware into Flex Stacker in {location} if the shuttle is not extended." |
| 204 | + ) |
| 205 | + |
| 206 | + id_list = [ |
| 207 | + id for id in (primary_id, maybe_adapter_id, maybe_lid_id) if id is not None |
| 208 | + ] |
| 209 | + |
| 210 | + group = StackerStoredLabwareGroup( |
| 211 | + primaryLabwareId=primary_id, |
| 212 | + adapterLabwareId=maybe_adapter_id, |
| 213 | + lidLabwareId=maybe_lid_id, |
| 214 | + ) |
| 215 | + |
| 216 | + state_update.set_batch_labware_location( |
| 217 | + new_locations_by_id={ |
| 218 | + id: InStackerHopperLocation(moduleId=params.moduleId) for id in id_list |
| 219 | + }, |
| 220 | + new_offset_ids_by_id={id: None for id in id_list}, |
| 221 | + ) |
| 222 | + |
| 223 | + state_update.update_flex_stacker_contained_labware( |
| 224 | + module_id=params.moduleId, |
| 225 | + contained_labware_bottom_first=( |
| 226 | + [group] + stacker_state.contained_labware_bottom_first |
| 227 | + ), |
| 228 | + ) |
| 229 | + |
| 230 | + original_location_sequences = labware_locations_for_group( |
| 231 | + group, |
| 232 | + labware_location_base_sequence( |
| 233 | + group, |
| 234 | + self._state_view, |
| 235 | + self._state_view.geometry.get_predicted_location_sequence( |
| 236 | + ModuleLocation(moduleId=params.moduleId) |
| 237 | + ), |
| 238 | + ), |
| 239 | + ) |
| 240 | + |
| 241 | + if stacker_hw is not None: |
| 242 | + stacker_hw.set_stacker_identify(False) |
| 243 | + |
| 244 | + return SuccessData( |
| 245 | + public=UnsafeFlexStackerManualStoreResult.model_construct( |
| 246 | + eventualDestinationLocationSequence=[ |
| 247 | + InStackerHopperLocation(moduleId=params.moduleId) |
| 248 | + ], |
| 249 | + primaryOriginLocationSequence=primary_location_sequence( |
| 250 | + original_location_sequences |
| 251 | + ), |
| 252 | + primaryLabwareId=primary_id, |
| 253 | + adapterOriginLocationSequence=adapter_location_sequence( |
| 254 | + original_location_sequences |
| 255 | + ), |
| 256 | + adapterLabwareId=maybe_adapter_id, |
| 257 | + lidOriginLocationSequence=lid_location_sequence( |
| 258 | + original_location_sequences |
| 259 | + ), |
| 260 | + lidLabwareId=maybe_lid_id, |
| 261 | + primaryLabwareURI=self._state_view.labware.get_uri_from_definition_unless_none( |
| 262 | + cast(LabwareDefinition, stacker_state.pool_primary_definition) |
| 263 | + ), |
| 264 | + adapterLabwareURI=self._state_view.labware.get_uri_from_definition_unless_none( |
| 265 | + stacker_state.pool_adapter_definition |
| 266 | + ), |
| 267 | + lidLabwareURI=self._state_view.labware.get_uri_from_definition_unless_none( |
| 268 | + stacker_state.pool_lid_definition |
| 269 | + ), |
| 270 | + ), |
| 271 | + state_update=state_update, |
| 272 | + ) |
| 273 | + |
| 274 | + |
| 275 | +class UnsafeFlexStackerManualStore( |
| 276 | + BaseCommand[ |
| 277 | + UnsafeFlexStackerManualStoreParams, |
| 278 | + UnsafeFlexStackerManualStoreResult, |
| 279 | + ErrorOccurrence, |
| 280 | + ] |
| 281 | +): |
| 282 | + """A command to manually store a labware into a Flex Stacker.""" |
| 283 | + |
| 284 | + commandType: UnsafeFlexStackerManualStoreCommandType = ( |
| 285 | + "unsafe/flexStacker/manualStore" |
| 286 | + ) |
| 287 | + params: UnsafeFlexStackerManualStoreParams |
| 288 | + result: UnsafeFlexStackerManualStoreResult | None = None |
| 289 | + |
| 290 | + _ImplementationCls: Type[ |
| 291 | + UnsafeFlexStackerManualStoreImpl |
| 292 | + ] = UnsafeFlexStackerManualStoreImpl |
| 293 | + |
| 294 | + |
| 295 | +class UnsafeFlexStackerManualStoreCreate( |
| 296 | + BaseCommandCreate[UnsafeFlexStackerManualStoreParams] |
| 297 | +): |
| 298 | + """A request to execute a Flex Stacker manual store command.""" |
| 299 | + |
| 300 | + commandType: UnsafeFlexStackerManualStoreCommandType = ( |
| 301 | + "unsafe/flexStacker/manualStore" |
| 302 | + ) |
| 303 | + params: UnsafeFlexStackerManualStoreParams |
| 304 | + |
| 305 | + _CommandCls: Type[UnsafeFlexStackerManualStore] = UnsafeFlexStackerManualStore |
0 commit comments