Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/src/opentrons/hardware_control/modules/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class FlexStackerData(TypedDict):

class VacuumModuleData(TypedDict):
errorDetails: str | None
pumpEngaged: bool | None


ModuleData = Union[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ def is_simulated(self) -> bool:

@property
def live_data(self) -> LiveData:
# TODO: FIX THIS
data: VacuumModuleData = {
"errorDetails": self._reader.error,
"pumpEngaged": self._reader.pump_state.pump_running,
}
return {"status": self.status.value, "data": data}

Expand Down
6 changes: 6 additions & 0 deletions api/src/opentrons/protocol_engine/commands/command_unions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
temperature_module,
thermocycler,
unsafe,
vacuum_module,
)
from .air_gap_in_place import (
AirGapInPlace,
Expand Down Expand Up @@ -500,6 +501,7 @@
flex_stacker.SetStoredLabware,
flex_stacker.Fill,
flex_stacker.Empty,
vacuum_module.StopVacuum,
calibration.CalibrateGripper,
calibration.CalibratePipette,
calibration.CalibrateModule,
Expand Down Expand Up @@ -611,6 +613,7 @@
flex_stacker.SetStoredLabwareParams,
flex_stacker.FillParams,
flex_stacker.EmptyParams,
vacuum_module.StopVacuumParams,
calibration.CalibrateGripperParams,
calibration.CalibratePipetteParams,
calibration.CalibrateModuleParams,
Expand Down Expand Up @@ -720,6 +723,7 @@
flex_stacker.SetStoredLabwareCommandType,
flex_stacker.FillCommandType,
flex_stacker.EmptyCommandType,
vacuum_module.StopVacuumCommandType,
calibration.CalibrateGripperCommandType,
calibration.CalibratePipetteCommandType,
calibration.CalibrateModuleCommandType,
Expand Down Expand Up @@ -830,6 +834,7 @@
flex_stacker.SetStoredLabwareCreate,
flex_stacker.FillCreate,
flex_stacker.EmptyCreate,
vacuum_module.StopVacuumCreate,
calibration.CalibrateGripperCreate,
calibration.CalibratePipetteCreate,
calibration.CalibrateModuleCreate,
Expand Down Expand Up @@ -948,6 +953,7 @@
flex_stacker.SetStoredLabwareResult,
flex_stacker.FillResult,
flex_stacker.EmptyResult,
vacuum_module.StopVacuumResult,
calibration.CalibrateGripperResult,
calibration.CalibratePipetteResult,
calibration.CalibrateModuleResult,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Command models for Vacuum Module commands."""

from .stop_vacuum import (
StopVacuum,
StopVacuumCommandType,
StopVacuumCreate,
StopVacuumParams,
StopVacuumResult,
)

__all__ = [
# Stop vacuum command models
"StopVacuum",
"StopVacuumCommandType",
"StopVacuumCreate",
"StopVacuumParams",
"StopVacuumResult",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Command models to stop the vacuum pump."""

from __future__ import annotations

from typing import TYPE_CHECKING, Optional

from pydantic import BaseModel, Field
from typing_extensions import Literal, Type

from ...errors.error_occurrence import ErrorOccurrence
from ...state import update_types
from ..command import AbstractCommandImpl, BaseCommand, BaseCommandCreate, SuccessData

if TYPE_CHECKING:
from opentrons.protocol_engine.execution import EquipmentHandler, MovementHandler
from opentrons.protocol_engine.state.state import StateView

StopVacuumCommandType = Literal["vacuum_module/stopVacuum"]


class StopVacuumParams(BaseModel):
"""Input parameters to stop the vacuum pump."""

moduleId: str = Field(..., description="Unique ID of the vacuum module.")


class StopVacuumResult(BaseModel):
"""Result data from stopping the vacuum pump."""


class StopVacuumImpl(
AbstractCommandImpl[StopVacuumParams, SuccessData[StopVacuumResult]]
):
"""Execution implementation of a stop vacuum pump command."""

def __init__(
self,
state_view: StateView,
equipment: EquipmentHandler,
movement: MovementHandler,
**unused_dependencies: object,
) -> None:
self._state_view = state_view
self._equipment = equipment
self._movement = movement

async def execute(self, params: StopVacuumParams) -> SuccessData[StopVacuumResult]:
"""Stop the vacuum pump."""
state_update = update_types.StateUpdate()
vm_state = self._state_view.modules.get_vacuum_module_substate(params.moduleId)

vm_hardware = self._equipment.get_module_hardware_api(vm_state.module_id)
if vm_hardware is not None:
await vm_hardware.set_vacuum_state(enable_vacuum=False)

return SuccessData(public=StopVacuumResult(), state_update=state_update)


class StopVacuum(BaseCommand[StopVacuumParams, StopVacuumResult, ErrorOccurrence]):
"""A command to stop the vacuum pump."""

commandType: StopVacuumCommandType = "vacuum_module/stopVacuum"
params: StopVacuumParams
result: Optional[StopVacuumResult] = None

_ImplementationCls: Type[StopVacuumImpl] = StopVacuumImpl


class StopVacuumCreate(BaseCommandCreate[StopVacuumParams]):
"""A request to stop the vacuum pump."""

commandType: StopVacuumCommandType = "vacuum_module/stopVacuum"
params: StopVacuumParams

_CommandCls: Type[StopVacuum] = StopVacuum
8 changes: 8 additions & 0 deletions api/src/opentrons/protocol_engine/execution/equipment.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
MagDeck,
TempDeck,
Thermocycler,
VacuumModule,
)
from opentrons.hardware_control.nozzle_manager import NozzleMap
from opentrons.protocol_engine.state.module_substates import (
Expand All @@ -48,6 +49,7 @@
MagneticModuleId,
TemperatureModuleId,
ThermocyclerModuleId,
VacuumModuleId,
)
from opentrons.types import MountType

Expand Down Expand Up @@ -758,6 +760,12 @@ def get_module_hardware_api(
module_id: FlexStackerId,
) -> Optional[FlexStacker]: ...

@overload
def get_module_hardware_api(
self,
module_id: VacuumModuleId,
) -> Optional[VacuumModule]: ...

def get_module_hardware_api(self, module_id: str) -> Optional[AbstractModule]:
"""Get the hardware API for a given module."""
use_virtual_modules = self._state_store.config.use_virtual_modules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ class VacuumModuleSubState:
"""

module_id: VacuumModuleId
pump_engaged: bool

def new_from_state_change(
self, update: VacuumModuleStateUpdate
) -> "VacuumModuleSubState":
"""Return a new state with the given update applied."""
new_pump_engaged = self.pump_engaged
if isinstance(update.pump_engaged, bool):
new_pump_engaged = self.pump_engaged
return VacuumModuleSubState(
module_id=self.module_id,
module_id=self.module_id, pump_engaged=new_pump_engaged
)
132 changes: 76 additions & 56 deletions api/src/opentrons/protocol_engine/state/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import math
from dataclasses import dataclass
from typing import (
Any,
Dict,
List,
NamedTuple,
Expand Down Expand Up @@ -68,11 +69,14 @@
TemperatureModuleSubState,
ThermocyclerModuleId,
ThermocyclerModuleSubState,
VacuumModuleId,
VacuumModuleSubState,
)
from .update_types import (
AbsorbanceReaderStateUpdate,
FlexStackerStateUpdate,
LoadModuleUpdate,
VacuumModuleStateUpdate,
)
from opentrons.hardware_control.modules.magdeck import (
OFFSET_TO_LABWARE_BOTTOM as MAGNETIC_MODULE_OFFSET_TO_LABWARE_BOTTOM,
Expand All @@ -91,9 +95,6 @@
from opentrons.protocol_engine.state.module_substates.absorbance_reader_substate import (
AbsorbanceReaderMeasureMode,
)
from opentrons.protocol_engine.state.module_substates.vacuum_module_substate import (
VacuumModuleSubState,
)
from opentrons.types import DeckSlotName, MountType, Point, StagingSlotName

ModuleSubStateT = TypeVar("ModuleSubStateT", bound=ModuleSubStateType)
Expand Down Expand Up @@ -309,6 +310,54 @@ def _handle_state_update(self, state_update: update_types.StateUpdate) -> None:
if state_update.flex_stacker_state_update != update_types.NO_CHANGE:
self._handle_flex_stacker_commands(state_update.flex_stacker_state_update)

def _module_model_map(
self, module_id: str, actual_model: Any, module_live_data: Any
) -> Any:
live_data = module_live_data["data"] if module_live_data else None
return {
ModuleModel.is_magnetic_module_model: MagneticModuleSubState(
module_id=MagneticModuleId(module_id),
model=actual_model,
),
ModuleModel.is_heater_shaker_module_model: HeaterShakerModuleSubState.from_live_data(
module_id=HeaterShakerModuleId(module_id),
data=live_data,
),
ModuleModel.is_temperature_module_model: TemperatureModuleSubState.from_live_data(
module_id=TemperatureModuleId(module_id),
data=live_data,
),
ModuleModel.is_thermocycler_module_model: ThermocyclerModuleSubState.from_live_data(
module_id=ThermocyclerModuleId(module_id), data=live_data
),
ModuleModel.is_magnetic_block: MagneticBlockSubState(
module_id=MagneticBlockId(module_id)
),
ModuleModel.is_absorbance_reader: AbsorbanceReaderSubState(
module_id=AbsorbanceReaderId(module_id),
configured=False,
measured=False,
is_lid_on=True,
data=None,
measure_mode=None,
configured_wavelengths=None,
reference_wavelength=None,
),
ModuleModel.is_flex_stacker: FlexStackerSubState(
module_id=FlexStackerId(module_id),
pool_primary_definition=None,
pool_adapter_definition=None,
pool_lid_definition=None,
contained_labware_bottom_first=[],
max_pool_count=0,
pool_overlap=0,
pool_height=0,
),
ModuleModel.is_vacuum_module: VacuumModuleSubState(
module_id=VacuumModuleId(module_id), pump_engaged=False
),
}

def _add_module_substate(
self,
module_id: str,
Expand All @@ -328,68 +377,25 @@ def _add_module_substate(
load_location = slot_name

actual_model = definition.model
live_data = module_live_data["data"] if module_live_data else None
self._state.requested_model_by_id[module_id] = requested_model
self._state.load_location_by_module_id[module_id] = load_location
self._state.hardware_by_module_id[module_id] = HardwareModule(
serial_number=serial_number,
definition=definition,
)

if ModuleModel.is_magnetic_module_model(actual_model):
self._state.substate_by_module_id[module_id] = MagneticModuleSubState(
module_id=MagneticModuleId(module_id),
model=actual_model,
)
elif ModuleModel.is_heater_shaker_module_model(actual_model):
self._state.substate_by_module_id[module_id] = (
HeaterShakerModuleSubState.from_live_data(
module_id=HeaterShakerModuleId(module_id),
data=live_data,
)
)
elif ModuleModel.is_temperature_module_model(actual_model):
self._state.substate_by_module_id[module_id] = (
TemperatureModuleSubState.from_live_data(
module_id=TemperatureModuleId(module_id),
data=live_data,
)
)
elif ModuleModel.is_thermocycler_module_model(actual_model):
self._state.substate_by_module_id[module_id] = (
ThermocyclerModuleSubState.from_live_data(
module_id=ThermocyclerModuleId(module_id), data=live_data
)
)
module_model_map = self._module_model_map(
module_id=module_id,
actual_model=actual_model,
module_live_data=module_live_data,
)
for is_module_type in module_model_map:
if is_module_type(actual_model):
substate = module_model_map[is_module_type]
self._state.substate_by_module_id[module_id] = substate
if ModuleModel.is_thermocycler_module_model(actual_model):
self._update_additional_slots_occupied_by_thermocycler(
module_id=module_id, slot_name=slot_name
)
elif ModuleModel.is_magnetic_block(actual_model):
self._state.substate_by_module_id[module_id] = MagneticBlockSubState(
module_id=MagneticBlockId(module_id)
)
elif ModuleModel.is_absorbance_reader(actual_model):
self._state.substate_by_module_id[module_id] = AbsorbanceReaderSubState(
module_id=AbsorbanceReaderId(module_id),
configured=False,
measured=False,
is_lid_on=True,
data=None,
measure_mode=None,
configured_wavelengths=None,
reference_wavelength=None,
)
elif ModuleModel.is_flex_stacker(actual_model):
self._state.substate_by_module_id[module_id] = FlexStackerSubState(
module_id=FlexStackerId(module_id),
pool_primary_definition=None,
pool_adapter_definition=None,
pool_lid_definition=None,
contained_labware_bottom_first=[],
max_pool_count=0,
pool_overlap=0,
pool_height=0,
)

def _update_additional_slots_occupied_by_thermocycler(
self,
Expand Down Expand Up @@ -659,6 +665,20 @@ def _handle_flex_stacker_commands(
prev_substate.new_from_state_change(state_update)
)

def _handle_vacuum_module_commands(
self, state_update: VacuumModuleStateUpdate
) -> None:
"""Handle Vacuum Module state updates."""
module_id = state_update.module_id
prev_substate = self._state.substate_by_module_id[module_id]
assert isinstance(prev_substate, VacuumModuleSubState), (
f"{module_id} is not a Vacuum Module."
)

self._state.substate_by_module_id[module_id] = (
prev_substate.new_from_state_change(state_update)
)


class ModuleView:
"""Read-only view of computed module state."""
Expand Down
Loading
Loading