修改部分移液逻辑

This commit is contained in:
q434343
2026-04-09 18:06:12 +08:00
parent 56d25b88bd
commit 4d3a41ed0d
6 changed files with 256 additions and 53 deletions

View File

@@ -391,6 +391,7 @@ class LiquidHandlerMiddleware(LiquidHandler):
except Exception:
free_volume = None
if isinstance(free_volume, (int, float)):
req = min(req, max(float(free_volume), 0.0))
safe.append(req)
@@ -934,6 +935,10 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
if (isinstance(local_history, list) and len(local_history) == 0
and isinstance(plr_history, list) and len(plr_history) > 0):
local_tracker.liquid_history = list(plr_history)
elif (isinstance(local_history, list) and len(local_history) > 0
and isinstance(plr_history, list) and len(plr_history) == 0):
# 远端认为容器为空,重置本地 tracker 以保持同步
local_tracker.liquid_history = []
resolved.append(local)
if len(resolved) != len(uuids):
raise ValueError(
@@ -947,13 +952,17 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
if isinstance(orig_dict, dict) and hasattr(res, "tracker"):
tracker = res.tracker
local_history = getattr(tracker, "liquid_history", None)
data = orig_dict.get("data") or {}
dict_history = data.get("liquid_history")
if isinstance(local_history, list) and len(local_history) == 0:
data = orig_dict.get("data") or {}
dict_history = data.get("liquid_history")
if isinstance(dict_history, list) and len(dict_history) > 0:
tracker.liquid_history = [
(name, float(vol)) for name, vol in dict_history
]
elif isinstance(local_history, list) and len(local_history) > 0:
if isinstance(dict_history, list) and len(dict_history) == 0:
# 调用方认为容器为空,重置本地 tracker
tracker.liquid_history = []
result[idx] = res
return result
@@ -1528,7 +1537,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
# raise ValueError(f"dis_vols length must be equal to sources or targets length, but got {len_dis_vols} and {num_sources} and {num_targets}")
if len(use_channels) != 8:
max_len = max(num_sources, num_targets)
max_len = max(num_sources, num_targets, len_asp_vols, len_dis_vols)
prev_dropped = True # 循环开始前通道上无 tip
for i in range(max_len):
@@ -1663,7 +1672,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[targets[0]],
targets=[sources[0]],
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets if offsets else None,
@@ -1852,76 +1861,141 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
def iter_tips(self, tip_racks: Sequence[TipRack]) -> Iterator[Resource]:
"""Yield tips from a list of TipRacks one-by-one until depleted."""
for rack in tip_racks:
if isinstance(rack, TipSpot):
yield rack
elif isinstance(rack, TipRack):
for item in rack:
if isinstance(item, list):
yield from item
else:
yield item
yield from self._iter_tips_single_rack_or_spot(rack)
def _iter_tips_single_rack_or_spot(self, rack: Resource) -> Iterator[Resource]:
"""单盒或单孔:与 ``iter_tips`` 中单项逻辑一致,供扁平池构建复用。"""
if isinstance(rack, TipSpot):
yield rack
elif isinstance(rack, TipRack):
for item in rack:
if isinstance(item, list):
yield from item
else:
yield item
def _flatten_tips_from_one(self, rack: Resource) -> List[Resource]:
"""将单个 TipRack/TipSpot 展开为孔位列表(顺序与 ``iter_tips`` 一致)。"""
return list(self._iter_tips_single_rack_or_spot(rack))
def _get_next_tip(self):
""" current_tip 迭代器获取下一个 tip耗尽时抛出明确错误而非 StopIteration"""
"""按型号分组的扁平枪头池取下一孔;耗尽时抛出明确错误而非 StopIteration"""
key = getattr(self, "_active_tip_type_key", None)
flat_map = getattr(self, "_tip_flat_spots", None)
if key is not None and flat_map is not None:
flat = flat_map.get(key)
if flat is not None and len(flat) > 0:
idx = self._tip_next_index.get(key, 0)
if idx < len(flat):
self._tip_next_index[key] = idx + 1
return flat[idx]
diag = (
f"active_type_key={key}, next_index={idx}, pool_len={len(flat)}; "
f"_tip_racks_by_type[{key}] count={len(self._tip_racks_by_type.get(key, []))}"
)
raise RuntimeError(
"Tip rack exhausted: no more tips available for this tip type. "
f"Diagnostics: {diag}"
)
if not hasattr(self, "current_tip"):
raise RuntimeError(
"No tip source: call set_tiprack with TipRack/TipSpot before picking tips."
)
try:
return next(self.current_tip)
except StopIteration as e:
diag_parts = []
tip_racks = getattr(self, 'tip_racks', None)
tip_racks = getattr(self, "tip_racks", None)
if tip_racks is not None:
for idx, rack in enumerate(tip_racks):
r_name = getattr(rack, 'name', '?')
r_name = getattr(rack, "name", "?")
r_type = type(rack).__name__
is_tr = isinstance(rack, TipRack)
is_ts = isinstance(rack, TipSpot)
n_children = len(getattr(rack, 'children', []))
n_children = len(getattr(rack, "children", []))
diag_parts.append(
f"rack[{idx}] name={r_name}, type={r_type}, "
f"is_TipRack={is_tr}, is_TipSpot={is_ts}, children={n_children}"
)
else:
diag_parts.append("tip_racks=None")
by_type = getattr(self, '_tip_racks_by_type', {})
by_type = getattr(self, "_tip_racks_by_type", {})
diag_parts.append(f"_tip_racks_by_type keys={list(by_type.keys())}")
active = getattr(self, "_active_tip_type_key", None)
diag_parts.append(f"_active_tip_type_key={active}")
raise RuntimeError(
f"Tip rack exhausted: no more tips available for transfer. "
"Tip rack exhausted: no more tips available for transfer. "
f"Diagnostics: {'; '.join(diag_parts)}"
) from e
@staticmethod
def _tip_type_key(rack: Resource) -> str:
"""生成枪头盒的分组键:优先用 model区分 10uL/300uL 等),否则回退到类名。"""
model = getattr(rack, "model", None)
if model and str(model).strip():
return str(model).strip()
return type(rack).__name__
def _register_rack(self, rack: Resource) -> None:
"""将单个 TipRack/TipSpot 注册到按型号分组的扁平池(去重、不重置已消耗下标)。"""
if not isinstance(rack, (TipRack, TipSpot)):
return
rack_name = rack.name if hasattr(rack, "name") else str(id(rack))
if rack_name in self._seen_rack_names:
return
self._seen_rack_names.add(rack_name)
type_key = self._tip_type_key(rack)
self._tip_racks_by_type.setdefault(type_key, []).append(rack)
self._tip_flat_spots.setdefault(type_key, []).extend(self._flatten_tips_from_one(rack))
self._tip_next_index.setdefault(type_key, 0)
def _init_all_tip_pools(self) -> None:
"""首次调用:从 deck 上一次性扫描所有 TipRack/TipSpot构建完整的按型号扁平池。"""
self._tip_racks_by_type: Dict[str, List[TipRack]] = {}
self._seen_rack_names: Set[str] = set()
self._tip_flat_spots: Dict[str, List[Resource]] = {}
self._tip_next_index: Dict[str, int] = {}
self._tip_pools_initialized = True
# 遍历 deck 直接子资源,收集所有 TipRack
deck = getattr(self, "deck", None)
if deck is not None:
for child in deck.children:
self._register_rack(child)
def set_tiprack(self, tip_racks: Sequence[TipRack]):
"""Set the tip racks for the liquid handler.
"""设置当前 transfer 使用的枪头类型。
Groups tip racks by type name (``type(rack).__name__``).
- Only actual TipRack / TipSpot instances are registered.
- If a rack has already been registered (by ``name``), it is skipped.
- If a rack is new and its type already exists, it is appended to that type's list.
- If the type is new, a new key-value pair is created.
首次调用时从 ``self.deck`` 一次性扫描所有 TipRack/TipSpot
``model``(或 ``type(rack).__name__``)分组构建扁平枪头池与消费下标。
后续调用仅切换 ``_active_tip_type_key``,不重建池。
If the current ``tip_racks`` contain no valid TipRack/TipSpot (e.g. a
Plate was passed by mistake), the iterator falls back to all previously
registered racks.
同型号多次 transfer 时,游标接续(如 A1-A12 用完后继续 B1-B12
而非从新盒 A1 重新开始。
"""
if not hasattr(self, '_tip_racks_by_type'):
self._tip_racks_by_type: Dict[str, List[TipRack]] = {}
self._seen_rack_names: Set[str] = set()
# —— 首次:全量初始化 ——
if not getattr(self, "_tip_pools_initialized", False):
self._init_all_tip_pools()
# 将本次传入但 deck 上不存在的新盒也注册进去(兜底)
for rack in tip_racks:
if not isinstance(rack, (TipRack, TipSpot)):
continue
rack_name = rack.name if hasattr(rack, 'name') else str(id(rack))
if rack_name in self._seen_rack_names:
continue
self._seen_rack_names.add(rack_name)
type_key = type(rack).__name__
if type_key not in self._tip_racks_by_type:
self._tip_racks_by_type[type_key] = []
self._tip_racks_by_type[type_key].append(rack)
self._register_rack(rack)
# —— 切换当前激活的枪头类型(按 model 区分 10uL/300uL 等)——
first_valid = next(
(r for r in tip_racks if isinstance(r, (TipRack, TipSpot))),
None,
)
self._active_tip_type_key = (
self._tip_type_key(first_valid) if first_valid is not None else None
)
# 兼容旧路径add_liquid / remove_liquid 等可能直接用 current_tip
self.tip_racks = tip_racks
valid_racks = [r for r in tip_racks if isinstance(r, (TipRack, TipSpot))]
if not valid_racks:
valid_racks = [r for racks in self._tip_racks_by_type.values() for r in racks]
self.tip_racks = tip_racks
self.current_tip = self.iter_tips(valid_racks)
async def move_to(self, well: Well, dis_to_top: float = 0, channel: int = 0):

View File

@@ -881,6 +881,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
mat_uuid = resource._unilabos_state["Material"].get("uuid")
if mat_uuid and mat_uuid in material_uuid_map:
work_tablets.append({"Number": number, "Material": material_uuid_map[mat_uuid]})
slot_none.remove(number)
continue
# 根据 resource 类型推断 materialEnum