Compare commits

...

2 Commits

Author SHA1 Message Date
q434343
6b3f9756a0 修改真机运动方式, 2026-03-31 14:33:50 +08:00
q434343
afddc6e40c 修改上传工作流部分代码 2026-03-31 14:32:48 +08:00
6 changed files with 626 additions and 118 deletions

View File

@@ -215,17 +215,38 @@ class LiquidHandlerMiddleware(LiquidHandler):
if spread == "":
spread = "custom"
for res in resources:
for i, res in enumerate(resources):
tracker = getattr(res, "tracker", None)
if tracker is None or getattr(tracker, "is_disabled", False):
continue
history = getattr(tracker, "liquid_history", None)
if tracker.get_used_volume() <= 0 and isinstance(history, list) and len(history) == 0:
fill_vol = tracker.max_volume if tracker.max_volume > 0 else 50000
need = float(vols[i]) if i < len(vols) else 0.0
if blow_out_air_volume and i < len(blow_out_air_volume) and blow_out_air_volume[i] is not None:
need += float(blow_out_air_volume[i] or 0.0)
if need <= 0:
continue
try:
used = float(tracker.get_used_volume())
except Exception:
used = 0.0
if used >= need:
continue
mv = float(getattr(tracker, "max_volume", 0) or 0)
if used <= 0:
# 与旧逻辑一致:空孔优先加满(或极大默认),避免仅有 history 记录但 used=0 时不补液
fill_vol = mv if mv > 0 else max(need, 50000.0)
else:
fill_vol = need - used
if mv > 0:
fill_vol = min(fill_vol, max(0.0, mv - used))
try:
tracker.add_liquid(fill_vol)
except Exception:
try:
tracker.add_liquid(fill_vol)
tracker.add_liquid(max(need - used, 1.0))
except Exception:
tracker.liquid_history.append(("auto_init", fill_vol))
history = getattr(tracker, "liquid_history", None)
if isinstance(history, list):
history.append(("auto_init", max(fill_vol, need, 1.0)))
if self._simulator:
try:
@@ -277,6 +298,37 @@ class LiquidHandlerMiddleware(LiquidHandler):
spread,
**backend_kwargs,
)
except (TooLittleLiquidError, TooLittleVolumeError) as e:
tracker_info = []
for r in resources:
t = getattr(r, "tracker", None)
if t is None:
tracker_info.append(f"{r.name}(no_tracker)")
else:
try:
tracker_info.append(
f"{r.name}(used={t.get_used_volume():.1f}, "
f"free={t.get_free_volume():.1f}, max={getattr(r, 'max_volume', '?')})"
)
except Exception:
tracker_info.append(f"{r.name}(tracker_err)")
if hasattr(self, "_ros_node") and self._ros_node is not None:
self._ros_node.lab_logger().warning(
f"[aspirate] hardware tracker shortfall, retry without volume tracking. "
f"error={e}, vols={vols}, trackers={tracker_info}"
)
with no_volume_tracking():
await super().aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
except ValueError as e:
if "Resource is too small to space channels" in str(e) and spread != "custom":
await super().aspirate(
@@ -1620,25 +1672,25 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
use_channels=use_channels,
)
if blow_out_air_volume_before_vol > 0:
source_tracker = getattr(sources[0], "tracker", None)
source_tracker_was_disabled = bool(getattr(source_tracker, "is_disabled", False))
try:
if source_tracker is not None and hasattr(source_tracker, "disable"):
source_tracker.disable()
await self.aspirate(
resources=[sources[0]],
vols=[blow_out_air_volume_before_vol],
use_channels=use_channels,
flow_rates=None,
offsets=[Coordinate(x=0, y=0, z=sources[0].get_size_z())],
liquid_height=None,
blow_out_air_volume=None,
spread="custom",
)
finally:
if source_tracker is not None:
source_tracker.enable()
# if blow_out_air_volume_before_vol > 0:
# source_tracker = getattr(sources[0], "tracker", None)
# source_tracker_was_disabled = bool(getattr(source_tracker, "is_disabled", False))
# try:
# if source_tracker is not None and hasattr(source_tracker, "disable"):
# source_tracker.disable()
# await self.aspirate(
# resources=[sources[0]],
# vols=[blow_out_air_volume_before_vol],
# use_channels=use_channels,
# flow_rates=None,
# offsets=[Coordinate(x=0, y=0, z=sources[0].get_size_z())],
# liquid_height=None,
# blow_out_air_volume=None,
# spread="custom",
# )
# finally:
# if source_tracker is not None:
# source_tracker.enable()
await self.aspirate(
resources=[sources[0]],

View File

@@ -86,6 +86,23 @@ class MatrixInfo(TypedDict):
WorkTablets: list[WorkTablets]
def _get_slot_number(resource) -> Optional[int]:
"""从 resource 的 unilabos_extra["update_resource_site"](如 "T13")或位置反算槽位号。"""
extra = getattr(resource, "unilabos_extra", {}) or {}
site = extra.get("update_resource_site", "")
if site:
digits = "".join(c for c in str(site) if c.isdigit())
return int(digits) if digits else None
loc = getattr(resource, "location", None)
if loc is not None and loc.x is not None and loc.y is not None:
col = round((loc.x - 5) / 137.5)
row = round(3 - (loc.y - 13) / 96)
idx = row * 4 + col
if 0 <= idx < 16:
return idx + 1
return None
class PRCXI9300Deck(Deck):
"""PRCXI 9300 的专用 Deck 类,继承自 Deck。
@@ -837,22 +854,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
@staticmethod
def _get_slot_number(resource) -> Optional[int]:
"""从 resource 的 unilabos_extra["update_resource_site"](如 "T13")或位置反算槽位号。"""
extra = getattr(resource, "unilabos_extra", {}) or {}
site = extra.get("update_resource_site", "")
if site:
digits = "".join(c for c in str(site) if c.isdigit())
return int(digits) if digits else None
# 使用 resource.location.x 和 resource.location.y 反算槽位号
# 参考 _DEFAULT_SITE_POSITIONS: x = (i%4)*137.5+5, y = (int(i/4))*96+13
loc = getattr(resource, "location", None)
if loc is not None and loc.x is not None and loc.y is not None:
col = round((loc.x - 5) / 137.5) # 0-3
row = round(3-(loc.y - 13) / 96) # 0-3
idx = row * 4 + col # 0-15
if 0 <= idx < 16:
return idx + 1 # 槽位号从 1 开始
return None
return _get_slot_number(resource)
def _match_and_create_matrix(self):
"""首次 transfer_liquid 时,根据 deck 上的 resource 自动匹配耗材并创建 WorkTabletMatrix。"""
@@ -972,7 +974,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
if child.children:
pip_pos = self.plr_pos_to_prcxi(child.children[0], self.left_2_claw)
else:
pip_pos = self.plr_pos_to_prcxi(child, Coordinate(50, self.left_2_claw.y, self.left_2_claw.z))
pip_pos = self.plr_pos_to_prcxi(child, Coordinate(-100, self.left_2_claw.y, self.left_2_claw.z))
half_x = child.get_size_x() / 2 * abs(1 + self.x_increase)
z_wall = child.get_size_z()
@@ -1006,7 +1008,10 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
raise PRCXIError(f"Failed to create auto-matched matrix: {res.get('Message', 'Unknown error')}")
def plr_pos_to_prcxi(self, resource: Resource, offset: Coordinate = Coordinate(0, 0, 0)):
resource_pos = resource.get_absolute_location(x="c",y="c",z="t")
z_pos = 'c'
if isinstance(resource, Tip):
z_pos = 'b'
resource_pos = resource.get_absolute_location(x="c",y="c",z=z_pos)
x = resource_pos.x
y = resource_pos.y
z = resource_pos.z
@@ -1437,6 +1442,24 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self.deck_z = deck_z
self.tip_length = 0
@staticmethod
def _deck_plate_slot_no(plate, deck) -> int:
"""台面板位槽号116与 PRCXI9300Handler._get_slot_number 一致;无法解析时退回 deck 子项顺序 +1。"""
sn = PRCXI9300Handler._get_slot_number(plate)
if sn is not None:
return sn
return deck.children.index(plate) + 1
@staticmethod
def _resource_num_items_y(resource) -> int:
"""板/TipRack 等在 Y 向孔位数;无 ``num_items_y`` 或非正数时返回 1。"""
ny = getattr(resource, "num_items_y", None)
try:
n = int(ny) if ny is not None else 1
except (TypeError, ValueError):
n = 1
return n if n >= 1 else 1
async def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool):
step = self.api_client.shaker_action(
time=time,
@@ -1610,33 +1633,33 @@ class PRCXI9300Backend(LiquidHandlerBackend):
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(_use_channels))
plate_indexes = []
plate_slots = []
for op in ops:
plate = op.resource.parent
deck = plate.parent
plate_index = deck.children.index(plate)
# print(f"Plate index: {plate_index}, Plate name: {plate.name}")
# print(f"Number of children in deck: {len(deck.children)}")
plate_slots.append(self._deck_plate_slot_no(plate, deck))
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
raise ValueError("All pickups must be from the same plate. Found different plates: " + str(plate_indexes))
if len(set(plate_slots)) != 1:
raise ValueError("All pickups must be from the same plate (slot). Found different slots: " + str(plate_slots))
_rack = ops[0].resource.parent
ny = self._resource_num_items_y(_rack)
tip_columns = []
for op in ops:
tipspot = op.resource
if self._resource_num_items_y(tipspot.parent) != ny:
raise ValueError("All pickups must use tip racks with the same num_items_y")
tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // 8)
tip_columns.append(tipspot_index // ny)
if len(set(tip_columns)) != 1:
raise ValueError(
"All pickups must be from the same tip column. Found different columns: " + str(tip_columns)
)
PlateNo = plate_indexes[0] + 1
PlateNo = plate_slots[0]
hole_col = tip_columns[0] + 1
hole_row = 1
if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1
hole_row = tipspot_index % ny + 1
step = self.api_client.Load(
axis=axis,
@@ -1647,8 +1670,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col,
blending_times=0,
balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers=f"{(hole_col - 1) * 8 + hole_row}" if self._num_channels != 8 else "1,2,3,4,5",
plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}",
hole_numbers=f"{(hole_col - 1) * ny + hole_row}" if self._num_channels != 8 else "1,2,3,4,5",
)
self.steps_todo_list.append(step)
@@ -1666,8 +1689,9 @@ class PRCXI9300Backend(LiquidHandlerBackend):
raise ValueError("Invalid use channels: " + str(_use_channels))
# 检查trash #
if ops[0].resource.name == "trash":
PlateNo = ops[0].resource.parent.parent.children.index(ops[0].resource.parent) + 1
_plate = ops[0].resource
_deck = _plate.parent
PlateNo = self._deck_plate_slot_no(_plate, _deck)
step = self.api_client.UnLoad(
axis=axis,
@@ -1685,32 +1709,35 @@ class PRCXI9300Backend(LiquidHandlerBackend):
return
# print(ops[0].resource.parent.children.index(ops[0].resource))
plate_indexes = []
plate_slots = []
for op in ops:
plate = op.resource.parent
deck = plate.parent
plate_index = deck.children.index(plate)
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
plate_slots.append(self._deck_plate_slot_no(plate, deck))
if len(set(plate_slots)) != 1:
raise ValueError(
"All drop_tips must be from the same plate. Found different plates: " + str(plate_indexes)
"All drop_tips must be from the same plate (slot). Found different slots: " + str(plate_slots)
)
_rack = ops[0].resource.parent
ny = self._resource_num_items_y(_rack)
tip_columns = []
for op in ops:
tipspot = op.resource
if self._resource_num_items_y(tipspot.parent) != ny:
raise ValueError("All drop_tips must use tip racks with the same num_items_y")
tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // 8)
tip_columns.append(tipspot_index // ny)
if len(set(tip_columns)) != 1:
raise ValueError(
"All drop_tips must be from the same tip column. Found different columns: " + str(tip_columns)
)
PlateNo = plate_indexes[0] + 1
PlateNo = plate_slots[0]
hole_col = tip_columns[0] + 1
hole_row = 1
if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1
hole_row = tipspot_index % ny + 1
step = self.api_client.UnLoad(
axis=axis,
@@ -1721,7 +1748,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col,
blending_times=0,
balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8",
)
self.steps_todo_list.append(step)
@@ -1744,31 +1771,34 @@ class PRCXI9300Backend(LiquidHandlerBackend):
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(use_channels))
plate_indexes = []
plate_slots = []
for op in targets:
deck = op.parent.parent.parent
plate = op.parent
plate_index = deck.children.index(plate)
plate_indexes.append(plate_index)
plate_slots.append(self._deck_plate_slot_no(plate, deck))
if len(set(plate_indexes)) != 1:
raise ValueError("All pickups must be from the same plate. Found different plates: " + str(plate_indexes))
if len(set(plate_slots)) != 1:
raise ValueError("All mix targets must be from the same plate (slot). Found different slots: " + str(plate_slots))
_plate0 = targets[0].parent
ny = self._resource_num_items_y(_plate0)
tip_columns = []
for op in targets:
if self._resource_num_items_y(op.parent) != ny:
raise ValueError("All mix targets must be on plates with the same num_items_y")
tipspot_index = op.parent.children.index(op)
tip_columns.append(tipspot_index // 8)
tip_columns.append(tipspot_index // ny)
if len(set(tip_columns)) != 1:
raise ValueError(
"All pickups must be from the same tip column. Found different columns: " + str(tip_columns)
"All mix targets must be in the same column group. Found different columns: " + str(tip_columns)
)
PlateNo = plate_indexes[0] + 1
PlateNo = plate_slots[0]
hole_col = tip_columns[0] + 1
hole_row = 1
if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1
hole_row = tipspot_index % ny + 1
assert mix_time > 0
step = self.api_client.Blending(
@@ -1779,7 +1809,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col,
blending_times=mix_time,
balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8",
)
self.steps_todo_list.append(step)
@@ -1796,36 +1826,39 @@ class PRCXI9300Backend(LiquidHandlerBackend):
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(_use_channels))
plate_indexes = []
plate_slots = []
for op in ops:
plate = op.resource.parent
deck = plate.parent
plate_index = deck.children.index(plate)
plate_indexes.append(plate_index)
plate_slots.append(self._deck_plate_slot_no(plate, deck))
if len(set(plate_indexes)) != 1:
raise ValueError("All pickups must be from the same plate. Found different plates: " + str(plate_indexes))
if len(set(plate_slots)) != 1:
raise ValueError("All aspirate must be from the same plate (slot). Found different slots: " + str(plate_slots))
_plate0 = ops[0].resource.parent
ny = self._resource_num_items_y(_plate0)
tip_columns = []
for op in ops:
tipspot = op.resource
if self._resource_num_items_y(tipspot.parent) != ny:
raise ValueError("All aspirate wells must be on plates with the same num_items_y")
tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // 8)
tip_columns.append(tipspot_index // ny)
if len(set(tip_columns)) != 1:
raise ValueError(
"All pickups must be from the same tip column. Found different columns: " + str(tip_columns)
"All aspirate must be from the same tip column. Found different columns: " + str(tip_columns)
)
volumes = [op.volume for op in ops]
if len(set(volumes)) != 1:
raise ValueError("All aspirate volumes must be the same. Found different volumes: " + str(volumes))
PlateNo = plate_indexes[0] + 1
PlateNo = plate_slots[0]
hole_col = tip_columns[0] + 1
hole_row = 1
if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1
hole_row = tipspot_index % ny + 1
step = self.api_client.Imbibing(
axis=axis,
@@ -1836,7 +1869,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col,
blending_times=0,
balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8",
)
self.steps_todo_list.append(step)
@@ -1853,21 +1886,24 @@ class PRCXI9300Backend(LiquidHandlerBackend):
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(_use_channels))
plate_indexes = []
plate_slots = []
for op in ops:
plate = op.resource.parent
deck = plate.parent
plate_index = deck.children.index(plate)
plate_indexes.append(plate_index)
plate_slots.append(self._deck_plate_slot_no(plate, deck))
if len(set(plate_indexes)) != 1:
raise ValueError("All dispense must be from the same plate. Found different plates: " + str(plate_indexes))
if len(set(plate_slots)) != 1:
raise ValueError("All dispense must be from the same plate (slot). Found different slots: " + str(plate_slots))
_plate0 = ops[0].resource.parent
ny = self._resource_num_items_y(_plate0)
tip_columns = []
for op in ops:
tipspot = op.resource
if self._resource_num_items_y(tipspot.parent) != ny:
raise ValueError("All dispense wells must be on plates with the same num_items_y")
tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // 8)
tip_columns.append(tipspot_index // ny)
if len(set(tip_columns)) != 1:
raise ValueError(
@@ -1878,12 +1914,12 @@ class PRCXI9300Backend(LiquidHandlerBackend):
if len(set(volumes)) != 1:
raise ValueError("All dispense volumes must be the same. Found different volumes: " + str(volumes))
PlateNo = plate_indexes[0] + 1
PlateNo = plate_slots[0]
hole_col = tip_columns[0] + 1
hole_row = 1
if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1
hole_row = tipspot_index % ny + 1
step = self.api_client.Tapping(
axis=axis,
@@ -1894,7 +1930,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col,
blending_times=0,
balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8",
)
self.steps_todo_list.append(step)

View File

@@ -1,4 +1,4 @@
from typing import Optional
from typing import Any, Callable, Dict, List, Optional, Tuple
from pylabrobot.resources import Tube, Coordinate
from pylabrobot.resources.well import Well, WellBottomType, CrossSectionType
from pylabrobot.resources.tip import Tip, TipCreator
@@ -838,4 +838,102 @@ def PRCXI_30mm_Adapter(name: str) -> PRCXI9300PlateAdapter:
"Name": "30mm适配器",
"SupplyType": 2
}
)
)
# ---------------------------------------------------------------------------
# 协议上传 / workflow 用:与设备端耗材字典字段对齐的模板描述(供 common 自动匹配)
# ---------------------------------------------------------------------------
_PRCXI_TEMPLATE_SPECS_CACHE: Optional[List[Dict[str, Any]]] = None
def _probe_prcxi_resource(factory: Callable[..., Any]) -> Any:
probe = "__unilab_template_probe__"
if factory.__name__ == "PRCXI_trash":
return factory()
return factory(probe)
def _first_child_capacity_for_match(resource: Any) -> float:
"""Well max_volume 或 Tip 的 maximal_volume用于与设备端 Volume 类似的打分。"""
ch = getattr(resource, "children", None) or []
if not ch:
return 0.0
c0 = ch[0]
mv = getattr(c0, "max_volume", None)
if mv is not None:
return float(mv)
tip = getattr(c0, "tip", None)
if tip is not None:
mv2 = getattr(tip, "maximal_volume", None)
if mv2 is not None:
return float(mv2)
return 0.0
# (factory, kind) — 不含各类 Adapter避免与真实板子误匹配
PRCXI_TEMPLATE_FACTORY_KINDS: List[Tuple[Callable[..., Any], str]] = [
(PRCXI_BioER_96_wellplate, "plate"),
(PRCXI_nest_1_troughplate, "plate"),
(PRCXI_BioRad_384_wellplate, "plate"),
(PRCXI_AGenBio_4_troughplate, "plate"),
(PRCXI_nest_12_troughplate, "plate"),
(PRCXI_CellTreat_96_wellplate, "plate"),
(PRCXI_10ul_eTips, "tip_rack"),
(PRCXI_300ul_Tips, "tip_rack"),
(PRCXI_PCR_Plate_200uL_nonskirted, "plate"),
(PRCXI_PCR_Plate_200uL_semiskirted, "plate"),
(PRCXI_PCR_Plate_200uL_skirted, "plate"),
(PRCXI_trash, "trash"),
(PRCXI_96_DeepWell, "plate"),
(PRCXI_EP_Adapter, "tube_rack"),
(PRCXI_1250uL_Tips, "tip_rack"),
(PRCXI_10uL_Tips, "tip_rack"),
(PRCXI_1000uL_Tips, "tip_rack"),
(PRCXI_200uL_Tips, "tip_rack"),
(PRCXI_48_DeepWell, "plate"),
]
def get_prcxi_labware_template_specs() -> List[Dict[str, Any]]:
"""返回与 ``prcxi._match_and_create_matrix`` 中耗材字段兼容的模板列表,用于按孔数+容量打分。"""
global _PRCXI_TEMPLATE_SPECS_CACHE
if _PRCXI_TEMPLATE_SPECS_CACHE is not None:
return _PRCXI_TEMPLATE_SPECS_CACHE
out: List[Dict[str, Any]] = []
for factory, kind in PRCXI_TEMPLATE_FACTORY_KINDS:
try:
r = _probe_prcxi_resource(factory)
except Exception:
continue
nx = int(getattr(r, "num_items_x", None) or 0)
ny = int(getattr(r, "num_items_y", None) or 0)
nchild = len(getattr(r, "children", []) or [])
hole_count = nx * ny if nx > 0 and ny > 0 else nchild
hole_row = ny if nx > 0 and ny > 0 else 0
hole_col = nx if nx > 0 and ny > 0 else 0
mi = getattr(r, "material_info", None) or {}
vol = _first_child_capacity_for_match(r)
menum = mi.get("materialEnum")
if menum is None and kind == "tip_rack":
menum = 1
elif menum is None and kind == "trash":
menum = 6
out.append(
{
"class_name": factory.__name__,
"kind": kind,
"materialEnum": menum,
"HoleRow": hole_row,
"HoleColum": hole_col,
"Volume": vol,
"hole_count": hole_count,
"material_uuid": mi.get("uuid"),
"material_code": mi.get("Code"),
}
)
_PRCXI_TEMPLATE_SPECS_CACHE = out
return out

View File

@@ -152,6 +152,253 @@ def _map_deck_slot(raw_slot: str, object_type: str = "") -> str:
return {"4": "13", "8": "14"}.get(s, s)
def _labware_def_index(labware_defs: Optional[List[Dict[str, Any]]]) -> Dict[str, Dict[str, Any]]:
m: Dict[str, Dict[str, Any]] = {}
for d in labware_defs or []:
for k in ("id", "name", "reagent_id", "reagent"):
key = d.get(k)
if key is not None and str(key):
m[str(key)] = d
return m
def _labware_hint_text(labware_id: str, item: Dict[str, Any]) -> str:
"""合并 id 与协议里的 labware 描述OpenTrons 全名常在 labware 字段)。"""
parts = [str(labware_id), str(item.get("labware", "") or "")]
return " ".join(parts).lower()
def _infer_reagent_kind(labware_id: str, item: Dict[str, Any]) -> str:
ot = (item.get("object") or "").strip().lower()
if ot == "trash":
return "trash"
if ot == "tiprack":
return "tip_rack"
lid = _labware_hint_text(labware_id, item)
if "trash" in lid:
return "trash"
# tiprack / tip + rack顺序在 tuberack 之前)
if "tiprack" in lid or ("tip" in lid and "rack" in lid):
return "tip_rack"
# 离心管架 / OpenTrons tuberack勿与 96 tiprack 混淆)
if "tuberack" in lid or "tube_rack" in lid:
return "tube_rack"
if "eppendorf" in lid and "rack" in lid:
return "tube_rack"
if "safelock" in lid and "rack" in lid:
return "tube_rack"
if "rack" in lid and "tip" not in lid:
return "tube_rack"
return "plate"
def _infer_tube_rack_num_positions(labware_id: str, item: Dict[str, Any]) -> int:
"""从 ``24_tuberack`` 等命名中解析孔位数;解析不到则默认 24与 PRCXI_EP_Adapter 4×6 一致)。"""
hint = _labware_hint_text(labware_id, item)
m = re.search(r"(\d+)_tuberack", hint)
if m:
return int(m.group(1))
m = re.search(r"tuberack[_\s]*(\d+)", hint)
if m:
return int(m.group(1))
m = re.search(r"(\d+)\s*[-_]?\s*pos(?:ition)?s?", hint)
if m:
return int(m.group(1))
return 96
def _tip_volume_hint(item: Dict[str, Any], labware_id: str) -> Optional[float]:
s = _labware_hint_text(labware_id, item)
for v in (1250, 1000, 300, 200, 10):
if f"{v}ul" in s or f"{v}μl" in s or f"{v}u" in s:
return float(v)
if f" {v} " in f" {s} ":
return float(v)
return None
def _volume_template_covers_requirement(template: Dict[str, Any], req: Optional[float], kind: str) -> bool:
"""有明确需求体积时,模板标称 Volume 必须 >= 需求;无 Volume 的模板不参与trash 除外)。"""
if kind == "trash":
return True
if req is None or req <= 0:
return True
mv = float(template.get("Volume") or 0)
if mv <= 0:
return False
return mv >= req
def _direct_labware_class_name(item: Dict[str, Any]) -> str:
"""仅用于 tip_rack 且 ``preserve_tip_rack_incoming_class=True````class_name``/``class`` 原样;否则 ``labware`` → ``lab_*``。"""
explicit = item.get("class_name") or item.get("class")
if explicit is not None and str(explicit).strip() != "":
return str(explicit).strip()
lw = str(item.get("labware", "") or "").strip()
if lw:
return f"lab_{lw.lower().replace('.', 'point').replace(' ', '_')}"
return ""
def _match_score_prcxi_template(
template: Dict[str, Any],
num_children: int,
child_max_volume: Optional[float],
) -> float:
"""孔数差主导;有需求体积且模板已满足 >= 时,余量比例 (模板-需求)/需求 越小越好(优先选刚好够的)。"""
hole_count = int(template.get("hole_count") or 0)
hole_diff = abs(num_children - hole_count)
material_volume = float(template.get("Volume") or 0)
req = child_max_volume
if req is not None and req > 0 and material_volume > 0:
vol_diff = (material_volume - req) / max(req, 1e-9)
elif material_volume > 0 and req is not None:
vol_diff = abs(float(req) - material_volume) / material_volume
else:
vol_diff = 0.0
return hole_diff * 1000 + vol_diff
def _apply_prcxi_labware_auto_match(
labware_info: Dict[str, Dict[str, Any]],
labware_defs: Optional[List[Dict[str, Any]]] = None,
*,
preserve_tip_rack_incoming_class: bool = True,
) -> None:
"""上传构建图前:按孔数+容量将 reagent 条目匹配到 ``prcxi_labware`` 注册类名,写入 ``prcxi_class_name``。
若给出需求体积,仅选用模板标称 Volume >= 该值的物料,并在满足条件的模板中选余量最小者。
``preserve_tip_rack_incoming_class=True``(默认)时:**仅 tip_rack** 不做模板匹配,类名由 ``class_name``/``class`` 或
``labware````lab_*``)直接给出;**plate / tube_rack / trash 等**仍按注册模板匹配。
``False`` 时 **全部**(含 tip_rack走模板匹配。"""
if not labware_info:
return
default_prcxi_tip_class = CLASS_NAMES_MAPPING.get("tip_rack", "PRCXI_300ul_Tips")
try:
from unilabos.devices.liquid_handling.prcxi.prcxi_labware import get_prcxi_labware_template_specs
except Exception:
return
templates = get_prcxi_labware_template_specs()
if not templates:
return
def_map = _labware_def_index(labware_defs)
for labware_id, item in labware_info.items():
if item.get("prcxi_class_name"):
continue
kind = _infer_reagent_kind(labware_id, item)
if preserve_tip_rack_incoming_class and kind == "tip_rack":
inc_s = _direct_labware_class_name(item)
if inc_s == default_prcxi_tip_class:
inc_s = ""
if inc_s:
item["prcxi_class_name"] = inc_s
continue
explicit = item.get("class_name") or item.get("class")
if explicit and str(explicit).startswith("PRCXI_"):
item["prcxi_class_name"] = str(explicit)
continue
extra = def_map.get(str(labware_id), {})
wells = item.get("well") or []
well_n = len(wells) if isinstance(wells, list) else 0
num_from_def = int(extra.get("num_wells") or extra.get("well_count") or item.get("num_wells") or 0)
if kind == "trash":
num_children = 0
elif kind == "tip_rack":
num_children = num_from_def if num_from_def > 0 else 96
elif kind == "tube_rack":
if num_from_def > 0:
num_children = num_from_def
elif well_n > 0:
num_children = well_n
else:
num_children = _infer_tube_rack_num_positions(labware_id, item)
else:
num_children = num_from_def if num_from_def > 0 else 96
child_max_volume = item.get("max_volume")
if child_max_volume is None:
child_max_volume = extra.get("max_volume")
try:
child_max_volume_f = float(child_max_volume) if child_max_volume is not None else None
except (TypeError, ValueError):
child_max_volume_f = None
if kind == "tip_rack" and child_max_volume_f is None:
child_max_volume_f = _tip_volume_hint(item, labware_id) or 300.0
candidates = [t for t in templates if t["kind"] == kind]
if not candidates:
continue
best = None
best_score = float("inf")
for t in candidates:
if kind != "trash" and int(t.get("hole_count") or 0) <= 0:
continue
if not _volume_template_covers_requirement(t, child_max_volume_f, kind):
continue
sc = _match_score_prcxi_template(t, num_children, child_max_volume_f)
if sc < best_score:
best_score = sc
best = t
if best:
item["prcxi_class_name"] = best["class_name"]
def _reconcile_slot_carrier_prcxi_class(
labware_info: Dict[str, Dict[str, Any]],
*,
preserve_tip_rack_incoming_class: bool = False,
) -> None:
"""同一 deck 槽位上多条 reagent 时,按载体类型优先级统一 ``prcxi_class_name``,避免先遍历到 96 板后槽位被错误绑定。
``preserve_tip_rack_incoming_class=True`` 时tip_rack 条目不参与同槽类名合并(不被覆盖、也不把 tip 类名扩散到同槽其它条目)。"""
by_slot: Dict[str, List[Tuple[str, Dict[str, Any]]]] = {}
for lid, item in labware_info.items():
ot = item.get("object", "") or ""
slot = _map_deck_slot(str(item.get("slot", "")), ot)
if not slot:
continue
by_slot.setdefault(str(slot), []).append((lid, item))
priority = {"trash": 0, "tube_rack": 1, "tip_rack": 2, "plate": 3}
for _slot, pairs in by_slot.items():
if len(pairs) < 2:
continue
def _rank(p: Tuple[str, Dict[str, Any]]) -> int:
return priority.get(_infer_reagent_kind(p[0], p[1]), 9)
pairs_sorted = sorted(pairs, key=_rank)
best_cls = None
for lid, it in pairs_sorted:
if preserve_tip_rack_incoming_class and _infer_reagent_kind(lid, it) == "tip_rack":
continue
c = it.get("prcxi_class_name")
if c:
best_cls = c
break
if not best_cls:
continue
for lid, it in pairs:
if preserve_tip_rack_incoming_class and _infer_reagent_kind(lid, it) == "tip_rack":
continue
it["prcxi_class_name"] = best_cls
# ---------------- Graph ----------------
@@ -377,6 +624,7 @@ def build_protocol_graph(
workstation_name: str,
action_resource_mapping: Optional[Dict[str, str]] = None,
labware_defs: Optional[List[Dict[str, Any]]] = None,
preserve_tip_rack_incoming_class: bool = True,
) -> WorkflowGraph:
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑
@@ -385,28 +633,67 @@ def build_protocol_graph(
protocol_steps: 协议步骤列表
workstation_name: 工作站名称
action_resource_mapping: action 到 resource_name 的映射字典,可选
labware_defs: 可选,``[{"id": "...", "num_wells": 96, "max_volume": 2200}, ...]`` 等,辅助 PRCXI 模板匹配
preserve_tip_rack_incoming_class: 默认 True 时**仅 tip_rack** 不跑模板匹配(类名由传入的 class/labware 决定);
**其它载体**仍按 PRCXI 模板匹配。False 时 **全部**(含 tip_rack都走模板匹配。
"""
G = WorkflowGraph()
resource_last_writer = {} # reagent_name -> "node_id:port"
slot_to_create_resource = {} # slot -> create_resource node_id
_apply_prcxi_labware_auto_match(
labware_info,
labware_defs,
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
)
_reconcile_slot_carrier_prcxi_class(
labware_info,
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
)
protocol_steps = refactor_data(protocol_steps, action_resource_mapping)
# ==================== 第一步:按 slot 去重创建 create_resource 节点 ====================
# 收集所有唯一的 slot
slots_info = {} # slot -> {labware, res_id}
# 按槽聚合:同一 slot 多条 reagent 时不能只取遍历顺序第一条,否则 tip 的 prcxi_class_name / object 会被其它条目盖住
by_slot: Dict[str, List[Tuple[str, Dict[str, Any]]]] = {}
for labware_id, item in labware_info.items():
object_type = item.get("object", "") or ""
slot = _map_deck_slot(str(item.get("slot", "")), object_type)
labware = item.get("labware", "")
if slot and slot not in slots_info:
res_id = f"{labware}_slot_{slot}"
slots_info[slot] = {
"labware": labware,
"res_id": res_id,
"labware_id": labware_id,
"object": object_type,
}
if not slot:
continue
by_slot.setdefault(slot, []).append((labware_id, item))
slots_info: Dict[str, Dict[str, Any]] = {}
for slot, pairs in by_slot.items():
def _ot_tip(it: Dict[str, Any]) -> bool:
return str(it.get("object", "") or "").strip().lower() == "tiprack"
tip_pairs = [(lid, it) for lid, it in pairs if _ot_tip(it)]
chosen_lid = ""
chosen_item: Dict[str, Any] = {}
prcxi_val: Optional[str] = None
scan = tip_pairs if tip_pairs else pairs
for lid, it in scan:
c = it.get("prcxi_class_name")
if c:
chosen_lid, chosen_item, prcxi_val = lid, it, str(c)
break
if not chosen_lid and scan:
chosen_lid, chosen_item = scan[0]
pv = chosen_item.get("prcxi_class_name")
prcxi_val = str(pv) if pv else None
labware = str(chosen_item.get("labware", "") or "")
res_id = f"{labware}_slot_{slot}" if labware.strip() else f"{chosen_lid}_slot_{slot}"
res_id = res_id.replace(" ", "_")
slots_info[slot] = {
"labware": labware,
"res_id": res_id,
"labware_id": chosen_lid,
"object": chosen_item.get("object", "") or "",
"prcxi_class_name": prcxi_val,
}
# 创建 Group 节点,包含所有 create_resource 节点
group_node_id = str(uuid.uuid4())
@@ -429,11 +716,21 @@ def build_protocol_graph(
for slot, info in slots_info.items():
node_id = str(uuid.uuid4())
res_id = info["res_id"]
res_type_name = info["labware"].lower().replace(".", "point")
object_type = info.get("object", "")
res_type_name = f"lab_{res_type_name}"
if object_type == "trash":
object_type = info.get("object", "") or ""
ot_lo = str(object_type).strip().lower()
matched = info.get("prcxi_class_name")
if ot_lo == "trash":
res_type_name = "PRCXI_trash"
elif matched:
res_type_name = matched
elif ot_lo == "tiprack":
if preserve_tip_rack_incoming_class:
lid = str(info.get("labware_id") or "").strip() or "tip_rack"
res_type_name = f"lab_{lid.lower().replace('.', 'point').replace(' ', '_')}"
else:
res_type_name = CLASS_NAMES_MAPPING.get("tip_rack", "PRCXI_300ul_Tips")
else:
res_type_name = f"lab_{info['labware'].lower().replace('.', 'point')}"
G.add_node(
node_id,
template_name="create_resource",
@@ -456,9 +753,9 @@ def build_protocol_graph(
},
)
slot_to_create_resource[slot] = node_id
if object_type == "tiprack":
if ot_lo == "tiprack":
resource_last_writer[info["labware_id"]] = f"{node_id}:labware"
if object_type == "trash":
if ot_lo == "trash":
trash_create_node_id = node_id
# create_resource 之间不需要 ready 连接

View File

@@ -210,6 +210,7 @@ def convert_from_json(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = DEFAULT_WORKSTATION,
validate: bool = True,
preserve_tip_rack_incoming_class: bool = True,
) -> WorkflowGraph:
"""
从 JSON 数据或文件转换为 WorkflowGraph
@@ -221,6 +222,8 @@ def convert_from_json(
data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi"
validate: 是否校验句柄配置,默认 True
preserve_tip_rack_incoming_class: True默认时仅 tip_rack 不跑模板、按传入类名/labware其它载体仍自动匹配。
False 时全部走模板。JSON 根 ``preserve_tip_rack_incoming_class`` 可覆盖此参数。
Returns:
WorkflowGraph: 构建好的工作流图
@@ -263,6 +266,10 @@ def convert_from_json(
# reagent 已经是字典格式,用于 set_liquid 和 well 数量查找
labware_info = reagent
preserve = preserve_tip_rack_incoming_class
if "preserve_tip_rack_incoming_class" in json_data:
preserve = bool(json_data["preserve_tip_rack_incoming_class"])
# 构建工作流图
graph = build_protocol_graph(
labware_info=labware_info,
@@ -270,6 +277,7 @@ def convert_from_json(
workstation_name=workstation_name,
action_resource_mapping=ACTION_RESOURCE_MAPPING,
labware_defs=labware_defs,
preserve_tip_rack_incoming_class=preserve,
)
# 校验句柄配置
@@ -287,6 +295,7 @@ def convert_from_json(
def convert_json_to_node_link(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = DEFAULT_WORKSTATION,
preserve_tip_rack_incoming_class: bool = True,
) -> Dict[str, Any]:
"""
将 JSON 数据转换为 node-link 格式的字典
@@ -298,13 +307,18 @@ def convert_json_to_node_link(
Returns:
Dict: node-link 格式的工作流数据
"""
graph = convert_from_json(data, workstation_name)
graph = convert_from_json(
data,
workstation_name,
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
)
return graph.to_node_link_dict()
def convert_json_to_workflow_list(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = DEFAULT_WORKSTATION,
preserve_tip_rack_incoming_class: bool = True,
) -> List[Dict[str, Any]]:
"""
将 JSON 数据转换为工作流列表格式
@@ -316,5 +330,9 @@ def convert_json_to_workflow_list(
Returns:
List: 工作流节点列表
"""
graph = convert_from_json(data, workstation_name)
graph = convert_from_json(
data,
workstation_name,
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
)
return graph.to_dict()

View File

@@ -234,6 +234,7 @@ def convert_from_json(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi",
validate: bool = True,
preserve_tip_rack_incoming_class: bool = True,
) -> WorkflowGraph:
"""
从 JSON 数据或文件转换为 WorkflowGraph
@@ -246,6 +247,7 @@ def convert_from_json(
data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi"
validate: 是否校验句柄配置,默认 True
preserve_tip_rack_incoming_class: True 时仅 tip 不跑模板False 时全部匹配JSON 根字段同名可覆盖
Returns:
WorkflowGraph: 构建好的工作流图
@@ -295,12 +297,17 @@ def convert_from_json(
"3. {'steps': [...], 'labware': [...]}"
)
preserve = preserve_tip_rack_incoming_class
if "preserve_tip_rack_incoming_class" in json_data:
preserve = bool(json_data["preserve_tip_rack_incoming_class"])
# 构建工作流图
graph = build_protocol_graph(
labware_info=labware_info,
protocol_steps=protocol_steps,
workstation_name=workstation_name,
action_resource_mapping=ACTION_RESOURCE_MAPPING,
preserve_tip_rack_incoming_class=preserve,
)
# 校验句柄配置