From 4d3a41ed0dec63f2be202b36fdaa85202ac836b4 Mon Sep 17 00:00:00 2001 From: q434343 <554662886@qq.com> Date: Thu, 9 Apr 2026 18:06:12 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=83=A8=E5=88=86=E7=A7=BB?= =?UTF-8?q?=E6=B6=B2=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../liquid_handling/test_transfer_liquid.py | 5 + tests/devices/liquid_handling/unit_test.py | 61 +++++++ .../liquid_handler_abstract.py | 158 +++++++++++++----- .../devices/liquid_handling/prcxi/prcxi.py | 1 + .../test/experiments/prcxi_9320_slim.json | 2 +- unilabos/workflow/common.py | 82 +++++++-- 6 files changed, 256 insertions(+), 53 deletions(-) diff --git a/tests/devices/liquid_handling/test_transfer_liquid.py b/tests/devices/liquid_handling/test_transfer_liquid.py index 9896aac5..a08acc01 100644 --- a/tests/devices/liquid_handling/test_transfer_liquid.py +++ b/tests/devices/liquid_handling/test_transfer_liquid.py @@ -39,6 +39,11 @@ class FakeLiquidHandler(LiquidHandlerAbstract): self.current_tip = iter(make_tip_iter()) self.calls: List[Tuple[str, Any]] = [] + def set_tiprack(self, tip_racks): + if not tip_racks: + return + super().set_tiprack(tip_racks) + async def pick_up_tips(self, tip_spots, use_channels=None, offsets=None, **backend_kwargs): self.calls.append(("pick_up_tips", {"tips": list(tip_spots), "use_channels": use_channels})) diff --git a/tests/devices/liquid_handling/unit_test.py b/tests/devices/liquid_handling/unit_test.py index b7a72cd6..defc93c6 100644 --- a/tests/devices/liquid_handling/unit_test.py +++ b/tests/devices/liquid_handling/unit_test.py @@ -39,6 +39,12 @@ class FakeLiquidHandler(LiquidHandlerAbstract): self.current_tip = iter(make_tip_iter()) self.calls: List[Tuple[str, Any]] = [] + def set_tiprack(self, tip_racks): + # transfer_liquid 总会调用 set_tiprack;测试用 Dummy 枪头时 tip_racks 为空,需保留自种子的 current_tip + if not tip_racks: + return + super().set_tiprack(tip_racks) + async def pick_up_tips(self, tip_spots, use_channels=None, offsets=None, **backend_kwargs): self.calls.append(("pick_up_tips", {"tips": list(tip_spots), "use_channels": use_channels})) @@ -545,3 +551,58 @@ def test_mix_multiple_targets_supports_per_target_offsets(): assert aspirates[1]["flow_rates"] == [rates[1]] +def test_set_tiprack_per_type_resumes_first_physical_rack(): + """同型号多次 set_tiprack 时接续第一盒剩余孔位,而非从新盒 A1 开始。""" + from pylabrobot.liquid_handling import LiquidHandlerChatterboxBackend + from pylabrobot.resources import Deck, Tip, TipRack, TipSpot, create_equally_spaced + + mk = lambda: Tip( + has_filter=False, total_tip_length=10.0, maximal_volume=300.0, fitting_depth=2.0 + ) + + class TipTypeAlpha(TipRack): + pass + + class TipTypeBeta(TipRack): + pass + + def make_rack(cls: type, name: str) -> TipRack: + items = create_equally_spaced( + TipSpot, + num_items_x=12, + num_items_y=2, + dx=0, + dy=0, + dz=0, + item_dx=9, + item_dy=9, + size_x=1, + size_y=1, + make_tip=mk, + ) + return cls(name, 120, 40, 10, items=items) + + rack1 = make_rack(TipTypeAlpha, "rack_phys_1") + rack2 = make_rack(TipTypeBeta, "rack_phys_2") + rack3 = make_rack(TipTypeAlpha, "rack_phys_3") + + lh = LiquidHandlerAbstract( + LiquidHandlerChatterboxBackend(1), Deck(), channel_num=1, simulator=False + ) + flat1 = lh._flatten_tips_from_one(rack1) + assert len(flat1) == 24 + + lh.set_tiprack([rack1]) + for i in range(12): + assert lh._get_next_tip() is flat1[i] + + lh.set_tiprack([rack2]) + spot_b = lh._get_next_tip() + assert "rack_phys_2" in spot_b.name + + lh.set_tiprack([rack3]) + spot_resume = lh._get_next_tip() + assert spot_resume is flat1[12], "第三次同型号应接续 rack1 第二行首孔,而非 rack3 首孔" + assert spot_resume is not lh._flatten_tips_from_one(rack3)[0] + + diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index fdb82806..fcbcd3c5 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -391,6 +391,7 @@ class LiquidHandlerMiddleware(LiquidHandler): except Exception: free_volume = None + if isinstance(free_volume, (int, float)): req = min(req, max(float(free_volume), 0.0)) safe.append(req) @@ -934,6 +935,10 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): if (isinstance(local_history, list) and len(local_history) == 0 and isinstance(plr_history, list) and len(plr_history) > 0): local_tracker.liquid_history = list(plr_history) + elif (isinstance(local_history, list) and len(local_history) > 0 + and isinstance(plr_history, list) and len(plr_history) == 0): + # 远端认为容器为空,重置本地 tracker 以保持同步 + local_tracker.liquid_history = [] resolved.append(local) if len(resolved) != len(uuids): raise ValueError( @@ -947,13 +952,17 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): if isinstance(orig_dict, dict) and hasattr(res, "tracker"): tracker = res.tracker local_history = getattr(tracker, "liquid_history", None) + data = orig_dict.get("data") or {} + dict_history = data.get("liquid_history") if isinstance(local_history, list) and len(local_history) == 0: - data = orig_dict.get("data") or {} - dict_history = data.get("liquid_history") if isinstance(dict_history, list) and len(dict_history) > 0: tracker.liquid_history = [ (name, float(vol)) for name, vol in dict_history ] + elif isinstance(local_history, list) and len(local_history) > 0: + if isinstance(dict_history, list) and len(dict_history) == 0: + # 调用方认为容器为空,重置本地 tracker + tracker.liquid_history = [] result[idx] = res return result @@ -1528,7 +1537,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): # raise ValueError(f"dis_vols length must be equal to sources or targets length, but got {len_dis_vols} and {num_sources} and {num_targets}") if len(use_channels) != 8: - max_len = max(num_sources, num_targets) + max_len = max(num_sources, num_targets, len_asp_vols, len_dis_vols) prev_dropped = True # 循环开始前通道上无 tip for i in range(max_len): @@ -1663,7 +1672,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: await self.mix( - targets=[targets[0]], + targets=[sources[0]], mix_time=mix_times, mix_vol=mix_vol, offsets=offsets if offsets else None, @@ -1852,76 +1861,141 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): def iter_tips(self, tip_racks: Sequence[TipRack]) -> Iterator[Resource]: """Yield tips from a list of TipRacks one-by-one until depleted.""" for rack in tip_racks: - if isinstance(rack, TipSpot): - yield rack - elif isinstance(rack, TipRack): - for item in rack: - if isinstance(item, list): - yield from item - else: - yield item + yield from self._iter_tips_single_rack_or_spot(rack) + + def _iter_tips_single_rack_or_spot(self, rack: Resource) -> Iterator[Resource]: + """单盒或单孔:与 ``iter_tips`` 中单项逻辑一致,供扁平池构建复用。""" + if isinstance(rack, TipSpot): + yield rack + elif isinstance(rack, TipRack): + for item in rack: + if isinstance(item, list): + yield from item + else: + yield item + + def _flatten_tips_from_one(self, rack: Resource) -> List[Resource]: + """将单个 TipRack/TipSpot 展开为孔位列表(顺序与 ``iter_tips`` 一致)。""" + return list(self._iter_tips_single_rack_or_spot(rack)) def _get_next_tip(self): - """从 current_tip 迭代器获取下一个 tip,耗尽时抛出明确错误而非 StopIteration""" + """从按型号分组的扁平枪头池取下一孔;耗尽时抛出明确错误而非 StopIteration。""" + key = getattr(self, "_active_tip_type_key", None) + flat_map = getattr(self, "_tip_flat_spots", None) + if key is not None and flat_map is not None: + flat = flat_map.get(key) + if flat is not None and len(flat) > 0: + idx = self._tip_next_index.get(key, 0) + if idx < len(flat): + self._tip_next_index[key] = idx + 1 + return flat[idx] + diag = ( + f"active_type_key={key}, next_index={idx}, pool_len={len(flat)}; " + f"_tip_racks_by_type[{key}] count={len(self._tip_racks_by_type.get(key, []))}" + ) + raise RuntimeError( + "Tip rack exhausted: no more tips available for this tip type. " + f"Diagnostics: {diag}" + ) + + if not hasattr(self, "current_tip"): + raise RuntimeError( + "No tip source: call set_tiprack with TipRack/TipSpot before picking tips." + ) try: return next(self.current_tip) except StopIteration as e: diag_parts = [] - tip_racks = getattr(self, 'tip_racks', None) + tip_racks = getattr(self, "tip_racks", None) if tip_racks is not None: for idx, rack in enumerate(tip_racks): - r_name = getattr(rack, 'name', '?') + r_name = getattr(rack, "name", "?") r_type = type(rack).__name__ is_tr = isinstance(rack, TipRack) is_ts = isinstance(rack, TipSpot) - n_children = len(getattr(rack, 'children', [])) + n_children = len(getattr(rack, "children", [])) diag_parts.append( f"rack[{idx}] name={r_name}, type={r_type}, " f"is_TipRack={is_tr}, is_TipSpot={is_ts}, children={n_children}" ) else: diag_parts.append("tip_racks=None") - by_type = getattr(self, '_tip_racks_by_type', {}) + by_type = getattr(self, "_tip_racks_by_type", {}) diag_parts.append(f"_tip_racks_by_type keys={list(by_type.keys())}") + active = getattr(self, "_active_tip_type_key", None) + diag_parts.append(f"_active_tip_type_key={active}") raise RuntimeError( - f"Tip rack exhausted: no more tips available for transfer. " + "Tip rack exhausted: no more tips available for transfer. " f"Diagnostics: {'; '.join(diag_parts)}" ) from e + @staticmethod + def _tip_type_key(rack: Resource) -> str: + """生成枪头盒的分组键:优先用 model(区分 10uL/300uL 等),否则回退到类名。""" + model = getattr(rack, "model", None) + if model and str(model).strip(): + return str(model).strip() + return type(rack).__name__ + + def _register_rack(self, rack: Resource) -> None: + """将单个 TipRack/TipSpot 注册到按型号分组的扁平池(去重、不重置已消耗下标)。""" + if not isinstance(rack, (TipRack, TipSpot)): + return + rack_name = rack.name if hasattr(rack, "name") else str(id(rack)) + if rack_name in self._seen_rack_names: + return + self._seen_rack_names.add(rack_name) + type_key = self._tip_type_key(rack) + self._tip_racks_by_type.setdefault(type_key, []).append(rack) + self._tip_flat_spots.setdefault(type_key, []).extend(self._flatten_tips_from_one(rack)) + self._tip_next_index.setdefault(type_key, 0) + + def _init_all_tip_pools(self) -> None: + """首次调用:从 deck 上一次性扫描所有 TipRack/TipSpot,构建完整的按型号扁平池。""" + self._tip_racks_by_type: Dict[str, List[TipRack]] = {} + self._seen_rack_names: Set[str] = set() + self._tip_flat_spots: Dict[str, List[Resource]] = {} + self._tip_next_index: Dict[str, int] = {} + self._tip_pools_initialized = True + + # 遍历 deck 直接子资源,收集所有 TipRack + deck = getattr(self, "deck", None) + if deck is not None: + for child in deck.children: + self._register_rack(child) + def set_tiprack(self, tip_racks: Sequence[TipRack]): - """Set the tip racks for the liquid handler. + """设置当前 transfer 使用的枪头类型。 - Groups tip racks by type name (``type(rack).__name__``). - - Only actual TipRack / TipSpot instances are registered. - - If a rack has already been registered (by ``name``), it is skipped. - - If a rack is new and its type already exists, it is appended to that type's list. - - If the type is new, a new key-value pair is created. + 首次调用时从 ``self.deck`` 一次性扫描所有 TipRack/TipSpot,按 + ``model``(或 ``type(rack).__name__``)分组构建扁平枪头池与消费下标。 + 后续调用仅切换 ``_active_tip_type_key``,不重建池。 - If the current ``tip_racks`` contain no valid TipRack/TipSpot (e.g. a - Plate was passed by mistake), the iterator falls back to all previously - registered racks. + 同型号多次 transfer 时,游标接续(如 A1-A12 用完后继续 B1-B12), + 而非从新盒 A1 重新开始。 """ - if not hasattr(self, '_tip_racks_by_type'): - self._tip_racks_by_type: Dict[str, List[TipRack]] = {} - self._seen_rack_names: Set[str] = set() + # —— 首次:全量初始化 —— + if not getattr(self, "_tip_pools_initialized", False): + self._init_all_tip_pools() + # 将本次传入但 deck 上不存在的新盒也注册进去(兜底) for rack in tip_racks: - if not isinstance(rack, (TipRack, TipSpot)): - continue - rack_name = rack.name if hasattr(rack, 'name') else str(id(rack)) - if rack_name in self._seen_rack_names: - continue - self._seen_rack_names.add(rack_name) - type_key = type(rack).__name__ - if type_key not in self._tip_racks_by_type: - self._tip_racks_by_type[type_key] = [] - self._tip_racks_by_type[type_key].append(rack) + self._register_rack(rack) + # —— 切换当前激活的枪头类型(按 model 区分 10uL/300uL 等)—— + first_valid = next( + (r for r in tip_racks if isinstance(r, (TipRack, TipSpot))), + None, + ) + self._active_tip_type_key = ( + self._tip_type_key(first_valid) if first_valid is not None else None + ) + + # 兼容旧路径(add_liquid / remove_liquid 等可能直接用 current_tip) + self.tip_racks = tip_racks valid_racks = [r for r in tip_racks if isinstance(r, (TipRack, TipSpot))] if not valid_racks: valid_racks = [r for racks in self._tip_racks_by_type.values() for r in racks] - - self.tip_racks = tip_racks self.current_tip = self.iter_tips(valid_racks) async def move_to(self, well: Well, dis_to_top: float = 0, channel: int = 0): diff --git a/unilabos/devices/liquid_handling/prcxi/prcxi.py b/unilabos/devices/liquid_handling/prcxi/prcxi.py index 9fbf213f..855668e3 100644 --- a/unilabos/devices/liquid_handling/prcxi/prcxi.py +++ b/unilabos/devices/liquid_handling/prcxi/prcxi.py @@ -881,6 +881,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract): mat_uuid = resource._unilabos_state["Material"].get("uuid") if mat_uuid and mat_uuid in material_uuid_map: work_tablets.append({"Number": number, "Material": material_uuid_map[mat_uuid]}) + slot_none.remove(number) continue # 根据 resource 类型推断 materialEnum diff --git a/unilabos/test/experiments/prcxi_9320_slim.json b/unilabos/test/experiments/prcxi_9320_slim.json index d04df6c7..196debf0 100644 --- a/unilabos/test/experiments/prcxi_9320_slim.json +++ b/unilabos/test/experiments/prcxi_9320_slim.json @@ -31,7 +31,7 @@ "step_mode": true, "calibration_points": { "line_1": [[452.07,21.19], [313.88,21.19], [176.87,21.19], [39.08,21.19]], - "line_2": [[415.67,116.68], [313.28,116.88], [176.58,116.69], [38.58,117.18]], + "line_2": [[451.37,116.68], [313.28,116.88], [176.58,116.69], [38.58,117.18]], "line_3": [[450.87,212.18], [312.98,212.38], [176.08,212.68], [38.08,213.18]], "line_4": [[450.08,307.68], [312.18,307.89], [175.18,308.18], [37.58,309.18]] } diff --git a/unilabos/workflow/common.py b/unilabos/workflow/common.py index 421613c8..24f7f3c5 100644 --- a/unilabos/workflow/common.py +++ b/unilabos/workflow/common.py @@ -23,9 +23,9 @@ - 遍历所有 reagent,按 slot 去重,为每个唯一的 slot 创建一个板子 - 所有 create_resource 节点的 parent_uuid 指向 Group 节点,minimized=true - 生成参数: - res_id: plate_slot_{slot} + res_id / 节点 name / display_name: {匹配后的 prcxi 类名}_slot_{槽位} device_id: /PRCXI - class_name: PRCXI_BioER_96_wellplate + class_name: 与 res_id 中类型一致(如 PRCXI 384/96 孔板注册类) parent: /PRCXI/PRCXI_Deck slot_on_deck: "{slot}" - 输出端口: labware(用于连接 set_liquid_from_plate) @@ -207,6 +207,62 @@ def _infer_tube_rack_num_positions(labware_id: str, item: Dict[str, Any]) -> int return 96 +def _infer_plate_num_children_from_wells(wells: Any) -> Optional[int]: + """根据 well 名推断孔板总孔数档位:列>12 或 行>H(8) 视为 384,否则 96。""" + if not isinstance(wells, list) or not wells: + return None + max_row = 0 + max_col = 0 + for w in wells: + m = re.match(r"^([A-Za-z]+)(\d+)$", str(w).strip()) + if not m: + continue + row_s, col_s = m.group(1).upper(), m.group(2) + ri = 0 + for ch in row_s: + ri = ri * 26 + (ord(ch) - ord("A") + 1) + max_row = max(max_row, ri) + max_col = max(max_col, int(col_s)) + if max_col <= 0: + return None + if max_col > 12 or max_row > 8: + return 384 + return 96 + + +def _infer_plate_num_children_from_labware_hint(labware_id: str, item: Dict[str, Any]) -> Optional[int]: + """从 labware 命名(如 custom_384_wellplate、nest_96_wellplate)解析孔数,供模板匹配。""" + hint = _labware_hint_text(labware_id, item) + m = re.search( + r"\b(1536|384|96|48|24|12|6)(\s*[-_]?\s*well|wellplate|_well_)", + hint, + ) + if m: + return int(m.group(1)) + m = re.search(r"[_\s](1536|384|96|48|24|12|6)[_\s]", hint) + if m and ("well" in hint or "plate" in hint): + return int(m.group(1)) + return None + + +def _infer_plate_num_children( + labware_id: str, + item: Dict[str, Any], + wells: Any, + num_from_def: int, +) -> int: + """孔板用于 PRCXI 匹配的孔数:优先定义表,其次命名,再 well 地址,最后默认 96。""" + if num_from_def > 0: + return num_from_def + hinted = _infer_plate_num_children_from_labware_hint(labware_id, item) + if hinted is not None: + return hinted + from_wells = _infer_plate_num_children_from_wells(wells) + if from_wells is not None: + return from_wells + return 96 + + def _tip_volume_hint(item: Dict[str, Any], labware_id: str) -> Optional[float]: s = _labware_hint_text(labware_id, item) for v in (1250, 1000, 300, 200, 10): @@ -386,7 +442,8 @@ def _apply_prcxi_labware_auto_match( else: num_children = _infer_tube_rack_num_positions(labware_id, item) else: - num_children = num_from_def if num_from_def > 0 else 96 + # plate:勿在无 labware_defs 时默认 96,否则 384 板会被错配成 96 模板 + num_children = _infer_plate_num_children(labware_id, item, wells, num_from_def) child_max_volume = item.get("max_volume") if child_max_volume is None: @@ -752,11 +809,8 @@ def build_protocol_graph( prcxi_val = str(pv) if pv else None labware = str(chosen_item.get("labware", "") or "") - res_id = f"{labware}_slot_{slot}" if labware.strip() else f"{chosen_lid}_slot_{slot}" - res_id = res_id.replace(" ", "_") slots_info[slot] = { "labware": labware, - "res_id": res_id, "labware_id": chosen_lid, "object": chosen_item.get("object", "") or "", "prcxi_class_name": prcxi_val, @@ -782,7 +836,6 @@ def build_protocol_graph( # 为每个唯一的 slot 创建 create_resource 节点 for slot, info in slots_info.items(): node_id = str(uuid.uuid4()) - res_id = info["res_id"] object_type = info.get("object", "") or "" ot_lo = str(object_type).strip().lower() matched = info.get("prcxi_class_name") @@ -798,11 +851,14 @@ def build_protocol_graph( res_type_name = CLASS_NAMES_MAPPING.get("tip_rack", "PRCXI_300ul_Tips") else: res_type_name = f"lab_{info['labware'].lower().replace('.', 'point')}" + # 上传物料:匹配后的类型名 + _slot_ + 槽位(name / display_name / res_id 一致) + res_id = f"{res_type_name}_slot_{slot}".replace(" ", "_") G.add_node( node_id, template_name="create_resource", resource_name="host_node", - name=f"{res_type_name}_slot{slot}", + name=res_id, + display_name=res_id, description=f"Create plate on slot {slot}", lab_node_type="Labware", footer="create_resource-host_node", @@ -857,19 +913,25 @@ def build_protocol_graph( if not wells or not slot: continue - # res_id 不能有空格 + # res_id 不能有空格(液体名仍用协议中的 reagent key) res_id = str(labware_id).replace(" ", "_") well_count = len(wells) liquid_volume = DEFAULT_LIQUID_VOLUME if object_type == "source" else 0 node_id = str(uuid.uuid4()) set_liquid_index += 1 + prcxi_mat = item.get("prcxi_class_name") + if prcxi_mat: + sl_node_title = f"{prcxi_mat}_slot_{slot}_{res_id}" + else: + sl_node_title = f"lab_{res_id.lower()}_slot_{slot}_{set_liquid_index}" G.add_node( node_id, template_name="set_liquid_from_plate", resource_name="liquid_handler.prcxi", - name=f"SetLiquid {set_liquid_index}", + name=sl_node_title, + display_name=sl_node_title, description=f"Set liquid: {labware_id}", lab_node_type="Reagent", footer="set_liquid_from_plate-liquid_handler.prcxi",