From fe501c965f5796374e77efcc97000f9aae0107c8 Mon Sep 17 00:00:00 2001 From: ZiWei <131428629+ZiWei09@users.noreply.github.com> Date: Wed, 11 Mar 2026 14:09:46 +0800 Subject: [PATCH] feat: Update workstation reference and templates with new PLC integration details and enhanced workflow mappings --- .cursor/skills/add-workstation/SKILL.md | 506 +++++--------------- .cursor/skills/add-workstation/reference.md | 485 ++++++++++++++++++- .cursor/skills/add-workstation/templates.md | 454 ++++++++++++++++++ 3 files changed, 1064 insertions(+), 381 deletions(-) create mode 100644 .cursor/skills/add-workstation/templates.md diff --git a/.cursor/skills/add-workstation/SKILL.md b/.cursor/skills/add-workstation/SKILL.md index 2762d4e4..f3e59b46 100644 --- a/.cursor/skills/add-workstation/SKILL.md +++ b/.cursor/skills/add-workstation/SKILL.md @@ -5,254 +5,89 @@ description: Guide for adding new workstations to Uni-Lab-OS (接入新工作站 # Uni-Lab-OS 工作站接入指南 -工作站(workstation)是组合多个子设备的大型设备,拥有独立的物料管理系统(PLR Deck)和工作流引擎。本指南覆盖从需求分析到验证的全流程。 +工作站是组合多个子设备的大型设备,拥有独立的物料管理系统(PLR Deck)和工作流引擎。 -> **前置知识**:工作站接入基于 `docs/ai_guides/add_device.md` 的通用设备接入框架,但有显著差异。阅读本指南前无需先读通用指南。 +> **完整代码模板**见 [templates.md](templates.md),**高级模式**见 [reference.md](reference.md)。 ## 第一步:确定工作站类型 -向用户确认以下信息: - -**Q1: 工作站的业务场景?** +向用户确认: | 类型 | 基类 | 适用场景 | 示例 | |------|------|----------|------| -| **Protocol 工作站** | `ProtocolNode` | 标准化学操作协议(过滤、转移、加热等) | FilterProtocolStation | -| **外部系统工作站** | `WorkstationBase` | 与外部 LIMS/MES 系统对接,有专属 API | BioyondStation | -| **硬件控制工作站** | `WorkstationBase` | 直接控制 PLC/硬件,无外部系统 | CoinCellAssembly | +| **Protocol** | `ProtocolNode` | 标准化学操作协议 | FilterProtocolStation | +| **外部系统** | `WorkstationBase` | 对接 LIMS/MES API | BioyondStation | +| **硬件控制** | `WorkstationBase` | 直接控制 PLC/硬件 | CoinCellAssembly | -**Q2: 工作站英文名称?**(如 `my_reaction_station`) - -**Q3: 与外部系统的交互方式?** - -| 方式 | 适用场景 | 需要的配置 | -|------|----------|-----------| -| 无外部系统 | Protocol 工作站、纯硬件控制 | 无 | -| HTTP API | LIMS/MES 系统(如 Bioyond) | `api_host`, `api_key` | -| Modbus TCP | PLC 控制 | `address`, `port` | -| OPC UA | 工业设备 | `url` | - -**Q4: 子设备组成?** -- 列出所有子设备(如反应器、泵、阀、传感器等) -- 哪些是已有设备类型?哪些需要新增? -- 子设备之间的硬件代理关系(如泵通过串口设备通信) - -**Q5: 物料管理需求?** -- 是否需要 Deck(物料面板)? -- 物料类型(plate、tip_rack、bottle 等) -- 是否需要与外部物料系统同步? +还需确认: +- 英文名称、通信方式(HTTP/Modbus/OPC UA/无) +- 子设备组成(哪些已有、哪些新增、硬件代理关系) +- 物料需求(是否需要 Deck、物料类型、是否需外部同步) --- ## 第二步:理解工作站架构 -工作站与普通设备的核心差异: - | 维度 | 普通设备 | 工作站 | |------|---------|--------| -| 基类 | 无(纯 Python 类) | `WorkstationBase` 或 `ProtocolNode` | +| 基类 | 纯 Python 类 | `WorkstationBase` / `ProtocolNode` | | ROS 节点 | `BaseROS2DeviceNode` | `ROS2WorkstationNode` | -| 状态管理 | `self.data` 字典 | 通常不用 `self.data`,用 `@property` 直接访问 | -| 子设备 | 无 | `children` 列表,通过 `self._children` 访问 | +| 状态管理 | `self.data` 字典 | `@property` 直接访问 | +| 子设备 | 无 | `self._children` / `self._ros_node.sub_devices` | | 物料 | 无 | `self.deck`(PLR Deck) | -| 图文件角色 | `parent: null` 或 `parent: ""` | `parent: null`,含 `children` 和 `deck` | ### 继承体系 -`WorkstationBase` (ABC) → `ProtocolNode` (通用协议) / `BioyondWorkstation` (→ ReactionStation, DispensingStation) / `CoinCellAssemblyWorkstation` (硬件控制) +``` +WorkstationBase (ABC) +├── BioyondWorkstation ← HTTP RPC + 资源同步 +│ ├── BioyondReactionStation +│ └── BioyondDispensingStation +├── CoinCellAssemblyWorkstation ← Modbus/PLC +└── ProtocolNode ← 标准化学协议 +``` -### ROS 层 +### 子设备初始化流程 -`ROS2WorkstationNode` 额外负责:初始化 children 子设备节点、为子设备创建 ActionClient、配置硬件代理、为 protocol_type 创建协议 ActionServer。 +`ROS2WorkstationNode.__init__` → 遍历 `children`(type=="device")→ `initialize_device_from_dict()` → 存入 `sub_devices` → 为每个动作创建 `ActionClient` → 识别通信设备(`serial_*`/`io_*`)→ `_setup_hardware_proxy()` --- ## 第三步:创建驱动文件 -文件路径:`unilabos/devices/workstation//.py` +路径:`unilabos/devices/workstation//.py` -### 模板 A:基于外部系统的工作站 +根据类型选择模板(完整代码见 [templates.md](templates.md)): -适用于与 LIMS/MES 等外部系统对接的场景。 +| 类型 | 模板 | 关键要素 | +|------|------|---------| +| 外部系统 | Template A | `config` 接收 API 配置,`post_init` 启动 RPC/HTTP 服务 | +| 硬件控制 | Template B | `TCPClient` + CSV 寄存器映射,`use_node()` 读写 | +| Protocol | Template C | 直接使用 `ProtocolNode`,通常不需要自定义类 | -```python -import logging -from typing import Dict, Any, Optional, List -from pylabrobot.resources import Deck - -from unilabos.devices.workstation.workstation_base import WorkstationBase - -try: - from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode -except ImportError: - ROS2WorkstationNode = None - - -class MyWorkstation(WorkstationBase): - """工作站描述""" - - _ros_node: "ROS2WorkstationNode" - - def __init__( - self, - config: dict = None, - deck: Optional[Deck] = None, - protocol_type: list = None, - **kwargs, - ): - super().__init__(deck=deck, **kwargs) - self.config = config or {} - self.logger = logging.getLogger(f"MyWorkstation") - - # 外部系统连接配置 - self.api_host = self.config.get("api_host", "") - self.api_key = self.config.get("api_key", "") - - # 工作站业务状态(不同于 self.data 模式) - self._status = "Idle" - - def post_init(self, ros_node: "ROS2WorkstationNode") -> None: - super().post_init(ros_node) - # 在这里启动后台服务、连接监控等 - - # ============ 子设备访问 ============ - - def _get_child_device(self, device_id: str): - """通过 ID 获取子设备节点""" - return self._children.get(device_id) - - # ============ 动作方法 ============ - - async def scheduler_start(self, **kwargs) -> Dict[str, Any]: - """启动调度器""" - return {"success": True} - - async def create_order(self, json_str: str, **kwargs) -> Dict[str, Any]: - """创建工单""" - return {"success": True} - - # ============ 属性 ============ - - @property - def workflow_sequence(self) -> str: - return "[]" - - @property - def material_info(self) -> str: - return "{}" -``` - -### 模板 B:基于硬件控制的工作站 - -适用于直接与 PLC/硬件通信的场景。 - -```python -import logging -from typing import Dict, Any, Optional -from pylabrobot.resources import Deck - -from unilabos.devices.workstation.workstation_base import WorkstationBase - -try: - from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode -except ImportError: - ROS2WorkstationNode = None - - -class MyHardwareWorkstation(WorkstationBase): - """硬件控制工作站""" - - _ros_node: "ROS2WorkstationNode" - - def __init__( - self, - config: dict = None, - deck: Optional[Deck] = None, - address: str = "192.168.1.100", - port: str = "502", - debug_mode: bool = False, - *args, - **kwargs, - ): - super().__init__(deck=deck, *args, **kwargs) - self.config = config or {} - self.address = address - self.port = int(port) - self.debug_mode = debug_mode - self.logger = logging.getLogger("MyHardwareWorkstation") - - # 初始化通信客户端 - if not debug_mode: - from unilabos.device_comms.modbus_plc.client import ModbusTcpClient - self.client = ModbusTcpClient(host=self.address, port=self.port) - else: - self.client = None - - def post_init(self, ros_node: "ROS2WorkstationNode") -> None: - super().post_init(ros_node) - - # ============ 硬件读写 ============ - - def _read_register(self, name: str): - """读取 Modbus 寄存器""" - if self.debug_mode: - return 0 - # 实际读取逻辑 - pass - - # ============ 动作方法 ============ - - async def start_process(self, **kwargs) -> Dict[str, Any]: - """启动加工流程""" - return {"success": True} - - async def stop_process(self, **kwargs) -> Dict[str, Any]: - """停止加工流程""" - return {"success": True} - - # ============ 属性(从硬件实时读取)============ - - @property - def sys_status(self) -> str: - return str(self._read_register("SYS_STATUS")) -``` - -### 模板 C:Protocol 工作站 - -适用于标准化学操作协议的场景,直接使用 `ProtocolNode`。 - -```python -from typing import List, Optional -from pylabrobot.resources import Resource as PLRResource - -from unilabos.devices.workstation.workstation_base import ProtocolNode - - -class MyProtocolStation(ProtocolNode): - """Protocol 工作站 — 使用标准化学操作协议""" - - def __init__( - self, - protocol_type: List[str], - deck: Optional[PLRResource] = None, - *args, - **kwargs, - ): - super().__init__(protocol_type=protocol_type, deck=deck, *args, **kwargs) -``` - -> Protocol 工作站通常不需要自定义驱动类,直接使用 `ProtocolNode` 并在注册表和图文件中配置 `protocol_type` 即可。 +**所有模板的 `__init__` 必须接受 `deck` 和 `**kwargs`。** --- -## 第四步:创建子设备驱动(如需要) +## 第四步:创建子设备(如需要) -工作站的子设备本身是独立设备。按 `docs/ai_guides/add_device.md` 的标准流程创建。 +子设备是独立设备,有自己的驱动类和注册表。完整模板见 [templates.md § 子设备模板](templates.md)。 -子设备的关键约束: -- 在图文件中 `parent` 指向工作站 ID -- 图文件中在工作站的 `children` 数组里列出 -- 如需硬件代理,在子设备的 `config.hardware_interface.name` 指向通信设备 ID +### 关键要点 + +1. **驱动类**:普通 Python 类,`self.data` 预填所有属性 +2. **注册表**:`category` 包含工作站标识,`auto-` 前缀动作不创建 ActionClient +3. **图文件**:`parent` 指向工作站 ID,`type: "device"` +4. **代码访问**:`self._children.get("reactor_1").driver_instance` + +### 硬件代理模式 + +当子设备需要通过通信设备(串口/IO)通信时: + +1. 通信设备 ID 必须以 `serial_` 或 `io_` 开头 +2. 子设备注册表中声明 `hardware_interface: {name, read, write}` +3. 子设备实例的 `name` 属性值 = 通信设备 ID +4. ROS 节点自动将通信设备的 read/write 方法注入到子设备上 --- @@ -260,7 +95,7 @@ class MyProtocolStation(ProtocolNode): 路径:`unilabos/registry/devices/.yaml` -### 最小配置 +**最小配置(`--complete_registry` 自动补全):** ```yaml my_workstation: @@ -271,92 +106,57 @@ my_workstation: type: python ``` -启动时 `--complete_registry` 自动补全 `status_types` 和 `action_value_mappings`。 - -### 完整配置参考 - -```yaml -my_workstation: - description: "我的工作站" - version: "1.0.0" - category: - - workstation - - my_category - class: - module: unilabos.devices.workstation.my_station.my_station:MyWorkstation - type: python - status_types: - workflow_sequence: String - material_info: String - action_value_mappings: - scheduler_start: - type: UniLabJsonCommandAsync - goal: {} - result: - success: success - create_order: - type: UniLabJsonCommandAsync - goal: - json_str: json_str - result: - success: success - init_param_schema: - config: - type: object - deck: - type: object - protocol_type: - type: array -``` - -### 子设备注册表 - -子设备有独立的注册表文件,需要在 `category` 中包含工作站标识: - -```yaml -my_reactor: - category: - - reactor - - my_workstation - class: - module: unilabos.devices.workstation.my_station.my_reactor:MyReactor - type: python -``` +**完整配置**见 [templates.md § 注册表完整配置](templates.md)。 --- -## 第六步:配置 Deck 资源(如需要) +## 第六步:配置物料系统(如需要) -如果工作站有物料管理需求,需要定义 Deck 类。 +物料层级:`Deck` → `WareHouse` → `ResourceHolder` (site) → `BottleCarrier` → `Bottle` -### 使用已有 Deck 类 +### 快速流程 -查看 `unilabos/resources/` 目录下是否有适用的 Deck 类。 +1. **创建 Bottle**(`unilabos/resources//bottles.py`)— 工厂函数,返回 `Bottle` 实例 +2. **创建 Carrier**(`.../bottle_carriers.py`)— 工厂函数,用 `create_ordered_items_2d` 定义槽位 +3. **创建 WareHouse**(`.../warehouses.py`)— 用 `warehouse_factory()` 创建堆栈 +4. **创建 Deck**(`.../decks.py`)— 继承 `pylabrobot.resources.Deck`,`setup()` 中放置 WareHouse +5. **注册表**(`unilabos/registry/resources//`)— `class.type: pylabrobot` +6. **PLR 扩展**(`unilabos/resources/plr_additional_res_reg.py`)— 导入新 Deck 类 -### 创建自定义 Deck +完整代码模板见 [templates.md § 物料资源模板](templates.md)。 -在 `unilabos/resources//decks.py` 中定义: +### 图文件中的 Deck 配置 -```python -from pylabrobot.resources import Deck -from pylabrobot.resources.coordinate import Coordinate +工作站节点引用 Deck: - -def MyStation_Deck(name: str = "MyStation_Deck") -> Deck: - deck = Deck(name=name, size_x=2700.0, size_y=1080.0, size_z=1500.0) - # 在 deck 上定义子资源位置(carrier、plate 等) - return deck +```json +"deck": { + "data": { + "_resource_child_name": "my_deck", + "_resource_type": "unilabos.resources.my_project.decks:MyStation_Deck" + } +} ``` -在 `unilabos/resources//` 下注册或通过注册表引用。 +Deck 子节点: + +```json +{ + "id": "my_deck", + "parent": "my_station", + "type": "deck", + "class": "MyStation_Deck", + "config": {"type": "MyStation_Deck", "setup": true, "rotation": {"x": 0, "y": 0, "z": 0, "type": "Rotation"}} +} +``` + +> **`_resource_child_name`** 必须与 Deck 节点的 `id` 一致。 --- ## 第七步:配置图文件 -图文件路径:`unilabos/test/experiments/.json` - -### 完整结构 +路径:`unilabos/test/experiments/.json` ```json { @@ -364,53 +164,19 @@ def MyStation_Deck(name: str = "MyStation_Deck") -> Deck: { "id": "my_station", "name": "my_station", - "children": ["my_deck", "sub_device_1", "sub_device_2"], + "children": ["my_deck", "sub_device_1"], "parent": null, "type": "device", "class": "my_workstation", "position": {"x": 0, "y": 0, "z": 0}, - "config": { - "api_host": "http://192.168.1.100:8080", - "api_key": "YOUR_KEY" - }, - "deck": { - "data": { - "_resource_child_name": "my_deck", - "_resource_type": "unilabos.resources.my_module.decks:MyStation_Deck" - } - }, - "size_x": 2700.0, - "size_y": 1080.0, - "size_z": 1500.0, + "config": {}, + "deck": {"data": {"_resource_child_name": "my_deck", "_resource_type": "...decks:MyStation_Deck"}}, + "size_x": 2700.0, "size_y": 1080.0, "size_z": 1500.0, "protocol_type": [], "data": {} }, - { - "id": "my_deck", - "name": "my_deck", - "children": [], - "parent": "my_station", - "type": "deck", - "class": "MyStation_Deck", - "position": {"x": 0, "y": 0, "z": 0}, - "config": { - "type": "MyStation_Deck", - "setup": true, - "rotation": {"x": 0, "y": 0, "z": 0, "type": "Rotation"} - }, - "data": {} - }, - { - "id": "sub_device_1", - "name": "sub_device_1", - "children": [], - "parent": "my_station", - "type": "device", - "class": "sub_device_registry_name", - "position": {"x": 100, "y": 0, "z": 0}, - "config": {}, - "data": {} - } + {"id": "my_deck", "parent": "my_station", "type": "deck", "class": "MyStation_Deck", "config": {"type": "MyStation_Deck", "setup": true}}, + {"id": "sub_device_1", "parent": "my_station", "type": "device", "class": "sub_device_class", "config": {}} ] } ``` @@ -419,82 +185,76 @@ def MyStation_Deck(name: str = "MyStation_Deck") -> Deck: | 字段 | 说明 | |------|------| -| `id` | 节点唯一标识,与 `children` 数组中的引用一致 | | `children` | 包含 deck ID 和所有子设备 ID | -| `parent` | 工作站节点为 `null`;子设备/deck 指向工作站 ID | -| `type` | 工作站和子设备为 `"device"`;deck 为 `"deck"` | -| `class` | 对应注册表中的设备名 | -| `deck.data._resource_child_name` | 必须与 deck 节点的 `id` 一致 | -| `deck.data._resource_type` | Deck 工厂函数的完整 Python 路径 | -| `protocol_type` | Protocol 工作站填入协议名列表;否则为 `[]` | -| `config` | 传入驱动 `__init__` 的 `config` 参数 | +| `parent` | 工作站为 `null`;子设备/deck 指向工作站 ID | +| `type` | 工作站和子设备 `"device"`;deck 为 `"deck"` | +| `class` | 注册表中的设备名 | +| `protocol_type` | Protocol 工作站填协议名列表;否则 `[]` | +| `config` | 传入 `__init__` 的 `config` 参数 | + +### Config 字段速查 + +| 字段 | 外部系统 | PLC/硬件 | 说明 | +|------|---------|---------|------| +| `api_host` / `api_key` | ✅ | — | 外部 API 连接 | +| `address` / `port` | — | ✅ | PLC 地址(init 参数,非 config 内) | +| `workflow_mappings` | ✅ | — | 工作流名 → 外部 UUID | +| `material_type_mappings` | ✅ | — | PLR 资源类 → 外部物料类型 | +| `warehouse_mapping` | ✅ | — | 仓库 → 外部 UUID + 库位 UUID | +| `http_service_config` | ✅ | — | HTTP 回调 host/port | + +> 完整 Config 结构详见 [reference.md § 2](reference.md) --- ## 第八步:验证 ```bash -# 1. 模块可导入 python -c "from unilabos.devices.workstation.. import " - -# 2. 注册表补全 unilab -g .json --complete_registry - -# 3. 启动测试 unilab -g .json ``` --- -## 高级模式 - -实现外部系统对接型工作站时,详见 [reference.md](reference.md):RPC 客户端、HTTP 回调服务、连接监控、Config 结构模式(material_type_mappings / warehouse_mapping / workflow_mappings)、ResourceSynchronizer、update_resource、工作流序列、站间物料转移、post_init 完整模式。 - ---- - ## 关键规则 -1. **`__init__` 必须接受 `deck` 和 `**kwargs`** — `WorkstationBase.__init__` 需要 `deck` 参数 -2. **通过 `self._children` 访问子设备** — 不要自行维护子设备引用 -3. **`post_init` 中启动后台服务** — 不要在 `__init__` 中启动网络连接 -4. **异步方法使用 `await self._ros_node.sleep()`** — 禁止 `time.sleep()` 和 `asyncio.sleep()` -5. **子设备在图文件中声明** — 不在驱动代码中创建子设备实例 -6. **`deck` 配置中的 `_resource_child_name` 必须与 deck 节点 ID 一致** -7. **Protocol 工作站优先使用 `ProtocolNode`** — 不需要自定义类 +1. `__init__` 必须接受 `deck` 和 `**kwargs` +2. 通过 `self._children` 访问子设备,不自行维护引用 +3. `post_init` 中启动后台服务,不在 `__init__` 中启动网络连接 +4. 异步方法使用 `await self._ros_node.sleep()`,禁止 `time.sleep()` / `asyncio.sleep()` +5. 子设备在图文件中声明,不在驱动代码中创建 +6. `_resource_child_name` 必须与 deck 节点 ID 一致 +7. Protocol 工作站优先使用 `ProtocolNode` +8. 通信设备 ID 以 `serial_` 或 `io_` 开头 --- ## 工作流清单 ``` -工作站接入进度: -- [ ] 1. 确定工作站类型(Protocol / 外部系统 / 硬件控制) +- [ ] 1. 确定类型(Protocol / 外部系统 / 硬件控制) - [ ] 2. 确认子设备组成和物料需求 -- [ ] 3. 创建工作站驱动 unilabos/devices/workstation//.py -- [ ] 4. 创建子设备驱动(如需要,按 add_device.md 流程) -- [ ] 5. 创建注册表 unilabos/registry/devices/.yaml -- [ ] 6. 创建/选择 Deck 资源类(如需要) -- [ ] 7. 配置图文件 unilabos/test/experiments/.json -- [ ] 8. 验证:可导入 + 注册表补全 + 启动测试 +- [ ] 3. 创建工作站驱动 +- [ ] 4. 创建子设备驱动 + 注册表(如需要) +- [ ] 5. 创建工作站注册表 +- [ ] 6. 创建物料资源 Bottle→Carrier→WareHouse→Deck(如需要) +- [ ] 7. 注册 PLR 扩展(Deck 类需要) +- [ ] 8. 配置图文件 +- [ ] 9. 验证 ``` --- -## 现有工作站参考 +## 参考资源 -| 工作站 | 注册表名 | 驱动类 | 类型 | -|--------|----------|--------|------| -| Protocol 通用 | `workstation` | `ProtocolNode` | Protocol | -| Bioyond 反应站 | `reaction_station.bioyond` | `BioyondReactionStation` | 外部系统 | -| Bioyond 配液站 | `bioyond_dispensing_station` | `BioyondDispensingStation` | 外部系统 | -| 纽扣电池组装 | `coincellassemblyworkstation_device` | `CoinCellAssemblyWorkstation` | 硬件控制 | +- **代码模板**:[templates.md](templates.md) — 驱动模板 A/B/C、子设备、注册表、物料资源 +- **高级模式**:[reference.md](reference.md) — 外部系统集成、Config 模式、资源同步、PLC 框架、端到端案例 +- **现有工作站**: -### 参考文件路径 - -- 基类: `unilabos/devices/workstation/workstation_base.py` -- Bioyond 基类: `unilabos/devices/workstation/bioyond_studio/station.py` -- 反应站: `unilabos/devices/workstation/bioyond_studio/reaction_station/reaction_station.py` -- 配液站: `unilabos/devices/workstation/bioyond_studio/dispensing_station/dispensing_station.py` -- 纽扣电池: `unilabos/devices/workstation/coin_cell_assembly/coin_cell_assembly.py` -- ROS 节点: `unilabos/ros/nodes/presets/workstation.py` -- 图文件: `unilabos/test/experiments/reaction_station_bioyond.json`, `dispensing_station_bioyond.json` +| 工作站 | 注册表名 | 类型 | 驱动路径 | +|--------|----------|------|---------| +| Bioyond 反应站 | `reaction_station.bioyond` | 外部系统 | `bioyond_studio/reaction_station/` | +| Bioyond 配液站 | `bioyond_dispensing_station` | 外部系统 | `bioyond_studio/dispensing_station/` | +| 纽扣电池组装 | `coincellassemblyworkstation_device` | 硬件控制 | `coin_cell_assembly/` | +| Protocol 通用 | `workstation` | Protocol | `workstation_base.py` | diff --git a/.cursor/skills/add-workstation/reference.md b/.cursor/skills/add-workstation/reference.md index 0c1b9f0d..2ad79825 100644 --- a/.cursor/skills/add-workstation/reference.md +++ b/.cursor/skills/add-workstation/reference.md @@ -1,6 +1,6 @@ # 工作站高级模式参考 -本文件是 SKILL.md 的补充,包含外部系统集成、物料同步、配置结构等高级模式。 +本文件是 SKILL.md 的补充,包含外部系统集成、物料同步、PLC 框架、硬件代理等高级模式。 Agent 在需要实现这些功能时按需阅读。 --- @@ -116,7 +116,6 @@ class ConnectionMonitor: def _monitor_loop(self): while self._running: try: - # 调用外部系统接口检测连接 self.workstation.hardware_interface.ping() status = "online" except Exception: @@ -210,6 +209,35 @@ class ConnectionMonitor: } ``` +### 2.7 工作流到工序名映射 + +```json +{ + "workflow_to_section_map": { + "reactor_taken_in": "反应器放入", + "reactor_taken_out": "反应器取出", + "Solid_feeding_vials": "固体投料-小瓶" + } +} +``` + +### 2.8 动作名称映射 + +```json +{ + "action_names": { + "reactor_taken_in": { + "config": "通量-配置", + "stirring": "反应模块-开始搅拌" + }, + "solid_feeding_vials": { + "feeding": "粉末加样模块-投料", + "observe": "反应模块-观察搅拌结果" + } + } +} +``` + --- ## 3. 资源同步机制 @@ -246,14 +274,25 @@ class MyResourceSynchronizer(ResourceSynchronizer): return True ``` -### 3.2 update_resource — 上传资源树到云端 +### 3.2 资源树回调 + +Bioyond 工作站注册了资源树变更回调,实现与外部系统的自动同步: + +| 回调名 | 触发时机 | 外部操作 | +|--------|---------|---------| +| `resource_tree_add` | PLR Deck 中添加资源 | 入库到外部系统 | +| `resource_tree_remove` | PLR Deck 中移除资源 | 出库 | +| `resource_tree_transfer` | 创建物料(不入库) | 创建外部物料记录 | +| `resource_tree_update` | 资源位置移动 | 更新外部系统库位 | + +### 3.3 update_resource — 上传资源树到云端 将 PLR Deck 序列化后通过 ROS 服务上传。典型使用场景: ```python -# 在 post_init 中上传初始 deck from unilabos.ros.nodes.base_device_node import ROS2DeviceNode +# 在 post_init 中上传初始 deck ROS2DeviceNode.run_async_func( self._ros_node.update_resource, True, **{"resources": [self.deck]} @@ -315,15 +354,11 @@ async def transfer_materials_to_another_station( """将物料转移到另一个工作站""" target_node = self._children.get(target_device_id) if not target_node: - # 通过 ROS 节点查找非子设备的目标站 pass for group in transfer_groups: resource = self.find_resource_by_name(group["resource_name"]) - # 从本站 deck 移除 resource.unassign() - # 调用目标站的接收方法 - # ... return {"success": True, "transferred": len(transfer_groups)} ``` @@ -369,3 +404,437 @@ def post_init(self, ros_node): # 5. 初始化资源同步器(可选) self.resource_synchronizer = MyResourceSynchronizer(self, self.rpc_client) ``` + +--- + +## 7. PLC/Modbus 完整框架 + +### 7.1 寄存器映射 CSV 格式 + +PLC 工作站使用 CSV 文件定义寄存器映射表。路径通常为工作站目录下的 `.csv`。 + +**CSV 列定义:** + +| 列名 | 说明 | 值示例 | +|------|------|--------| +| `Name` | 寄存器节点名称(代码中引用的唯一标识) | `COIL_SYS_START_CMD` | +| `DataType` | 数据类型 | `BOOL`, `INT16`, `FLOAT32` | +| `InitValue` | 初始值(可选) | — | +| `Comment` | 注释(可选) | — | +| `Attribute` | 自定义属性(可选) | — | +| `DeviceType` | Modbus 设备类型 | `coil`, `hold_register`, `input_register`, `discrete_inputs` | +| `Address` | Modbus 地址 | `8010`, `11000` | + +**CSV 示例:** + +```csv +Name,DataType,InitValue,Comment,Attribute,DeviceType,Address, +COIL_SYS_START_CMD,BOOL,,系统启动命令,,coil,8010, +COIL_SYS_STOP_CMD,BOOL,,系统停止命令,,coil,8020, +COIL_SYS_RESET_CMD,BOOL,,系统复位命令,,coil,8030, +REG_MSG_ELECTROLYTE_VOLUME,INT16,,电解液体积,,hold_register,11004, +REG_DATA_OPEN_CIRCUIT_VOLTAGE,FLOAT32,,开路电压,,hold_register,10002, +REG_DATA_AXIS_X_POS,FLOAT32,,X轴位置,,hold_register,10004, +``` + +**命名约定:** +- 线圈:`COIL_` 前缀(读写布尔量) +- 保持寄存器:`REG_MSG_`(消息/命令寄存器)、`REG_DATA_`(数据/状态寄存器) +- `_CMD` 后缀:写入命令 +- `_STATUS` 后缀:读取状态 + +### 7.2 TCPClient 初始化 + +```python +from unilabos.device_comms.modbus_plc.client import TCPClient, BaseClient +from unilabos.device_comms.modbus_plc.modbus import DataType, WorderOrder + +# 创建 Modbus TCP 客户端 +modbus_client = TCPClient(addr="192.168.1.100", port=502) +modbus_client.client.connect() + +# 从 CSV 加载寄存器映射 +import os +csv_path = os.path.join(os.path.dirname(__file__), 'register_map.csv') +nodes = BaseClient.load_csv(csv_path) +client = modbus_client.register_node_list(nodes) +``` + +### 7.3 寄存器读写操作 + +```python +# 读取线圈(布尔值) +result, err = client.use_node('COIL_SYS_START_STATUS').read(1) +is_started = result[0] if not err else False + +# 写入线圈 +client.use_node('COIL_SYS_START_CMD').write(True) + +# 读取保持寄存器(INT16) +result, err = client.use_node('REG_DATA_ASSEMBLY_COIN_CELL_NUM').read(1) + +# 读取保持寄存器(FLOAT32,需要 2 个寄存器) +result, err = client.use_node('REG_DATA_OPEN_CIRCUIT_VOLTAGE').read(2) + +# 写入保持寄存器(FLOAT32) +client.use_node('REG_MSG_ELECTROLYTE_VOLUME').write( + 100.0, + data_type=DataType.FLOAT32, + word_order=WorderOrder.LITTLE, +) +``` + +**FLOAT32 字节序注意:** 许多 PLC 使用 Big Byte Order + Little Word Order,需要交换两个 16 位寄存器的顺序。参考 `coin_cell_assembly.py` 中的 `_decode_float32_correct` 函数。 + +### 7.4 ModbusWorkflow 生命周期 + +PLC 工作站的动作通过 `ModbusWorkflow` + `WorkflowAction` 组织,每个动作有 4 个生命周期阶段: + +```python +from unilabos.device_comms.modbus_plc.client import ModbusWorkflow, WorkflowAction + +# 定义动作的生命周期函数 +def my_init(use_node): + """初始化:设置参数""" + use_node('REG_MSG_ELECTROLYTE_VOLUME').write( + 100.0, data_type=DataType.FLOAT32, word_order=WorderOrder.LITTLE + ) + return True + +def my_start(use_node): + """启动:触发动作并轮询等待完成""" + use_node('COIL_SYS_START_CMD').write(True) + while True: + result, err = use_node('COIL_SYS_START_STATUS').read(1) + if not err and result[0]: + break + time.sleep(0.5) + return True + +def my_stop(use_node): + """停止:复位触发信号""" + use_node('COIL_SYS_START_CMD').write(False) + return True + +def my_cleanup(use_node): + """清理:无论成功失败都执行""" + use_node('COIL_SYS_RESET_CMD').write(True) + +# 组合成工作流 +workflow = ModbusWorkflow( + name="我的加工流程", + actions=[ + WorkflowAction(init=my_init, start=my_start, stop=my_stop, cleanup=my_cleanup) + ], +) + +# 执行 +client.run_modbus_workflow(workflow) +``` + +**生命周期执行顺序:** `init` → `start` → `stop` → `cleanup`(cleanup 始终执行,即使前序步骤失败) + +### 7.5 PLC 工作站中的握手循环 + +纽扣电池组装站的典型 PLC 交互模式(信息交换握手): + +```python +async def _send_msg_to_plc(self, data: dict): + """向 PLC 发送消息并等待确认""" + # 1. 写入数据寄存器 + for key, value in data.items(): + self._write_register(key, value) + + # 2. 发送"消息已准备好"信号 + self._write_coil('COIL_UNILAB_SEND_MSG_SUCC_CMD', True) + + # 3. 等待 PLC 读取确认 + while not self._read_coil('COIL_REQUEST_REC_MSG_STATUS'): + await self._ros_node.sleep(0.3) + + # 4. 撤销发送信号 + self._write_coil('COIL_UNILAB_SEND_MSG_SUCC_CMD', False) + +async def _recv_msg_from_plc(self) -> dict: + """等待 PLC 发送消息""" + # 1. 等待 PLC 发送信号 + while not self._read_coil('COIL_REQUEST_SEND_MSG_STATUS'): + await self._ros_node.sleep(0.3) + + # 2. 读取数据寄存器 + data = {} + for key in self._recv_registers: + data[key] = self._read_register(key) + + # 3. 发送"已收到"确认 + self._write_coil('COIL_UNILAB_REC_MSG_SUCC_CMD', True) + + # 4. 等待 PLC 撤销发送信号 + while self._read_coil('COIL_REQUEST_SEND_MSG_STATUS'): + await self._ros_node.sleep(0.3) + + # 5. 撤销确认信号 + self._write_coil('COIL_UNILAB_REC_MSG_SUCC_CMD', False) + + return data +``` + +### 7.6 JSON 驱动的 PLC 工作流 + +PLC 工作站还支持通过 JSON 描述工作流,无需编写 Python 代码。使用 `BaseClient.execute_procedure_from_json`: + +```json +{ + "register_node_list_from_csv_path": {"path": "register_map.csv"}, + "create_flow": [ + { + "name": "初始化系统", + "action": [ + { + "address_function_to_create": [ + {"func_name": "write_start", "node_name": "COIL_SYS_START_CMD", "mode": "write", "value": true}, + {"func_name": "read_status", "node_name": "COIL_SYS_START_STATUS", "mode": "read", "value": 1} + ], + "create_init_function": null, + "create_start_function": { + "func_name": "start_sys", + "write_functions": ["write_start"], + "condition_functions": ["read_status"], + "stop_condition_expression": "read_status[0]" + }, + "create_stop_function": {"func_name": "stop_start", "node_name": "COIL_SYS_START_CMD", "mode": "write", "value": false}, + "create_cleanup_function": null + } + ] + } + ], + "execute_flow": ["初始化系统"] +} +``` + +参考:`unilabos/device_comms/modbus_plc/client.py`(`ExecuteProcedureJson` 类型定义) + +--- + +## 8. 端到端案例 Walkthrough:Bioyond 反应站 + +以 Bioyond 反应站为例,展示从零接入一个带物料输入的外部系统工作站的完整过程。 + +### 8.1 需求 + +- **类型**:外部系统工作站(与 Bioyond LIMS 系统对接) +- **通信**:HTTP API(RPC 客户端 + HTTP 回调服务) +- **子设备**:5 个反应器(reactor_1 ~ reactor_5) +- **物料**:反应器、试剂瓶、烧杯、样品板、小瓶、枪头盒 → 6 种 WareHouse → 1 个 Deck + +### 8.2 文件结构 + +``` +unilabos/ +├── devices/workstation/bioyond_studio/ +│ ├── station.py # BioyondWorkstation 基类 +│ ├── bioyond_rpc.py # RPC 客户端 +│ └── reaction_station/ +│ └── reaction_station.py # BioyondReactionStation + BioyondReactor +├── resources/bioyond/ +│ ├── bottles.py # Bottle 工厂函数(8 种) +│ ├── bottle_carriers.py # Carrier 工厂函数(8 种) +│ ├── warehouses.py # WareHouse 工厂函数(6 种) +│ └── decks.py # BIOYOND_PolymerReactionStation_Deck +├── registry/ +│ ├── devices/reaction_station_bioyond.yaml +│ └── resources/bioyond/ +│ ├── bottles.yaml +│ ├── bottle_carriers.yaml +│ └── decks.yaml +└── test/experiments/reaction_station_bioyond.json +``` + +### 8.3 继承链 + +``` +WorkstationBase +└── BioyondWorkstation # 通用 Bioyond 逻辑 + ├── __init__(config, deck, protocol_type) + ├── post_init() → 启动连接监控 + HTTP 服务 + 上传 deck + ├── BioyondResourceSynchronizer # 物料双向同步 + └── BioyondReactionStation # 反应站特化 + ├── reactor_taken_in() # 反应器放入工作流 + ├── solid_feeding_vials() # 固体投料 + ├── liquid_feeding_solvents() # 液体投料 + └── workflow_sequence @property # 工作流序列状态 +``` + +### 8.4 物料资源层级(反应站实例) + +``` +BIOYOND_PolymerReactionStation_Deck (2700×1080×1500mm) +├── 堆栈1左 (WareHouse 4x4) ← Coordinate(-200, 400, 0) +│ ├── A01 → BottleCarrier → Reactor +│ ├── A02 → BottleCarrier → Reactor +│ └── ...(共 16 槽位) +├── 堆栈1右 (WareHouse 4x4, col_offset=4) ← Coordinate(350, 400, 0) +│ ├── A05 → BottleCarrier → Reactor +│ └── ... +├── 站内试剂存放堆栈 (WareHouse 1x2) ← Coordinate(1050, 400, 0) +│ ├── A01 → 1BottleCarrier → Bottle +│ └── A02 → 1BottleCarrier → Bottle +├── 测量小瓶仓库 (WareHouse 3x2) ← Coordinate(...) +├── 站内Tip盒堆栈(左) (WareHouse, removed_positions) +└── 站内Tip盒堆栈(右) (WareHouse) +``` + +### 8.5 图文件关键结构 + +```json +{ + "nodes": [ + { + "id": "reaction_station_bioyond", + "children": ["Bioyond_Deck", "reactor_1", "reactor_2", "reactor_3", "reactor_4", "reactor_5"], + "parent": null, + "type": "device", + "class": "reaction_station.bioyond", + "config": { + "api_key": "DE9BDDA0", + "api_host": "http://172.21.103.36:45388", + "workflow_mappings": { + "reactor_taken_out": "3a16081e-...", + "reactor_taken_in": "3a160df6-..." + }, + "material_type_mappings": { + "BIOYOND_PolymerStation_Reactor": ["反应器", "3a14233b-..."], + "BIOYOND_PolymerStation_1BottleCarrier": ["试剂瓶", "3a14233b-..."] + }, + "warehouse_mapping": { + "堆栈1左": { + "uuid": "3a14aa17-...", + "site_uuids": {"A01": "3a14aa17-...", "A02": "3a14aa17-..."} + } + }, + "http_service_config": { + "http_service_host": "127.0.0.1", + "http_service_port": 8080 + } + }, + "deck": { + "data": { + "_resource_child_name": "Bioyond_Deck", + "_resource_type": "unilabos.resources.bioyond.decks:BIOYOND_PolymerReactionStation_Deck" + } + }, + "size_x": 2700.0, + "size_y": 1080.0, + "size_z": 2500.0, + "protocol_type": [], + "data": {} + }, + { + "id": "Bioyond_Deck", + "parent": "reaction_station_bioyond", + "type": "deck", + "class": "BIOYOND_PolymerReactionStation_Deck", + "config": {"type": "BIOYOND_PolymerReactionStation_Deck", "setup": true} + }, + { + "id": "reactor_1", + "parent": "reaction_station_bioyond", + "type": "device", + "class": "reaction_station.reactor", + "position": {"x": 1150, "y": 300, "z": 0}, + "config": {} + } + ] +} +``` + +### 8.6 初始化时序 + +``` +1. ROS2WorkstationNode.__init__ + ├── 创建 BioyondReactionStation 实例(__init__) + ├── 加载 Deck(BIOYOND_PolymerReactionStation_Deck, setup=true → 创建 6 个 WareHouse) + ├── 初始化 reactor_1~5(BioyondReactor 实例)→ sub_devices + └── 为每个 reactor 创建 ActionClient + +2. BioyondReactionStation.post_init(ros_node) + ├── 初始化 BioyondV1RPC(HTTP 客户端) + ├── 初始化 BioyondResourceSynchronizer + ├── 启动 ConnectionMonitor(30s 轮询) + ├── 启动 WorkstationHTTPService(接收回调) + ├── sync_from_external()(从 Bioyond 拉取物料到 Deck) + └── update_resource([self.deck])(上传 Deck 到云端) +``` + +### 8.7 物料同步流程 + +``` +外部入库: + Bioyond API → stock_material() → 获取物料列表 + → resource_bioyond_to_plr() → 转为 PLR Bottle/Carrier + → deck.warehouses["堆栈1左"]["A01"] = carrier + → update_resource([deck]) + +外部变更回调: + Bioyond POST /report/material_change + → WorkstationHTTPService 接收 + → process_material_change_report() + → 更新 Deck 中的资源 + → update_resource([affected_resource]) +``` + +### 8.8 工作站动作执行流程(以 reactor_taken_in 为例) + +```python +async def reactor_taken_in(self, assign_material_name, cutoff, temperature, **kwargs): + # 1. 从 config 获取工作流 UUID + workflow_id = self.config["workflow_mappings"]["reactor_taken_in"] + + # 2. 构建工序参数 + sections = self._build_sections(temperature, cutoff, ...) + + # 3. 合并到工作流序列 + self._workflow_sequence.append({"name": "reactor_taken_in", ...}) + + # 4. 调用外部系统创建工单 + result = self.hardware_interface.create_order(order_data) + + # 5. 等待外部系统完成(通过 HTTP 回调通知) + # process_order_finish_report 被回调时更新状态 + + return {"success": True} +``` + +--- + +## 9. 现有工作站 Config 结构完整对比 + +| 特性 | BioyondReactionStation | BioyondDispensingStation | CoinCellAssemblyWorkstation | +|------|----------------------|------------------------|-----------------------------| +| **继承** | BioyondWorkstation | BioyondWorkstation | WorkstationBase (直接) | +| **通信方式** | HTTP RPC | HTTP RPC | Modbus TCP | +| **`__init__` 签名** | `(config, deck, protocol_type, **kwargs)` | `(config, deck, protocol_type, **kwargs)` | `(config, deck, address, port, debug_mode, **kwargs)` | +| **子设备** | 5 个 BioyondReactor | 无 | 无 | +| **Deck** | BioyondReactionDeck (6 个 WareHouse) | BioyondDispensingDeck | CoincellDeck | +| **物料同步** | BioyondResourceSynchronizer (双向) | BioyondResourceSynchronizer (双向) | 无(本地 PLR) | +| **status_types** | `workflow_sequence: str` | 空 | 18 个属性 (sys_status, 传感器数据等) | +| **动作风格** | 语义化 (reactor_taken_in, ...) | 语义化 (compute_experiment_design, ...) | PLC 操作 (func_pack_device_init, ...) | +| **post_init** | 连接监控 + HTTP 服务 + 资源同步 + 上传 deck | 继承父类 | 上传 deck | +| **工作流管理** | workflow_mappings → 合并序列 → create_order | batch_create → wait_for_reports | PLC 握手循环 | + +### Config 字段对比 + +| 字段 | 反应站 | 配液站 | 纽扣电池 | +|------|--------|--------|---------| +| `api_host` | ✅ | ✅ | — | +| `api_key` | ✅ | ✅ | — | +| `workflow_mappings` | ✅ (8 个工作流) | — | — | +| `material_type_mappings` | ✅ (8 种物料) | ✅ | — | +| `warehouse_mapping` | ✅ (6 个仓库) | ✅ (3 个仓库) | — | +| `workflow_to_section_map` | ✅ | — | — | +| `action_names` | ✅ | — | — | +| `http_service_config` | ✅ | — | — | +| `material_default_parameters` | ✅ | — | — | +| `address` (init 参数) | — | — | ✅ | +| `port` (init 参数) | — | — | ✅ | +| `debug_mode` (init 参数) | — | — | ✅ | diff --git a/.cursor/skills/add-workstation/templates.md b/.cursor/skills/add-workstation/templates.md new file mode 100644 index 00000000..3c477835 --- /dev/null +++ b/.cursor/skills/add-workstation/templates.md @@ -0,0 +1,454 @@ +# 工作站代码模板 + +本文件包含 SKILL.md 引用的所有代码模板。Agent 根据需要按需阅读。 + +--- + +## Template A:外部系统工作站 + +```python +import logging +from typing import Dict, Any, Optional, List +from pylabrobot.resources import Deck + +from unilabos.devices.workstation.workstation_base import WorkstationBase + +try: + from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode +except ImportError: + ROS2WorkstationNode = None + + +class MyWorkstation(WorkstationBase): + _ros_node: "ROS2WorkstationNode" + + def __init__( + self, + config: dict = None, + deck: Optional[Deck] = None, + protocol_type: list = None, + **kwargs, + ): + super().__init__(deck=deck, **kwargs) + self.config = config or {} + self.logger = logging.getLogger(f"MyWorkstation") + self.api_host = self.config.get("api_host", "") + self.api_key = self.config.get("api_key", "") + self._status = "Idle" + + def post_init(self, ros_node: "ROS2WorkstationNode") -> None: + super().post_init(ros_node) + + def _get_child_device(self, device_id: str): + return self._children.get(device_id) + + async def scheduler_start(self, **kwargs) -> Dict[str, Any]: + return {"success": True} + + async def create_order(self, json_str: str, **kwargs) -> Dict[str, Any]: + return {"success": True} + + @property + def workflow_sequence(self) -> str: + return "[]" + + @property + def material_info(self) -> str: + return "{}" +``` + +--- + +## Template B:PLC/Modbus 硬件控制工作站 + +```python +import os +import logging +from typing import Dict, Any, Optional +from pylabrobot.resources import Deck + +from unilabos.devices.workstation.workstation_base import WorkstationBase +from unilabos.device_comms.modbus_plc.client import ( + TCPClient, BaseClient, ModbusWorkflow, WorkflowAction, +) +from unilabos.device_comms.modbus_plc.modbus import ( + Base as ModbusNodeBase, DataType, WorderOrder, +) + +try: + from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode +except ImportError: + ROS2WorkstationNode = None + + +class MyHardwareWorkstation(WorkstationBase): + _ros_node: "ROS2WorkstationNode" + + def __init__( + self, + config: dict = None, + deck: Optional[Deck] = None, + address: str = "192.168.1.100", + port: str = "502", + debug_mode: bool = False, + *args, + **kwargs, + ): + super().__init__(deck=deck, *args, **kwargs) + self.config = config or {} + self.address = address + self.port = int(port) + self.debug_mode = debug_mode + self.logger = logging.getLogger("MyHardwareWorkstation") + + if not debug_mode: + modbus_client = TCPClient(addr=self.address, port=self.port) + modbus_client.client.connect() + csv_path = os.path.join(os.path.dirname(__file__), 'register_map.csv') + self.nodes = BaseClient.load_csv(csv_path) + self.client = modbus_client.register_node_list(self.nodes) + else: + self.client = None + + def _read_coil(self, name: str) -> bool: + if self.debug_mode: + return False + result, err = self.client.use_node(name).read(1) + return result[0] if not err else False + + def _write_coil(self, name: str, value: bool): + if not self.debug_mode: + self.client.use_node(name).write(value) + + def _read_register(self, name: str, data_type: DataType = DataType.INT16): + if self.debug_mode: + return 0 + result, err = self.client.use_node(name).read( + 2 if data_type == DataType.FLOAT32 else 1 + ) + return result if not err else 0 + + def _write_register(self, name: str, value, data_type: DataType = DataType.FLOAT32): + if not self.debug_mode: + self.client.use_node(name).write( + value, data_type=data_type, word_order=WorderOrder.LITTLE + ) + + async def start_process(self, **kwargs) -> Dict[str, Any]: + self._write_coil('COIL_SYS_START_CMD', True) + return {"success": True} + + async def stop_process(self, **kwargs) -> Dict[str, Any]: + self._write_coil('COIL_SYS_STOP_CMD', True) + return {"success": True} + + @property + def sys_status(self) -> str: + return str(self._read_coil("COIL_SYS_START_STATUS")) +``` + +### PLC 寄存器映射 CSV 格式 + +```csv +Name,DataType,InitValue,Comment,Attribute,DeviceType,Address, +COIL_SYS_START_CMD,BOOL,,系统启动命令,,coil,8010, +COIL_SYS_STOP_CMD,BOOL,,系统停止命令,,coil,8020, +REG_MSG_ELECTROLYTE_VOLUME,INT16,,电解液体积,,hold_register,11004, +REG_DATA_OPEN_CIRCUIT_VOLTAGE,FLOAT32,,开路电压,,hold_register,10002, +``` + +命名约定:`COIL_` 线圈 / `REG_MSG_` 命令寄存器 / `REG_DATA_` 数据寄存器 / `_CMD` 写入 / `_STATUS` 读取 + +--- + +## Template C:Protocol 工作站 + +```python +from typing import List, Optional +from pylabrobot.resources import Resource as PLRResource +from unilabos.devices.workstation.workstation_base import ProtocolNode + + +class MyProtocolStation(ProtocolNode): + def __init__( + self, + protocol_type: List[str], + deck: Optional[PLRResource] = None, + *args, + **kwargs, + ): + super().__init__(protocol_type=protocol_type, deck=deck, *args, **kwargs) +``` + +> 通常不需要自定义类,直接在注册表和图文件中配置 `ProtocolNode` + `protocol_type` 即可。 + +--- + +## 子设备模板 + +### 驱动类 + +```python +class MyReactor: + def __init__(self, **kwargs): + self.data = { + "temperature": 0.0, + "status": "Idle", + } + + async def update_metrics(self, metrics_json: str, **kwargs): + import json + metrics = json.loads(metrics_json) + self.data["temperature"] = metrics.get("temperature", 0.0) + self.data["status"] = metrics.get("status", "Idle") + return {"success": True} +``` + +### 注册表 + +```yaml +reaction_station.reactor: + category: + - reactor + - my_workstation + class: + module: unilabos.devices.workstation.my_station.my_station:MyReactor + type: python + status_types: + temperature: Float64 + status: String + action_value_mappings: + auto-update_metrics: + type: UniLabJsonCommandAsync + goal: + metrics_json: json_str + result: + success: success +``` + +> `auto-` 前缀动作不创建 ActionClient,仅供工作站驱动内部调用。 + +### 图文件节点 + +```json +{ + "id": "reactor_1", + "name": "reactor_1", + "children": [], + "parent": "my_station", + "type": "device", + "class": "reaction_station.reactor", + "position": {"x": 1150, "y": 300, "z": 0}, + "config": {}, + "data": {} +} +``` + +### 代码中访问子设备 + +```python +child_node = self._children.get("reactor_1") +child_node.driver_instance.update_metrics(data) + +child = self._ros_node.sub_devices.get("reactor_1") +child.driver_instance.data["temperature"] +``` + +### 硬件代理配置 + +通信设备节点(ID 以 `serial_` 或 `io_` 开头): + +```json +{ + "id": "serial_port_1", + "parent": "my_station", + "type": "device", + "class": "serial_device", + "config": { + "hardware_interface": { + "name": "hardware_interface_name", + "read": "read_method", + "write": "write_method" + } + } +} +``` + +子设备注册表中声明代理: + +```yaml +my_sub_device: + class: + hardware_interface: + name: hardware_interface_name # 属性名,值为通信设备 ID + read: read_from_device # 注入的读方法 + write: write_to_device # 注入的写方法 +``` + +ROS 节点初始化时自动检测 `name` 属性值是否为另一子设备 ID,若是则注入通信设备的 read/write 方法。 + +--- + +## 注册表完整配置 + +```yaml +my_workstation: + description: "我的工作站" + version: "1.0.0" + category: + - workstation + - my_category + class: + module: unilabos.devices.workstation.my_station.my_station:MyWorkstation + type: python + status_types: + workflow_sequence: String + material_info: String + action_value_mappings: + scheduler_start: + type: UniLabJsonCommandAsync + goal: {} + result: + success: success + create_order: + type: UniLabJsonCommandAsync + goal: + json_str: json_str + result: + success: success + init_param_schema: + config: + type: object + deck: + type: object + protocol_type: + type: array +``` + +--- + +## 物料资源模板 + +### Bottle 工厂函数 + +```python +from unilabos.resources.itemized_carrier import Bottle + + +def My_Reagent_Bottle( + name: str, + diameter: float = 70.0, + height: float = 120.0, + max_volume: float = 500000.0, # 单位 μL(500mL = 500000) + barcode: str = None, +) -> Bottle: + return Bottle( + name=name, diameter=diameter, height=height, + max_volume=max_volume, barcode=barcode, + model="My_Reagent_Bottle", + ) +``` + +### BottleCarrier 工厂函数 + +```python +from pylabrobot.resources import ResourceHolder +from pylabrobot.resources.carrier import create_ordered_items_2d +from unilabos.resources.itemized_carrier import BottleCarrier + + +def My_6SlotCarrier(name: str) -> BottleCarrier: + sites = create_ordered_items_2d( + klass=ResourceHolder, + num_items_x=3, num_items_y=2, + dx=10.0, dy=10.0, dz=5.0, + item_dx=42.0, item_dy=35.0, + size_x=20.0, size_y=20.0, size_z=50.0, + ) + carrier = BottleCarrier( + name=name, size_x=146.0, size_y=80.0, size_z=55.0, + sites=sites, model="My_6SlotCarrier", + ) + carrier.num_items_x = 3 + carrier.num_items_y = 2 + carrier.num_items_z = 1 + return carrier +``` + +### WareHouse 工厂函数 + +```python +from unilabos.resources.warehouse import warehouse_factory + + +def my_warehouse_4x4(name: str) -> "WareHouse": + return warehouse_factory( + name=name, + num_items_x=4, num_items_y=4, num_items_z=1, + dx=137.0, dy=96.0, dz=120.0, + item_dx=137.0, item_dy=125.0, item_dz=10.0, + resource_size_x=127.0, resource_size_y=85.0, resource_size_z=100.0, + model="my_warehouse_4x4", + ) +``` + +### Deck 类 + +```python +from pylabrobot.resources import Deck, Coordinate + + +class MyStation_Deck(Deck): + def __init__( + self, + name: str = "MyStation_Deck", + size_x: float = 2700.0, + size_y: float = 1080.0, + size_z: float = 1500.0, + category: str = "deck", + setup: bool = False, + ) -> None: + super().__init__(name=name, size_x=size_x, size_y=size_y, size_z=size_z) + if setup: + self.setup() + + def setup(self) -> None: + self.warehouses = { + "堆栈A": my_warehouse_4x4("堆栈A"), + "堆栈B": my_warehouse_4x4("堆栈B"), + } + self.warehouse_locations = { + "堆栈A": Coordinate(-200.0, 400.0, 0.0), + "堆栈B": Coordinate(2350.0, 400.0, 0.0), + } + for wh_name, wh in self.warehouses.items(): + self.assign_child_resource(wh, location=self.warehouse_locations[wh_name]) +``` + +### 资源注册表 + +```yaml +# unilabos/registry/resources//bottles.yaml +My_Reagent_Bottle: + category: [bottles] + class: + module: unilabos.resources.my_project.bottles:My_Reagent_Bottle + type: pylabrobot + +# unilabos/registry/resources//decks.yaml +MyStation_Deck: + category: [deck] + class: + module: unilabos.resources.my_project.decks:MyStation_Deck + type: pylabrobot + registry_type: resource +``` + +### PLR 扩展注册 + +新 Deck 类需要在 `unilabos/resources/plr_additional_res_reg.py` 中导入: + +```python +def register(): + from unilabos.resources.my_project.decks import MyStation_Deck +```