Compare commits

...

7 Commits

Author SHA1 Message Date
q434343
9a6f744afd Merge branch 'sjs_middle_school' into feat/lab_resource 2026-04-01 11:51:54 +08:00
q434343
8164d990cc 适配前吸空气部分 2026-04-01 11:50:42 +08:00
q434343
5c9c8a4ee9 Merge branch 'prcix9320' into sjs_middle_school 2026-03-31 18:48:20 +08:00
q434343
a48985720c 添加run_protocol参数 2026-03-31 16:11:11 +08:00
q434343
ad66fc1841 其他修改, 2026-03-31 14:57:51 +08:00
q434343
6b3f9756a0 修改真机运动方式, 2026-03-31 14:33:50 +08:00
q434343
afddc6e40c 修改上传工作流部分代码 2026-03-31 14:32:48 +08:00
17 changed files with 2356 additions and 654 deletions

View File

@@ -1,4 +1,95 @@
# CLAUDE.md
Please follow the rules defined in:
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
@AGENTS.md
## Build & Development
```bash
# Install (requires mamba env with python 3.11)
mamba create -n unilab python=3.11.14
mamba activate unilab
mamba install uni-lab::unilabos-env -c robostack-staging -c conda-forge
pip install -e .
uv pip install -r unilabos/utils/requirements.txt
# Run with a device graph
unilab --graph <graph.json> --config <config.py> --backend ros
unilab --graph <graph.json> --config <config.py> --backend simple # no ROS2 needed
# Common CLI flags
unilab --app_bridges websocket fastapi # communication bridges
unilab --test_mode # simulate hardware, no real execution
unilab --check_mode # CI validation of registry imports (AST-based)
unilab --skip_env_check # skip auto-install of dependencies
unilab --visual rviz|web|disable # visualization mode
unilab --is_slave # run as slave node
unilab --restart_mode # auto-restart on config changes (supervisor/child process)
# Workflow upload subcommand
unilab workflow_upload -f <workflow.json> -n <name> --tags tag1 tag2
# Tests
pytest tests/ # all tests
pytest tests/resources/test_resourcetreeset.py # single test file
pytest tests/resources/test_resourcetreeset.py::TestClassName::test_method # single test
# CI check (matches .github/workflows/ci-check.yml)
python -m unilabos --check_mode --skip_env_check
```
## Architecture
### Startup Flow
`unilab` CLI (entry point in `setup.py`) → `unilabos/app/main.py:main()` → loads config → builds registry → reads device graph (JSON/GraphML) → starts backend thread (ROS2/simple) → starts FastAPI web server + WebSocket client.
### Core Layers
**Registry** (`unilabos/registry/`): Singleton `Registry` class discovers and catalogs all device types, resource types, and communication devices. Two registration mechanisms: YAML definitions in `registry/devices/*.yaml` and Python decorators (`@device`, `@action`, `@resource` in `registry/decorators.py`). AST scanning discovers decorated classes without importing them. Class paths resolved to Python classes via `utils/import_manager.py`.
**Resource Tracking** (`unilabos/resources/resource_tracker.py`): Pydantic-based `ResourceDict``ResourceDictInstance``ResourceTreeSet` hierarchy. `ResourceTreeSet` is the canonical in-memory representation of all devices and resources. Graph I/O in `resources/graphio.py` reads JSON/GraphML device topology files into `nx.Graph` + `ResourceTreeSet`.
**Device Drivers** (`unilabos/devices/`): 30+ hardware drivers organized by category (liquid_handling, hplc, balance, arm, etc.). Each driver class gets wrapped by `ros/device_node_wrapper.py:ros2_device_node()` into a `ROS2DeviceNode` (defined in `ros/nodes/base_device_node.py`) with publishers, subscribers, and action servers.
**ROS2 Layer** (`unilabos/ros/`): Preset node types in `ros/nodes/presets/``host_node` (main orchestrator, ~90KB), `controller_node`, `workstation`, `serial_node`, `camera`, `resource_mesh_manager`. Custom messages in `unilabos_msgs/` (80+ action types, pre-built via conda `ros-humble-unilabos-msgs`).
**Protocol Compilation** (`unilabos/compile/`): 20+ protocol compilers (add, centrifuge, dissolve, filter, heatchill, stir, pump, etc.) registered in `__init__.py:action_protocol_generators` dict. Utility parsers in `compile/utils/` (vessel, unit, logger).
**Workflow** (`unilabos/workflow/`): Converts workflow definitions from multiple formats — JSON (`convert_from_json.py`, `common.py`), Python scripts (`from_python_script.py`), XDL (`from_xdl.py`) — into executable `WorkflowGraph`. Legacy converters in `workflow/legacy/`.
**Communication** (`unilabos/device_comms/`): Hardware adapters — OPC-UA, Modbus PLC, RPC, universal driver. `app/communication.py` provides factory pattern for WebSocket connections.
**Web/API** (`unilabos/app/web/`): FastAPI server with REST API (`api.py`), Jinja2 templates (`pages.py`), HTTP client (`client.py`). Default port 8002.
### Configuration System
- **Config classes** in `unilabos/config/config.py`: `BasicConfig`, `WSConfig`, `HTTPConfig`, `ROSConfig` — class-level attributes, loaded from Python `.py` config files (see `config/example_config.py`)
- Environment variable overrides with prefix `UNILABOS_` (e.g., `UNILABOS_BASICCONFIG_PORT=9000`)
- Device topology defined in graph files (JSON node-link format or GraphML)
### Key Data Flow
1. Graph file → `graphio.read_node_link_json()``(nx.Graph, ResourceTreeSet, resource_links)`
2. `ResourceTreeSet` + `Registry``initialize_device.initialize_device_from_dict()``ROS2DeviceNode` instances
3. Device nodes communicate via ROS2 topics/actions or direct Python calls (simple backend)
4. Cloud sync via WebSocket (`app/ws_client.py`) and HTTP (`app/web/client.py`)
### Test Data
Example device graphs and experiment configs are in `unilabos/test/experiments/` (not `tests/`). Registry test fixtures in `unilabos/test/registry/`.
## Code Conventions
- Code comments and log messages in **simplified Chinese**
- Python 3.11+, type hints expected
- Pydantic models for data validation (`resource_tracker.py`)
- Singleton pattern via `@singleton` decorator (`utils/decorator.py`)
- Dynamic class loading via `utils/import_manager.py` — device classes resolved at runtime from registry YAML paths
- CLI argument dashes auto-converted to underscores for consistency
- No linter/formatter configuration enforced (no ruff, black, flake8, mypy configs)
- Documentation built with Sphinx (Chinese language, `sphinx_rtd_theme`, `myst_parser`)
## Licensing
- Framework code: GPL-3.0
- Device drivers (`unilabos/devices/`): DP Technology Proprietary License — do not redistribute

539
test/devices/test_prcxi.py Normal file
View File

@@ -0,0 +1,539 @@
import pytest
import json
import os
import asyncio
import collections
from typing import List, Dict, Any
from pylabrobot.resources import Coordinate
from pylabrobot.resources.opentrons.tip_racks import opentrons_96_tiprack_300ul, opentrons_96_tiprack_10ul
from pylabrobot.resources.opentrons.plates import corning_96_wellplate_360ul_flat, nest_96_wellplate_2ml_deep
from unilabos.devices.liquid_handling.prcxi.prcxi import (
PRCXI9300Deck,
PRCXI9300Container,
PRCXI9300Trash,
PRCXI9300Handler,
PRCXI9300Backend,
DefaultLayout,
Material,
WorkTablets,
MatrixInfo
)
@pytest.fixture
def prcxi_materials() -> Dict[str, Any]:
"""加载 PRCXI 物料数据"""
print("加载 PRCXI 物料数据...")
material_path = os.path.join(os.path.dirname(__file__), "..", "..", "unilabos", "devices", "liquid_handling", "prcxi", "prcxi_material.json")
with open(material_path, "r", encoding="utf-8") as f:
data = json.load(f)
print(f"加载了 {len(data)} 条物料数据")
return data
@pytest.fixture
def prcxi_9300_deck() -> PRCXI9300Deck:
"""创建 PRCXI 9300 工作台"""
return PRCXI9300Deck(name="PRCXI_Deck_9300", size_x=100, size_y=100, size_z=100, model="9300")
@pytest.fixture
def prcxi_9320_deck() -> PRCXI9300Deck:
"""创建 PRCXI 9320 工作台"""
return PRCXI9300Deck(name="PRCXI_Deck_9320", size_x=100, size_y=100, size_z=100, model="9320")
@pytest.fixture
def prcxi_9300_handler(prcxi_9300_deck) -> PRCXI9300Handler:
"""创建 PRCXI 9300 处理器(模拟模式)"""
return PRCXI9300Handler(
deck=prcxi_9300_deck,
host="192.168.1.201",
port=9999,
timeout=10.0,
channel_num=8,
axis="Left",
setup=False,
debug=True,
simulator=True,
matrix_id="test-matrix-9300"
)
@pytest.fixture
def prcxi_9320_handler(prcxi_9320_deck) -> PRCXI9300Handler:
"""创建 PRCXI 9320 处理器(模拟模式)"""
return PRCXI9300Handler(
deck=prcxi_9320_deck,
host="192.168.1.201",
port=9999,
timeout=10.0,
channel_num=1,
axis="Right",
setup=False,
debug=True,
simulator=True,
matrix_id="test-matrix-9320",
is_9320=True
)
@pytest.fixture
def tip_rack_300ul(prcxi_materials) -> PRCXI9300Container:
"""创建 300μL 枪头盒"""
tip_rack = PRCXI9300Container(
name="tip_rack_300ul",
size_x=50,
size_y=50,
size_z=10,
category="tip_rack",
ordering=collections.OrderedDict()
)
tip_rack.load_state({
"Material": {
"uuid": prcxi_materials["300μL Tip头"]["uuid"],
"Code": "ZX-001-300",
"Name": "300μL Tip头"
}
})
return tip_rack
@pytest.fixture
def tip_rack_10ul(prcxi_materials) -> PRCXI9300Container:
"""创建 10μL 枪头盒"""
tip_rack = PRCXI9300Container(
name="tip_rack_10ul",
size_x=50,
size_y=50,
size_z=10,
category="tip_rack",
ordering=collections.OrderedDict()
)
tip_rack.load_state({
"Material": {
"uuid": prcxi_materials["10μL加长 Tip头"]["uuid"],
"Code": "ZX-001-10+",
"Name": "10μL加长 Tip头"
}
})
return tip_rack
@pytest.fixture
def well_plate_96(prcxi_materials) -> PRCXI9300Container:
"""创建 96 孔板"""
plate = PRCXI9300Container(
name="well_plate_96",
size_x=50,
size_y=50,
size_z=10,
category="plate",
ordering=collections.OrderedDict()
)
plate.load_state({
"Material": {
"uuid": prcxi_materials["96深孔板"]["uuid"],
"Code": "ZX-019-2.2",
"Name": "96深孔板"
}
})
return plate
@pytest.fixture
def deep_well_plate(prcxi_materials) -> PRCXI9300Container:
"""创建深孔板"""
plate = PRCXI9300Container(
name="deep_well_plate",
size_x=50,
size_y=50,
size_z=10,
category="plate",
ordering=collections.OrderedDict()
)
plate.load_state({
"Material": {
"uuid": prcxi_materials["96深孔板"]["uuid"],
"Code": "ZX-019-2.2",
"Name": "96深孔板"
}
})
return plate
@pytest.fixture
def trash_container(prcxi_materials) -> PRCXI9300Trash:
"""创建垃圾桶"""
trash = PRCXI9300Trash(name="trash", size_x=50, size_y=50, size_z=10, category="trash")
trash.load_state({
"Material": {
"uuid": prcxi_materials["废弃槽"]["uuid"]
}
})
return trash
@pytest.fixture
def default_layout_9300() -> DefaultLayout:
"""创建 PRCXI 9300 默认布局"""
return DefaultLayout("PRCXI9300")
@pytest.fixture
def default_layout_9320() -> DefaultLayout:
"""创建 PRCXI 9320 默认布局"""
return DefaultLayout("PRCXI9320")
class TestPRCXIDeckSetup:
"""测试 PRCXI 工作台设置功能"""
def test_prcxi_9300_deck_creation(self, prcxi_9300_deck):
"""测试 PRCXI 9300 工作台创建"""
assert prcxi_9300_deck.name == "PRCXI_Deck_9300"
assert len(prcxi_9300_deck.sites) == 6
assert prcxi_9300_deck._size_x == 100
assert prcxi_9300_deck._size_y == 100
assert prcxi_9300_deck._size_z == 100
def test_prcxi_9320_deck_creation(self, prcxi_9320_deck):
"""测试 PRCXI 9320 工作台创建"""
assert prcxi_9320_deck.name == "PRCXI_Deck_9320"
assert len(prcxi_9320_deck.sites) == 16
assert prcxi_9320_deck._size_x == 100
assert prcxi_9320_deck._size_y == 100
assert prcxi_9320_deck._size_z == 100
def test_container_assignment(self, prcxi_9300_deck, tip_rack_300ul, well_plate_96, trash_container):
"""测试容器分配到工作台"""
# 分配枪头盒
prcxi_9300_deck.assign_child_resource(tip_rack_300ul, location=Coordinate(0, 0, 0))
assert tip_rack_300ul in prcxi_9300_deck.children
# 分配孔板
prcxi_9300_deck.assign_child_resource(well_plate_96, location=Coordinate(0, 0, 0))
assert well_plate_96 in prcxi_9300_deck.children
# 分配垃圾桶
prcxi_9300_deck.assign_child_resource(trash_container, location=Coordinate(0, 0, 0))
assert trash_container in prcxi_9300_deck.children
def test_container_material_loading(self, tip_rack_300ul, well_plate_96, prcxi_materials):
"""测试容器物料信息加载"""
# 测试枪头盒物料信息
tip_material = tip_rack_300ul._unilabos_state["Material"]
assert tip_material["uuid"] == prcxi_materials["300μL Tip头"]["uuid"]
assert tip_material["Name"] == "300μL Tip头"
# 测试孔板物料信息
plate_material = well_plate_96._unilabos_state["Material"]
assert plate_material["uuid"] == prcxi_materials["96深孔板"]["uuid"]
assert plate_material["Name"] == "96深孔板"
class TestPRCXISingleStepOperations:
"""测试 PRCXI 单步操作功能"""
@pytest.mark.asyncio
async def test_pick_up_tips_single_channel(self, prcxi_9320_handler, prcxi_9320_deck, tip_rack_10ul):
"""测试单通道拾取枪头"""
# 将枪头盒添加到工作台
prcxi_9320_deck.assign_child_resource(tip_rack_10ul, location=Coordinate(0, 0, 0))
# 初始化处理器
await prcxi_9320_handler.setup()
# 设置枪头盒
prcxi_9320_handler.set_tiprack([tip_rack_10ul])
# 创建模拟的枪头位置
from pylabrobot.resources import TipSpot, Tip
tip = Tip(has_filter=False, total_tip_length=10, maximal_volume=10, fitting_depth=5)
tip_spot = TipSpot("A1", size_x=1, size_y=1, size_z=1, make_tip=lambda: tip)
tip_rack_10ul.assign_child_resource(tip_spot, location=Coordinate(0, 0, 0))
# 直接测试后端方法
from pylabrobot.liquid_handling import Pickup
pickup = Pickup(resource=tip_spot, offset=None, tip=tip)
await prcxi_9320_handler._unilabos_backend.pick_up_tips([pickup], [0])
# 验证步骤已添加到待办列表
assert len(prcxi_9320_handler._unilabos_backend.steps_todo_list) == 1
step = prcxi_9320_handler._unilabos_backend.steps_todo_list[0]
assert step["Function"] == "Load"
@pytest.mark.asyncio
async def test_pick_up_tips_multi_channel(self, prcxi_9300_handler, tip_rack_300ul):
"""测试多通道拾取枪头"""
# 设置枪头盒
prcxi_9300_handler.set_tiprack([tip_rack_300ul])
# 拾取8个枪头
tip_spots = tip_rack_300ul.children[:8]
await prcxi_9300_handler.pick_up_tips(tip_spots, [0, 1, 2, 3, 4, 5, 6, 7])
# 验证步骤已添加到待办列表
assert len(prcxi_9300_handler._unilabos_backend.steps_todo_list) == 1
step = prcxi_9300_handler._unilabos_backend.steps_todo_list[0]
assert step["Function"] == "Load"
@pytest.mark.asyncio
async def test_aspirate_single_channel(self, prcxi_9320_handler, well_plate_96):
"""测试单通道吸取液体"""
# 设置液体
well = well_plate_96.get_item("A1")
prcxi_9320_handler.set_liquid([well], ["water"], [50])
# 吸取液体
await prcxi_9320_handler.aspirate([well], [50], [0])
# 验证步骤已添加到待办列表
assert len(prcxi_9320_handler._unilabos_backend.steps_todo_list) == 1
step = prcxi_9320_handler._unilabos_backend.steps_todo_list[0]
assert step["Function"] == "Imbibing"
assert step["DosageNum"] == 50
@pytest.mark.asyncio
async def test_dispense_single_channel(self, prcxi_9320_handler, well_plate_96):
"""测试单通道分配液体"""
# 分配液体
well = well_plate_96.get_item("A1")
await prcxi_9320_handler.dispense([well], [25], [0])
# 验证步骤已添加到待办列表
assert len(prcxi_9320_handler._unilabos_backend.steps_todo_list) == 1
step = prcxi_9320_handler._unilabos_backend.steps_todo_list[0]
assert step["Function"] == "Tapping"
assert step["DosageNum"] == 25
@pytest.mark.asyncio
async def test_mix_single_channel(self, prcxi_9320_handler, well_plate_96):
"""测试单通道混合液体"""
# 混合液体
well = well_plate_96.get_item("A1")
await prcxi_9320_handler.mix([well], mix_time=3, mix_vol=50)
# 验证步骤已添加到待办列表
assert len(prcxi_9320_handler._unilabos_backend.steps_todo_list) == 1
step = prcxi_9320_handler._unilabos_backend.steps_todo_list[0]
assert step["Function"] == "Blending"
assert step["BlendingTimes"] == 3
assert step["DosageNum"] == 50
@pytest.mark.asyncio
async def test_drop_tips_to_trash(self, prcxi_9320_handler, trash_container):
"""测试丢弃枪头到垃圾桶"""
# 丢弃枪头
await prcxi_9320_handler.drop_tips([trash_container], [0])
# 验证步骤已添加到待办列表
assert len(prcxi_9320_handler._unilabos_backend.steps_todo_list) == 1
step = prcxi_9320_handler._unilabos_backend.steps_todo_list[0]
assert step["Function"] == "UnLoad"
@pytest.mark.asyncio
async def test_discard_tips(self, prcxi_9320_handler):
"""测试丢弃枪头"""
# 丢弃枪头
await prcxi_9320_handler.discard_tips([0])
# 验证步骤已添加到待办列表
assert len(prcxi_9320_handler._unilabos_backend.steps_todo_list) == 1
step = prcxi_9320_handler._unilabos_backend.steps_todo_list[0]
assert step["Function"] == "UnLoad"
@pytest.mark.asyncio
async def test_liquid_transfer_workflow(self, prcxi_9320_handler, tip_rack_10ul, well_plate_96):
"""测试完整的液体转移工作流程"""
# 设置枪头盒和液体
prcxi_9320_handler.set_tiprack([tip_rack_10ul])
source_well = well_plate_96.get_item("A1")
target_well = well_plate_96.get_item("B1")
prcxi_9320_handler.set_liquid([source_well], ["water"], [100])
# 创建协议
await prcxi_9320_handler.create_protocol(protocol_name="Test Transfer Protocol")
# 执行转移流程
tip_spot = tip_rack_10ul.get_item("A1")
await prcxi_9320_handler.pick_up_tips([tip_spot], [0])
await prcxi_9320_handler.aspirate([source_well], [50], [0])
await prcxi_9320_handler.dispense([target_well], [50], [0])
await prcxi_9320_handler.discard_tips([0])
# 验证所有步骤都已添加
assert len(prcxi_9320_handler._unilabos_backend.steps_todo_list) == 4
functions = [step["Function"] for step in prcxi_9320_handler._unilabos_backend.steps_todo_list]
assert functions == ["Load", "Imbibing", "Tapping", "UnLoad"]
class TestPRCXILayoutRecommendation:
"""测试 PRCXI 板位推荐功能"""
def test_9300_layout_creation(self, default_layout_9300):
"""测试 PRCXI 9300 布局创建"""
layout_info = default_layout_9300.get_layout()
assert layout_info["rows"] == 2
assert layout_info["columns"] == 3
assert len(layout_info["layout"]) == 6
assert layout_info["trash_slot"] == 6
assert "waste_liquid_slot" not in layout_info
def test_9320_layout_creation(self, default_layout_9320):
"""测试 PRCXI 9320 布局创建"""
layout_info = default_layout_9320.get_layout()
assert layout_info["rows"] == 4
assert layout_info["columns"] == 4
assert len(layout_info["layout"]) == 16
assert layout_info["trash_slot"] == 16
assert layout_info["waste_liquid_slot"] == 12
def test_layout_recommendation_9320(self, default_layout_9320, prcxi_materials):
"""测试 PRCXI 9320 板位推荐功能"""
# 添加物料信息
default_layout_9320.add_lab_resource(prcxi_materials)
# 推荐布局
needs = [
("reagent_1", "96 细胞培养皿", 3),
("reagent_2", "12道储液槽", 1),
("reagent_3", "200μL Tip头", 7),
("reagent_4", "10μL加长 Tip头", 1),
]
matrix_layout, layout_list = default_layout_9320.recommend_layout(needs)
# 验证返回结果
assert "MatrixId" in matrix_layout
assert "MatrixName" in matrix_layout
assert "MatrixCount" in matrix_layout
assert "WorkTablets" in matrix_layout
assert len(layout_list) == 12 # 3+1+7+1 = 12个位置
# 验证推荐的位置不包含预留位置
reserved_positions = {12, 16}
recommended_positions = [item["positions"] for item in layout_list]
for pos in recommended_positions:
assert pos not in reserved_positions
def test_layout_recommendation_insufficient_space(self, default_layout_9320, prcxi_materials):
"""测试板位推荐空间不足的情况"""
# 添加物料信息
default_layout_9320.add_lab_resource(prcxi_materials)
# 尝试推荐超过可用空间的布局
needs = [
("reagent_1", "96 细胞培养皿", 15), # 需要15个位置但只有14个可用
]
with pytest.raises(ValueError, match="需要 .* 个位置,但只有 .* 个可用位置"):
default_layout_9320.recommend_layout(needs)
def test_layout_recommendation_material_not_found(self, default_layout_9320, prcxi_materials):
"""测试板位推荐物料不存在的情况"""
# 添加物料信息
default_layout_9320.add_lab_resource(prcxi_materials)
# 尝试推荐不存在的物料
needs = [
("reagent_1", "不存在的物料", 1),
]
with pytest.raises(ValueError, match="Material .* not found in lab resources"):
default_layout_9320.recommend_layout(needs)
class TestPRCXIBackendOperations:
"""测试 PRCXI 后端操作功能"""
def test_backend_initialization(self, prcxi_9300_handler):
"""测试后端初始化"""
backend = prcxi_9300_handler._unilabos_backend
assert isinstance(backend, PRCXI9300Backend)
assert backend._num_channels == 8
assert backend.debug is True
def test_protocol_creation(self, prcxi_9300_handler):
"""测试协议创建"""
backend = prcxi_9300_handler._unilabos_backend
backend.create_protocol("Test Protocol")
assert backend.protocol_name == "Test Protocol"
assert len(backend.steps_todo_list) == 0
def test_channel_validation(self):
"""测试通道验证"""
# 测试正确的8通道配置
valid_channels = [0, 1, 2, 3, 4, 5, 6, 7]
result = PRCXI9300Backend.check_channels(valid_channels)
assert result == valid_channels
# 测试错误的通道配置
invalid_channels = [0, 1, 2, 3]
result = PRCXI9300Backend.check_channels(invalid_channels)
assert result == [0, 1, 2, 3, 4, 5, 6, 7]
def test_matrix_info_creation(self, prcxi_9300_handler):
"""测试矩阵信息创建"""
backend = prcxi_9300_handler._unilabos_backend
backend.create_protocol("Test Protocol")
# 模拟运行协议时的矩阵信息创建
run_time = 1234567890
matrix_info = MatrixInfo(
MatrixId=f"{int(run_time)}",
MatrixName=f"protocol_{run_time}",
MatrixCount=len(backend.tablets_info),
WorkTablets=backend.tablets_info,
)
assert matrix_info["MatrixId"] == str(int(run_time))
assert matrix_info["MatrixName"] == f"protocol_{run_time}"
assert "WorkTablets" in matrix_info
class TestPRCXIContainerOperations:
"""测试 PRCXI 容器操作功能"""
def test_container_serialization(self, tip_rack_300ul):
"""测试容器序列化"""
serialized = tip_rack_300ul.serialize_state()
assert "Material" in serialized
assert serialized["Material"]["Name"] == "300μL Tip头"
def test_container_deserialization(self, tip_rack_300ul):
"""测试容器反序列化"""
# 序列化
serialized = tip_rack_300ul.serialize_state()
# 创建新容器并反序列化
new_tip_rack = PRCXI9300Container(
name="new_tip_rack",
size_x=50,
size_y=50,
size_z=10,
category="tip_rack",
ordering=collections.OrderedDict()
)
new_tip_rack.load_state(serialized)
assert new_tip_rack._unilabos_state["Material"]["Name"] == "300μL Tip头"
def test_trash_container_creation(self, prcxi_materials):
"""测试垃圾桶容器创建"""
trash = PRCXI9300Trash(name="trash", size_x=50, size_y=50, size_z=10, category="trash")
trash.load_state({
"Material": {
"uuid": prcxi_materials["废弃槽"]["uuid"]
}
})
assert trash.name == "trash"
assert trash._unilabos_state["Material"]["uuid"] == prcxi_materials["废弃槽"]["uuid"]
if __name__ == "__main__":
# 运行测试
pytest.main([__file__, "-v"])

View File

@@ -215,17 +215,38 @@ class LiquidHandlerMiddleware(LiquidHandler):
if spread == "":
spread = "custom"
for res in resources:
for i, res in enumerate(resources):
tracker = getattr(res, "tracker", None)
if tracker is None or getattr(tracker, "is_disabled", False):
continue
history = getattr(tracker, "liquid_history", None)
if tracker.get_used_volume() <= 0 and isinstance(history, list) and len(history) == 0:
fill_vol = tracker.max_volume if tracker.max_volume > 0 else 50000
need = float(vols[i]) if i < len(vols) else 0.0
if blow_out_air_volume and i < len(blow_out_air_volume) and blow_out_air_volume[i] is not None:
need += float(blow_out_air_volume[i] or 0.0)
if need <= 0:
continue
try:
used = float(tracker.get_used_volume())
except Exception:
used = 0.0
if used >= need:
continue
mv = float(getattr(tracker, "max_volume", 0) or 0)
if used <= 0:
# 与旧逻辑一致:空孔优先加满(或极大默认),避免仅有 history 记录但 used=0 时不补液
fill_vol = mv if mv > 0 else max(need, 50000.0)
else:
fill_vol = need - used
if mv > 0:
fill_vol = min(fill_vol, max(0.0, mv - used))
try:
tracker.add_liquid(fill_vol)
except Exception:
try:
tracker.add_liquid(fill_vol)
tracker.add_liquid(max(need - used, 1.0))
except Exception:
tracker.liquid_history.append(("auto_init", fill_vol))
history = getattr(tracker, "liquid_history", None)
if isinstance(history, list):
history.append(("auto_init", max(fill_vol, need, 1.0)))
if self._simulator:
try:
@@ -277,6 +298,37 @@ class LiquidHandlerMiddleware(LiquidHandler):
spread,
**backend_kwargs,
)
except (TooLittleLiquidError, TooLittleVolumeError) as e:
tracker_info = []
for r in resources:
t = getattr(r, "tracker", None)
if t is None:
tracker_info.append(f"{r.name}(no_tracker)")
else:
try:
tracker_info.append(
f"{r.name}(used={t.get_used_volume():.1f}, "
f"free={t.get_free_volume():.1f}, max={getattr(r, 'max_volume', '?')})"
)
except Exception:
tracker_info.append(f"{r.name}(tracker_err)")
if hasattr(self, "_ros_node") and self._ros_node is not None:
self._ros_node.lab_logger().warning(
f"[aspirate] hardware tracker shortfall, retry without volume tracking. "
f"error={e}, vols={vols}, trackers={tracker_info}"
)
with no_volume_tracking():
await super().aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
except ValueError as e:
if "Resource is too small to space channels" in str(e) and spread != "custom":
await super().aspirate(
@@ -1628,12 +1680,12 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
source_tracker.disable()
await self.aspirate(
resources=[sources[0]],
vols=[blow_out_air_volume_before_vol],
vols=[0],
use_channels=use_channels,
flow_rates=None,
offsets=[Coordinate(x=0, y=0, z=sources[0].get_size_z())],
liquid_height=None,
blow_out_air_volume=None,
blow_out_air_volume=[blow_out_air_volume_before_vol],
spread="custom",
)
finally:
@@ -1660,9 +1712,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
use_channels=use_channels,
flow_rates=[dis_flow_rates[0]] if dis_flow_rates and len(dis_flow_rates) > 0 else None,
offsets=[offsets[0]] if offsets and len(offsets) > 0 else None,
blow_out_air_volume=(
[blow_out_air_volume_vol] if blow_out_air_volume_vol > 0 else None
),
blow_out_air_volume=[blow_out_air_volume_vol+blow_out_air_volume_before_vol],
liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None,
spread=spread,
)

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
from typing import Optional
from typing import Any, Callable, Dict, List, Optional, Tuple
from pylabrobot.resources import Tube, Coordinate
from pylabrobot.resources.well import Well, WellBottomType, CrossSectionType
from pylabrobot.resources.tip import Tip, TipCreator
@@ -838,4 +838,102 @@ def PRCXI_30mm_Adapter(name: str) -> PRCXI9300PlateAdapter:
"Name": "30mm适配器",
"SupplyType": 2
}
)
)
# ---------------------------------------------------------------------------
# 协议上传 / workflow 用:与设备端耗材字典字段对齐的模板描述(供 common 自动匹配)
# ---------------------------------------------------------------------------
_PRCXI_TEMPLATE_SPECS_CACHE: Optional[List[Dict[str, Any]]] = None
def _probe_prcxi_resource(factory: Callable[..., Any]) -> Any:
probe = "__unilab_template_probe__"
if factory.__name__ == "PRCXI_trash":
return factory()
return factory(probe)
def _first_child_capacity_for_match(resource: Any) -> float:
"""Well max_volume 或 Tip 的 maximal_volume用于与设备端 Volume 类似的打分。"""
ch = getattr(resource, "children", None) or []
if not ch:
return 0.0
c0 = ch[0]
mv = getattr(c0, "max_volume", None)
if mv is not None:
return float(mv)
tip = getattr(c0, "tip", None)
if tip is not None:
mv2 = getattr(tip, "maximal_volume", None)
if mv2 is not None:
return float(mv2)
return 0.0
# (factory, kind) — 不含各类 Adapter避免与真实板子误匹配
PRCXI_TEMPLATE_FACTORY_KINDS: List[Tuple[Callable[..., Any], str]] = [
(PRCXI_BioER_96_wellplate, "plate"),
(PRCXI_nest_1_troughplate, "plate"),
(PRCXI_BioRad_384_wellplate, "plate"),
(PRCXI_AGenBio_4_troughplate, "plate"),
(PRCXI_nest_12_troughplate, "plate"),
(PRCXI_CellTreat_96_wellplate, "plate"),
(PRCXI_10ul_eTips, "tip_rack"),
(PRCXI_300ul_Tips, "tip_rack"),
(PRCXI_PCR_Plate_200uL_nonskirted, "plate"),
(PRCXI_PCR_Plate_200uL_semiskirted, "plate"),
(PRCXI_PCR_Plate_200uL_skirted, "plate"),
(PRCXI_trash, "trash"),
(PRCXI_96_DeepWell, "plate"),
(PRCXI_EP_Adapter, "tube_rack"),
(PRCXI_1250uL_Tips, "tip_rack"),
(PRCXI_10uL_Tips, "tip_rack"),
(PRCXI_1000uL_Tips, "tip_rack"),
(PRCXI_200uL_Tips, "tip_rack"),
(PRCXI_48_DeepWell, "plate"),
]
def get_prcxi_labware_template_specs() -> List[Dict[str, Any]]:
"""返回与 ``prcxi._match_and_create_matrix`` 中耗材字段兼容的模板列表,用于按孔数+容量打分。"""
global _PRCXI_TEMPLATE_SPECS_CACHE
if _PRCXI_TEMPLATE_SPECS_CACHE is not None:
return _PRCXI_TEMPLATE_SPECS_CACHE
out: List[Dict[str, Any]] = []
for factory, kind in PRCXI_TEMPLATE_FACTORY_KINDS:
try:
r = _probe_prcxi_resource(factory)
except Exception:
continue
nx = int(getattr(r, "num_items_x", None) or 0)
ny = int(getattr(r, "num_items_y", None) or 0)
nchild = len(getattr(r, "children", []) or [])
hole_count = nx * ny if nx > 0 and ny > 0 else nchild
hole_row = ny if nx > 0 and ny > 0 else 0
hole_col = nx if nx > 0 and ny > 0 else 0
mi = getattr(r, "material_info", None) or {}
vol = _first_child_capacity_for_match(r)
menum = mi.get("materialEnum")
if menum is None and kind == "tip_rack":
menum = 1
elif menum is None and kind == "trash":
menum = 6
out.append(
{
"class_name": factory.__name__,
"kind": kind,
"materialEnum": menum,
"HoleRow": hole_row,
"HoleColum": hole_col,
"Volume": vol,
"hole_count": hole_count,
"material_uuid": mi.get("uuid"),
"material_code": mi.get("Code"),
}
)
_PRCXI_TEMPLATE_SPECS_CACHE = out
return out

View File

@@ -0,0 +1,150 @@
from typing import Any, Dict, Optional
from .prcxi import PRCXI9300ModuleSite
class PRCXI9300FunctionalModule(PRCXI9300ModuleSite):
"""
PRCXI 9300 功能模块基类(加热/冷却/震荡/加热震荡/磁吸等)。
设计目标:
- 作为一个可以在工作台上拖拽摆放的实体资源(继承自 PRCXI9300ModuleSite -> ItemizedCarrier
- 顶面存在一个站点site可吸附标准板类资源plate / tip_rack / tube_rack 等)。
- 支持注入 `material_info` (UUID 等),并且在 serialize_state 时做安全过滤。
"""
def __init__(
self,
name: str,
size_x: float,
size_y: float,
size_z: float,
module_type: Optional[str] = None,
category: str = "module",
model: Optional[str] = None,
material_info: Optional[Dict[str, Any]] = None,
**kwargs: Any,
):
super().__init__(
name=name,
size_x=size_x,
size_y=size_y,
size_z=size_z,
material_info=material_info,
model=model,
category=category,
**kwargs,
)
# 记录模块类型(加热 / 冷却 / 震荡 / 加热震荡 / 磁吸)
self.module_type = module_type or "generic"
# 与 PRCXI9300PlateAdapter 一致,使用 _unilabos_state 保存扩展信息
if not hasattr(self, "_unilabos_state") or self._unilabos_state is None:
self._unilabos_state = {}
# super().__init__ 已经在有 material_info 时写入 "Material",这里仅确保存在
if material_info is not None and "Material" not in self._unilabos_state:
self._unilabos_state["Material"] = material_info
# 额外标记 category 和模块类型,便于前端或上层逻辑区分
self._unilabos_state.setdefault("category", category)
self._unilabos_state["module_type"] = module_type
# ============================================================================
# 具体功能模块定义
# 这里的尺寸和 material_info 目前为占位参数,后续可根据实际测量/JSON 配置进行更新。
# 顶面站点尺寸与模块外形一致,保证可以吸附标准 96 板/储液槽等。
# ============================================================================
def PRCXI_Heating_Module(name: str) -> PRCXI9300FunctionalModule:
"""加热模块(顶面可吸附标准板)。"""
return PRCXI9300FunctionalModule(
name=name,
size_x=127.76,
size_y=85.48,
size_z=40.0,
module_type="heating",
model="PRCXI_Heating_Module",
material_info={
"uuid": "TODO-HEATING-MODULE-UUID",
"Code": "HEAT-MOD",
"Name": "PRCXI 加热模块",
"SupplyType": 3,
},
)
def PRCXI_MetalCooling_Module(name: str) -> PRCXI9300FunctionalModule:
"""金属冷却模块(顶面可吸附标准板)。"""
return PRCXI9300FunctionalModule(
name=name,
size_x=127.76,
size_y=85.48,
size_z=40.0,
module_type="metal_cooling",
model="PRCXI_MetalCooling_Module",
material_info={
"uuid": "TODO-METAL-COOLING-MODULE-UUID",
"Code": "METAL-COOL-MOD",
"Name": "PRCXI 金属冷却模块",
"SupplyType": 3,
},
)
def PRCXI_Shaking_Module(name: str) -> PRCXI9300FunctionalModule:
"""震荡模块(顶面可吸附标准板)。"""
return PRCXI9300FunctionalModule(
name=name,
size_x=127.76,
size_y=85.48,
size_z=50.0,
module_type="shaking",
model="PRCXI_Shaking_Module",
material_info={
"uuid": "TODO-SHAKING-MODULE-UUID",
"Code": "SHAKE-MOD",
"Name": "PRCXI 震荡模块",
"SupplyType": 3,
},
)
def PRCXI_Heating_Shaking_Module(name: str) -> PRCXI9300FunctionalModule:
"""加热震荡模块(顶面可吸附标准板)。"""
return PRCXI9300FunctionalModule(
name=name,
size_x=127.76,
size_y=85.48,
size_z=55.0,
module_type="heating_shaking",
model="PRCXI_Heating_Shaking_Module",
material_info={
"uuid": "TODO-HEATING-SHAKING-MODULE-UUID",
"Code": "HEAT-SHAKE-MOD",
"Name": "PRCXI 加热震荡模块",
"SupplyType": 3,
},
)
def PRCXI_Magnetic_Module(name: str) -> PRCXI9300FunctionalModule:
"""磁吸模块(顶面可吸附标准板)。"""
return PRCXI9300FunctionalModule(
name=name,
size_x=127.76,
size_y=85.48,
size_z=30.0,
module_type="magnetic",
model="PRCXI_Magnetic_Module",
material_info={
"uuid": "TODO-MAGNETIC-MODULE-UUID",
"Code": "MAG-MOD",
"Name": "PRCXI 磁吸模块",
"SupplyType": 3,
},
)

View File

@@ -7779,7 +7779,8 @@ liquid_handler.prcxi:
auto-run_protocol:
feedback: {}
goal: {}
goal_default: {}
goal_default:
protocol_id: null
handles: {}
placeholder_keys: {}
result: {}
@@ -7788,7 +7789,9 @@ liquid_handler.prcxi:
properties:
feedback: {}
goal:
properties: {}
properties:
protocol_id:
type: string
required: []
type: object
result: {}

View File

@@ -0,0 +1,70 @@
PRCXI_Heating_Module:
category:
- prcxi
- modules
class:
module: unilabos.devices.liquid_handling.prcxi.prcxi_modules:PRCXI_Heating_Module
type: pylabrobot
description: '加热模块 (Code: HEAT-MOD)'
handles: []
icon: ''
init_param_schema: {}
registry_type: resource
version: 1.0.0
PRCXI_MetalCooling_Module:
category:
- prcxi
- modules
class:
module: unilabos.devices.liquid_handling.prcxi.prcxi_modules:PRCXI_MetalCooling_Module
type: pylabrobot
description: '金属冷却模块 (Code: METAL-COOL-MOD)'
handles: []
icon: ''
init_param_schema: {}
registry_type: resource
version: 1.0.0
PRCXI_Shaking_Module:
category:
- prcxi
- modules
class:
module: unilabos.devices.liquid_handling.prcxi.prcxi_modules:PRCXI_Shaking_Module
type: pylabrobot
description: '震荡模块 (Code: SHAKE-MOD)'
handles: []
icon: ''
init_param_schema: {}
registry_type: resource
version: 1.0.0
PRCXI_Heating_Shaking_Module:
category:
- prcxi
- modules
class:
module: unilabos.devices.liquid_handling.prcxi.prcxi_modules:PRCXI_Heating_Shaking_Module
type: pylabrobot
description: '加热震荡模块 (Code: HEAT-SHAKE-MOD)'
handles: []
icon: ''
init_param_schema: {}
registry_type: resource
version: 1.0.0
PRCXI_Magnetic_Module:
category:
- prcxi
- modules
class:
module: unilabos.devices.liquid_handling.prcxi.prcxi_modules:PRCXI_Magnetic_Module
type: pylabrobot
description: '磁吸模块 (Code: MAG-MOD)'
handles: []
icon: ''
init_param_schema: {}
registry_type: resource
version: 1.0.0

View File

@@ -9,6 +9,9 @@ def register():
from unilabos.devices.liquid_handling.prcxi.prcxi import PRCXI9300TipRack
from unilabos.devices.liquid_handling.prcxi.prcxi import PRCXI9300Trash
from unilabos.devices.liquid_handling.prcxi.prcxi import PRCXI9300TubeRack
from unilabos.devices.liquid_handling.prcxi.prcxi import PRCXI9300ModuleSite
# noinspection PyUnresolvedReferences
from unilabos.devices.liquid_handling.prcxi.prcxi_modules import PRCXI9300FunctionalModule
# noinspection PyUnresolvedReferences
from unilabos.devices.workstation.workstation_base import WorkStationContainer

View File

@@ -123,6 +123,24 @@ class ResourceDictType(TypedDict):
machine_name: str
class ResourceDictType(TypedDict):
id: str
uuid: str
name: str
description: str
resource_schema: Dict[str, Any]
model: Dict[str, Any]
icon: str
parent_uuid: Optional[str]
parent: Optional["ResourceDictType"]
type: Union[Literal["device"], str]
klass: str
pose: ResourceDictPositionType
config: Dict[str, Any]
data: Dict[str, Any]
extra: Dict[str, Any]
# 统一的资源字典模型parent 自动序列化为 parent_uuidchildren 不序列化
class ResourceDict(BaseModel):
id: str = Field(description="Resource ID")
@@ -441,6 +459,8 @@ class ResourceTreeSet(object):
"reagent_bottle": "reagent_bottle",
"flask": "flask",
"beaker": "beaker",
"module": "module",
"carrier": "carrier",
}
if source in replace_info:
return replace_info[source]
@@ -553,10 +573,17 @@ class ResourceTreeSet(object):
trees.append(tree_instance)
return cls(trees)
def to_plr_resources(self, skip_devices=True) -> List["PLRResource"]:
def to_plr_resources(
self, skip_devices: bool = True, requested_uuids: Optional[List[str]] = None
) -> List["PLRResource"]:
"""
将 ResourceTreeSet 转换为 PLR 资源列表
Args:
skip_devices: 是否跳过 device 类型节点
requested_uuids: 若指定,则按此 UUID 顺序返回对应资源(用于批量查询时一一对应),
否则返回各树的根节点列表
Returns:
List[PLRResource]: PLR 资源实例列表
"""
@@ -571,6 +598,8 @@ class ResourceTreeSet(object):
"deck": "Deck",
"container": "RegularContainer",
"tip_spot": "TipSpot",
"module": "PRCXI9300ModuleSite",
"carrier": "ItemizedCarrier",
}
def collect_node_data(node: ResourceDictInstance, name_to_uuid: dict, all_states: dict, name_to_extra: dict):
@@ -612,6 +641,71 @@ class ResourceTreeSet(object):
d["model"] = res.config.get("model", None)
return d
# deserialize 会单独处理的元数据 key不传给构造函数
_META_KEYS = {"type", "parent_name", "location", "children", "rotation", "barcode"}
# deserialize 自定义逻辑使用的 key如 TipSpot 用 prototype_tip 构建 make_tip需保留
_DESERIALIZE_PRESERVED_KEYS = {"prototype_tip"}
def remove_incompatible_params(plr_d: dict) -> None:
"""递归移除 PLR 类不接受的参数,避免 deserialize 报错。
- 移除构造函数不接受的参数(如 compute_height_from_volume、ordering、category
- 对 TubeRack将 ordering 转为 ordered_items
- 保留 deserialize 自定义逻辑需要的 key如 prototype_tip
"""
if "type" in plr_d:
sub_cls = find_subclass(plr_d["type"], PLRResource)
if sub_cls is not None:
spec = inspect.signature(sub_cls)
valid_params = set(spec.parameters.keys())
# TubeRack 特殊处理:先转换 ordering再参与后续过滤
if "ordering" not in valid_params and "ordering" in plr_d:
ordering = plr_d.pop("ordering", None)
if sub_cls.__name__ == "TubeRack":
plr_d["ordered_items"] = (
_ordering_to_ordered_items(plr_d, ordering)
if ordering
else {}
)
# 移除构造函数不接受的参数(保留 META 和 deserialize 自定义逻辑需要的 key
for key in list(plr_d.keys()):
if (
key not in _META_KEYS
and key not in _DESERIALIZE_PRESERVED_KEYS
and key not in valid_params
):
plr_d.pop(key, None)
for child in plr_d.get("children", []):
remove_incompatible_params(child)
def _ordering_to_ordered_items(plr_d: dict, ordering: dict) -> dict:
"""将 ordering 转为 ordered_items从 children 构建 Tube 对象"""
from pylabrobot.resources import Tube, Coordinate
from pylabrobot.serializer import deserialize as plr_deserialize
children = plr_d.get("children", [])
ordered_items = {}
for idx, (ident, child_name) in enumerate(ordering.items()):
child_data = children[idx] if idx < len(children) else None
if child_data is None:
continue
loc_data = child_data.get("location")
loc = (
plr_deserialize(loc_data)
if loc_data
else Coordinate(0, 0, 0)
)
tube = Tube(
name=child_data.get("name", child_name or ident),
size_x=child_data.get("size_x", 10),
size_y=child_data.get("size_y", 10),
size_z=child_data.get("size_z", 50),
max_volume=child_data.get("max_volume", 1000),
)
tube.location = loc
ordered_items[ident] = tube
plr_d["children"] = [] # 已并入 ordered_items避免重复反序列化
return ordered_items
plr_resources = []
tracker = DeviceNodeResourceTracker()
@@ -631,9 +725,7 @@ class ResourceTreeSet(object):
raise ValueError(
f"无法找到类型 {plr_dict['type']} 对应的 PLR 资源类。原始信息:{tree.root_node.res_content}"
)
spec = inspect.signature(sub_cls)
if "category" not in spec.parameters:
plr_dict.pop("category", None)
remove_incompatible_params(plr_dict)
plr_resource = sub_cls.deserialize(plr_dict, allow_marshal=True)
from pylabrobot.resources import Coordinate
from pylabrobot.serializer import deserialize
@@ -653,6 +745,18 @@ class ResourceTreeSet(object):
logger.error(f"堆栈: {traceback.format_exc()}")
raise
if requested_uuids:
# 按请求的 UUID 顺序返回对应资源(从整棵树中按 uuid 提取)
result = []
for uid in requested_uuids:
if uid in tracker.uuid_to_resources:
result.append(tracker.uuid_to_resources[uid])
else:
raise ValueError(
f"请求的 UUID {uid} 在资源树中未找到。"
f"可用 UUID 数量: {len(tracker.uuid_to_resources)}"
)
return result
return plr_resources
@classmethod
@@ -741,16 +845,6 @@ class ResourceTreeSet(object):
"""
return [tree.root_node for tree in self.trees]
@property
def root_nodes_uuid(self) -> List[ResourceDictInstance]:
"""
获取所有树的根节点
Returns:
所有根节点的资源实例列表
"""
return [tree.root_node.res_content.uuid for tree in self.trees]
@property
def all_nodes(self) -> List[ResourceDictInstance]:
"""
@@ -868,6 +962,17 @@ class ResourceTreeSet(object):
f"从远端同步了 {added_count} 个物料子树"
)
else:
# 二级是物料
if remote_child_name not in local_children_map:
# 本地不存在该物料,直接引入
remote_child.res_content.parent = local_device.res_content
local_device.children.append(remote_child)
local_children_map[remote_child_name] = remote_child
logger.info(
f"物料 '{remote_root_id}/{remote_child_name}': "
f"从远端同步了整个子树"
)
continue
# 二级物料已存在,比较三级子节点是否缺失
local_material = local_children_map[remote_child_name]
local_material_children_map = {child.res_content.name: child for child in

View File

@@ -22,6 +22,7 @@ from unilabos_msgs.srv import (
SerialCommand,
) # type: ignore
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unilabos_msgs.action import SendCmd
from unique_identifier_msgs.msg import UUID
from unilabos.registry.decorators import device, action, NodeType
@@ -313,9 +314,15 @@ class HostNode(BaseROS2DeviceNode):
callback_group=self.callback_group,
),
} # 用来存储多个ActionClient实例
self._add_resource_mesh_client = ActionClient(
self,
SendCmd,
"/devices/resource_mesh_manager/add_resource_mesh",
callback_group=self.callback_group,
)
self._action_value_mappings: Dict[str, Dict] = {
device_id: self._action_value_mappings
} # device_id -> action_value_mappings(本地+远程设备统一存储)
} # device_id -> action_value_mappings(本地+远程设备统一存储)
self._slave_registry_configs: Dict[str, Dict] = {} # registry_name -> registry_config(含action_value_mappings)
self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态
self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备
@@ -1131,6 +1138,27 @@ class HostNode(BaseROS2DeviceNode):
),
}
def _notify_resource_mesh_add(self, resource_tree_set: ResourceTreeSet):
"""通知 ResourceMeshManager 添加资源的 mesh 可视化"""
if not self._add_resource_mesh_client.server_is_ready():
self.lab_logger().debug("[Host Node] ResourceMeshManager 未就绪,跳过 mesh 添加通知")
return
resource_configs = []
for node in resource_tree_set.all_nodes:
res_dict = node.res_content.model_dump(by_alias=True)
if res_dict.get("type") == "device":
continue
resource_configs.append(res_dict)
if not resource_configs:
return
goal_msg = SendCmd.Goal()
goal_msg.command = json.dumps({"resources": resource_configs})
self._add_resource_mesh_client.send_goal_async(goal_msg)
self.lab_logger().info(f"[Host Node] 已发送 {len(resource_configs)} 个资源 mesh 添加请求")
async def _resource_tree_action_add_callback(self, data: dict, response: SerialCommand_Response): # OK
resource_tree_set = ResourceTreeSet.load(data["data"])
mount_uuid = data["mount_uuid"]
@@ -1171,6 +1199,12 @@ class HostNode(BaseROS2DeviceNode):
response.response = json.dumps(uuid_mapping) if success else "FAILED"
self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}")
if success:
try:
self._notify_resource_mesh_add(resource_tree_set)
except Exception as e:
self.lab_logger().error(f"[Host Node] 通知 ResourceMeshManager 添加 mesh 失败: {e}")
async def _resource_tree_action_get_callback(self, data: dict, response: SerialCommand_Response): # OK
uuid_list: List[str] = data["data"]
with_children: bool = data["with_children"]
@@ -1222,6 +1256,12 @@ class HostNode(BaseROS2DeviceNode):
response.response = json.dumps(uuid_mapping)
self.lab_logger().info(f"[Host Node-Resource] Resource tree update completed, success: {success}")
if success:
try:
self._notify_resource_mesh_add(new_tree_set)
except Exception as e:
self.lab_logger().error(f"[Host Node] 通知 ResourceMeshManager 更新 mesh 失败: {e}")
async def _resource_tree_update_callback(self, request: SerialCommand_Request, response: SerialCommand_Response):
"""
子节点通知Host物料树更新

View File

@@ -0,0 +1,232 @@
{
"nodes": [
{
"id": "PRCXI",
"name": "PRCXI",
"type": "device",
"class": "liquid_handler.prcxi",
"parent": "",
"pose": {
"size": {
"width": 424,
"height": 202,
"depth": 0
}
},
"config": {
"axis": "Left",
"deck": {
"_resource_type": "unilabos.devices.liquid_handling.prcxi.prcxi:PRCXI9300Deck",
"_resource_child_name": "PRCXI_Deck"
},
"host": "10.20.31.167",
"port": 9999,
"debug": false,
"setup": true,
"timeout": 10,
"simulator": false,
"channel_num": 8
},
"data": {
"reset_ok": true
},
"schema": {},
"description": "",
"model": null,
"position": {
"x": 0,
"y": 240,
"z": 0
}
},
{
"id": "PRCXI_Deck",
"name": "PRCXI_Deck",
"children": [],
"parent": "PRCXI",
"type": "deck",
"class": "",
"position": {
"x": 10,
"y": 10,
"z": 0
},
"config": {
"type": "PRCXI9300Deck",
"size_x": 404,
"size_y": 182,
"size_z": 0,
"model": "9300",
"rotation": {
"x": 0,
"y": 0,
"z": 0,
"type": "Rotation"
},
"category": "deck",
"barcode": null,
"preferred_pickup_location": null,
"sites": [
{
"label": "T1",
"visible": true,
"occupied_by": null,
"position": {
"x": 0,
"y": 96,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T2",
"visible": true,
"occupied_by": null,
"position": {
"x": 138,
"y": 96,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T3",
"visible": true,
"occupied_by": null,
"position": {
"x": 276,
"y": 96,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T4",
"visible": true,
"occupied_by": null,
"position": {
"x": 0,
"y": 0,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T5",
"visible": true,
"occupied_by": null,
"position": {
"x": 138,
"y": 0,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T6",
"visible": true,
"occupied_by": null,
"position": {
"x": 276,
"y": 0,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
}
]
},
"data": {}
}
],
"edges": []
}

View File

@@ -8,8 +8,8 @@
"parent": "",
"pose": {
"size": {
"width": 562,
"height": 394,
"width": 550,
"height": 400,
"depth": 0
}
},
@@ -55,9 +55,9 @@
},
"config": {
"type": "PRCXI9300Deck",
"size_x": 542,
"size_y": 374,
"size_z": 0,
"size_x": 550,
"size_y": 400,
"size_z": 17,
"rotation": {
"x": 0,
"y": 0,
@@ -74,7 +74,7 @@
"occupied_by": null,
"position": {
"x": 0,
"y": 0,
"y": 288,
"z": 0
},
"size": {
@@ -89,7 +89,10 @@
"plates",
"tip_racks",
"tube_rack",
"adaptor"
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
@@ -98,7 +101,7 @@
"occupied_by": null,
"position": {
"x": 138,
"y": 0,
"y": 288,
"z": 0
},
"size": {
@@ -112,7 +115,10 @@
"plates",
"tip_racks",
"tube_rack",
"adaptor"
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
@@ -121,7 +127,7 @@
"occupied_by": null,
"position": {
"x": 276,
"y": 0,
"y": 288,
"z": 0
},
"size": {
@@ -135,7 +141,10 @@
"plates",
"tip_racks",
"tube_rack",
"adaptor"
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
@@ -144,6 +153,240 @@
"occupied_by": null,
"position": {
"x": 414,
"y": 288,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T5",
"visible": true,
"occupied_by": null,
"position": {
"x": 0,
"y": 192,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T6",
"visible": true,
"occupied_by": null,
"position": {
"x": 138,
"y": 192,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T7",
"visible": true,
"occupied_by": null,
"position": {
"x": 276,
"y": 192,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T8",
"visible": true,
"occupied_by": null,
"position": {
"x": 414,
"y": 192,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T9",
"visible": true,
"occupied_by": null,
"position": {
"x": 0,
"y": 96,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T10",
"visible": true,
"occupied_by": null,
"position": {
"x": 138,
"y": 96,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T11",
"visible": true,
"occupied_by": null,
"position": {
"x": 276,
"y": 96,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T12",
"visible": true,
"occupied_by": null,
"position": {
"x": 414,
"y": 96,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
"label": "T13",
"visible": true,
"occupied_by": null,
"position": {
"x": 0,
"y": 0,
"z": 0
},
@@ -158,214 +401,10 @@
"plates",
"tip_racks",
"tube_rack",
"adaptor"
]
},
{
"label": "T5",
"visible": true,
"occupied_by": null,
"position": {
"x": 0,
"y": 96,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor"
]
},
{
"label": "T6",
"visible": true,
"occupied_by": null,
"position": {
"x": 138,
"y": 96,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor"
]
},
{
"label": "T7",
"visible": true,
"occupied_by": null,
"position": {
"x": 276,
"y": 96,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor"
]
},
{
"label": "T8",
"visible": true,
"occupied_by": null,
"position": {
"x": 414,
"y": 96,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor"
]
},
{
"label": "T9",
"visible": true,
"occupied_by": null,
"position": {
"x": 0,
"y": 192,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor"
]
},
{
"label": "T10",
"visible": true,
"occupied_by": null,
"position": {
"x": 138,
"y": 192,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor"
]
},
{
"label": "T11",
"visible": true,
"occupied_by": null,
"position": {
"x": 276,
"y": 192,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor"
]
},
{
"label": "T12",
"visible": true,
"occupied_by": null,
"position": {
"x": 414,
"y": 192,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor"
]
},
{
"label": "T13",
"visible": true,
"occupied_by": null,
"position": {
"x": 0,
"y": 288,
"z": 0
},
"size": {
"width": 128.0,
"height": 86,
"depth": 0
},
"content_type": [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"adaptor"
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
@@ -374,7 +413,7 @@
"occupied_by": null,
"position": {
"x": 138,
"y": 288,
"y": 0,
"z": 0
},
"size": {
@@ -388,7 +427,10 @@
"plates",
"tip_racks",
"tube_rack",
"adaptor"
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
@@ -397,7 +439,7 @@
"occupied_by": null,
"position": {
"x": 276,
"y": 288,
"y": 0,
"z": 0
},
"size": {
@@ -411,7 +453,10 @@
"plates",
"tip_racks",
"tube_rack",
"adaptor"
"adaptor",
"plateadapter",
"module",
"trash"
]
},
{
@@ -420,7 +465,7 @@
"occupied_by": null,
"position": {
"x": 414,
"y": 288,
"y": 0,
"z": 0
},
"size": {
@@ -434,7 +479,10 @@
"plates",
"tip_racks",
"tube_rack",
"adaptor"
"adaptor",
"plateadapter",
"module",
"trash"
]
}
]

View File

@@ -108,7 +108,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -153,7 +154,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -198,7 +200,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -243,7 +246,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -288,7 +292,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -333,7 +338,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -378,7 +384,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -423,7 +430,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -468,7 +476,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -513,7 +522,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -558,7 +568,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -603,7 +614,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -648,7 +660,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -693,7 +706,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -738,7 +752,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]
@@ -783,7 +798,8 @@
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
"tube_rack",
"plateadapter"
]
}
]

View File

@@ -152,6 +152,253 @@ def _map_deck_slot(raw_slot: str, object_type: str = "") -> str:
return {"4": "13", "8": "14"}.get(s, s)
def _labware_def_index(labware_defs: Optional[List[Dict[str, Any]]]) -> Dict[str, Dict[str, Any]]:
m: Dict[str, Dict[str, Any]] = {}
for d in labware_defs or []:
for k in ("id", "name", "reagent_id", "reagent"):
key = d.get(k)
if key is not None and str(key):
m[str(key)] = d
return m
def _labware_hint_text(labware_id: str, item: Dict[str, Any]) -> str:
"""合并 id 与协议里的 labware 描述OpenTrons 全名常在 labware 字段)。"""
parts = [str(labware_id), str(item.get("labware", "") or "")]
return " ".join(parts).lower()
def _infer_reagent_kind(labware_id: str, item: Dict[str, Any]) -> str:
ot = (item.get("object") or "").strip().lower()
if ot == "trash":
return "trash"
if ot == "tiprack":
return "tip_rack"
lid = _labware_hint_text(labware_id, item)
if "trash" in lid:
return "trash"
# tiprack / tip + rack顺序在 tuberack 之前)
if "tiprack" in lid or ("tip" in lid and "rack" in lid):
return "tip_rack"
# 离心管架 / OpenTrons tuberack勿与 96 tiprack 混淆)
if "tuberack" in lid or "tube_rack" in lid:
return "tube_rack"
if "eppendorf" in lid and "rack" in lid:
return "tube_rack"
if "safelock" in lid and "rack" in lid:
return "tube_rack"
if "rack" in lid and "tip" not in lid:
return "tube_rack"
return "plate"
def _infer_tube_rack_num_positions(labware_id: str, item: Dict[str, Any]) -> int:
"""从 ``24_tuberack`` 等命名中解析孔位数;解析不到则默认 24与 PRCXI_EP_Adapter 4×6 一致)。"""
hint = _labware_hint_text(labware_id, item)
m = re.search(r"(\d+)_tuberack", hint)
if m:
return int(m.group(1))
m = re.search(r"tuberack[_\s]*(\d+)", hint)
if m:
return int(m.group(1))
m = re.search(r"(\d+)\s*[-_]?\s*pos(?:ition)?s?", hint)
if m:
return int(m.group(1))
return 96
def _tip_volume_hint(item: Dict[str, Any], labware_id: str) -> Optional[float]:
s = _labware_hint_text(labware_id, item)
for v in (1250, 1000, 300, 200, 10):
if f"{v}ul" in s or f"{v}μl" in s or f"{v}u" in s:
return float(v)
if f" {v} " in f" {s} ":
return float(v)
return None
def _volume_template_covers_requirement(template: Dict[str, Any], req: Optional[float], kind: str) -> bool:
"""有明确需求体积时,模板标称 Volume 必须 >= 需求;无 Volume 的模板不参与trash 除外)。"""
if kind == "trash":
return True
if req is None or req <= 0:
return True
mv = float(template.get("Volume") or 0)
if mv <= 0:
return False
return mv >= req
def _direct_labware_class_name(item: Dict[str, Any]) -> str:
"""仅用于 tip_rack 且 ``preserve_tip_rack_incoming_class=True````class_name``/``class`` 原样;否则 ``labware`` → ``lab_*``。"""
explicit = item.get("class_name") or item.get("class")
if explicit is not None and str(explicit).strip() != "":
return str(explicit).strip()
lw = str(item.get("labware", "") or "").strip()
if lw:
return f"lab_{lw.lower().replace('.', 'point').replace(' ', '_')}"
return ""
def _match_score_prcxi_template(
template: Dict[str, Any],
num_children: int,
child_max_volume: Optional[float],
) -> float:
"""孔数差主导;有需求体积且模板已满足 >= 时,余量比例 (模板-需求)/需求 越小越好(优先选刚好够的)。"""
hole_count = int(template.get("hole_count") or 0)
hole_diff = abs(num_children - hole_count)
material_volume = float(template.get("Volume") or 0)
req = child_max_volume
if req is not None and req > 0 and material_volume > 0:
vol_diff = (material_volume - req) / max(req, 1e-9)
elif material_volume > 0 and req is not None:
vol_diff = abs(float(req) - material_volume) / material_volume
else:
vol_diff = 0.0
return hole_diff * 1000 + vol_diff
def _apply_prcxi_labware_auto_match(
labware_info: Dict[str, Dict[str, Any]],
labware_defs: Optional[List[Dict[str, Any]]] = None,
*,
preserve_tip_rack_incoming_class: bool = True,
) -> None:
"""上传构建图前:按孔数+容量将 reagent 条目匹配到 ``prcxi_labware`` 注册类名,写入 ``prcxi_class_name``。
若给出需求体积,仅选用模板标称 Volume >= 该值的物料,并在满足条件的模板中选余量最小者。
``preserve_tip_rack_incoming_class=True``(默认)时:**仅 tip_rack** 不做模板匹配,类名由 ``class_name``/``class`` 或
``labware````lab_*``)直接给出;**plate / tube_rack / trash 等**仍按注册模板匹配。
``False`` 时 **全部**(含 tip_rack走模板匹配。"""
if not labware_info:
return
default_prcxi_tip_class = CLASS_NAMES_MAPPING.get("tip_rack", "PRCXI_300ul_Tips")
try:
from unilabos.devices.liquid_handling.prcxi.prcxi_labware import get_prcxi_labware_template_specs
except Exception:
return
templates = get_prcxi_labware_template_specs()
if not templates:
return
def_map = _labware_def_index(labware_defs)
for labware_id, item in labware_info.items():
if item.get("prcxi_class_name"):
continue
kind = _infer_reagent_kind(labware_id, item)
if preserve_tip_rack_incoming_class and kind == "tip_rack":
inc_s = _direct_labware_class_name(item)
if inc_s == default_prcxi_tip_class:
inc_s = ""
if inc_s:
item["prcxi_class_name"] = inc_s
continue
explicit = item.get("class_name") or item.get("class")
if explicit and str(explicit).startswith("PRCXI_"):
item["prcxi_class_name"] = str(explicit)
continue
extra = def_map.get(str(labware_id), {})
wells = item.get("well") or []
well_n = len(wells) if isinstance(wells, list) else 0
num_from_def = int(extra.get("num_wells") or extra.get("well_count") or item.get("num_wells") or 0)
if kind == "trash":
num_children = 0
elif kind == "tip_rack":
num_children = num_from_def if num_from_def > 0 else 96
elif kind == "tube_rack":
if num_from_def > 0:
num_children = num_from_def
elif well_n > 0:
num_children = well_n
else:
num_children = _infer_tube_rack_num_positions(labware_id, item)
else:
num_children = num_from_def if num_from_def > 0 else 96
child_max_volume = item.get("max_volume")
if child_max_volume is None:
child_max_volume = extra.get("max_volume")
try:
child_max_volume_f = float(child_max_volume) if child_max_volume is not None else None
except (TypeError, ValueError):
child_max_volume_f = None
if kind == "tip_rack" and child_max_volume_f is None:
child_max_volume_f = _tip_volume_hint(item, labware_id) or 300.0
candidates = [t for t in templates if t["kind"] == kind]
if not candidates:
continue
best = None
best_score = float("inf")
for t in candidates:
if kind != "trash" and int(t.get("hole_count") or 0) <= 0:
continue
if not _volume_template_covers_requirement(t, child_max_volume_f, kind):
continue
sc = _match_score_prcxi_template(t, num_children, child_max_volume_f)
if sc < best_score:
best_score = sc
best = t
if best:
item["prcxi_class_name"] = best["class_name"]
def _reconcile_slot_carrier_prcxi_class(
labware_info: Dict[str, Dict[str, Any]],
*,
preserve_tip_rack_incoming_class: bool = False,
) -> None:
"""同一 deck 槽位上多条 reagent 时,按载体类型优先级统一 ``prcxi_class_name``,避免先遍历到 96 板后槽位被错误绑定。
``preserve_tip_rack_incoming_class=True`` 时tip_rack 条目不参与同槽类名合并(不被覆盖、也不把 tip 类名扩散到同槽其它条目)。"""
by_slot: Dict[str, List[Tuple[str, Dict[str, Any]]]] = {}
for lid, item in labware_info.items():
ot = item.get("object", "") or ""
slot = _map_deck_slot(str(item.get("slot", "")), ot)
if not slot:
continue
by_slot.setdefault(str(slot), []).append((lid, item))
priority = {"trash": 0, "tube_rack": 1, "tip_rack": 2, "plate": 3}
for _slot, pairs in by_slot.items():
if len(pairs) < 2:
continue
def _rank(p: Tuple[str, Dict[str, Any]]) -> int:
return priority.get(_infer_reagent_kind(p[0], p[1]), 9)
pairs_sorted = sorted(pairs, key=_rank)
best_cls = None
for lid, it in pairs_sorted:
if preserve_tip_rack_incoming_class and _infer_reagent_kind(lid, it) == "tip_rack":
continue
c = it.get("prcxi_class_name")
if c:
best_cls = c
break
if not best_cls:
continue
for lid, it in pairs:
if preserve_tip_rack_incoming_class and _infer_reagent_kind(lid, it) == "tip_rack":
continue
it["prcxi_class_name"] = best_cls
# ---------------- Graph ----------------
@@ -377,6 +624,7 @@ def build_protocol_graph(
workstation_name: str,
action_resource_mapping: Optional[Dict[str, str]] = None,
labware_defs: Optional[List[Dict[str, Any]]] = None,
preserve_tip_rack_incoming_class: bool = True,
) -> WorkflowGraph:
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑
@@ -385,28 +633,67 @@ def build_protocol_graph(
protocol_steps: 协议步骤列表
workstation_name: 工作站名称
action_resource_mapping: action 到 resource_name 的映射字典,可选
labware_defs: 可选,``[{"id": "...", "num_wells": 96, "max_volume": 2200}, ...]`` 等,辅助 PRCXI 模板匹配
preserve_tip_rack_incoming_class: 默认 True 时**仅 tip_rack** 不跑模板匹配(类名由传入的 class/labware 决定);
**其它载体**仍按 PRCXI 模板匹配。False 时 **全部**(含 tip_rack都走模板匹配。
"""
G = WorkflowGraph()
resource_last_writer = {} # reagent_name -> "node_id:port"
slot_to_create_resource = {} # slot -> create_resource node_id
_apply_prcxi_labware_auto_match(
labware_info,
labware_defs,
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
)
_reconcile_slot_carrier_prcxi_class(
labware_info,
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
)
protocol_steps = refactor_data(protocol_steps, action_resource_mapping)
# ==================== 第一步:按 slot 去重创建 create_resource 节点 ====================
# 收集所有唯一的 slot
slots_info = {} # slot -> {labware, res_id}
# 按槽聚合:同一 slot 多条 reagent 时不能只取遍历顺序第一条,否则 tip 的 prcxi_class_name / object 会被其它条目盖住
by_slot: Dict[str, List[Tuple[str, Dict[str, Any]]]] = {}
for labware_id, item in labware_info.items():
object_type = item.get("object", "") or ""
slot = _map_deck_slot(str(item.get("slot", "")), object_type)
labware = item.get("labware", "")
if slot and slot not in slots_info:
res_id = f"{labware}_slot_{slot}"
slots_info[slot] = {
"labware": labware,
"res_id": res_id,
"labware_id": labware_id,
"object": object_type,
}
if not slot:
continue
by_slot.setdefault(slot, []).append((labware_id, item))
slots_info: Dict[str, Dict[str, Any]] = {}
for slot, pairs in by_slot.items():
def _ot_tip(it: Dict[str, Any]) -> bool:
return str(it.get("object", "") or "").strip().lower() == "tiprack"
tip_pairs = [(lid, it) for lid, it in pairs if _ot_tip(it)]
chosen_lid = ""
chosen_item: Dict[str, Any] = {}
prcxi_val: Optional[str] = None
scan = tip_pairs if tip_pairs else pairs
for lid, it in scan:
c = it.get("prcxi_class_name")
if c:
chosen_lid, chosen_item, prcxi_val = lid, it, str(c)
break
if not chosen_lid and scan:
chosen_lid, chosen_item = scan[0]
pv = chosen_item.get("prcxi_class_name")
prcxi_val = str(pv) if pv else None
labware = str(chosen_item.get("labware", "") or "")
res_id = f"{labware}_slot_{slot}" if labware.strip() else f"{chosen_lid}_slot_{slot}"
res_id = res_id.replace(" ", "_")
slots_info[slot] = {
"labware": labware,
"res_id": res_id,
"labware_id": chosen_lid,
"object": chosen_item.get("object", "") or "",
"prcxi_class_name": prcxi_val,
}
# 创建 Group 节点,包含所有 create_resource 节点
group_node_id = str(uuid.uuid4())
@@ -429,11 +716,21 @@ def build_protocol_graph(
for slot, info in slots_info.items():
node_id = str(uuid.uuid4())
res_id = info["res_id"]
res_type_name = info["labware"].lower().replace(".", "point")
object_type = info.get("object", "")
res_type_name = f"lab_{res_type_name}"
if object_type == "trash":
object_type = info.get("object", "") or ""
ot_lo = str(object_type).strip().lower()
matched = info.get("prcxi_class_name")
if ot_lo == "trash":
res_type_name = "PRCXI_trash"
elif matched:
res_type_name = matched
elif ot_lo == "tiprack":
if preserve_tip_rack_incoming_class:
lid = str(info.get("labware_id") or "").strip() or "tip_rack"
res_type_name = f"lab_{lid.lower().replace('.', 'point').replace(' ', '_')}"
else:
res_type_name = CLASS_NAMES_MAPPING.get("tip_rack", "PRCXI_300ul_Tips")
else:
res_type_name = f"lab_{info['labware'].lower().replace('.', 'point')}"
G.add_node(
node_id,
template_name="create_resource",
@@ -456,9 +753,9 @@ def build_protocol_graph(
},
)
slot_to_create_resource[slot] = node_id
if object_type == "tiprack":
if ot_lo == "tiprack":
resource_last_writer[info["labware_id"]] = f"{node_id}:labware"
if object_type == "trash":
if ot_lo == "trash":
trash_create_node_id = node_id
# create_resource 之间不需要 ready 连接

View File

@@ -210,6 +210,7 @@ def convert_from_json(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = DEFAULT_WORKSTATION,
validate: bool = True,
preserve_tip_rack_incoming_class: bool = True,
) -> WorkflowGraph:
"""
从 JSON 数据或文件转换为 WorkflowGraph
@@ -221,6 +222,8 @@ def convert_from_json(
data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi"
validate: 是否校验句柄配置,默认 True
preserve_tip_rack_incoming_class: True默认时仅 tip_rack 不跑模板、按传入类名/labware其它载体仍自动匹配。
False 时全部走模板。JSON 根 ``preserve_tip_rack_incoming_class`` 可覆盖此参数。
Returns:
WorkflowGraph: 构建好的工作流图
@@ -263,6 +266,10 @@ def convert_from_json(
# reagent 已经是字典格式,用于 set_liquid 和 well 数量查找
labware_info = reagent
preserve = preserve_tip_rack_incoming_class
if "preserve_tip_rack_incoming_class" in json_data:
preserve = bool(json_data["preserve_tip_rack_incoming_class"])
# 构建工作流图
graph = build_protocol_graph(
labware_info=labware_info,
@@ -270,6 +277,7 @@ def convert_from_json(
workstation_name=workstation_name,
action_resource_mapping=ACTION_RESOURCE_MAPPING,
labware_defs=labware_defs,
preserve_tip_rack_incoming_class=preserve,
)
# 校验句柄配置
@@ -287,6 +295,7 @@ def convert_from_json(
def convert_json_to_node_link(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = DEFAULT_WORKSTATION,
preserve_tip_rack_incoming_class: bool = True,
) -> Dict[str, Any]:
"""
将 JSON 数据转换为 node-link 格式的字典
@@ -298,13 +307,18 @@ def convert_json_to_node_link(
Returns:
Dict: node-link 格式的工作流数据
"""
graph = convert_from_json(data, workstation_name)
graph = convert_from_json(
data,
workstation_name,
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
)
return graph.to_node_link_dict()
def convert_json_to_workflow_list(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = DEFAULT_WORKSTATION,
preserve_tip_rack_incoming_class: bool = True,
) -> List[Dict[str, Any]]:
"""
将 JSON 数据转换为工作流列表格式
@@ -316,5 +330,9 @@ def convert_json_to_workflow_list(
Returns:
List: 工作流节点列表
"""
graph = convert_from_json(data, workstation_name)
graph = convert_from_json(
data,
workstation_name,
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
)
return graph.to_dict()

View File

@@ -234,6 +234,7 @@ def convert_from_json(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi",
validate: bool = True,
preserve_tip_rack_incoming_class: bool = True,
) -> WorkflowGraph:
"""
从 JSON 数据或文件转换为 WorkflowGraph
@@ -246,6 +247,7 @@ def convert_from_json(
data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi"
validate: 是否校验句柄配置,默认 True
preserve_tip_rack_incoming_class: True 时仅 tip 不跑模板False 时全部匹配JSON 根字段同名可覆盖
Returns:
WorkflowGraph: 构建好的工作流图
@@ -295,12 +297,17 @@ def convert_from_json(
"3. {'steps': [...], 'labware': [...]}"
)
preserve = preserve_tip_rack_incoming_class
if "preserve_tip_rack_incoming_class" in json_data:
preserve = bool(json_data["preserve_tip_rack_incoming_class"])
# 构建工作流图
graph = build_protocol_graph(
labware_info=labware_info,
protocol_steps=protocol_steps,
workstation_name=workstation_name,
action_resource_mapping=ACTION_RESOURCE_MAPPING,
preserve_tip_rack_incoming_class=preserve,
)
# 校验句柄配置