diff --git a/unilabos/devices/liquid_handling/prcxi/prcxi.py b/unilabos/devices/liquid_handling/prcxi/prcxi.py index bcce1b26..9fbf213f 100644 --- a/unilabos/devices/liquid_handling/prcxi/prcxi.py +++ b/unilabos/devices/liquid_handling/prcxi/prcxi.py @@ -783,12 +783,16 @@ class PRCXI9300Handler(LiquidHandlerAbstract): y_increase = -0.003636, x_offset = -1.8, y_offset = -37.48, - deck_z = 309.5, + deck_z = 235.5, deck_y = 400, rail_width=27.5, xy_coupling = -0.0045, + calibration_points: Optional[Dict[str, List[List[float]]]] = None, + calibration_labware_type: Optional[str] = "PRCXI_300ul_Tips", ): + self._rail_width = rail_width + self._rail_interval = rail_interval self.deck_x = (start_rail + rail_nums*5 + (rail_nums-1)*rail_interval) * rail_width self.deck_y = deck_y self.deck_z = deck_z @@ -797,8 +801,14 @@ class PRCXI9300Handler(LiquidHandlerAbstract): self.x_offset = x_offset self.y_offset = y_offset self.xy_coupling = xy_coupling - self.left_2_claw = Coordinate(-130.2, 34, -74) - self.right_2_left = Coordinate(22,-1, 11) + self._slot_prcxi_positions: Dict[int, Tuple[float, float]] = {} + self.calibration_labware_type = calibration_labware_type + + if calibration_points is not None: + self.calibrate_from_points(calibration_points, labware_type=self.calibration_labware_type) + + self.left_2_claw = Coordinate(130.2, -34, 74) + self.right_2_left = Coordinate(22,-1, 12) self.tip_height = 0 tablets_info = [] @@ -876,9 +886,9 @@ class PRCXI9300Handler(LiquidHandlerAbstract): # 根据 resource 类型推断 materialEnum # MaterialEnum: Other=0, Tips=1, DeepWellPlate=2, PCRPlate=3, ELISAPlate=4, Reservoir=5, WasteBox=6 expected_enum = None - if isinstance(resource, PRCXI9300TipRack) or isinstance(resource, TipRack): + if isinstance(resource, TipRack): expected_enum = 1 # Tips - elif isinstance(resource, PRCXI9300Trash) or isinstance(resource, Trash): + elif isinstance(resource, Trash): expected_enum = 6 # WasteBox elif isinstance(resource, (PRCXI9300Plate, Plate)): expected_enum = None # Plate 可能是 DeepWellPlate/PCRPlate/ELISAPlate,不限定 @@ -940,21 +950,27 @@ class PRCXI9300Handler(LiquidHandlerAbstract): # 重新计算所有槽位的位置(初始化时 deck 可能为空,此时才有资源) pipetting_positions = [] - plate_positions = [] + claw_positions = [] for child in self.deck.children: number = self._get_slot_number(child) if number is None: continue - pos = self.plr_pos_to_prcxi(child) - plate_positions.append({"Number": number, "XPos": pos.x, "YPos": pos.y, "ZPos": pos.z}) + pos = self.plr_pos_to_prcxi(child, self.left_2_claw) + slot_pos = self._slot_prcxi_positions[number] + pos.x = slot_pos[0] - child.get_size_x() / 2 + self.left_2_claw.x + pos.y = slot_pos[1] - child.get_size_y() / 2 + self.left_2_claw.y + claw_positions.append({"Number": number, "XPos": pos.x, "YPos": pos.y, "ZPos": pos.z}) if child.children: - pip_pos = self.plr_pos_to_prcxi(child.children[0], self.left_2_claw) + pip_pos = self.plr_pos_to_prcxi(child.children[0]) else: - pip_pos = self.plr_pos_to_prcxi(child, Coordinate(-100, self.left_2_claw.y, self.left_2_claw.z)) - half_x = child.get_size_x() / 2 * abs(1 + self.x_increase) + pip_pos = self.plr_pos_to_prcxi(child) + pip_pos.x = slot_pos[0] - 40 + pip_pos.y = slot_pos[1] - child.get_size_y() / 2 + pip_pos.z = pip_pos.z - 40 + half_x = child.get_size_x() / 2 z_wall = child.get_size_z() pipetting_positions.append({ @@ -975,18 +991,69 @@ class PRCXI9300Handler(LiquidHandlerAbstract): if pipetting_positions: api.update_pipetting_position(matrix_id, pipetting_positions) - # 更新 backend 中的 plate_positions - backend.plate_positions = plate_positions + # 更新 backend 中的 claw_positions + backend.claw_positions = claw_positions - if plate_positions: - api.update_clamp_jaw_position(matrix_id, plate_positions) + if claw_positions: + api.update_clamp_jaw_position(matrix_id, claw_positions) print(f"Auto-matched materials and created matrix: {matrix_id}") else: raise PRCXIError(f"Failed to create auto-matched matrix: {res.get('Message', 'Unknown error')}") - def plr_pos_to_prcxi(self, resource: Resource, offset: Coordinate = Coordinate(0, 0, 0)): + def calibrate_from_points( + self, + calibration_points: Dict[str, List[List[float]]], + labware_type: Optional[str] = "PRCXI_300ul_Tips", + ): + """从实测 PRCXI 机器坐标直接计算每个 slot 的 PRCXI 原点坐标。 + + 校准点是将参考物料放在各 slot 后,机器移至其 A1 位置所读取的 + PRCXI 坐标。通过 ``labware_type`` 创建临时实例,取 ``children[0]`` + (即 A1)的 location 作为偏移量,逆运算得 slot 原点。 + line_1~line_N 依次对应 T1~T4, T5~T8, ... + + Args: + calibration_points: ``{"line_1": [[px, py], ...], ...}``。 + ``[0, 0]`` 表示该点无效,不计入。 + labware_type: prcxi_labware 中的工厂函数名(如 ``"PRCXI_300ul_Tips"``)。 + 为 ``None`` 时 dx=dy=0,即校准点直接作为 slot 原点。 + """ + dx, dy = 0.0, 0.0 + if labware_type is not None: + from . import prcxi_labware + factory = getattr(prcxi_labware, labware_type) + temp = factory("_calibration_ref") + a1 = temp.children[0] + dx, dy = a1.location.x + a1.get_size_x() / 2, a1.location.y + a1.get_size_y() / 2 + + + sorted_keys = sorted( + calibration_points.keys(), + key=lambda k: int("".join(c for c in k if c.isdigit()) or "0"), + ) + + slot_number = 0 + for key in sorted_keys: + for pt in calibration_points[key]: + slot_number += 1 + if isinstance(pt, (list, tuple)) and len(pt) >= 2 and not (pt[0] == 0 and pt[1] == 0): + self._slot_prcxi_positions[slot_number] = ( + float(pt[0]) + dx, + float(pt[1]) + dy, + ) + + def _find_slot_for_resource(self, resource: Resource) -> Optional[int]: + """沿 parent 链向上找到 Deck 的直接子节点,返回其槽位号。""" + current = resource + while current is not None: + if isinstance(current.parent, (PRCXI9300Deck, LiquidHandlerAbstract)): + return self._get_slot_number(current) + current = getattr(current, "parent", None) + return self._get_slot_number(resource) + + def plr_pos_to_prcxi(self, resource: Resource, resource_offset: Coordinate = Coordinate(0, 0, 0), offset: Coordinate = Coordinate(0, 0, 0)): z_pos = 'c' tip_height = self.tip_height if isinstance(resource, TipSpot): @@ -996,7 +1063,6 @@ class PRCXI9300Handler(LiquidHandlerAbstract): x = resource_pos.x y = resource_pos.y z = resource_pos.z + tip_height - # 如果z等于0,则递归resource.parent的高度并向z加,使用get_size_z方法 parent = resource.parent res_z = resource.location.z @@ -1004,14 +1070,21 @@ class PRCXI9300Handler(LiquidHandlerAbstract): z += parent.get_size_z() res_z = parent.location.z parent = getattr(parent, "parent", None) - - prcxi_x = (self.deck_x - x)*(1+self.x_increase) + self.x_offset + self.xy_coupling * (self.deck_y - y) - prcxi_y = (self.deck_y - y)*(1+self.y_increase) + self.y_offset + + slot_number = self._find_slot_for_resource(resource) if self._slot_prcxi_positions else None + if slot_number is not None and slot_number in self._slot_prcxi_positions and self.calibration_labware_type is not None: + slot_prcxi_x, slot_prcxi_y = self._slot_prcxi_positions[slot_number] + prcxi_x = slot_prcxi_x - resource.location.x - resource.get_size_x() / 2 + prcxi_y = slot_prcxi_y - resource.location.y - resource.get_size_y() / 2 + else: + prcxi_x = (self.deck_x - x)*(1+self.x_increase) + self.x_offset + self.xy_coupling * (self.deck_y - y) + prcxi_y = (self.deck_y - y)*(1+self.y_increase) + self.y_offset + prcxi_z = self.deck_z - z - prcxi_x = min(max(0, prcxi_x+offset.x),self.deck_x) - prcxi_y = min(max(0, prcxi_y+offset.y),self.deck_y) - prcxi_z = min(max(0, prcxi_z+offset.z),self.deck_z) + prcxi_x = min(max(0, prcxi_x+resource_offset.x),self.deck_x) + prcxi_y = min(max(0, prcxi_y+resource_offset.y),self.deck_y) + prcxi_z = min(max(0, prcxi_z+resource_offset.z),self.deck_z) return Coordinate(prcxi_x, prcxi_y, prcxi_z) @@ -1158,7 +1231,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract): _dis_list = dis_vols if isinstance(dis_vols, list) else [dis_vols] if all(v <= 10.0 for v in _asp_list) and all(v <= 10.0 for v in _dis_list): use_channels = [1] - mix_vol = max(min(mix_vol,10),0) + mix_vol = max(min(mix_vol,10),0) if mix_vol is not None else None sources = await self._resolve_to_plr_resources(sources) targets = await self._resolve_to_plr_resources(targets) tip_racks = list(await self._resolve_to_plr_resources(tip_racks)) @@ -1179,7 +1252,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract): number = self._get_slot_number(slot) - pip_pos = self.plr_pos_to_prcxi(slot.children[0], self.left_2_claw) + pip_pos = self.plr_pos_to_prcxi(slot.children[0]) half_x = slot.children[0].get_size_x() / 2 * abs(1 + self.x_increase) z_wall = slot.children[0].get_size_z() @@ -1215,7 +1288,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract): touch_tip=touch_tip, liquid_height=liquid_height, blow_out_air_volume=blow_out_air_volume, - blow_out_air_volume_before=blow_out_air_volume_before, + blow_out_air_volume_before=None, spread=spread, is_96_well=is_96_well, mix_stage=mix_stage, @@ -1446,8 +1519,9 @@ class PRCXI9300Backend(LiquidHandlerBackend): offset = pickup.offset pickup_distance_from_top = pickup.pickup_distance_from_top direction = pickup.direction - - plate_number = int(resource.parent.name.replace("T", "")) + plate = resource.parent + deck = plate.parent + plate_number = self._deck_plate_slot_no(plate, deck) is_whole_plate = True balance_height = 0 step = self.api_client.clamp_jaw_pick_up(plate_number, is_whole_plate, balance_height) @@ -1460,7 +1534,9 @@ class PRCXI9300Backend(LiquidHandlerBackend): plate_number = None target_plate_number = backend_kwargs.get("target_plate_number", None) if target_plate_number is not None: - plate_number = int(target_plate_number.name.replace("T", "")) + plate = target_plate_number + deck = plate.parent + plate_number = self._deck_plate_slot_no(plate, deck) is_whole_plate = True balance_height = 0 @@ -1558,7 +1634,7 @@ class PRCXI9300Backend(LiquidHandlerBackend): await asyncio.sleep(1) print("PRCXI9300 reset successfully.") - # self.api_client.update_clamp_jaw_position(self.matrix_id, self.plate_positions) + # self.api_client.update_clamp_jaw_position(self.matrix_id, self.claw_positions) except ConnectionRefusedError as e: raise RuntimeError( @@ -1767,8 +1843,6 @@ class PRCXI9300Backend(LiquidHandlerBackend): async def aspirate(self, ops: List[SingleChannelAspiration], use_channels: List[int] = None): """Aspirate liquid from the specified resources.""" - if ops[0].blow_out_air_volume and ops[0].volume == 0: - return if hasattr(use_channels, "tolist"): _use_channels = use_channels.tolist() else: @@ -1810,8 +1884,11 @@ class PRCXI9300Backend(LiquidHandlerBackend): PlateNo = plate_slots[0] hole_col = tip_columns[0] + 1 hole_row = 1 + assist_fun1 = "" if self.num_channels != 8: hole_row = tipspot_index % ny + 1 + if ops[0].blow_out_air_volume is not None: + assist_fun1 = f"反向吸液({float(min(max(ops[0].blow_out_air_volume,0),10))}ul)" step = self.api_client.Imbibing( axis=axis, @@ -1821,9 +1898,10 @@ class PRCXI9300Backend(LiquidHandlerBackend): hole_row=hole_row, hole_col=hole_col, blending_times=0, - balance_height=0, + balance_height=int(min(max(ops[0].liquid_height,0),10)), plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}", hole_numbers="1,2,3,4,5,6,7,8", + assist_fun1=assist_fun1, ) self.steps_todo_list.append(step) @@ -1874,6 +1952,10 @@ class PRCXI9300Backend(LiquidHandlerBackend): if self.num_channels != 8: hole_row = tipspot_index % ny + 1 + assist_fun1 = "" + if ops[0].blow_out_air_volume is not None: + assist_fun1 = f"吹样({float(min(max(ops[0].blow_out_air_volume,0),10))}ul)" + step = self.api_client.Tapping( axis=axis, dosage=int(volumes[0]), @@ -1882,9 +1964,10 @@ class PRCXI9300Backend(LiquidHandlerBackend): hole_row=hole_row, hole_col=hole_col, blending_times=0, - balance_height=0, + balance_height=int(min(max(ops[0].liquid_height,0),10)), plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}", hole_numbers="1,2,3,4,5,6,7,8", + assist_fun1=assist_fun1, ) self.steps_todo_list.append(step) @@ -2079,10 +2162,10 @@ class PRCXI9300Api: """GetWorkTabletMatrixById""" return self.call("IMatrix", "GetWorkTabletMatrixById", [matrix_id]) - def update_clamp_jaw_position(self, target_matrix_id: str, plate_positions: List[Dict[str, Any]]): + def update_clamp_jaw_position(self, target_matrix_id: str, claw_positions: List[Dict[str, Any]]): position_params = { "MatrixId": target_matrix_id, - "WorkTablets": plate_positions + "WorkTablets": claw_positions } return self.call("IMatrix", "UpdateClampJawPosition", [position_params]) diff --git a/unilabos/test/experiments/prcxi_9320_slim.json b/unilabos/test/experiments/prcxi_9320_slim.json index 54aa9f4e..d04df6c7 100644 --- a/unilabos/test/experiments/prcxi_9320_slim.json +++ b/unilabos/test/experiments/prcxi_9320_slim.json @@ -28,7 +28,13 @@ "matrix_id": "", "simulator": false, "channel_num": 2, - "step_mode": true + "step_mode": true, + "calibration_points": { + "line_1": [[452.07,21.19], [313.88,21.19], [176.87,21.19], [39.08,21.19]], + "line_2": [[415.67,116.68], [313.28,116.88], [176.58,116.69], [38.58,117.18]], + "line_3": [[450.87,212.18], [312.98,212.38], [176.08,212.68], [38.08,213.18]], + "line_4": [[450.08,307.68], [312.18,307.89], [175.18,308.18], [37.58,309.18]] + } }, "data": { "reset_ok": true diff --git a/unilabos/workflow/common.py b/unilabos/workflow/common.py index 33d38e4b..421613c8 100644 --- a/unilabos/workflow/common.py +++ b/unilabos/workflow/common.py @@ -217,6 +217,68 @@ def _tip_volume_hint(item: Dict[str, Any], labware_id: str) -> Optional[float]: return None +def _flatten_transfer_vols(value: Any) -> List[float]: + """将 asp_vols/dis_vols 标量或列表展平为 float 列表,无法转换的项跳过。""" + if value is None: + return [] + if isinstance(value, (list, tuple)): + out: List[float] = [] + for v in value: + try: + out.append(float(v)) + except (TypeError, ValueError): + continue + return out + try: + return [float(value)] + except (TypeError, ValueError): + return [] + + +def _tip_prcxi_class_for_max_ul(max_ul: float) -> str: + """按移液最大体积分档推介 PRCXI tip 类名:≤10 µL → 10µL;<300 → 300µL;否则 1000µL。""" + if max_ul <= 10: + return "PRCXI_10uL_Tips" + if max_ul < 300: + return "PRCXI_300ul_Tips" + return "PRCXI_1000uL_Tips" + + +def _apply_tip_rack_class_from_transfer_volumes( + labware_info: Dict[str, Dict[str, Any]], + protocol_steps_refactored: List[Dict[str, Any]], +) -> None: + """根据各 ``transfer_liquid`` 的 asp_vols/dis_vols 为对应 ``tip_racks`` 写入 ``prcxi_class_name``。""" + tip_to_max_ul: Dict[str, float] = {} + + for step in protocol_steps_refactored: + if step.get("template_name") != "transfer_liquid": + continue + p = step.get("param") or {} + tip_key_raw = p.get("tip_racks") + if tip_key_raw is None or str(tip_key_raw).strip() == "": + continue + tip_key = str(tip_key_raw).strip() + if tip_key not in labware_info: + continue + + nums = _flatten_transfer_vols(p.get("asp_vols", p.get("asp_vol"))) + _flatten_transfer_vols( + p.get("dis_vols", p.get("dis_vol")) + ) + if not nums: + continue + step_max = max(nums) + tip_to_max_ul[tip_key] = max(tip_to_max_ul.get(tip_key, 0.0), step_max) + + for tip_key, max_ul in tip_to_max_ul.items(): + item = labware_info.get(tip_key) + if item is None: + continue + if _infer_reagent_kind(tip_key, item) != "tip_rack": + continue + item["prcxi_class_name"] = _tip_prcxi_class_for_max_ul(max_ul) + + def _volume_template_covers_requirement(template: Dict[str, Any], req: Optional[float], kind: str) -> bool: """有明确需求体积时,模板标称 Volume 必须 >= 需求;无 Volume 的模板不参与(trash 除外)。""" if kind == "trash": @@ -636,11 +698,18 @@ def build_protocol_graph( labware_defs: 可选,``[{"id": "...", "num_wells": 96, "max_volume": 2200}, ...]`` 等,辅助 PRCXI 模板匹配 preserve_tip_rack_incoming_class: 默认 True 时**仅 tip_rack** 不跑模板匹配(类名由传入的 class/labware 决定); **其它载体**仍按 PRCXI 模板匹配。False 时 **全部**(含 tip_rack)都走模板匹配。 + + 会先 ``refactor_data`` 规范化步骤,再根据 ``transfer_liquid`` 的 ``asp_vols``/``dis_vols`` 为对应 + ``tip_racks`` 写入 ``prcxi_class_name``(最大体积 ``≤10`` → ``PRCXI_10uL_Tips``,``<300`` → ``PRCXI_300ul_Tips``, + 否则 ``PRCXI_1000uL_Tips``);无有效体积的步骤不覆盖。 """ G = WorkflowGraph() resource_last_writer = {} # reagent_name -> "node_id:port" slot_to_create_resource = {} # slot -> create_resource node_id + protocol_steps = refactor_data(protocol_steps, action_resource_mapping) + _apply_tip_rack_class_from_transfer_volumes(labware_info, protocol_steps) + _apply_prcxi_labware_auto_match( labware_info, labware_defs, @@ -651,8 +720,6 @@ def build_protocol_graph( preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class, ) - protocol_steps = refactor_data(protocol_steps, action_resource_mapping) - # ==================== 第一步:按 slot 去重创建 create_resource 节点 ==================== # 按槽聚合:同一 slot 多条 reagent 时不能只取遍历顺序第一条,否则 tip 的 prcxi_class_name / object 会被其它条目盖住 by_slot: Dict[str, List[Tuple[str, Dict[str, Any]]]] = {}