使用16个孔与固定tip头类型,来定位slot位置

This commit is contained in:
q434343
2026-04-08 02:51:28 +08:00
parent 95f3e0b291
commit 56d25b88bd
3 changed files with 195 additions and 39 deletions

View File

@@ -783,12 +783,16 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
y_increase = -0.003636,
x_offset = -1.8,
y_offset = -37.48,
deck_z = 309.5,
deck_z = 235.5,
deck_y = 400,
rail_width=27.5,
xy_coupling = -0.0045,
calibration_points: Optional[Dict[str, List[List[float]]]] = None,
calibration_labware_type: Optional[str] = "PRCXI_300ul_Tips",
):
self._rail_width = rail_width
self._rail_interval = rail_interval
self.deck_x = (start_rail + rail_nums*5 + (rail_nums-1)*rail_interval) * rail_width
self.deck_y = deck_y
self.deck_z = deck_z
@@ -797,8 +801,14 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
self.x_offset = x_offset
self.y_offset = y_offset
self.xy_coupling = xy_coupling
self.left_2_claw = Coordinate(-130.2, 34, -74)
self.right_2_left = Coordinate(22,-1, 11)
self._slot_prcxi_positions: Dict[int, Tuple[float, float]] = {}
self.calibration_labware_type = calibration_labware_type
if calibration_points is not None:
self.calibrate_from_points(calibration_points, labware_type=self.calibration_labware_type)
self.left_2_claw = Coordinate(130.2, -34, 74)
self.right_2_left = Coordinate(22,-1, 12)
self.tip_height = 0
tablets_info = []
@@ -876,9 +886,9 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
# 根据 resource 类型推断 materialEnum
# MaterialEnum: Other=0, Tips=1, DeepWellPlate=2, PCRPlate=3, ELISAPlate=4, Reservoir=5, WasteBox=6
expected_enum = None
if isinstance(resource, PRCXI9300TipRack) or isinstance(resource, TipRack):
if isinstance(resource, TipRack):
expected_enum = 1 # Tips
elif isinstance(resource, PRCXI9300Trash) or isinstance(resource, Trash):
elif isinstance(resource, Trash):
expected_enum = 6 # WasteBox
elif isinstance(resource, (PRCXI9300Plate, Plate)):
expected_enum = None # Plate 可能是 DeepWellPlate/PCRPlate/ELISAPlate不限定
@@ -940,21 +950,27 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
# 重新计算所有槽位的位置(初始化时 deck 可能为空,此时才有资源)
pipetting_positions = []
plate_positions = []
claw_positions = []
for child in self.deck.children:
number = self._get_slot_number(child)
if number is None:
continue
pos = self.plr_pos_to_prcxi(child)
plate_positions.append({"Number": number, "XPos": pos.x, "YPos": pos.y, "ZPos": pos.z})
pos = self.plr_pos_to_prcxi(child, self.left_2_claw)
slot_pos = self._slot_prcxi_positions[number]
pos.x = slot_pos[0] - child.get_size_x() / 2 + self.left_2_claw.x
pos.y = slot_pos[1] - child.get_size_y() / 2 + self.left_2_claw.y
claw_positions.append({"Number": number, "XPos": pos.x, "YPos": pos.y, "ZPos": pos.z})
if child.children:
pip_pos = self.plr_pos_to_prcxi(child.children[0], self.left_2_claw)
pip_pos = self.plr_pos_to_prcxi(child.children[0])
else:
pip_pos = self.plr_pos_to_prcxi(child, Coordinate(-100, self.left_2_claw.y, self.left_2_claw.z))
half_x = child.get_size_x() / 2 * abs(1 + self.x_increase)
pip_pos = self.plr_pos_to_prcxi(child)
pip_pos.x = slot_pos[0] - 40
pip_pos.y = slot_pos[1] - child.get_size_y() / 2
pip_pos.z = pip_pos.z - 40
half_x = child.get_size_x() / 2
z_wall = child.get_size_z()
pipetting_positions.append({
@@ -975,18 +991,69 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
if pipetting_positions:
api.update_pipetting_position(matrix_id, pipetting_positions)
# 更新 backend 中的 plate_positions
backend.plate_positions = plate_positions
# 更新 backend 中的 claw_positions
backend.claw_positions = claw_positions
if plate_positions:
api.update_clamp_jaw_position(matrix_id, plate_positions)
if claw_positions:
api.update_clamp_jaw_position(matrix_id, claw_positions)
print(f"Auto-matched materials and created matrix: {matrix_id}")
else:
raise PRCXIError(f"Failed to create auto-matched matrix: {res.get('Message', 'Unknown error')}")
def plr_pos_to_prcxi(self, resource: Resource, offset: Coordinate = Coordinate(0, 0, 0)):
def calibrate_from_points(
self,
calibration_points: Dict[str, List[List[float]]],
labware_type: Optional[str] = "PRCXI_300ul_Tips",
):
"""从实测 PRCXI 机器坐标直接计算每个 slot 的 PRCXI 原点坐标。
校准点是将参考物料放在各 slot 后,机器移至其 A1 位置所读取的
PRCXI 坐标。通过 ``labware_type`` 创建临时实例,取 ``children[0]``
(即 A1的 location 作为偏移量,逆运算得 slot 原点。
line_1~line_N 依次对应 T1~T4, T5~T8, ...
Args:
calibration_points: ``{"line_1": [[px, py], ...], ...}``。
``[0, 0]`` 表示该点无效,不计入。
labware_type: prcxi_labware 中的工厂函数名(如 ``"PRCXI_300ul_Tips"``)。
为 ``None`` 时 dx=dy=0即校准点直接作为 slot 原点。
"""
dx, dy = 0.0, 0.0
if labware_type is not None:
from . import prcxi_labware
factory = getattr(prcxi_labware, labware_type)
temp = factory("_calibration_ref")
a1 = temp.children[0]
dx, dy = a1.location.x + a1.get_size_x() / 2, a1.location.y + a1.get_size_y() / 2
sorted_keys = sorted(
calibration_points.keys(),
key=lambda k: int("".join(c for c in k if c.isdigit()) or "0"),
)
slot_number = 0
for key in sorted_keys:
for pt in calibration_points[key]:
slot_number += 1
if isinstance(pt, (list, tuple)) and len(pt) >= 2 and not (pt[0] == 0 and pt[1] == 0):
self._slot_prcxi_positions[slot_number] = (
float(pt[0]) + dx,
float(pt[1]) + dy,
)
def _find_slot_for_resource(self, resource: Resource) -> Optional[int]:
"""沿 parent 链向上找到 Deck 的直接子节点,返回其槽位号。"""
current = resource
while current is not None:
if isinstance(current.parent, (PRCXI9300Deck, LiquidHandlerAbstract)):
return self._get_slot_number(current)
current = getattr(current, "parent", None)
return self._get_slot_number(resource)
def plr_pos_to_prcxi(self, resource: Resource, resource_offset: Coordinate = Coordinate(0, 0, 0), offset: Coordinate = Coordinate(0, 0, 0)):
z_pos = 'c'
tip_height = self.tip_height
if isinstance(resource, TipSpot):
@@ -996,7 +1063,6 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
x = resource_pos.x
y = resource_pos.y
z = resource_pos.z + tip_height
# 如果z等于0则递归resource.parent的高度并向z加使用get_size_z方法
parent = resource.parent
res_z = resource.location.z
@@ -1004,14 +1070,21 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
z += parent.get_size_z()
res_z = parent.location.z
parent = getattr(parent, "parent", None)
prcxi_x = (self.deck_x - x)*(1+self.x_increase) + self.x_offset + self.xy_coupling * (self.deck_y - y)
prcxi_y = (self.deck_y - y)*(1+self.y_increase) + self.y_offset
slot_number = self._find_slot_for_resource(resource) if self._slot_prcxi_positions else None
if slot_number is not None and slot_number in self._slot_prcxi_positions and self.calibration_labware_type is not None:
slot_prcxi_x, slot_prcxi_y = self._slot_prcxi_positions[slot_number]
prcxi_x = slot_prcxi_x - resource.location.x - resource.get_size_x() / 2
prcxi_y = slot_prcxi_y - resource.location.y - resource.get_size_y() / 2
else:
prcxi_x = (self.deck_x - x)*(1+self.x_increase) + self.x_offset + self.xy_coupling * (self.deck_y - y)
prcxi_y = (self.deck_y - y)*(1+self.y_increase) + self.y_offset
prcxi_z = self.deck_z - z
prcxi_x = min(max(0, prcxi_x+offset.x),self.deck_x)
prcxi_y = min(max(0, prcxi_y+offset.y),self.deck_y)
prcxi_z = min(max(0, prcxi_z+offset.z),self.deck_z)
prcxi_x = min(max(0, prcxi_x+resource_offset.x),self.deck_x)
prcxi_y = min(max(0, prcxi_y+resource_offset.y),self.deck_y)
prcxi_z = min(max(0, prcxi_z+resource_offset.z),self.deck_z)
return Coordinate(prcxi_x, prcxi_y, prcxi_z)
@@ -1158,7 +1231,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
_dis_list = dis_vols if isinstance(dis_vols, list) else [dis_vols]
if all(v <= 10.0 for v in _asp_list) and all(v <= 10.0 for v in _dis_list):
use_channels = [1]
mix_vol = max(min(mix_vol,10),0)
mix_vol = max(min(mix_vol,10),0) if mix_vol is not None else None
sources = await self._resolve_to_plr_resources(sources)
targets = await self._resolve_to_plr_resources(targets)
tip_racks = list(await self._resolve_to_plr_resources(tip_racks))
@@ -1179,7 +1252,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
number = self._get_slot_number(slot)
pip_pos = self.plr_pos_to_prcxi(slot.children[0], self.left_2_claw)
pip_pos = self.plr_pos_to_prcxi(slot.children[0])
half_x = slot.children[0].get_size_x() / 2 * abs(1 + self.x_increase)
z_wall = slot.children[0].get_size_z()
@@ -1215,7 +1288,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
touch_tip=touch_tip,
liquid_height=liquid_height,
blow_out_air_volume=blow_out_air_volume,
blow_out_air_volume_before=blow_out_air_volume_before,
blow_out_air_volume_before=None,
spread=spread,
is_96_well=is_96_well,
mix_stage=mix_stage,
@@ -1446,8 +1519,9 @@ class PRCXI9300Backend(LiquidHandlerBackend):
offset = pickup.offset
pickup_distance_from_top = pickup.pickup_distance_from_top
direction = pickup.direction
plate_number = int(resource.parent.name.replace("T", ""))
plate = resource.parent
deck = plate.parent
plate_number = self._deck_plate_slot_no(plate, deck)
is_whole_plate = True
balance_height = 0
step = self.api_client.clamp_jaw_pick_up(plate_number, is_whole_plate, balance_height)
@@ -1460,7 +1534,9 @@ class PRCXI9300Backend(LiquidHandlerBackend):
plate_number = None
target_plate_number = backend_kwargs.get("target_plate_number", None)
if target_plate_number is not None:
plate_number = int(target_plate_number.name.replace("T", ""))
plate = target_plate_number
deck = plate.parent
plate_number = self._deck_plate_slot_no(plate, deck)
is_whole_plate = True
balance_height = 0
@@ -1558,7 +1634,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
await asyncio.sleep(1)
print("PRCXI9300 reset successfully.")
# self.api_client.update_clamp_jaw_position(self.matrix_id, self.plate_positions)
# self.api_client.update_clamp_jaw_position(self.matrix_id, self.claw_positions)
except ConnectionRefusedError as e:
raise RuntimeError(
@@ -1767,8 +1843,6 @@ class PRCXI9300Backend(LiquidHandlerBackend):
async def aspirate(self, ops: List[SingleChannelAspiration], use_channels: List[int] = None):
"""Aspirate liquid from the specified resources."""
if ops[0].blow_out_air_volume and ops[0].volume == 0:
return
if hasattr(use_channels, "tolist"):
_use_channels = use_channels.tolist()
else:
@@ -1810,8 +1884,11 @@ class PRCXI9300Backend(LiquidHandlerBackend):
PlateNo = plate_slots[0]
hole_col = tip_columns[0] + 1
hole_row = 1
assist_fun1 = ""
if self.num_channels != 8:
hole_row = tipspot_index % ny + 1
if ops[0].blow_out_air_volume is not None:
assist_fun1 = f"反向吸液({float(min(max(ops[0].blow_out_air_volume,0),10))}ul)"
step = self.api_client.Imbibing(
axis=axis,
@@ -1821,9 +1898,10 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_row=hole_row,
hole_col=hole_col,
blending_times=0,
balance_height=0,
balance_height=int(min(max(ops[0].liquid_height,0),10)),
plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8",
assist_fun1=assist_fun1,
)
self.steps_todo_list.append(step)
@@ -1874,6 +1952,10 @@ class PRCXI9300Backend(LiquidHandlerBackend):
if self.num_channels != 8:
hole_row = tipspot_index % ny + 1
assist_fun1 = ""
if ops[0].blow_out_air_volume is not None:
assist_fun1 = f"吹样({float(min(max(ops[0].blow_out_air_volume,0),10))}ul)"
step = self.api_client.Tapping(
axis=axis,
dosage=int(volumes[0]),
@@ -1882,9 +1964,10 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_row=hole_row,
hole_col=hole_col,
blending_times=0,
balance_height=0,
balance_height=int(min(max(ops[0].liquid_height,0),10)),
plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8",
assist_fun1=assist_fun1,
)
self.steps_todo_list.append(step)
@@ -2079,10 +2162,10 @@ class PRCXI9300Api:
"""GetWorkTabletMatrixById"""
return self.call("IMatrix", "GetWorkTabletMatrixById", [matrix_id])
def update_clamp_jaw_position(self, target_matrix_id: str, plate_positions: List[Dict[str, Any]]):
def update_clamp_jaw_position(self, target_matrix_id: str, claw_positions: List[Dict[str, Any]]):
position_params = {
"MatrixId": target_matrix_id,
"WorkTablets": plate_positions
"WorkTablets": claw_positions
}
return self.call("IMatrix", "UpdateClampJawPosition", [position_params])