Merge remote-tracking branch 'origin/dev' into feature/organic-extraction

# Conflicts:
#	.cursor/skills/add-workstation/SKILL.md
#	.cursor/skills/add-workstation/reference.md
This commit is contained in:
ZiWei
2026-03-27 11:49:30 +08:00
17 changed files with 2048 additions and 812 deletions

View File

@@ -1,260 +1,626 @@
---
name: add-workstation
description: Guide for adding new workstations to Uni-Lab-OS (接入新工作站). Walks through workstation type selection, sub-device composition, external system integration, driver creation, registry YAML, deck setup, and graph file configuration. Use when the user wants to add/integrate a new workstation, create a workstation driver, configure a station with sub-devices, set up deck and materials, or mentions 工作站/工站/station/workstation.
description: Guide for adding new workstations to Uni-Lab-OS (接入新工作站). Uses @device decorator + AST auto-scanning. Walks through workstation type, sub-device composition, driver creation, deck setup, and graph file. Use when the user wants to add a workstation, create a workstation driver, configure a station with sub-devices, or mentions 工作站/工站/station/workstation.
---
# Uni-Lab-OS 工作站接入指南
工作站是组合多个子设备的大型设备,拥有独立的物料管理系统PLR Deck和工作流引擎。
> **完整代码模板**见 [templates.md](templates.md)**高级模式**见 [reference.md](reference.md)。
## 第一步:确定工作站类型
向用户确认:
| 类型 | 基类 | 适用场景 | 示例 |
|------|------|----------|------|
| **Protocol** | `ProtocolNode` | 标准化学操作协议 | FilterProtocolStation |
| **外部系统** | `WorkstationBase` | 对接 LIMS/MES API | BioyondStation |
| **硬件控制** | `WorkstationBase` | 直接控制 PLC/硬件 | CoinCellAssembly |
还需确认:
- 英文名称、通信方式HTTP/Modbus/OPC UA/无)
- 子设备组成(哪些已有、哪些新增、硬件代理关系)
- 物料需求(是否需要 Deck、物料类型、是否需外部同步
工作站workstation是组合多个子设备的大型设备,拥有独立的物料管理系统和工作流引擎。使用 `@device` 装饰器注册AST 自动扫描生成注册表
---
## 第二步:理解工作站架构
## 工作站类型
| 维度 | 普通设备 | 工作站 |
|------|---------|--------|
| 基类 | 纯 Python 类 | `WorkstationBase` / `ProtocolNode` |
| ROS 节点 | `BaseROS2DeviceNode` | `ROS2WorkstationNode` |
| 状态管理 | `self.data` 字典 | `@property` 直接访问 |
| 子设备 | 无 | `self._children` / `self._ros_node.sub_devices` |
| 物料 | 无 | `self.deck`PLR Deck |
| 类型 | 基类 | 适用场景 |
| ------------------- | ----------------- | ---------------------------------- |
| **Protocol 工作站** | `ProtocolNode` | 标准化学操作协议(泵转移、过滤等) |
| **外部系统工作站** | `WorkstationBase` | 与外部 LIMS/MES 对接 |
| **硬件控制工作站** | `WorkstationBase` | 直接控制 PLC/硬件 |
### 继承体系
---
```
WorkstationBase (ABC)
├── BioyondWorkstation ← HTTP RPC + 资源同步
│ ├── BioyondReactionStation
│ └── BioyondDispensingStation
├── CoinCellAssemblyWorkstation ← Modbus/PLC
└── ProtocolNode ← 标准化学协议
## @device 装饰器(工作站)
工作站也使用 `@device` 装饰器注册,参数与普通设备一致:
```python
@device(
id="my_workstation", # 注册表唯一标识(必填)
category=["workstation"], # 分类标签
description="我的工作站",
)
```
### 子设备初始化流程
`ROS2WorkstationNode.__init__` → 遍历 `children`type=="device")→ `initialize_device_from_dict()` → 存入 `sub_devices` → 为每个动作创建 `ActionClient` → 识别通信设备(`serial_*`/`io_*`)→ `_setup_hardware_proxy()`
如果一个工作站类支持多个具体变体,可使用 `ids` / `id_meta`,与设备的用法相同(参见 add-device SKILL
---
## 第三步:创建驱动文件
## 工作站驱动模板
路径:`unilabos/devices/workstation/<station_name>/<station_name>.py`
### 模板 A基于外部系统的工作站
根据类型选择模板(完整代码见 [templates.md](templates.md)
```python
import logging
from typing import Dict, Any, Optional
from pylabrobot.resources import Deck
| 类型 | 模板 | 关键要素 |
|------|------|---------|
| 外部系统 | Template A | `config` 接收 API 配置,`post_init` 启动 RPC/HTTP 服务 |
| 硬件控制 | Template B | `TCPClient` + CSV 寄存器映射,`use_node()` 读写 |
| Protocol | Template C | 直接使用 `ProtocolNode`,通常不需要自定义类 |
from unilabos.registry.decorators import device, topic_config, not_action
from unilabos.devices.workstation.workstation_base import WorkstationBase
**所有模板的 `__init__` 必须接受 `deck` 和 `**kwargs`。**
try:
from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode
except ImportError:
ROS2WorkstationNode = None
---
## 第四步:创建子设备(如需要)
@device(id="my_workstation", category=["workstation"], description="我的工作站")
class MyWorkstation(WorkstationBase):
_ros_node: "ROS2WorkstationNode"
子设备是独立设备,有自己的驱动类和注册表。完整模板见 [templates.md § 子设备模板](templates.md)。
def __init__(self, config=None, deck=None, protocol_type=None, **kwargs):
super().__init__(deck=deck, **kwargs)
self.config = config or {}
self.logger = logging.getLogger("MyWorkstation")
self.api_host = self.config.get("api_host", "")
self._status = "Idle"
### 关键要点
@not_action
def post_init(self, ros_node: "ROS2WorkstationNode"):
super().post_init(ros_node)
self._ros_node = ros_node
1. **驱动类**:普通 Python 类,`self.data` 预填所有属性
2. **注册表**`category` 包含工作站标识,`auto-` 前缀动作不创建 ActionClient
3. **图文件**`parent` 指向工作站 ID`type: "device"`
4. **代码访问**`self._children.get("reactor_1").driver_instance`
async def scheduler_start(self, **kwargs) -> Dict[str, Any]:
"""注册为工作站动作"""
return {"success": True}
### 硬件代理模式
async def create_order(self, json_str: str, **kwargs) -> Dict[str, Any]:
"""注册为工作站动作"""
return {"success": True}
当子设备需要通过通信设备(串口/IO通信时
@property
@topic_config()
def workflow_sequence(self) -> str:
return "[]"
1. 通信设备 ID 必须以 `serial_``io_` 开头
2. 子设备注册表中声明 `hardware_interface: {name, read, write}`
3. 子设备实例的 `name` 属性值 = 通信设备 ID
4. ROS 节点自动将通信设备的 read/write 方法注入到子设备上
---
## 第五步:创建注册表 YAML
路径:`unilabos/registry/devices/<station_name>.yaml`
**最小配置(`--complete_registry` 自动补全):**
```yaml
my_workstation:
category:
- workstation
class:
module: unilabos.devices.workstation.my_station.my_station:MyWorkstation
type: python
@property
@topic_config()
def material_info(self) -> str:
return "{}"
```
**完整配置**见 [templates.md § 注册表完整配置](templates.md)。
### 模板 BProtocol 工作站
直接使用 `ProtocolNode`,通常不需要自定义驱动类:
```python
from unilabos.devices.workstation.workstation_base import ProtocolNode
```
在图文件中配置 `protocol_type` 即可。
---
## 第六步:配置物料系统(如需要
## 子设备访问sub_devices
物料层级:`Deck``WareHouse``ResourceHolder` (site) → `BottleCarrier``Bottle`
工站初始化子设备后,所有子设备实例存储在 `self._ros_node.sub_devices` 字典中key 为设备 idvalue 为 `ROS2DeviceNode` 实例)。工站的驱动类可以直接获取子设备实例来调用其方法:
### 快速流程
```python
# 在工站驱动类的方法中访问子设备
sub = self._ros_node.sub_devices["pump_1"]
1. **创建 Bottle**`unilabos/resources/<project>/bottles.py`)— 工厂函数,返回 `Bottle` 实例
2. **创建 Carrier**`.../bottle_carriers.py`)— 工厂函数,用 `create_ordered_items_2d` 定义槽位
3. **创建 WareHouse**`.../warehouses.py`)— 用 `warehouse_factory()` 创建堆栈
4. **创建 Deck**`.../decks.py`)— 继承 `pylabrobot.resources.Deck``setup()` 中放置 WareHouse
5. **注册表**`unilabos/registry/resources/<project>/`)— `class.type: pylabrobot`
6. **PLR 扩展**`unilabos/resources/plr_additional_res_reg.py`)— 导入新 Deck 类
# .driver_instance — 子设备的驱动实例(即设备 Python 类的实例)
sub.driver_instance.some_method(arg1, arg2)
完整代码模板见 [templates.md § 物料资源模板](templates.md)。
# .ros_node_instance — 子设备的 ROS2 节点实例
sub.ros_node_instance._action_value_mappings # 查看子设备支持的 action
```
### 图文件中的 Deck 配置
**常见用法**
工作站节点引用 Deck
```python
class MyWorkstation(WorkstationBase):
def my_protocol(self, **kwargs):
# 获取子设备驱动实例
pump = self._ros_node.sub_devices["pump_1"].driver_instance
heater = self._ros_node.sub_devices["heater_1"].driver_instance
# 直接调用子设备方法
pump.aspirate(volume=100)
heater.set_temperature(80)
```
> 参考实现:`unilabos/devices/workstation/bioyond_studio/reaction_station/reaction_station.py` 中通过 `self._ros_node.sub_devices.get(reactor_id)` 获取子反应器实例并更新数据。
---
## 硬件通信接口hardware_interface
硬件控制型工作站通常需要通过串口Serial、Modbus 等通信协议控制多个子设备。Uni-Lab-OS 通过 **通信设备代理** 机制实现端口共享:一个串口只创建一个 `serial` 节点,多个子设备共享这个通信实例。
### 工作原理
`ROS2WorkstationNode` 初始化时分两轮遍历子设备(`workstation.py`
**第一轮 — 初始化所有子设备**:按 `children` 顺序调用 `initialize_device()`,通信设备(`serial_` / `io_` 开头的 id优先完成初始化创建 `serial.Serial()` 实例。其他子设备此时 `self.hardware_interface = "serial_pump"`(字符串)。
**第二轮 — 代理替换**:遍历所有已初始化的子设备,读取子设备的 `_hardware_interface` 配置:
```
hardware_interface = d.ros_node_instance._hardware_interface
# → {"name": "hardware_interface", "read": "send_command", "write": "send_command"}
```
1.`name` 字段对应的属性值:`name_value = getattr(driver, hardware_interface["name"])`
- 如果 `name_value` 是字符串且该字符串是某个子设备的 id → 触发代理替换
2. 从通信设备获取真正的 `read`/`write` 方法
3.`setattr(driver, read_method, _read)` 将通信设备的方法绑定到子设备上
因此:
- **通信设备 id 必须与子设备 config 中填的字符串完全一致**(如 `"serial_pump"`
- **通信设备 id 必须以 `serial_``io_` 开头**(否则第一轮不会被识别为通信设备)
- **通信设备必须在 `children` 列表中排在最前面**,确保先初始化
### HardwareInterface 参数说明
```python
from unilabos.registry.decorators import HardwareInterface
HardwareInterface(
name="hardware_interface", # __init__ 中接收通信实例的属性名
read="send_command", # 通信设备上暴露的读方法名
write="send_command", # 通信设备上暴露的写方法名
extra_info=["list_ports"], # 可选:额外暴露的方法
)
```
**`name` 字段的含义**:对应设备类 `__init__` 中,用于保存通信实例的**属性名**。系统据此知道要替换哪个属性。大部分设备直接用 `"hardware_interface"`,也可以自定义(如 `"io_device_port"`)。
### 示例 1name="hardware_interface"
```python
from unilabos.registry.decorators import device, HardwareInterface
@device(
id="my_pump",
category=["pump_and_valve"],
hardware_interface=HardwareInterface(
name="hardware_interface",
read="send_command",
write="send_command",
),
)
class MyPump:
def __init__(self, port=None, address="1", **kwargs):
# name="hardware_interface" → 系统替换 self.hardware_interface
self.hardware_interface = port # 初始为字符串 "serial_pump",启动后被替换为 Serial 实例
self.address = address
def send_command(self, command: str):
full_command = f"/{self.address}{command}\r\n"
self.hardware_interface.write(bytearray(full_command, "ascii"))
return self.hardware_interface.read_until(b"\n")
```
### 示例 2电磁阀name="io_device_port",自定义属性名)
```python
@device(
id="solenoid_valve",
category=["pump_and_valve"],
hardware_interface=HardwareInterface(
name="io_device_port", # 自定义属性名 → 系统替换 self.io_device_port
read="read_io_coil",
write="write_io_coil",
),
)
class SolenoidValve:
def __init__(self, io_device_port: str = None, **kwargs):
# name="io_device_port" → 图文件 config 中用 "io_device_port": "io_board_1"
self.io_device_port = io_device_port # 初始为字符串,系统替换为 Modbus 实例
```
### Serial 通信设备class="serial"
`serial` 是 Uni-Lab-OS 内置的通信代理设备,代码位于 `unilabos/ros/nodes/presets/serial_node.py`
```python
from serial import Serial, SerialException
from threading import Lock
class ROS2SerialNode(BaseROS2DeviceNode):
def __init__(self, device_id, registry_name, port: str, baudrate: int = 9600, **kwargs):
self.port = port
self.baudrate = baudrate
self._hardware_interface = {
"name": "hardware_interface",
"write": "send_command",
"read": "read_data",
}
self._query_lock = Lock()
self.hardware_interface = Serial(baudrate=baudrate, port=port)
BaseROS2DeviceNode.__init__(
self, driver_instance=self, registry_name=registry_name,
device_id=device_id, status_types={}, action_value_mappings={},
hardware_interface=self._hardware_interface, print_publish=False,
)
self.create_service(SerialCommand, "serialwrite", self.handle_serial_request)
def send_command(self, command: str):
with self._query_lock:
self.hardware_interface.write(bytearray(f"{command}\n", "ascii"))
return self.hardware_interface.read_until(b"\n").decode()
def read_data(self):
with self._query_lock:
return self.hardware_interface.read_until(b"\n").decode()
```
在图文件中使用 `"class": "serial"` 即可创建串口代理:
```json
"deck": {
"data": {
"_resource_child_name": "my_deck",
"_resource_type": "unilabos.resources.my_project.decks:MyStation_Deck"
{
"id": "serial_pump",
"class": "serial",
"parent": "my_station",
"config": { "port": "COM7", "baudrate": 9600 }
}
```
### 图文件配置
**通信设备必须在 `children` 列表中排在最前面**,确保先于其他子设备初始化:
```json
{
"nodes": [
{
"id": "my_station",
"class": "workstation",
"children": ["serial_pump", "pump_1", "pump_2"],
"config": { "protocol_type": ["PumpTransferProtocol"] }
},
{
"id": "serial_pump",
"class": "serial",
"parent": "my_station",
"config": { "port": "COM7", "baudrate": 9600 }
},
{
"id": "pump_1",
"class": "syringe_pump_with_valve.runze.SY03B-T08",
"parent": "my_station",
"config": { "port": "serial_pump", "address": "1", "max_volume": 25.0 }
},
{
"id": "pump_2",
"class": "syringe_pump_with_valve.runze.SY03B-T08",
"parent": "my_station",
"config": { "port": "serial_pump", "address": "2", "max_volume": 25.0 }
}
],
"links": [
{
"source": "pump_1",
"target": "serial_pump",
"type": "communication",
"port": { "pump_1": "port", "serial_pump": "port" }
},
{
"source": "pump_2",
"target": "serial_pump",
"type": "communication",
"port": { "pump_2": "port", "serial_pump": "port" }
}
]
}
```
### 通信协议速查
| 协议 | config 参数 | 依赖包 | 通信设备 class |
| -------------------- | ------------------------------ | ---------- | -------------------------- |
| Serial (RS232/RS485) | `port`, `baudrate` | `pyserial` | `serial` |
| Modbus RTU | `port`, `baudrate`, `slave_id` | `pymodbus` | `device_comms/modbus_plc/` |
| Modbus TCP | `host`, `port`, `slave_id` | `pymodbus` | `device_comms/modbus_plc/` |
| TCP Socket | `host`, `port` | stdlib | 自定义 |
| HTTP API | `url`, `token` | `requests` | `device_comms/rpc.py` |
参考实现:`unilabos/test/experiments/Grignard_flow_batchreact_single_pumpvalve.json`
---
## Deck 与物料生命周期
### 1. Deck 入参与两种初始化模式
系统根据设备节点 `config.deck` 的写法,自动反序列化 Deck 实例后传入 `__init__``deck` 参数。目前 `deck` 是固定字段名,只支持一个主 Deck。建议一个设备拥有一个台面台面上抽象二级、三级子物料。
有两种初始化模式:
#### init 初始化(推荐)
`config.deck` 直接包含 `_resource_type` + `_resource_child_name`,系统先用 Deck 节点的 `config` 调用 Deck 类的 `__init__` 反序列化,再将实例传入设备的 `deck` 参数。子物料随 Deck 的 `children` 一起反序列化。
```json
"config": {
"deck": {
"_resource_type": "unilabos.devices.liquid_handling.prcxi.prcxi:PRCXI9300Deck",
"_resource_child_name": "PRCXI_Deck"
}
}
```
Deck 子节点:
#### deserialize 初始化
`config.deck``data` 包裹一层,系统走 `deserialize` 路径,可传入更多参数(如 `allow_marshal` 等):
```json
{
"id": "my_deck",
"parent": "my_station",
"type": "deck",
"class": "MyStation_Deck",
"config": {"type": "MyStation_Deck", "setup": true, "rotation": {"x": 0, "y": 0, "z": 0, "type": "Rotation"}}
"config": {
"deck": {
"data": {
"_resource_child_name": "YB_Bioyond_Deck",
"_resource_type": "unilabos.resources.bioyond.decks:BIOYOND_YB_Deck"
}
}
}
```
> **`_resource_child_name`** 必须与 Deck 节点的 `id` 一致
没有特殊需求时推荐 init 初始化
---
## 第七步:配置图文件
路径:`unilabos/test/experiments/<station_name>.json`
```json
{
"nodes": [
{
"id": "my_station",
"name": "my_station",
"children": ["my_deck", "sub_device_1"],
"parent": null,
"type": "device",
"class": "my_workstation",
"position": {"x": 0, "y": 0, "z": 0},
"config": {},
"deck": {"data": {"_resource_child_name": "my_deck", "_resource_type": "...decks:MyStation_Deck"}},
"size_x": 2700.0, "size_y": 1080.0, "size_z": 1500.0,
"protocol_type": [],
"data": {}
},
{"id": "my_deck", "parent": "my_station", "type": "deck", "class": "MyStation_Deck", "config": {"type": "MyStation_Deck", "setup": true}},
{"id": "sub_device_1", "parent": "my_station", "type": "device", "class": "sub_device_class", "config": {}}
]
}
```
### 图文件规则
#### config.deck 字段说明
| 字段 | 说明 |
|------|------|
| `children` | 包含 deck ID 和所有子设备 ID |
| `parent` | 工作站为 `null`;子设备/deck 指向工作站 ID |
| `type` | 工作站和子设备 `"device"`deck 为 `"deck"` |
| `class` | 注册表中的设备名 |
| `protocol_type` | Protocol 工作站填协议名列表;否则 `[]` |
| `config` | 传入 `__init__``config` 参数 |
| `_resource_type` | Deck 类的完整模块路径(`module:ClassName` |
| `_resource_child_name` | 对应图文件中 Deck 节点的 `id`,建立父子关联 |
### Config 字段速查
#### 设备 __init__ 接收
| 字段 | 外部系统 | PLC/硬件 | 说明 |
|------|---------|---------|------|
| `api_host` / `api_key` | ✅ | — | 外部 API 连接 |
| `address` / `port` | — | ✅ | PLC 地址init 参数,非 config 内) |
| `workflow_mappings` | ✅ | — | 工作流名 → 外部 UUID |
| `material_type_mappings` | ✅ | — | PLR 资源类 → 外部物料类型 |
| `warehouse_mapping` | ✅ | — | 仓库 → 外部 UUID + 库位 UUID |
| `http_service_config` | ✅ | — | HTTP 回调 host/port |
```python
def __init__(self, config=None, deck=None, protocol_type=None, **kwargs):
super().__init__(deck=deck, **kwargs)
# deck 已经是反序列化后的 Deck 实例
# → PRCXI9300Deck / BIOYOND_YB_Deck 等
```
> 完整 Config 结构详见 [reference.md § 2](reference.md)
#### Deck 节点(图文件中)
Deck 节点作为设备的 `children` 之一,`parent` 指向设备 id
```json
{
"id": "PRCXI_Deck",
"parent": "PRCXI",
"type": "deck",
"class": "",
"children": [],
"config": {
"type": "PRCXI9300Deck",
"size_x": 542, "size_y": 374, "size_z": 0,
"category": "deck",
"sites": [...]
},
"data": {}
}
```
- `config` 中的字段会传入 Deck 类的 `__init__`(因此 `__init__` 必须能接受所有 `serialize()` 输出的字段)
- `children` 初始为空时,由同步器或手动初始化填充
- `config.type` 填 Deck 类名
### 2. Deck 为空时自行初始化
如果 Deck 节点的 `children` 为空,工作站需在 `post_init` 或首次同步时自行初始化内容:
```python
@not_action
def post_init(self, ros_node):
super().post_init(ros_node)
if self.deck and not self.deck.children:
self._initialize_default_deck()
def _initialize_default_deck(self):
from my_labware import My_TipRack, My_Plate
self.deck.assign_child_resource(My_TipRack("T1"), spot=0)
self.deck.assign_child_resource(My_Plate("T2"), spot=1)
```
### 3. 物料双向同步
当工作站对接外部系统LIMS/MES需要实现 `ResourceSynchronizer` 处理双向物料同步:
```python
from unilabos.devices.workstation.workstation_base import ResourceSynchronizer
class MyResourceSynchronizer(ResourceSynchronizer):
def sync_from_external(self) -> bool:
"""从外部系统同步到 self.workstation.deck"""
external_data = self._query_external_materials()
# 以外部工站为准:根据外部数据反向创建 PLR 资源实例
for item in external_data:
cls = self._resolve_resource_class(item["type"])
resource = cls(name=item["name"], **item["params"])
self.workstation.deck.assign_child_resource(resource, spot=item["slot"])
return True
def sync_to_external(self, resource) -> bool:
"""将 UniLab 侧物料变更同步到外部系统"""
# 以 UniLab 为准:将 PLR 资源转为外部格式并推送
external_format = self._convert_to_external(resource)
return self._push_to_external(external_format)
def handle_external_change(self, change_info) -> bool:
"""处理外部系统主动推送的变更"""
return True
```
同步策略取决于业务场景:
- **以外部工站为准**:从外部 API 查询物料数据,反向创建对应的 PLR 资源实例放到 Deck 上
- **以 UniLab 为准**UniLab 侧的物料变更通过 `sync_to_external` 推送到外部系统
在工作站 `post_init` 中初始化同步器:
```python
@not_action
def post_init(self, ros_node):
super().post_init(ros_node)
self.resource_synchronizer = MyResourceSynchronizer(self)
self.resource_synchronizer.sync_from_external()
```
### 4. 序列化与持久化serialize / serialize_state
资源类需正确实现序列化,系统据此完成持久化和前端同步。
**`serialize()`** — 输出资源的结构信息(`config` 层),反序列化时作为 `__init__` 的入参回传。因此 **`__init__` 必须通过 `**kwargs`接受`serialize()` 输出的所有字段\*\*,即使当前不使用:
```python
class MyDeck(Deck):
def __init__(self, name, size_x, size_y, size_z,
sites=None, # serialize() 输出的字段
rotation=None, # serialize() 输出的字段
barcode=None, # serialize() 输出的字段
**kwargs): # 兜底:接受所有未知的 serialize 字段
super().__init__(size_x, size_y, size_z, name)
# ...
def serialize(self) -> dict:
data = super().serialize()
data["sites"] = [...] # 自定义字段
return data
```
**`serialize_state()`** — 输出资源的运行时状态(`data` 层),用于持久化可变信息。`data` 中的内容会被正确保存和恢复:
```python
class MyPlate(Plate):
def __init__(self, name, size_x, size_y, size_z,
material_info=None, **kwargs):
super().__init__(name, size_x, size_y, size_z, **kwargs)
self._unilabos_state = {}
if material_info:
self._unilabos_state["Material"] = material_info
def serialize_state(self) -> Dict[str, Any]:
data = super().serialize_state()
data.update(self._unilabos_state)
return data
```
关键要点:
- `serialize()` 输出的所有字段都会作为 `config` 回传到 `__init__`,所以 `__init__` 必须能接受它们(显式声明或 `**kwargs`
- `serialize_state()` 输出的 `data` 用于持久化运行时状态(如物料信息、液体量等)
- `_unilabos_state` 中只存可 JSON 序列化的基本类型str, int, float, bool, list, dict, None
### 5. 子物料自动同步
子物料Bottle、Plate、TipRack 等)放到 Deck 上后,系统会自动将其同步到前端的 Deck 视图。只需保证资源类正确实现了 `serialize()` / `serialize_state()` 和反序列化即可。
### 6. 图文件配置(参考 prcxi_9320_slim.json
```json
{
"nodes": [
{
"id": "my_station",
"type": "device",
"class": "my_workstation",
"config": {
"deck": {
"_resource_type": "unilabos.resources.my_module:MyDeck",
"_resource_child_name": "my_deck"
},
"host": "10.20.30.1",
"port": 9999
}
},
{
"id": "my_deck",
"parent": "my_station",
"type": "deck",
"class": "",
"children": [],
"config": {
"type": "MyLabDeck",
"size_x": 542,
"size_y": 374,
"size_z": 0,
"category": "deck",
"sites": [
{
"label": "T1",
"visible": true,
"occupied_by": null,
"position": { "x": 0, "y": 0, "z": 0 },
"size": { "width": 128.0, "height": 86, "depth": 0 },
"content_type": ["plate", "tip_rack", "tube_rack", "adaptor"]
}
]
},
"data": {}
}
],
"edges": []
}
```
Deck 节点要点:
- `config.type` 填 Deck 类名(如 `"PRCXI9300Deck"`
- `config.sites` 完整列出所有 site从 Deck 类的 `serialize()` 输出获取)
- `children` 初始为空(由同步器或手动初始化填充)
- 设备节点 `config.deck._resource_type` 指向 Deck 类的完整模块路径
---
## 第八步:验证
## 子设备
```bash
python -c "from unilabos.devices.workstation.<name>.<name> import <ClassName>"
unilab -g <graph>.json --complete_registry
unilab -g <graph>.json
```
子设备按标准设备接入流程创建(参见 add-device SKILL使用 `@device` 装饰器。
子设备约束:
- 图文件中 `parent` 指向工作站 ID
- 在工作站 `children` 数组中列出
---
## 关键规则
1. `__init__` 必须接受 `deck``**kwargs`
2. 通过 `self._children` 访问子设备,不自行维护引用
3. `post_init` 中启动后台服务,不在 `__init__` 中启动网络连接
4. 异步方法使用 `await self._ros_node.sleep()`,禁止 `time.sleep()` / `asyncio.sleep()`
5. 子设备在图文件中声明,不在驱动代码中创建
6. `_resource_child_name` 必须与 deck 节点 ID 一致
7. Protocol 工作站优先使用 `ProtocolNode`
8. 通信设备 ID 以 `serial_``io_` 开头
1. **`__init__` 必须接受 `deck``**kwargs`** — `WorkstationBase.**init**`需要`deck` 参数
2. **Deck 通过 `config.deck._resource_type` 反序列化传入** — 不要在 `__init__` 中手动创建 Deck
3. **Deck 为空时自行初始化内容** — 在 `post_init` 中检查并填充默认物料
4. **外部同步实现 `ResourceSynchronizer`**`sync_from_external` / `sync_to_external`
5. **通过 `self._children` 访问子设备** — 不要自行维护子设备引用
6. **`post_init` 中启动后台服务** — 不要在 `__init__` 中启动网络连接
7. **异步方法使用 `await self._ros_node.sleep()`** — 禁止 `time.sleep()``asyncio.sleep()`
8. **使用 `@not_action` 标记非动作方法**`post_init`, `initialize`, `cleanup`
9. **子物料保证正确 serialize/deserialize** — 系统自动同步到前端 Deck 视图
---
## 工作流清单
## 验证
```
- [ ] 1. 确定类型Protocol / 外部系统 / 硬件控制)
- [ ] 2. 确认子设备组成和物料需求
- [ ] 3. 创建工作站驱动
- [ ] 4. 创建子设备驱动 + 注册表(如需要)
- [ ] 5. 创建工作站注册表
- [ ] 6. 创建物料资源 Bottle→Carrier→WareHouse→Deck如需要
- [ ] 7. 注册 PLR 扩展Deck 类需要)
- [ ] 8. 配置图文件
- [ ] 9. 验证
```bash
# 模块可导入
python -c "from unilabos.devices.workstation.<name>.<name> import <ClassName>"
# 启动测试AST 自动扫描)
unilab -g <graph>.json
```
---
## 参考资源
## 现有工作站参考
- **代码模板**[templates.md](templates.md) — 驱动模板 A/B/C、子设备、注册表、物料资源
- **高级模式**[reference.md](reference.md) — 外部系统集成、Config 模式、资源同步、PLC 框架、端到端案例
- **现有工作站**
| 工作站 | 驱动类 | 类型 |
| -------------- | ----------------------------- | -------- |
| Protocol 通用 | `ProtocolNode` | Protocol |
| Bioyond 反应站 | `BioyondReactionStation` | 外部系统 |
| 纽扣电池组装 | `CoinCellAssemblyWorkstation` | 硬件控制 |
| 工作站 | 注册表名 | 类型 | 驱动路径 |
|--------|----------|------|---------|
| Bioyond 反应站 | `reaction_station.bioyond` | 外部系统 | `bioyond_studio/reaction_station/` |
| Bioyond 配液站 | `bioyond_dispensing_station` | 外部系统 | `bioyond_studio/dispensing_station/` |
| 纽扣电池组装 | `coincellassemblyworkstation_device` | 硬件控制 | `coin_cell_assembly/` |
| Protocol 通用 | `workstation` | Protocol | `workstation_base.py` |
参考路径:`unilabos/devices/workstation/` 目录下各工作站实现。

View File

@@ -1,6 +1,6 @@
# 工作站高级模式参考
本文件是 SKILL.md 的补充,包含外部系统集成、物料同步、PLC 框架、硬件代理等高级模式。
本文件是 SKILL.md 的补充,包含外部系统集成、物料同步、配置结构等高级模式。
Agent 在需要实现这些功能时按需阅读。
---
@@ -116,6 +116,7 @@ class ConnectionMonitor:
def _monitor_loop(self):
while self._running:
try:
# 调用外部系统接口检测连接
self.workstation.hardware_interface.ping()
status = "online"
except Exception:
@@ -209,35 +210,6 @@ class ConnectionMonitor:
}
```
### 2.7 工作流到工序名映射
```json
{
"workflow_to_section_map": {
"reactor_taken_in": "反应器放入",
"reactor_taken_out": "反应器取出",
"Solid_feeding_vials": "固体投料-小瓶"
}
}
```
### 2.8 动作名称映射
```json
{
"action_names": {
"reactor_taken_in": {
"config": "通量-配置",
"stirring": "反应模块-开始搅拌"
},
"solid_feeding_vials": {
"feeding": "粉末加样模块-投料",
"observe": "反应模块-观察搅拌结果"
}
}
}
```
---
## 3. 资源同步机制
@@ -274,25 +246,14 @@ class MyResourceSynchronizer(ResourceSynchronizer):
return True
```
### 3.2 资源树回调
Bioyond 工作站注册了资源树变更回调,实现与外部系统的自动同步:
| 回调名 | 触发时机 | 外部操作 |
|--------|---------|---------|
| `resource_tree_add` | PLR Deck 中添加资源 | 入库到外部系统 |
| `resource_tree_remove` | PLR Deck 中移除资源 | 出库 |
| `resource_tree_transfer` | 创建物料(不入库) | 创建外部物料记录 |
| `resource_tree_update` | 资源位置移动 | 更新外部系统库位 |
### 3.3 update_resource — 上传资源树到云端
### 3.2 update_resource — 上传资源树到云端
将 PLR Deck 序列化后通过 ROS 服务上传。典型使用场景:
```python
# 在 post_init 中上传初始 deck
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode
# 在 post_init 中上传初始 deck
ROS2DeviceNode.run_async_func(
self._ros_node.update_resource, True,
**{"resources": [self.deck]}
@@ -354,11 +315,15 @@ async def transfer_materials_to_another_station(
"""将物料转移到另一个工作站"""
target_node = self._children.get(target_device_id)
if not target_node:
# 通过 ROS 节点查找非子设备的目标站
pass
for group in transfer_groups:
resource = self.find_resource_by_name(group["resource_name"])
# 从本站 deck 移除
resource.unassign()
# 调用目标站的接收方法
# ...
return {"success": True, "transferred": len(transfer_groups)}
```
@@ -404,437 +369,3 @@ def post_init(self, ros_node):
# 5. 初始化资源同步器(可选)
self.resource_synchronizer = MyResourceSynchronizer(self, self.rpc_client)
```
---
## 7. PLC/Modbus 完整框架
### 7.1 寄存器映射 CSV 格式
PLC 工作站使用 CSV 文件定义寄存器映射表。路径通常为工作站目录下的 `<name>.csv`
**CSV 列定义:**
| 列名 | 说明 | 值示例 |
|------|------|--------|
| `Name` | 寄存器节点名称(代码中引用的唯一标识) | `COIL_SYS_START_CMD` |
| `DataType` | 数据类型 | `BOOL`, `INT16`, `FLOAT32` |
| `InitValue` | 初始值(可选) | — |
| `Comment` | 注释(可选) | — |
| `Attribute` | 自定义属性(可选) | — |
| `DeviceType` | Modbus 设备类型 | `coil`, `hold_register`, `input_register`, `discrete_inputs` |
| `Address` | Modbus 地址 | `8010`, `11000` |
**CSV 示例:**
```csv
Name,DataType,InitValue,Comment,Attribute,DeviceType,Address,
COIL_SYS_START_CMD,BOOL,,系统启动命令,,coil,8010,
COIL_SYS_STOP_CMD,BOOL,,系统停止命令,,coil,8020,
COIL_SYS_RESET_CMD,BOOL,,系统复位命令,,coil,8030,
REG_MSG_ELECTROLYTE_VOLUME,INT16,,电解液体积,,hold_register,11004,
REG_DATA_OPEN_CIRCUIT_VOLTAGE,FLOAT32,,开路电压,,hold_register,10002,
REG_DATA_AXIS_X_POS,FLOAT32,,X轴位置,,hold_register,10004,
```
**命名约定:**
- 线圈:`COIL_` 前缀(读写布尔量)
- 保持寄存器:`REG_MSG_`(消息/命令寄存器)、`REG_DATA_`(数据/状态寄存器)
- `_CMD` 后缀:写入命令
- `_STATUS` 后缀:读取状态
### 7.2 TCPClient 初始化
```python
from unilabos.device_comms.modbus_plc.client import TCPClient, BaseClient
from unilabos.device_comms.modbus_plc.modbus import DataType, WorderOrder
# 创建 Modbus TCP 客户端
modbus_client = TCPClient(addr="192.168.1.100", port=502)
modbus_client.client.connect()
# 从 CSV 加载寄存器映射
import os
csv_path = os.path.join(os.path.dirname(__file__), 'register_map.csv')
nodes = BaseClient.load_csv(csv_path)
client = modbus_client.register_node_list(nodes)
```
### 7.3 寄存器读写操作
```python
# 读取线圈(布尔值)
result, err = client.use_node('COIL_SYS_START_STATUS').read(1)
is_started = result[0] if not err else False
# 写入线圈
client.use_node('COIL_SYS_START_CMD').write(True)
# 读取保持寄存器INT16
result, err = client.use_node('REG_DATA_ASSEMBLY_COIN_CELL_NUM').read(1)
# 读取保持寄存器FLOAT32需要 2 个寄存器)
result, err = client.use_node('REG_DATA_OPEN_CIRCUIT_VOLTAGE').read(2)
# 写入保持寄存器FLOAT32
client.use_node('REG_MSG_ELECTROLYTE_VOLUME').write(
100.0,
data_type=DataType.FLOAT32,
word_order=WorderOrder.LITTLE,
)
```
**FLOAT32 字节序注意:** 许多 PLC 使用 Big Byte Order + Little Word Order需要交换两个 16 位寄存器的顺序。参考 `coin_cell_assembly.py` 中的 `_decode_float32_correct` 函数。
### 7.4 ModbusWorkflow 生命周期
PLC 工作站的动作通过 `ModbusWorkflow` + `WorkflowAction` 组织,每个动作有 4 个生命周期阶段:
```python
from unilabos.device_comms.modbus_plc.client import ModbusWorkflow, WorkflowAction
# 定义动作的生命周期函数
def my_init(use_node):
"""初始化:设置参数"""
use_node('REG_MSG_ELECTROLYTE_VOLUME').write(
100.0, data_type=DataType.FLOAT32, word_order=WorderOrder.LITTLE
)
return True
def my_start(use_node):
"""启动:触发动作并轮询等待完成"""
use_node('COIL_SYS_START_CMD').write(True)
while True:
result, err = use_node('COIL_SYS_START_STATUS').read(1)
if not err and result[0]:
break
time.sleep(0.5)
return True
def my_stop(use_node):
"""停止:复位触发信号"""
use_node('COIL_SYS_START_CMD').write(False)
return True
def my_cleanup(use_node):
"""清理:无论成功失败都执行"""
use_node('COIL_SYS_RESET_CMD').write(True)
# 组合成工作流
workflow = ModbusWorkflow(
name="我的加工流程",
actions=[
WorkflowAction(init=my_init, start=my_start, stop=my_stop, cleanup=my_cleanup)
],
)
# 执行
client.run_modbus_workflow(workflow)
```
**生命周期执行顺序:** `init``start``stop``cleanup`cleanup 始终执行,即使前序步骤失败)
### 7.5 PLC 工作站中的握手循环
纽扣电池组装站的典型 PLC 交互模式(信息交换握手):
```python
async def _send_msg_to_plc(self, data: dict):
"""向 PLC 发送消息并等待确认"""
# 1. 写入数据寄存器
for key, value in data.items():
self._write_register(key, value)
# 2. 发送"消息已准备好"信号
self._write_coil('COIL_UNILAB_SEND_MSG_SUCC_CMD', True)
# 3. 等待 PLC 读取确认
while not self._read_coil('COIL_REQUEST_REC_MSG_STATUS'):
await self._ros_node.sleep(0.3)
# 4. 撤销发送信号
self._write_coil('COIL_UNILAB_SEND_MSG_SUCC_CMD', False)
async def _recv_msg_from_plc(self) -> dict:
"""等待 PLC 发送消息"""
# 1. 等待 PLC 发送信号
while not self._read_coil('COIL_REQUEST_SEND_MSG_STATUS'):
await self._ros_node.sleep(0.3)
# 2. 读取数据寄存器
data = {}
for key in self._recv_registers:
data[key] = self._read_register(key)
# 3. 发送"已收到"确认
self._write_coil('COIL_UNILAB_REC_MSG_SUCC_CMD', True)
# 4. 等待 PLC 撤销发送信号
while self._read_coil('COIL_REQUEST_SEND_MSG_STATUS'):
await self._ros_node.sleep(0.3)
# 5. 撤销确认信号
self._write_coil('COIL_UNILAB_REC_MSG_SUCC_CMD', False)
return data
```
### 7.6 JSON 驱动的 PLC 工作流
PLC 工作站还支持通过 JSON 描述工作流,无需编写 Python 代码。使用 `BaseClient.execute_procedure_from_json`
```json
{
"register_node_list_from_csv_path": {"path": "register_map.csv"},
"create_flow": [
{
"name": "初始化系统",
"action": [
{
"address_function_to_create": [
{"func_name": "write_start", "node_name": "COIL_SYS_START_CMD", "mode": "write", "value": true},
{"func_name": "read_status", "node_name": "COIL_SYS_START_STATUS", "mode": "read", "value": 1}
],
"create_init_function": null,
"create_start_function": {
"func_name": "start_sys",
"write_functions": ["write_start"],
"condition_functions": ["read_status"],
"stop_condition_expression": "read_status[0]"
},
"create_stop_function": {"func_name": "stop_start", "node_name": "COIL_SYS_START_CMD", "mode": "write", "value": false},
"create_cleanup_function": null
}
]
}
],
"execute_flow": ["初始化系统"]
}
```
参考:`unilabos/device_comms/modbus_plc/client.py``ExecuteProcedureJson` 类型定义)
---
## 8. 端到端案例 WalkthroughBioyond 反应站
以 Bioyond 反应站为例,展示从零接入一个带物料输入的外部系统工作站的完整过程。
### 8.1 需求
- **类型**:外部系统工作站(与 Bioyond LIMS 系统对接)
- **通信**HTTP APIRPC 客户端 + HTTP 回调服务)
- **子设备**5 个反应器reactor_1 ~ reactor_5
- **物料**:反应器、试剂瓶、烧杯、样品板、小瓶、枪头盒 → 6 种 WareHouse → 1 个 Deck
### 8.2 文件结构
```
unilabos/
├── devices/workstation/bioyond_studio/
│ ├── station.py # BioyondWorkstation 基类
│ ├── bioyond_rpc.py # RPC 客户端
│ └── reaction_station/
│ └── reaction_station.py # BioyondReactionStation + BioyondReactor
├── resources/bioyond/
│ ├── bottles.py # Bottle 工厂函数8 种)
│ ├── bottle_carriers.py # Carrier 工厂函数8 种)
│ ├── warehouses.py # WareHouse 工厂函数6 种)
│ └── decks.py # BIOYOND_PolymerReactionStation_Deck
├── registry/
│ ├── devices/reaction_station_bioyond.yaml
│ └── resources/bioyond/
│ ├── bottles.yaml
│ ├── bottle_carriers.yaml
│ └── decks.yaml
└── test/experiments/reaction_station_bioyond.json
```
### 8.3 继承链
```
WorkstationBase
└── BioyondWorkstation # 通用 Bioyond 逻辑
├── __init__(config, deck, protocol_type)
├── post_init() → 启动连接监控 + HTTP 服务 + 上传 deck
├── BioyondResourceSynchronizer # 物料双向同步
└── BioyondReactionStation # 反应站特化
├── reactor_taken_in() # 反应器放入工作流
├── solid_feeding_vials() # 固体投料
├── liquid_feeding_solvents() # 液体投料
└── workflow_sequence @property # 工作流序列状态
```
### 8.4 物料资源层级(反应站实例)
```
BIOYOND_PolymerReactionStation_Deck (2700×1080×1500mm)
├── 堆栈1左 (WareHouse 4x4) ← Coordinate(-200, 400, 0)
│ ├── A01 → BottleCarrier → Reactor
│ ├── A02 → BottleCarrier → Reactor
│ └── ...(共 16 槽位)
├── 堆栈1右 (WareHouse 4x4, col_offset=4) ← Coordinate(350, 400, 0)
│ ├── A05 → BottleCarrier → Reactor
│ └── ...
├── 站内试剂存放堆栈 (WareHouse 1x2) ← Coordinate(1050, 400, 0)
│ ├── A01 → 1BottleCarrier → Bottle
│ └── A02 → 1BottleCarrier → Bottle
├── 测量小瓶仓库 (WareHouse 3x2) ← Coordinate(...)
├── 站内Tip盒堆栈(左) (WareHouse, removed_positions)
└── 站内Tip盒堆栈(右) (WareHouse)
```
### 8.5 图文件关键结构
```json
{
"nodes": [
{
"id": "reaction_station_bioyond",
"children": ["Bioyond_Deck", "reactor_1", "reactor_2", "reactor_3", "reactor_4", "reactor_5"],
"parent": null,
"type": "device",
"class": "reaction_station.bioyond",
"config": {
"api_key": "DE9BDDA0",
"api_host": "http://172.21.103.36:45388",
"workflow_mappings": {
"reactor_taken_out": "3a16081e-...",
"reactor_taken_in": "3a160df6-..."
},
"material_type_mappings": {
"BIOYOND_PolymerStation_Reactor": ["反应器", "3a14233b-..."],
"BIOYOND_PolymerStation_1BottleCarrier": ["试剂瓶", "3a14233b-..."]
},
"warehouse_mapping": {
"堆栈1左": {
"uuid": "3a14aa17-...",
"site_uuids": {"A01": "3a14aa17-...", "A02": "3a14aa17-..."}
}
},
"http_service_config": {
"http_service_host": "127.0.0.1",
"http_service_port": 8080
}
},
"deck": {
"data": {
"_resource_child_name": "Bioyond_Deck",
"_resource_type": "unilabos.resources.bioyond.decks:BIOYOND_PolymerReactionStation_Deck"
}
},
"size_x": 2700.0,
"size_y": 1080.0,
"size_z": 2500.0,
"protocol_type": [],
"data": {}
},
{
"id": "Bioyond_Deck",
"parent": "reaction_station_bioyond",
"type": "deck",
"class": "BIOYOND_PolymerReactionStation_Deck",
"config": {"type": "BIOYOND_PolymerReactionStation_Deck", "setup": true}
},
{
"id": "reactor_1",
"parent": "reaction_station_bioyond",
"type": "device",
"class": "reaction_station.reactor",
"position": {"x": 1150, "y": 300, "z": 0},
"config": {}
}
]
}
```
### 8.6 初始化时序
```
1. ROS2WorkstationNode.__init__
├── 创建 BioyondReactionStation 实例__init__
├── 加载 DeckBIOYOND_PolymerReactionStation_Deck, setup=true → 创建 6 个 WareHouse
├── 初始化 reactor_1~5BioyondReactor 实例)→ sub_devices
└── 为每个 reactor 创建 ActionClient
2. BioyondReactionStation.post_init(ros_node)
├── 初始化 BioyondV1RPCHTTP 客户端)
├── 初始化 BioyondResourceSynchronizer
├── 启动 ConnectionMonitor30s 轮询)
├── 启动 WorkstationHTTPService接收回调
├── sync_from_external()(从 Bioyond 拉取物料到 Deck
└── update_resource([self.deck])(上传 Deck 到云端)
```
### 8.7 物料同步流程
```
外部入库:
Bioyond API → stock_material() → 获取物料列表
→ resource_bioyond_to_plr() → 转为 PLR Bottle/Carrier
→ deck.warehouses["堆栈1左"]["A01"] = carrier
→ update_resource([deck])
外部变更回调:
Bioyond POST /report/material_change
→ WorkstationHTTPService 接收
→ process_material_change_report()
→ 更新 Deck 中的资源
→ update_resource([affected_resource])
```
### 8.8 工作站动作执行流程(以 reactor_taken_in 为例)
```python
async def reactor_taken_in(self, assign_material_name, cutoff, temperature, **kwargs):
# 1. 从 config 获取工作流 UUID
workflow_id = self.config["workflow_mappings"]["reactor_taken_in"]
# 2. 构建工序参数
sections = self._build_sections(temperature, cutoff, ...)
# 3. 合并到工作流序列
self._workflow_sequence.append({"name": "reactor_taken_in", ...})
# 4. 调用外部系统创建工单
result = self.hardware_interface.create_order(order_data)
# 5. 等待外部系统完成(通过 HTTP 回调通知)
# process_order_finish_report 被回调时更新状态
return {"success": True}
```
---
## 9. 现有工作站 Config 结构完整对比
| 特性 | BioyondReactionStation | BioyondDispensingStation | CoinCellAssemblyWorkstation |
|------|----------------------|------------------------|-----------------------------|
| **继承** | BioyondWorkstation | BioyondWorkstation | WorkstationBase (直接) |
| **通信方式** | HTTP RPC | HTTP RPC | Modbus TCP |
| **`__init__` 签名** | `(config, deck, protocol_type, **kwargs)` | `(config, deck, protocol_type, **kwargs)` | `(config, deck, address, port, debug_mode, **kwargs)` |
| **子设备** | 5 个 BioyondReactor | 无 | 无 |
| **Deck** | BioyondReactionDeck (6 个 WareHouse) | BioyondDispensingDeck | CoincellDeck |
| **物料同步** | BioyondResourceSynchronizer (双向) | BioyondResourceSynchronizer (双向) | 无(本地 PLR |
| **status_types** | `workflow_sequence: str` | 空 | 18 个属性 (sys_status, 传感器数据等) |
| **动作风格** | 语义化 (reactor_taken_in, ...) | 语义化 (compute_experiment_design, ...) | PLC 操作 (func_pack_device_init, ...) |
| **post_init** | 连接监控 + HTTP 服务 + 资源同步 + 上传 deck | 继承父类 | 上传 deck |
| **工作流管理** | workflow_mappings → 合并序列 → create_order | batch_create → wait_for_reports | PLC 握手循环 |
### Config 字段对比
| 字段 | 反应站 | 配液站 | 纽扣电池 |
|------|--------|--------|---------|
| `api_host` | ✅ | ✅ | — |
| `api_key` | ✅ | ✅ | — |
| `workflow_mappings` | ✅ (8 个工作流) | — | — |
| `material_type_mappings` | ✅ (8 种物料) | ✅ | — |
| `warehouse_mapping` | ✅ (6 个仓库) | ✅ (3 个仓库) | — |
| `workflow_to_section_map` | ✅ | — | — |
| `action_names` | ✅ | — | — |
| `http_service_config` | ✅ | — | — |
| `material_default_parameters` | ✅ | — | — |
| `address` (init 参数) | — | — | ✅ |
| `port` (init 参数) | — | — | ✅ |
| `debug_mode` (init 参数) | — | — | ✅ |