diff --git a/.cursor/skills/add-protocol/SKILL.md b/.cursor/skills/add-protocol/SKILL.md deleted file mode 100644 index 2537051b..00000000 --- a/.cursor/skills/add-protocol/SKILL.md +++ /dev/null @@ -1,323 +0,0 @@ ---- -name: add-protocol -description: Guide for adding new experiment protocols to Uni-Lab-OS (添加新实验操作协议). Walks through ROS Action definition, Pydantic model creation, protocol generator implementation, and registration. Use when the user wants to add a new protocol, create a compile function, implement an experiment operation, or mentions 协议/protocol/编译/compile/实验操作. ---- - -# 添加新实验操作协议(Protocol) - -Protocol 是对实验有意义的完整动作(如泵转移、过滤、溶解),需要多设备协同。`compile/` 中的生成函数根据设备连接图将抽象操作"编译"为设备指令序列。 - -添加一个 Protocol 需修改 **6 个文件**,按以下流程执行。 - ---- - -## 第一步:确认协议信息 - -向用户确认: - -| 信息 | 示例 | -|------|------| -| 协议英文名 | `MyNewProtocol` | -| 操作描述 | 将固体样品研磨至目标粒径 | -| Goal 参数(必需 + 可选) | `vessel: dict`, `time: float = 300.0` | -| Result 字段 | `success: bool`, `message: str` | -| 需要哪些设备协同 | 研磨器、搅拌器 | - ---- - -## 第二步:创建 ROS Action 定义 - -路径:`unilabos_msgs/action/.action` - -三段式结构(Goal / Result / Feedback),用 `---` 分隔: - -``` -# Goal -Resource vessel -float64 time -string mode ---- -# Result -bool success -string return_info ---- -# Feedback -string status -string current_device -builtin_interfaces/Duration time_spent -builtin_interfaces/Duration time_remaining -``` - -**类型映射:** - -| Python 类型 | ROS 类型 | 说明 | -|------------|----------|------| -| `dict` | `Resource` | 容器/设备引用,自定义消息类型 | -| `float` | `float64` | | -| `int` | `int32` | | -| `str` | `string` | | -| `bool` | `bool` | | - -> `Resource` 是 `unilabos_msgs/msg/Resource.msg` 中定义的自定义消息类型。 - ---- - -## 第三步:注册 Action 到 CMakeLists - -在 `unilabos_msgs/CMakeLists.txt` 的 `set(action_files ...)` 块中添加: - -```cmake -"action/MyNewAction.action" -``` - -> 调试时需编译:`cd unilabos_msgs && colcon build && source ./install/local_setup.sh && cd ..` -> PR 合并后 CI/CD 自动发布,`mamba update ros-humble-unilabos-msgs` 即可。 - ---- - -## 第四步:创建 Pydantic 模型 - -在 `unilabos/messages/__init__.py` 中添加(位于 `# Start Protocols` 和 `# End Protocols` 之间): - -```python -class MyNewProtocol(BaseModel): - # === 必需参数 === - vessel: dict = Field(..., description="目标容器") - - # === 可选参数 === - time: float = Field(300.0, description="操作时间 (秒)") - mode: str = Field("default", description="操作模式") - - def model_post_init(self, __context): - """参数验证和修正""" - if self.time <= 0: - self.time = 300.0 -``` - -**规则:** -- 参数名必须与 `.action` 文件中 Goal 字段完全一致 -- `dict` 类型对应 `.action` 中的 `Resource` -- 将类名加入文件末尾的 `__all__` 列表 - ---- - -## 第五步:实现协议生成函数 - -路径:`unilabos/compile/_protocol.py` - -```python -import networkx as nx -from typing import List, Dict, Any - - -def generate_my_new_protocol( - G: nx.DiGraph, - vessel: dict, - time: float = 300.0, - mode: str = "default", - **kwargs, -) -> List[Dict[str, Any]]: - """将 MyNewProtocol 编译为设备动作序列。 - - Args: - G: 设备连接图(NetworkX),节点为设备/容器,边为物理连接 - vessel: 目标容器 {"id": "reactor_1"} - time: 操作时间(秒) - mode: 操作模式 - - Returns: - 动作列表,每个元素为: - - dict: 单步动作 - - list[dict]: 并行动作 - """ - from unilabos.compile.utils.vessel_parser import get_vessel - - vessel_id, vessel_data = get_vessel(vessel) - actions = [] - - # 查找相关设备(通过图的连接关系) - # 生成动作序列 - actions.append({ - "device_id": "target_device_id", - "action_name": "some_action", - "action_kwargs": {"param": "value"} - }) - - # 等待 - actions.append({ - "action_name": "wait", - "action_kwargs": {"time": time} - }) - - return actions -``` - -### 动作字典格式 - -```python -# 单步动作(发给子设备) -{"device_id": "pump_1", "action_name": "set_position", "action_kwargs": {"position": 10.0}} - -# 发给工作站自身 -{"device_id": "self", "action_name": "my_action", "action_kwargs": {...}} - -# 等待 -{"action_name": "wait", "action_kwargs": {"time": 5.0}} - -# 并行动作(列表嵌套) -[ - {"device_id": "pump_1", "action_name": "set_position", "action_kwargs": {"position": 10.0}}, - {"device_id": "stirrer_1", "action_name": "start_stir", "action_kwargs": {"stir_speed": 300}} -] -``` - -### 关于 `vessel` 参数类型 - -现有协议的 `vessel` 参数类型不统一: -- 新协议趋势:使用 `dict`(如 `{"id": "reactor_1"}`) -- 旧协议:使用 `str`(如 `"reactor_1"`) -- 兼容写法:`Union[str, dict]` - -**建议新协议统一使用 `dict` 类型**,通过 `get_vessel()` 兼容两种输入。 - -### 公共工具函数(`unilabos/compile/utils/`) - -| 函数 | 用途 | -|------|------| -| `get_vessel(vessel)` | 解析容器参数为 `(vessel_id, vessel_data)`,兼容 dict 和 str | -| `find_solvent_vessel(G, solvent)` | 根据溶剂名查找容器(精确→命名规则→模糊→液体类型) | -| `find_reagent_vessel(G, reagent)` | 根据试剂名查找容器(支持固体和液体) | -| `find_connected_stirrer(G, vessel)` | 查找与容器相连的搅拌器 | -| `find_solid_dispenser(G)` | 查找固体加样器 | - -### 协议内专属查找函数 - -许多协议在自己的文件内定义了专属的 `find_*` 函数(不在 `utils/` 中)。编写新协议时,优先复用 `utils/` 中的公共函数;如需特殊查找逻辑,在协议文件内部定义即可: - -```python -def find_my_special_device(G: nx.DiGraph, vessel: str) -> str: - """查找与容器相关的特殊设备""" - for node in G.nodes(): - if 'my_device_type' in G.nodes[node].get('class', '').lower(): - return node - raise ValueError("未找到特殊设备") -``` - -### 复用已有协议 - -复杂协议通常组合已有协议: - -```python -from unilabos.compile.pump_protocol import generate_pump_protocol_with_rinsing - -actions.extend(generate_pump_protocol_with_rinsing( - G, from_vessel=solvent_vessel, to_vessel=vessel, volume=volume -)) -``` - -### 图查询模式 - -```python -# 查找与容器相连的特定类型设备 -for neighbor in G.neighbors(vessel_id): - node_data = G.nodes[neighbor] - if "heater" in node_data.get("class", ""): - heater_id = neighbor - break - -# 查找最短路径(泵转移) -path = nx.shortest_path(G, source=from_vessel_id, target=to_vessel_id) -``` - ---- - -## 第六步:注册协议生成函数 - -在 `unilabos/compile/__init__.py` 中: - -1. 顶部添加导入: - -```python -from .my_new_protocol import generate_my_new_protocol -``` - -2. 在 `action_protocol_generators` 字典中添加映射: - -```python -action_protocol_generators = { - # ... 已有协议 - MyNewProtocol: generate_my_new_protocol, -} -``` - ---- - -## 第七步:配置图文件 - -在工作站的图文件中,将协议名加入 `protocol_type`: - -```json -{ - "id": "my_station", - "class": "workstation", - "config": { - "protocol_type": ["PumpTransferProtocol", "MyNewProtocol"] - } -} -``` - ---- - -## 第八步:验证 - -```bash -# 1. 模块可导入 -python -c "from unilabos.messages import MyNewProtocol; print(MyNewProtocol.model_fields)" - -# 2. 生成函数可导入 -python -c "from unilabos.compile import action_protocol_generators; print(list(action_protocol_generators.keys()))" - -# 3. 启动测试(可选) -unilab -g .json --complete_registry -``` - ---- - -## 工作流清单 - -``` -协议接入进度: -- [ ] 1. 确认协议名、参数、涉及设备 -- [ ] 2. 创建 .action 文件 (unilabos_msgs/action/.action) -- [ ] 3. 注册到 CMakeLists.txt -- [ ] 4. 创建 Pydantic 模型 (unilabos/messages/__init__.py) + 更新 __all__ -- [ ] 5. 实现生成函数 (unilabos/compile/_protocol.py) -- [ ] 6. 注册到 compile/__init__.py -- [ ] 7. 配置图文件 protocol_type -- [ ] 8. 验证 -``` - ---- - -## 高级模式 - -实现复杂协议时,详见 [reference.md](reference.md):协议运行时数据流、mock graph 测试模式、单位解析工具(`unit_parser.py`)、复杂协议组合模式(以 dissolve 为例)。 - ---- - -## 现有协议速查 - -| 协议 | Pydantic 类 | 生成函数 | 核心参数 | -|------|-------------|---------|---------| -| 泵转移 | `PumpTransferProtocol` | `generate_pump_protocol_with_rinsing` | `from_vessel, to_vessel, volume` | -| 简单转移 | `TransferProtocol` | `generate_pump_protocol` | `from_vessel, to_vessel, volume` | -| 加样 | `AddProtocol` | `generate_add_protocol` | `vessel, reagent, volume` | -| 过滤 | `FilterProtocol` | `generate_filter_protocol` | `vessel, filtrate_vessel` | -| 溶解 | `DissolveProtocol` | `generate_dissolve_protocol` | `vessel, solvent, volume` | -| 加热/冷却 | `HeatChillProtocol` | `generate_heat_chill_protocol` | `vessel, temp, time` | -| 搅拌 | `StirProtocol` | `generate_stir_protocol` | `vessel, time` | -| 分离 | `SeparateProtocol` | `generate_separate_protocol` | `from_vessel, separation_vessel, solvent` | -| 蒸发 | `EvaporateProtocol` | `generate_evaporate_protocol` | `vessel, pressure, temp, time` | -| 清洗 | `CleanProtocol` | `generate_clean_protocol` | `vessel, solvent, volume` | -| 离心 | `CentrifugeProtocol` | `generate_centrifuge_protocol` | `vessel, speed, time` | -| 抽气充气 | `EvacuateAndRefillProtocol` | `generate_evacuateandrefill_protocol` | `vessel, gas` | diff --git a/.cursor/skills/add-protocol/reference.md b/.cursor/skills/add-protocol/reference.md deleted file mode 100644 index a212ced9..00000000 --- a/.cursor/skills/add-protocol/reference.md +++ /dev/null @@ -1,207 +0,0 @@ -# 协议高级参考 - -本文件是 SKILL.md 的补充,包含协议运行时数据流、测试模式、单位解析工具和复杂协议组合模式。Agent 在需要实现这些功能时按需阅读。 - ---- - -## 1. 协议运行时数据流 - -从图文件到协议执行的完整链路: - -``` -实验图 JSON - ↓ graphio.read_node_link_json() -physical_setup_graph (NetworkX DiGraph) - ↓ ROS2WorkstationNode._setup_protocol_names(protocol_type) -为每个 protocol_name 创建 ActionServer - ↓ 收到 Action Goal -_create_protocol_execute_callback() - ↓ convert_from_ros_msg_with_mapping(goal, mapping) -protocol_kwargs (Python dict) - ↓ 向 Host 查询 Resource 类型参数的当前状态 -protocol_kwargs 更新(vessel 带上 children、data 等) - ↓ protocol_steps_generator(G=physical_setup_graph, **protocol_kwargs) -List[Dict] 动作序列 - ↓ 逐步 execute_single_action / 并行 create_task -子设备 ActionClient 执行 -``` - -### `_setup_protocol_names` 核心逻辑 - -```python -def _setup_protocol_names(self, protocol_type): - if isinstance(protocol_type, str): - self.protocol_names = [p.strip() for p in protocol_type.split(",")] - else: - self.protocol_names = protocol_type - self.protocol_action_mappings = {} - for protocol_name in self.protocol_names: - protocol_type = globals()[protocol_name] # 从 messages 模块取 Pydantic 类 - self.protocol_action_mappings[protocol_name] = get_action_type(protocol_type) -``` - -### `_create_protocol_execute_callback` 关键步骤 - -1. `convert_from_ros_msg_with_mapping(goal, action_value_mapping["goal"])` — ROS Goal → Python dict -2. 对 `Resource` 类型字段,通过 `resource_get` Service 查询 Host 的最新资源状态 -3. `protocol_steps_generator(G=physical_setup_graph, **protocol_kwargs)` — 调用编译函数 -4. 遍历 steps:`dict` 串行执行,`list` 并行执行 -5. `execute_single_action` 通过 `_action_clients[device_id]` 向子设备发送 Action Goal -6. 执行完毕后通过 `resource_update` Service 更新资源状态 - ---- - -## 2. 测试模式 - -### 2.1 协议文件内测试函数 - -许多协议文件末尾有 `test_*` 函数,主要测试参数解析工具: - -```python -def test_dissolve_protocol(): - """测试溶解协议的各种参数解析""" - volumes = ["10 mL", "?", 10.0, "1 L", "500 μL"] - for vol in volumes: - result = parse_volume_input(vol) - print(f"体积解析: {vol} → {result}mL") - - masses = ["2.9 g", "?", 2.5, "500 mg"] - for mass in masses: - result = parse_mass_input(mass) - print(f"质量解析: {mass} → {result}g") -``` - -### 2.2 使用 mock graph 测试协议生成器 - -推荐的端到端测试模式: - -```python -import pytest -import networkx as nx -from unilabos.compile.stir_protocol import generate_stir_protocol - - -@pytest.fixture -def topology_graph(): - """创建测试拓扑图""" - G = nx.DiGraph() - G.add_node("flask_1", **{"class": "flask", "type": "container"}) - G.add_node("stirrer_1", **{"class": "virtual_stirrer", "type": "device"}) - G.add_edge("stirrer_1", "flask_1") - return G - - -def test_generate_stir_protocol(topology_graph): - """测试搅拌协议生成""" - actions = generate_stir_protocol( - G=topology_graph, - vessel="flask_1", - time="5 min", - stir_speed=300.0 - ) - assert len(actions) >= 1 - assert actions[0]["device_id"] == "stirrer_1" -``` - -**要点:** -- 用 `nx.DiGraph()` 构建最小拓扑 -- `add_node(id, **attrs)` 设置 `class`、`type`、`data` 等 -- `add_edge(src, dst)` 建立物理连接 -- 协议内的 `find_*` 函数依赖这些节点和边 - ---- - -## 3. 单位解析工具 - -路径:`unilabos/compile/utils/unit_parser.py` - -| 函数 | 输入 | 返回 | 默认值 | -|------|------|------|--------| -| `parse_volume_input(input, default_unit)` | `"100 mL"`, `"2.5 L"`, `"500 μL"`, `10.0`, `"?"` | mL (float) | 50.0 | -| `parse_mass_input(input)` | `"19.3 g"`, `"500 mg"`, `2.5`, `"?"` | g (float) | 1.0 | -| `parse_time_input(input)` | `"30 min"`, `"1 h"`, `"300"`, `60.0`, `"?"` | 秒 (float) | 60.0 | - -支持的单位: - -- **体积**: mL, L, μL/uL, milliliter, liter, microliter -- **质量**: g, mg, kg, gram, milligram, kilogram -- **时间**: s/sec/second, min/minute, h/hr/hour, d/day - -特殊值 `"?"`、`"unknown"`、`"tbd"` 返回默认值。 - ---- - -## 4. 复杂协议组合模式 - -以 `dissolve_protocol` 为例,展示如何组合多个子操作: - -### 整体流程 - -``` -1. 解析参数 (parse_volume_input, parse_mass_input, parse_time_input) -2. 设备发现 (find_connected_heatchill, find_connected_stirrer, find_solid_dispenser) -3. 判断溶解类型 (液体 vs 固体) -4. 组合动作序列: - a. heat_chill_start / start_stir (启动加热/搅拌) - b. wait (等待温度稳定) - c. pump_protocol_with_rinsing (液体转移, 通过 extend 拼接) - 或 add_solid (固体加样) - d. heat_chill / stir / wait (溶解等待) - e. heat_chill_stop (停止加热) -``` - -### 关键代码模式 - -**设备发现 → 条件组合:** - -```python -heatchill_id = find_connected_heatchill(G, vessel_id) -stirrer_id = find_connected_stirrer(G, vessel_id) -solid_dispenser_id = find_solid_dispenser(G) - -actions = [] - -# 启动阶段 -if heatchill_id and temp > 25.0: - actions.append({ - "device_id": heatchill_id, - "action_name": "heat_chill_start", - "action_kwargs": {"vessel": {"id": vessel_id}, "temp": temp} - }) - actions.append({"action_name": "wait", "action_kwargs": {"time": 30}}) -elif stirrer_id: - actions.append({ - "device_id": stirrer_id, - "action_name": "start_stir", - "action_kwargs": {"vessel": {"id": vessel_id}, "stir_speed": stir_speed} - }) - -# 转移阶段(复用已有协议) -pump_actions = generate_pump_protocol_with_rinsing( - G=G, from_vessel=solvent_vessel, to_vessel=vessel_id, volume=volume -) -actions.extend(pump_actions) - -# 等待阶段 -if heatchill_id: - actions.append({ - "device_id": heatchill_id, - "action_name": "heat_chill", - "action_kwargs": {"vessel": {"id": vessel_id}, "temp": temp, "time": time} - }) -else: - actions.append({"action_name": "wait", "action_kwargs": {"time": time}}) -``` - ---- - -## 5. 关键路径 - -| 内容 | 路径 | -|------|------| -| 协议执行回调 | `unilabos/ros/nodes/presets/workstation.py` | -| ROS 消息映射 | `unilabos/ros/msgs/message_converter.py` | -| 物理拓扑图 | `unilabos/resources/graphio.py` (`physical_setup_graph`) | -| 单位解析 | `unilabos/compile/utils/unit_parser.py` | -| 容器解析 | `unilabos/compile/utils/vessel_parser.py` | -| 溶解协议(组合示例) | `unilabos/compile/dissolve_protocol.py` | diff --git a/.cursor/skills/add-workstation/templates.md b/.cursor/skills/add-workstation/templates.md deleted file mode 100644 index 3c477835..00000000 --- a/.cursor/skills/add-workstation/templates.md +++ /dev/null @@ -1,454 +0,0 @@ -# 工作站代码模板 - -本文件包含 SKILL.md 引用的所有代码模板。Agent 根据需要按需阅读。 - ---- - -## Template A:外部系统工作站 - -```python -import logging -from typing import Dict, Any, Optional, List -from pylabrobot.resources import Deck - -from unilabos.devices.workstation.workstation_base import WorkstationBase - -try: - from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode -except ImportError: - ROS2WorkstationNode = None - - -class MyWorkstation(WorkstationBase): - _ros_node: "ROS2WorkstationNode" - - def __init__( - self, - config: dict = None, - deck: Optional[Deck] = None, - protocol_type: list = None, - **kwargs, - ): - super().__init__(deck=deck, **kwargs) - self.config = config or {} - self.logger = logging.getLogger(f"MyWorkstation") - self.api_host = self.config.get("api_host", "") - self.api_key = self.config.get("api_key", "") - self._status = "Idle" - - def post_init(self, ros_node: "ROS2WorkstationNode") -> None: - super().post_init(ros_node) - - def _get_child_device(self, device_id: str): - return self._children.get(device_id) - - async def scheduler_start(self, **kwargs) -> Dict[str, Any]: - return {"success": True} - - async def create_order(self, json_str: str, **kwargs) -> Dict[str, Any]: - return {"success": True} - - @property - def workflow_sequence(self) -> str: - return "[]" - - @property - def material_info(self) -> str: - return "{}" -``` - ---- - -## Template B:PLC/Modbus 硬件控制工作站 - -```python -import os -import logging -from typing import Dict, Any, Optional -from pylabrobot.resources import Deck - -from unilabos.devices.workstation.workstation_base import WorkstationBase -from unilabos.device_comms.modbus_plc.client import ( - TCPClient, BaseClient, ModbusWorkflow, WorkflowAction, -) -from unilabos.device_comms.modbus_plc.modbus import ( - Base as ModbusNodeBase, DataType, WorderOrder, -) - -try: - from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode -except ImportError: - ROS2WorkstationNode = None - - -class MyHardwareWorkstation(WorkstationBase): - _ros_node: "ROS2WorkstationNode" - - def __init__( - self, - config: dict = None, - deck: Optional[Deck] = None, - address: str = "192.168.1.100", - port: str = "502", - debug_mode: bool = False, - *args, - **kwargs, - ): - super().__init__(deck=deck, *args, **kwargs) - self.config = config or {} - self.address = address - self.port = int(port) - self.debug_mode = debug_mode - self.logger = logging.getLogger("MyHardwareWorkstation") - - if not debug_mode: - modbus_client = TCPClient(addr=self.address, port=self.port) - modbus_client.client.connect() - csv_path = os.path.join(os.path.dirname(__file__), 'register_map.csv') - self.nodes = BaseClient.load_csv(csv_path) - self.client = modbus_client.register_node_list(self.nodes) - else: - self.client = None - - def _read_coil(self, name: str) -> bool: - if self.debug_mode: - return False - result, err = self.client.use_node(name).read(1) - return result[0] if not err else False - - def _write_coil(self, name: str, value: bool): - if not self.debug_mode: - self.client.use_node(name).write(value) - - def _read_register(self, name: str, data_type: DataType = DataType.INT16): - if self.debug_mode: - return 0 - result, err = self.client.use_node(name).read( - 2 if data_type == DataType.FLOAT32 else 1 - ) - return result if not err else 0 - - def _write_register(self, name: str, value, data_type: DataType = DataType.FLOAT32): - if not self.debug_mode: - self.client.use_node(name).write( - value, data_type=data_type, word_order=WorderOrder.LITTLE - ) - - async def start_process(self, **kwargs) -> Dict[str, Any]: - self._write_coil('COIL_SYS_START_CMD', True) - return {"success": True} - - async def stop_process(self, **kwargs) -> Dict[str, Any]: - self._write_coil('COIL_SYS_STOP_CMD', True) - return {"success": True} - - @property - def sys_status(self) -> str: - return str(self._read_coil("COIL_SYS_START_STATUS")) -``` - -### PLC 寄存器映射 CSV 格式 - -```csv -Name,DataType,InitValue,Comment,Attribute,DeviceType,Address, -COIL_SYS_START_CMD,BOOL,,系统启动命令,,coil,8010, -COIL_SYS_STOP_CMD,BOOL,,系统停止命令,,coil,8020, -REG_MSG_ELECTROLYTE_VOLUME,INT16,,电解液体积,,hold_register,11004, -REG_DATA_OPEN_CIRCUIT_VOLTAGE,FLOAT32,,开路电压,,hold_register,10002, -``` - -命名约定:`COIL_` 线圈 / `REG_MSG_` 命令寄存器 / `REG_DATA_` 数据寄存器 / `_CMD` 写入 / `_STATUS` 读取 - ---- - -## Template C:Protocol 工作站 - -```python -from typing import List, Optional -from pylabrobot.resources import Resource as PLRResource -from unilabos.devices.workstation.workstation_base import ProtocolNode - - -class MyProtocolStation(ProtocolNode): - def __init__( - self, - protocol_type: List[str], - deck: Optional[PLRResource] = None, - *args, - **kwargs, - ): - super().__init__(protocol_type=protocol_type, deck=deck, *args, **kwargs) -``` - -> 通常不需要自定义类,直接在注册表和图文件中配置 `ProtocolNode` + `protocol_type` 即可。 - ---- - -## 子设备模板 - -### 驱动类 - -```python -class MyReactor: - def __init__(self, **kwargs): - self.data = { - "temperature": 0.0, - "status": "Idle", - } - - async def update_metrics(self, metrics_json: str, **kwargs): - import json - metrics = json.loads(metrics_json) - self.data["temperature"] = metrics.get("temperature", 0.0) - self.data["status"] = metrics.get("status", "Idle") - return {"success": True} -``` - -### 注册表 - -```yaml -reaction_station.reactor: - category: - - reactor - - my_workstation - class: - module: unilabos.devices.workstation.my_station.my_station:MyReactor - type: python - status_types: - temperature: Float64 - status: String - action_value_mappings: - auto-update_metrics: - type: UniLabJsonCommandAsync - goal: - metrics_json: json_str - result: - success: success -``` - -> `auto-` 前缀动作不创建 ActionClient,仅供工作站驱动内部调用。 - -### 图文件节点 - -```json -{ - "id": "reactor_1", - "name": "reactor_1", - "children": [], - "parent": "my_station", - "type": "device", - "class": "reaction_station.reactor", - "position": {"x": 1150, "y": 300, "z": 0}, - "config": {}, - "data": {} -} -``` - -### 代码中访问子设备 - -```python -child_node = self._children.get("reactor_1") -child_node.driver_instance.update_metrics(data) - -child = self._ros_node.sub_devices.get("reactor_1") -child.driver_instance.data["temperature"] -``` - -### 硬件代理配置 - -通信设备节点(ID 以 `serial_` 或 `io_` 开头): - -```json -{ - "id": "serial_port_1", - "parent": "my_station", - "type": "device", - "class": "serial_device", - "config": { - "hardware_interface": { - "name": "hardware_interface_name", - "read": "read_method", - "write": "write_method" - } - } -} -``` - -子设备注册表中声明代理: - -```yaml -my_sub_device: - class: - hardware_interface: - name: hardware_interface_name # 属性名,值为通信设备 ID - read: read_from_device # 注入的读方法 - write: write_to_device # 注入的写方法 -``` - -ROS 节点初始化时自动检测 `name` 属性值是否为另一子设备 ID,若是则注入通信设备的 read/write 方法。 - ---- - -## 注册表完整配置 - -```yaml -my_workstation: - description: "我的工作站" - version: "1.0.0" - category: - - workstation - - my_category - class: - module: unilabos.devices.workstation.my_station.my_station:MyWorkstation - type: python - status_types: - workflow_sequence: String - material_info: String - action_value_mappings: - scheduler_start: - type: UniLabJsonCommandAsync - goal: {} - result: - success: success - create_order: - type: UniLabJsonCommandAsync - goal: - json_str: json_str - result: - success: success - init_param_schema: - config: - type: object - deck: - type: object - protocol_type: - type: array -``` - ---- - -## 物料资源模板 - -### Bottle 工厂函数 - -```python -from unilabos.resources.itemized_carrier import Bottle - - -def My_Reagent_Bottle( - name: str, - diameter: float = 70.0, - height: float = 120.0, - max_volume: float = 500000.0, # 单位 μL(500mL = 500000) - barcode: str = None, -) -> Bottle: - return Bottle( - name=name, diameter=diameter, height=height, - max_volume=max_volume, barcode=barcode, - model="My_Reagent_Bottle", - ) -``` - -### BottleCarrier 工厂函数 - -```python -from pylabrobot.resources import ResourceHolder -from pylabrobot.resources.carrier import create_ordered_items_2d -from unilabos.resources.itemized_carrier import BottleCarrier - - -def My_6SlotCarrier(name: str) -> BottleCarrier: - sites = create_ordered_items_2d( - klass=ResourceHolder, - num_items_x=3, num_items_y=2, - dx=10.0, dy=10.0, dz=5.0, - item_dx=42.0, item_dy=35.0, - size_x=20.0, size_y=20.0, size_z=50.0, - ) - carrier = BottleCarrier( - name=name, size_x=146.0, size_y=80.0, size_z=55.0, - sites=sites, model="My_6SlotCarrier", - ) - carrier.num_items_x = 3 - carrier.num_items_y = 2 - carrier.num_items_z = 1 - return carrier -``` - -### WareHouse 工厂函数 - -```python -from unilabos.resources.warehouse import warehouse_factory - - -def my_warehouse_4x4(name: str) -> "WareHouse": - return warehouse_factory( - name=name, - num_items_x=4, num_items_y=4, num_items_z=1, - dx=137.0, dy=96.0, dz=120.0, - item_dx=137.0, item_dy=125.0, item_dz=10.0, - resource_size_x=127.0, resource_size_y=85.0, resource_size_z=100.0, - model="my_warehouse_4x4", - ) -``` - -### Deck 类 - -```python -from pylabrobot.resources import Deck, Coordinate - - -class MyStation_Deck(Deck): - def __init__( - self, - name: str = "MyStation_Deck", - size_x: float = 2700.0, - size_y: float = 1080.0, - size_z: float = 1500.0, - category: str = "deck", - setup: bool = False, - ) -> None: - super().__init__(name=name, size_x=size_x, size_y=size_y, size_z=size_z) - if setup: - self.setup() - - def setup(self) -> None: - self.warehouses = { - "堆栈A": my_warehouse_4x4("堆栈A"), - "堆栈B": my_warehouse_4x4("堆栈B"), - } - self.warehouse_locations = { - "堆栈A": Coordinate(-200.0, 400.0, 0.0), - "堆栈B": Coordinate(2350.0, 400.0, 0.0), - } - for wh_name, wh in self.warehouses.items(): - self.assign_child_resource(wh, location=self.warehouse_locations[wh_name]) -``` - -### 资源注册表 - -```yaml -# unilabos/registry/resources//bottles.yaml -My_Reagent_Bottle: - category: [bottles] - class: - module: unilabos.resources.my_project.bottles:My_Reagent_Bottle - type: pylabrobot - -# unilabos/registry/resources//decks.yaml -MyStation_Deck: - category: [deck] - class: - module: unilabos.resources.my_project.decks:MyStation_Deck - type: pylabrobot - registry_type: resource -``` - -### PLR 扩展注册 - -新 Deck 类需要在 `unilabos/resources/plr_additional_res_reg.py` 中导入: - -```python -def register(): - from unilabos.resources.my_project.decks import MyStation_Deck -``` diff --git a/.cursor/skills/edit-experiment-graph/SKILL.md b/.cursor/skills/edit-experiment-graph/SKILL.md deleted file mode 100644 index fa236789..00000000 --- a/.cursor/skills/edit-experiment-graph/SKILL.md +++ /dev/null @@ -1,381 +0,0 @@ ---- -name: edit-experiment-graph -description: Guide for creating and editing experiment graph files in Uni-Lab-OS (创建/编辑实验组态图). Covers node types, link types, parent-child relationships, deck configuration, and common graph patterns. Use when the user wants to create a graph file, edit an experiment configuration, set up device topology, or mentions 图文件/graph/组态/拓扑/实验图/experiment JSON. ---- - -# 创建/编辑实验组态图 - -实验图(Graph File)定义设备拓扑、物理连接和物料配置。系统启动时加载图文件,初始化所有设备和连接关系。 - -路径:`unilabos/test/experiments/.json` - ---- - -## 第一步:确认需求 - -向用户确认: - -| 信息 | 说明 | -|------|------| -| 场景类型 | 单设备调试 / 多设备联调 / 工作站完整图 | -| 包含的设备 | 设备 ID、注册表 class 名、配置参数 | -| 连接关系 | 物理连接(管道)/ 通信连接(串口)/ 无连接 | -| 父子关系 | 是否有工作站包含子设备 | -| 物料需求 | 是否需要 Deck、容器、试剂瓶 | - ---- - -## 第二步:JSON 顶层结构 - -```json -{ - "nodes": [], - "links": [] -} -``` - -> `links` 也可写作 `edges`,加载时两者等效。 - ---- - -## 第三步:定义 Nodes - -### 节点字段 - -| 字段 | 类型 | 必需 | 默认值 | 说明 | -|------|------|------|--------|------| -| `id` | string | **是** | — | 节点唯一标识,links 和 children 中引用此值 | -| `class` | string | **是** | — | 对应注册表名(设备/资源 YAML 的 key),容器可为 `null` | -| `name` | string | 否 | 同 `id` | 显示名称,缺省时自动用 `id` | -| `type` | string | 否 | `"device"` | 节点类型(见下表),缺省时自动设为 `"device"` | -| `children` | string[] | 否 | `[]` | 子节点 ID 列表 | -| `parent` | string\|null | 否 | `null` | 父节点 ID,顶层设备为 `null` | -| `position` | object | 否 | `{x:0,y:0,z:0}` | 空间坐标 | -| `config` | object | 否 | `{}` | 传给驱动 `__init__` 的参数 | -| `data` | object | 否 | `{}` | 初始运行状态 | -| `size_x/y/z` | float | 否 | — | 节点物理尺寸(工作站节点常用) | - -> 非标准字段(如 `api_host`)会自动移入 `config`。 - -### 节点类型 - -| `type` | 用途 | `class` 要求 | -|--------|------|-------------| -| `device` | 设备(默认) | 注册表中的设备名 | -| `deck` | 工作台面 | Deck 工厂函数/类名 | -| `container` | 容器(烧瓶、反应釜) | `null` 或具体容器类名 | - -### 设备节点模板 - -```json -{ - "id": "my_device", - "name": "我的设备", - "children": [], - "parent": null, - "type": "device", - "class": "registry_device_name", - "position": {"x": 0, "y": 0, "z": 0}, - "config": { - "port": "/dev/ttyUSB0", - "baudrate": 115200 - }, - "data": { - "status": "Idle" - } -} -``` - -### 容器节点模板 - -容器用于协议系统中表示试剂瓶、反应釜等,`class` 通常为 `null`: - -```json -{ - "id": "flask_DMF", - "name": "DMF试剂瓶", - "children": [], - "parent": "my_station", - "type": "container", - "class": null, - "position": {"x": 200, "y": 500, "z": 0}, - "config": {"max_volume": 1000.0}, - "data": { - "liquid": [{"liquid_type": "DMF", "liquid_volume": 800.0}] - } -} -``` - -### Deck 节点模板 - -```json -{ - "id": "my_deck", - "name": "my_deck", - "children": [], - "parent": "my_station", - "type": "deck", - "class": "MyStation_Deck", - "position": {"x": 0, "y": 0, "z": 0}, - "config": { - "type": "MyStation_Deck", - "setup": true, - "rotation": {"x": 0, "y": 0, "z": 0, "type": "Rotation"} - }, - "data": {} -} -``` - ---- - -## 第四步:定义 Links - -### Link 字段 - -| 字段 | 类型 | 说明 | -|------|------|------| -| `source` | string | 源节点 ID | -| `target` | string | 目标节点 ID | -| `type` | string | `"physical"` / `"fluid"` / `"communication"` | -| `port` | object | 端口映射 `{source_id: "port_name", target_id: "port_name"}` | - -### 物理/流体连接 - -设备间的管道连接,协议系统用此查找路径: - -```json -{ - "source": "multiway_valve_1", - "target": "flask_DMF", - "type": "fluid", - "port": { - "multiway_valve_1": "2", - "flask_DMF": "outlet" - } -} -``` - -### 通信连接 - -设备间的串口/IO 通信代理,加载时自动将端口信息写入目标设备 config: - -```json -{ - "source": "pump_1", - "target": "serial_device", - "type": "communication", - "port": { - "pump_1": "port", - "serial_device": "port" - } -} -``` - ---- - -## 第五步:父子关系与工作站配置 - -### 工作站 + 子设备 - -工作站节点的 `children` 列出所有子节点 ID,子节点的 `parent` 指向工作站: - -```json -{ - "id": "my_station", - "children": ["my_deck", "pump_1", "valve_1", "reactor_1"], - "parent": null, - "type": "device", - "class": "workstation", - "config": { - "protocol_type": ["PumpTransferProtocol", "CleanProtocol"] - } -} -``` - -### 工作站 + Deck 引用 - -工作站节点中通过 `deck` 字段引用 Deck: - -```json -{ - "id": "my_station", - "children": ["my_deck", "sub_device_1"], - "deck": { - "data": { - "_resource_child_name": "my_deck", - "_resource_type": "unilabos.resources.my_module.decks:MyDeck" - } - } -} -``` - -**关键约束:** -- `_resource_child_name` 必须与 Deck 节点的 `id` 一致 -- `_resource_type` 为 Deck 类/工厂函数的完整 Python 路径 - ---- - -## 常见图模式 - -### 模式 A:单设备调试 - -最简形式,一个设备节点,无连接: - -```json -{ - "nodes": [ - { - "id": "my_device", - "name": "my_device", - "children": [], - "parent": null, - "type": "device", - "class": "motor.zdt_x42", - "position": {"x": 0, "y": 0, "z": 0}, - "config": {"port": "/dev/ttyUSB0", "baudrate": 115200}, - "data": {"status": "idle"} - } - ], - "links": [] -} -``` - -### 模式 B:Protocol 工作站(泵+阀+容器) - -工作站配合泵、阀、容器和物理连接,用于协议编译: - -```json -{ - "nodes": [ - { - "id": "station", "name": "协议工作站", - "class": "workstation", "type": "device", "parent": null, - "children": ["pump", "valve", "flask_solvent", "reactor", "waste"], - "config": {"protocol_type": ["PumpTransferProtocol"]} - }, - {"id": "pump", "name": "转移泵", "class": "virtual_transfer_pump", - "type": "device", "parent": "station", - "config": {"port": "VIRTUAL", "max_volume": 25.0}, - "data": {"status": "Idle", "position": 0.0, "valve_position": "0"}}, - {"id": "valve", "name": "多通阀", "class": "virtual_multiway_valve", - "type": "device", "parent": "station", - "config": {"port": "VIRTUAL", "positions": 8}}, - {"id": "flask_solvent", "name": "溶剂瓶", "type": "container", - "class": null, "parent": "station", - "config": {"max_volume": 1000.0}, - "data": {"liquid": [{"liquid_type": "DMF", "liquid_volume": 500}]}}, - {"id": "reactor", "name": "反应器", "type": "container", - "class": null, "parent": "station"}, - {"id": "waste", "name": "废液瓶", "type": "container", - "class": null, "parent": "station"} - ], - "links": [ - {"source": "pump", "target": "valve", "type": "fluid", - "port": {"pump": "transferpump", "valve": "transferpump"}}, - {"source": "valve", "target": "flask_solvent", "type": "fluid", - "port": {"valve": "1", "flask_solvent": "outlet"}}, - {"source": "valve", "target": "reactor", "type": "fluid", - "port": {"valve": "2", "reactor": "inlet"}}, - {"source": "valve", "target": "waste", "type": "fluid", - "port": {"valve": "3", "waste": "inlet"}} - ] -} -``` - -### 模式 C:外部系统工作站 + Deck - -```json -{ - "nodes": [ - { - "id": "bioyond_station", "class": "reaction_station.bioyond", - "parent": null, "children": ["bioyond_deck"], - "config": { - "api_host": "http://192.168.1.100:8080", - "api_key": "YOUR_KEY", - "material_type_mappings": {}, - "warehouse_mapping": {} - }, - "deck": { - "data": { - "_resource_child_name": "bioyond_deck", - "_resource_type": "unilabos.resources.bioyond.decks:BIOYOND_PolymerReactionStation_Deck" - } - } - }, - { - "id": "bioyond_deck", "class": "BIOYOND_PolymerReactionStation_Deck", - "parent": "bioyond_station", "type": "deck", - "config": {"type": "BIOYOND_PolymerReactionStation_Deck", "setup": true} - } - ], - "links": [] -} -``` - -### 模式 D:通信代理(串口设备) - -泵通过串口设备通信,使用 `communication` 类型的 link。加载时系统会自动将串口端口信息写入泵的 `config`: - -```json -{ - "nodes": [ - {"id": "station", "name": "工作站", "type": "device", - "class": "workstation", "parent": null, - "children": ["serial_1", "pump_1"]}, - {"id": "serial_1", "name": "串口", "type": "device", - "class": "serial", "parent": "station", - "config": {"port": "COM7", "baudrate": 9600}}, - {"id": "pump_1", "name": "注射泵", "type": "device", - "class": "syringe_pump_with_valve.runze.SY03B-T08", "parent": "station"} - ], - "links": [ - {"source": "pump_1", "target": "serial_1", "type": "communication", - "port": {"pump_1": "port", "serial_1": "port"}} - ] -} -``` - ---- - -## 验证 - -```bash -# 启动测试 -unilab -g unilabos/test/experiments/.json --complete_registry - -# 仅检查注册表 -python -m unilabos --check_mode --skip_env_check -``` - ---- - -## 高级模式 - -处理复杂图文件时,详见 [reference.md](reference.md):ResourceDict 完整字段 schema、Pose 标准化规则、Handle 验证机制、GraphML 格式支持、外部系统工作站完整 config 结构。 - ---- - -## 常见错误 - -| 错误 | 原因 | 修复 | -|------|------|------| -| `class` 找不到 | 注册表中无此设备名 | 在 `unilabos/registry/devices/` 或 `resources/` 中搜索正确名称 | -| children/parent 不一致 | 子节点 `parent` 与父节点 `children` 不匹配 | 确保双向一致 | -| `_resource_child_name` 不匹配 | Deck 引用名与 Deck 节点 `id` 不同 | 保持一致 | -| Link 端口错误 | `port` 中的 key 不是 source/target 的 `id` | key 必须是对应节点的 `id` | -| 重复 UUID | 多个节点有相同 `uuid` | 删除或修改 UUID | - ---- - -## 参考路径 - -| 内容 | 路径 | -|------|------| -| 图文件目录 | `unilabos/test/experiments/` | -| 协议测试站 | `unilabos/test/experiments/Protocol_Test_Station/` | -| 图加载代码 | `unilabos/resources/graphio.py` | -| 节点模型 | `unilabos/resources/resource_tracker.py` | -| 设备注册表 | `unilabos/registry/devices/` | -| 资源注册表 | `unilabos/registry/resources/` | -| 用户文档 | `docs/user_guide/graph_files.md` | diff --git a/.cursor/skills/edit-experiment-graph/reference.md b/.cursor/skills/edit-experiment-graph/reference.md deleted file mode 100644 index 8582acd4..00000000 --- a/.cursor/skills/edit-experiment-graph/reference.md +++ /dev/null @@ -1,255 +0,0 @@ -# 实验图高级参考 - -本文件是 SKILL.md 的补充,包含 ResourceDict 完整 schema、Handle 验证、GraphML 格式、Pose 标准化规则和复杂图文件结构。Agent 在需要处理这些场景时按需阅读。 - ---- - -## 1. ResourceDict 完整字段 - -`unilabos/resources/resource_tracker.py` 中定义的节点数据模型: - -| 字段 | 类型 | 别名 | 说明 | -|------|------|------|------| -| `id` | `str` | — | 节点唯一标识 | -| `uuid` | `str` | — | 全局唯一标识 | -| `name` | `str` | — | 显示名称 | -| `description` | `str` | — | 描述(默认 `""` ) | -| `resource_schema` | `Dict[str, Any]` | `schema` | 资源 schema | -| `model` | `Dict[str, Any]` | — | 3D 模型信息 | -| `icon` | `str` | — | 图标(默认 `""` ) | -| `parent_uuid` | `Optional[str]` | — | 父节点 UUID | -| `parent` | `Optional[ResourceDict]` | — | 父节点引用(序列化时 exclude) | -| `type` | `Union[Literal["device"], str]` | — | 节点类型 | -| `klass` | `str` | `class` | 注册表类名 | -| `pose` | `ResourceDictPosition` | — | 位姿信息 | -| `config` | `Dict[str, Any]` | — | 配置参数 | -| `data` | `Dict[str, Any]` | — | 运行时数据 | -| `extra` | `Dict[str, Any]` | — | 扩展数据 | - -### Pose 完整结构(ResourceDictPosition) - -| 字段 | 类型 | 默认值 | 说明 | -|------|------|--------|------| -| `size` | `{width, height, depth}` | `{0,0,0}` | 节点尺寸 | -| `scale` | `{x, y, z}` | `{1,1,1}` | 缩放比例 | -| `layout` | `"2d"/"x-y"/"z-y"/"x-z"` | `"x-y"` | 布局方向 | -| `position` | `{x, y, z}` | `{0,0,0}` | 2D 位置 | -| `position3d` | `{x, y, z}` | `{0,0,0}` | 3D 位置 | -| `rotation` | `{x, y, z}` | `{0,0,0}` | 旋转角度 | -| `cross_section_type` | `"rectangle"/"circle"/"rounded_rectangle"` | `"rectangle"` | 横截面形状 | - ---- - -## 2. Position / Pose 标准化规则 - -图文件中的 `position` 有多种写法,加载时自动标准化。 - -### 输入格式兼容 - -```json -// 格式 A: 直接 {x, y, z}(最常用) -"position": {"x": 100, "y": 200, "z": 0} - -// 格式 B: 嵌套 position -"position": {"position": {"x": 100, "y": 200, "z": 0}} - -// 格式 C: 使用 pose 字段 -"pose": {"position": {"x": 100, "y": 200, "z": 0}} - -// 格式 D: 顶层 x, y, z(无 position 字段) -"x": 100, "y": 200, "z": 0 -``` - -### 标准化流程 - -1. **graphio.py `canonicalize_nodes_data`**:若 `position` 不是 dict,从节点顶层提取 `x/y/z` 填入 `pose.position` -2. **resource_tracker.py `get_resource_instance_from_dict`**:若 `position.x` 存在(旧格式),转为 `{"position": {"x":..., "y":..., "z":...}}` -3. `pose.size` 从 `config.size_x/size_y/size_z` 自动填充 - ---- - -## 3. Handle 验证 - -启动时系统验证 link 中的 `sourceHandle` / `targetHandle` 是否在注册表的 `handles` 中定义。 - -```python -# unilabos/app/main.py (约 449-481 行) -source_handler_keys = [ - h["handler_key"] for h in materials[source_node.klass]["handles"] - if h["io_type"] == "source" -] -target_handler_keys = [ - h["handler_key"] for h in materials[target_node.klass]["handles"] - if h["io_type"] == "target" -] -if source_handle not in source_handler_keys: - print_status(f"节点 {source_node.id} 的source端点 {source_handle} 不存在", "error") - resource_edge_info.pop(...) # 移除非法 link -``` - -**Handle 定义在注册表 YAML 中:** - -```yaml -my_device: - handles: - - handler_key: access - io_type: target - data_type: fluid - side: NORTH - label: access -``` - -> 大多数简单设备不定义 handles,此验证仅对有 `sourceHandle`/`targetHandle` 的 link 生效。 - ---- - -## 4. GraphML 格式支持 - -除 JSON 外,系统也支持 GraphML 格式(`unilabos/resources/graphio.py::read_graphml`)。 - -### 与 JSON 的关键差异 - -| 特性 | JSON | GraphML | -|------|------|---------| -| 父子关系 | `parent`/`children` 字段 | `::` 分隔的节点 ID(如 `station::pump_1`) | -| 加载后 | 直接解析 | 先 `nx.read_graphml` 再转 JSON 格式 | -| 输出 | 不生成副本 | 自动生成等价的 `.json` 文件 | - -### GraphML 转换流程 - -``` -nx.read_graphml(file) - ↓ 用 label 重映射节点名 - ↓ 从 "::" 推断 parent_relation -nx.relabel_nodes + nx.node_link_data - ↓ canonicalize_nodes_data + canonicalize_links_ports - ↓ 写出等价 JSON 文件 -physical_setup_graph + handle_communications -``` - ---- - -## 5. 复杂图文件结构示例 - -### 外部系统工作站完整 config - -以 `reaction_station_bioyond.json` 为例,工作站 `config` 中的关键字段: - -```json -{ - "config": { - "api_key": "DE9BDDA0", - "api_host": "http://172.21.103.36:45388", - - "workflow_mappings": { - "scheduler_start": {"workflow": "start", "params": {}}, - "create_order": {"workflow": "create_order", "params": {}} - }, - - "material_type_mappings": { - "BIOYOND_PolymerStation_Reactor": ["反应器", "type-uuid-here"], - "BIOYOND_PolymerStation_1BottleCarrier": ["试剂瓶", "type-uuid-here"] - }, - - "warehouse_mapping": { - "堆栈1左": { - "uuid": "warehouse-uuid-here", - "site_uuids": { - "A01": "site-uuid-1", - "A02": "site-uuid-2" - } - } - }, - - "http_service_config": { - "enabled": true, - "host": "0.0.0.0", - "port": 45399, - "routes": ["/callback/workflow", "/callback/material"] - }, - - "deck": { - "data": { - "_resource_child_name": "Bioyond_Deck", - "_resource_type": "unilabos.resources.bioyond.decks:BIOYOND_PolymerReactionStation_Deck" - } - }, - - "size_x": 2700.0, - "size_y": 1080.0, - "size_z": 2500.0, - "protocol_type": [], - "data": {} - } -} -``` - -### 子设备 Reactor 节点 - -```json -{ - "id": "reactor_1", - "name": "reactor_1", - "parent": "reaction_station_bioyond", - "type": "device", - "class": "bioyond_reactor", - "position": {"x": 1150, "y": 300, "z": 0}, - "config": { - "reactor_index": 0, - "bioyond_workflow_key": "reactor_1" - }, - "data": {} -} -``` - -### Deck 节点 - -```json -{ - "id": "Bioyond_Deck", - "name": "Bioyond_Deck", - "parent": "reaction_station_bioyond", - "type": "deck", - "class": "BIOYOND_PolymerReactionStation_Deck", - "position": {"x": 0, "y": 0, "z": 0}, - "config": { - "type": "BIOYOND_PolymerReactionStation_Deck", - "setup": true, - "rotation": {"x": 0, "y": 0, "z": 0, "type": "Rotation"} - }, - "data": {} -} -``` - ---- - -## 6. Link 端口标准化 - -`graphio.py::canonicalize_links_ports` 处理 `port` 字段的多种格式: - -```python -# 输入: 字符串格式 "(A,B)" -"port": "(pump_1, valve_1)" -# 输出: 字典格式 -"port": {"source_id": "pump_1", "target_id": "valve_1"} - -# 输入: 已是字典 -"port": {"pump_1": "port", "serial_1": "port"} -# 保持不变 - -# 输入: 无 port 字段 -# 自动补充空 port -``` - ---- - -## 7. 关键路径 - -| 内容 | 路径 | -|------|------| -| ResourceDict 模型 | `unilabos/resources/resource_tracker.py` | -| 图加载 + 标准化 | `unilabos/resources/graphio.py` | -| Handle 验证 | `unilabos/app/main.py` (449-481 行) | -| 反应站图文件 | `unilabos/test/experiments/reaction_station_bioyond.json` | -| 配液站图文件 | `unilabos/test/experiments/dispensing_station_bioyond.json` | -| 用户文档 | `docs/user_guide/graph_files.md` | diff --git a/.cursor/skills/validate-device/SKILL.md b/.cursor/skills/validate-device/SKILL.md deleted file mode 100644 index 26334552..00000000 --- a/.cursor/skills/validate-device/SKILL.md +++ /dev/null @@ -1,180 +0,0 @@ ---- -name: validate-device -description: Validate Uni-Lab-OS device or workstation implementations against interface contracts and project rules. Use when users ask to validate device code, audit compliance, check contract compatibility, review registry alignment, or mention 验证设备/检查设备/设备审计/接口对齐/device validation/check compliance/audit device/workstation validation. Prioritize docs/ai_guides/add_device.md and unilabos/registry/devices/ as validation baselines. ---- - -# 设备合规验证 (Device Validation) - -验证设备实现是否符合 Uni-Lab-OS 的接口契约和编码标准。 - -## 第一步:确定验证目标 - -先确定验证范围: -- 单文件:`unilabos/devices/.../*.py` -- 同类别批量:如 `pump_and_valve`、`temperature` -- 自动检测:用户刚改动设备代码时,从上下文与 git 变更推断目标 - -## 第二步:读取必要文件(按优先级) - -使用 `ReadFile` 工具读取以下文件,并按优先级作为验证基线: - -1. **设备实现文件** - 待验证的 Python 设备类 -2. **对应的 YAML 注册表** - `unilabos/registry/devices/` 下的对应文件 -3. **设备接入指南** - `docs/ai_guides/add_device.md`(权威规则与接口快照) -4. **CLAUDE.md / AGENTS.md** - 项目规则(关键硬约束) -5. **同类设备参考** - `unilabos/registry/devices/` 中同类别设备(优先最新仓库内容) - -如果规则冲突,优先级为:**仓库当前注册表与 `add_device.md` > `CLAUDE.md/AGENTS.md` 中通用约束 > 历史示例**。 - -## 第三步:执行验证检查 - -按顺序执行并记录证据(文件路径 + 关键信息): - -### 检查 1:参数名契约验证 - -**规则:** 动作方法的参数名是接口契约,不可重命名。 - -**检查内容:** -- 扫描公开动作方法(不以 `_` 开头) -- 从 YAML `action_value_mappings` 提取动作参数名 -- 对比 Python 方法签名参数名是否完全一致(不能改名) - -### 检查 2:status 字符串一致性 - -**规则:** status 字符串必须与同类已有设备一致。 - -**检查内容:** -- 收集 `self.data["status"]` 所有赋值 -- 对比同类设备与注册表中的标准值 -- 标记中文状态、大小写不一致和自定义状态 - -### 检查 3:self.data 初始化完整性 - -**规则:** `self.data` 必须在 `__init__` 中预填充所有属性字段。 - -**检查内容:** -- 检查 `__init__` 中 `self.data` 初始化 -- 提取 `@property` 与 YAML `status_types` 字段 -- 校验 `self.data` 已预填充全部字段,且不是空字典 - -### 检查 4:异步 sleep 使用规范 - -**规则:** 异步方法中使用 `await self._ros_node.sleep()`,禁止 `time.sleep()` 和 `asyncio.sleep()`。 - -**检查内容:** -- 扫描所有 `async def` -- 禁止:`time.sleep(...)` / `asyncio.sleep(...)` -- 正确:`await self._ros_node.sleep(...)` - -### 检查 5:YAML 与代码接口对齐 - -**规则:** 设备实现必须与 YAML 注册表定义的接口完全匹配。 - -**检查内容:** -- 属性对齐:`status_types` 字段都有对应 `@property` -- 动作对齐:`action_value_mappings` 中非 `auto-` 动作都有实现 -- 返回值检查:优先 `bool` 或 `Dict[str, Any]` - -### 检查 6:串口响应解析健壮性(按需适用) - -**规则:** 禁止用硬编码索引解析串口响应,必须先定位帧起始标记。 - -**检查内容:** -- 仅在存在串口/二进制协议代码时执行 -- 禁止直接按固定索引解析原始响应 -- 必须先定位帧起始标记(如 `/`、`0xFE`) - -### 检查 7:代码质量检查(补充项,不计入强制合规) - -**可选检查项:** -- 是否有 `post_init` 方法接收 `ros_node` -- 是否有 `initialize` 和 `cleanup` 方法 -- 类型转换:参数是否显式转换(`float(temp)`、`int(position)`) -- 错误处理:是否有基本的异常捕获 -- 日志记录:是否使用 `self.logger` - -## 第四步:生成合规报告 - -使用以下结构输出(简洁、可执行): - -```markdown -# 设备验证报告 - -**设备:** unilabos/devices/pump_and_valve/my_pump.py -**类名:** MyPump -**类别:** pump_and_valve -**验证时间:** {{date}} - -## 总体评分(仅统计适用检查) - -- ✅ 通过检查:5/6(Applicable) -- ❌ 失败检查:1/6 -- ⏭️ 跳过检查:1 项(Not Applicable) -- ⚠️ 警告:3 项 - -## 详细结果 - -### ❌ 必须修复(Blocking) -1. [检查名] 问题描述(文件与位置) - - 当前:`...` - - 应改:`...` - -### ⚠️ 建议修复(Non-blocking) -1. [检查名] 问题描述(文件与位置) - - 建议:`...` - -### ✅ 通过项 -- 检查 1:... -- 检查 3:... - -## 结论 - -该设备实现存在 **N 个必须修复问题**。修复后复检。 -``` - -## 第五步:询问是否自动修复 - -如果发现了问题,询问用户: - -``` -发现了 2 个必须修复的问题。是否需要我自动修复这些问题? - -选项: -1. 自动修复所有问题 -2. 仅修复高优先级问题 -3. 手动修复(我会提供详细指导) -4. 仅查看报告,不修复 -``` - -如果用户选择自动修复,使用当前环境可用的编辑工具(优先 `ApplyPatch`)逐个修复问题,并在修复后重新验证。 - -## 注意事项 - -1. **不要过度严格** - 某些警告可能是合理的设计选择,询问用户确认 -2. **提供上下文** - 解释为什么某个规则很重要 -3. **批量验证** - 如果验证多个设备,提供汇总报告 -4. **版本兼容** - 旧设备可能使用旧的约定,注意区分 - -## 常见问题 - -**Q: 如果找不到对应的 YAML 文件怎么办?** -A: 提示用户可能需要先创建 YAML 注册表,或者仅执行不依赖 YAML 的检查(如 status 字符串、async sleep)。 - -**Q: 如果设备是工作站(workstation)怎么办?** -A: 工作站有不同的验证规则,检查是否继承自 `WorkstationBase`,是否有 `deck` 配置等。 - -**Q: 如何处理虚拟设备(mock)?** -A: 虚拟设备可以放宽某些要求(如串口解析),但接口契约仍需遵守。 - -## 触发示例 - -用户可能这样说: -- "验证一下这个设备代码" -- "检查 my_pump.py 是否符合规范" -- "这个设备实现有什么问题吗" -- "帮我 audit 一下设备接口" -- "检查设备合规性" -- "validate device implementation" -- "验证这个 workstation 是否合规" -- "帮我检查设备实现和 registry 是否对齐" -- "做一次设备接口审计"