Compare commits

...

5 Commits

Author SHA1 Message Date
Xuwznln
b7c726635c correct sample demo ret value 2026-03-24 23:24:12 +08:00
Xuwznln
c809912fd3 新增试剂reagent 2026-03-24 23:22:45 +08:00
Xuwznln
d956b27e9f update registry 2026-03-24 23:10:57 +08:00
Xuwznln
ff1e21fcd8 新增manual_confirm 2026-03-24 23:04:00 +08:00
Xuwznln
b9d9666003 add workstation creation skill 2026-03-24 23:03:49 +08:00
10 changed files with 1327 additions and 135 deletions

View File

@@ -0,0 +1,626 @@
---
name: add-workstation
description: Guide for adding new workstations to Uni-Lab-OS (接入新工作站). Uses @device decorator + AST auto-scanning. Walks through workstation type, sub-device composition, driver creation, deck setup, and graph file. Use when the user wants to add a workstation, create a workstation driver, configure a station with sub-devices, or mentions 工作站/工站/station/workstation.
---
# Uni-Lab-OS 工作站接入指南
工作站workstation是组合多个子设备的大型设备拥有独立的物料管理系统和工作流引擎。使用 `@device` 装饰器注册AST 自动扫描生成注册表。
---
## 工作站类型
| 类型 | 基类 | 适用场景 |
| ------------------- | ----------------- | ---------------------------------- |
| **Protocol 工作站** | `ProtocolNode` | 标准化学操作协议(泵转移、过滤等) |
| **外部系统工作站** | `WorkstationBase` | 与外部 LIMS/MES 对接 |
| **硬件控制工作站** | `WorkstationBase` | 直接控制 PLC/硬件 |
---
## @device 装饰器(工作站)
工作站也使用 `@device` 装饰器注册,参数与普通设备一致:
```python
@device(
id="my_workstation", # 注册表唯一标识(必填)
category=["workstation"], # 分类标签
description="我的工作站",
)
```
如果一个工作站类支持多个具体变体,可使用 `ids` / `id_meta`,与设备的用法相同(参见 add-device SKILL
---
## 工作站驱动模板
### 模板 A基于外部系统的工作站
```python
import logging
from typing import Dict, Any, Optional
from pylabrobot.resources import Deck
from unilabos.registry.decorators import device, topic_config, not_action
from unilabos.devices.workstation.workstation_base import WorkstationBase
try:
from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode
except ImportError:
ROS2WorkstationNode = None
@device(id="my_workstation", category=["workstation"], description="我的工作站")
class MyWorkstation(WorkstationBase):
_ros_node: "ROS2WorkstationNode"
def __init__(self, config=None, deck=None, protocol_type=None, **kwargs):
super().__init__(deck=deck, **kwargs)
self.config = config or {}
self.logger = logging.getLogger("MyWorkstation")
self.api_host = self.config.get("api_host", "")
self._status = "Idle"
@not_action
def post_init(self, ros_node: "ROS2WorkstationNode"):
super().post_init(ros_node)
self._ros_node = ros_node
async def scheduler_start(self, **kwargs) -> Dict[str, Any]:
"""注册为工作站动作"""
return {"success": True}
async def create_order(self, json_str: str, **kwargs) -> Dict[str, Any]:
"""注册为工作站动作"""
return {"success": True}
@property
@topic_config()
def workflow_sequence(self) -> str:
return "[]"
@property
@topic_config()
def material_info(self) -> str:
return "{}"
```
### 模板 BProtocol 工作站
直接使用 `ProtocolNode`,通常不需要自定义驱动类:
```python
from unilabos.devices.workstation.workstation_base import ProtocolNode
```
在图文件中配置 `protocol_type` 即可。
---
## 子设备访问sub_devices
工站初始化子设备后,所有子设备实例存储在 `self._ros_node.sub_devices` 字典中key 为设备 idvalue 为 `ROS2DeviceNode` 实例)。工站的驱动类可以直接获取子设备实例来调用其方法:
```python
# 在工站驱动类的方法中访问子设备
sub = self._ros_node.sub_devices["pump_1"]
# .driver_instance — 子设备的驱动实例(即设备 Python 类的实例)
sub.driver_instance.some_method(arg1, arg2)
# .ros_node_instance — 子设备的 ROS2 节点实例
sub.ros_node_instance._action_value_mappings # 查看子设备支持的 action
```
**常见用法**
```python
class MyWorkstation(WorkstationBase):
def my_protocol(self, **kwargs):
# 获取子设备驱动实例
pump = self._ros_node.sub_devices["pump_1"].driver_instance
heater = self._ros_node.sub_devices["heater_1"].driver_instance
# 直接调用子设备方法
pump.aspirate(volume=100)
heater.set_temperature(80)
```
> 参考实现:`unilabos/devices/workstation/bioyond_studio/reaction_station/reaction_station.py` 中通过 `self._ros_node.sub_devices.get(reactor_id)` 获取子反应器实例并更新数据。
---
## 硬件通信接口hardware_interface
硬件控制型工作站通常需要通过串口Serial、Modbus 等通信协议控制多个子设备。Uni-Lab-OS 通过 **通信设备代理** 机制实现端口共享:一个串口只创建一个 `serial` 节点,多个子设备共享这个通信实例。
### 工作原理
`ROS2WorkstationNode` 初始化时分两轮遍历子设备(`workstation.py`
**第一轮 — 初始化所有子设备**:按 `children` 顺序调用 `initialize_device()`,通信设备(`serial_` / `io_` 开头的 id优先完成初始化创建 `serial.Serial()` 实例。其他子设备此时 `self.hardware_interface = "serial_pump"`(字符串)。
**第二轮 — 代理替换**:遍历所有已初始化的子设备,读取子设备的 `_hardware_interface` 配置:
```
hardware_interface = d.ros_node_instance._hardware_interface
# → {"name": "hardware_interface", "read": "send_command", "write": "send_command"}
```
1.`name` 字段对应的属性值:`name_value = getattr(driver, hardware_interface["name"])`
- 如果 `name_value` 是字符串且该字符串是某个子设备的 id → 触发代理替换
2. 从通信设备获取真正的 `read`/`write` 方法
3.`setattr(driver, read_method, _read)` 将通信设备的方法绑定到子设备上
因此:
- **通信设备 id 必须与子设备 config 中填的字符串完全一致**(如 `"serial_pump"`
- **通信设备 id 必须以 `serial_``io_` 开头**(否则第一轮不会被识别为通信设备)
- **通信设备必须在 `children` 列表中排在最前面**,确保先初始化
### HardwareInterface 参数说明
```python
from unilabos.registry.decorators import HardwareInterface
HardwareInterface(
name="hardware_interface", # __init__ 中接收通信实例的属性名
read="send_command", # 通信设备上暴露的读方法名
write="send_command", # 通信设备上暴露的写方法名
extra_info=["list_ports"], # 可选:额外暴露的方法
)
```
**`name` 字段的含义**:对应设备类 `__init__` 中,用于保存通信实例的**属性名**。系统据此知道要替换哪个属性。大部分设备直接用 `"hardware_interface"`,也可以自定义(如 `"io_device_port"`)。
### 示例 1name="hardware_interface"
```python
from unilabos.registry.decorators import device, HardwareInterface
@device(
id="my_pump",
category=["pump_and_valve"],
hardware_interface=HardwareInterface(
name="hardware_interface",
read="send_command",
write="send_command",
),
)
class MyPump:
def __init__(self, port=None, address="1", **kwargs):
# name="hardware_interface" → 系统替换 self.hardware_interface
self.hardware_interface = port # 初始为字符串 "serial_pump",启动后被替换为 Serial 实例
self.address = address
def send_command(self, command: str):
full_command = f"/{self.address}{command}\r\n"
self.hardware_interface.write(bytearray(full_command, "ascii"))
return self.hardware_interface.read_until(b"\n")
```
### 示例 2电磁阀name="io_device_port",自定义属性名)
```python
@device(
id="solenoid_valve",
category=["pump_and_valve"],
hardware_interface=HardwareInterface(
name="io_device_port", # 自定义属性名 → 系统替换 self.io_device_port
read="read_io_coil",
write="write_io_coil",
),
)
class SolenoidValve:
def __init__(self, io_device_port: str = None, **kwargs):
# name="io_device_port" → 图文件 config 中用 "io_device_port": "io_board_1"
self.io_device_port = io_device_port # 初始为字符串,系统替换为 Modbus 实例
```
### Serial 通信设备class="serial"
`serial` 是 Uni-Lab-OS 内置的通信代理设备,代码位于 `unilabos/ros/nodes/presets/serial_node.py`
```python
from serial import Serial, SerialException
from threading import Lock
class ROS2SerialNode(BaseROS2DeviceNode):
def __init__(self, device_id, registry_name, port: str, baudrate: int = 9600, **kwargs):
self.port = port
self.baudrate = baudrate
self._hardware_interface = {
"name": "hardware_interface",
"write": "send_command",
"read": "read_data",
}
self._query_lock = Lock()
self.hardware_interface = Serial(baudrate=baudrate, port=port)
BaseROS2DeviceNode.__init__(
self, driver_instance=self, registry_name=registry_name,
device_id=device_id, status_types={}, action_value_mappings={},
hardware_interface=self._hardware_interface, print_publish=False,
)
self.create_service(SerialCommand, "serialwrite", self.handle_serial_request)
def send_command(self, command: str):
with self._query_lock:
self.hardware_interface.write(bytearray(f"{command}\n", "ascii"))
return self.hardware_interface.read_until(b"\n").decode()
def read_data(self):
with self._query_lock:
return self.hardware_interface.read_until(b"\n").decode()
```
在图文件中使用 `"class": "serial"` 即可创建串口代理:
```json
{
"id": "serial_pump",
"class": "serial",
"parent": "my_station",
"config": { "port": "COM7", "baudrate": 9600 }
}
```
### 图文件配置
**通信设备必须在 `children` 列表中排在最前面**,确保先于其他子设备初始化:
```json
{
"nodes": [
{
"id": "my_station",
"class": "workstation",
"children": ["serial_pump", "pump_1", "pump_2"],
"config": { "protocol_type": ["PumpTransferProtocol"] }
},
{
"id": "serial_pump",
"class": "serial",
"parent": "my_station",
"config": { "port": "COM7", "baudrate": 9600 }
},
{
"id": "pump_1",
"class": "syringe_pump_with_valve.runze.SY03B-T08",
"parent": "my_station",
"config": { "port": "serial_pump", "address": "1", "max_volume": 25.0 }
},
{
"id": "pump_2",
"class": "syringe_pump_with_valve.runze.SY03B-T08",
"parent": "my_station",
"config": { "port": "serial_pump", "address": "2", "max_volume": 25.0 }
}
],
"links": [
{
"source": "pump_1",
"target": "serial_pump",
"type": "communication",
"port": { "pump_1": "port", "serial_pump": "port" }
},
{
"source": "pump_2",
"target": "serial_pump",
"type": "communication",
"port": { "pump_2": "port", "serial_pump": "port" }
}
]
}
```
### 通信协议速查
| 协议 | config 参数 | 依赖包 | 通信设备 class |
| -------------------- | ------------------------------ | ---------- | -------------------------- |
| Serial (RS232/RS485) | `port`, `baudrate` | `pyserial` | `serial` |
| Modbus RTU | `port`, `baudrate`, `slave_id` | `pymodbus` | `device_comms/modbus_plc/` |
| Modbus TCP | `host`, `port`, `slave_id` | `pymodbus` | `device_comms/modbus_plc/` |
| TCP Socket | `host`, `port` | stdlib | 自定义 |
| HTTP API | `url`, `token` | `requests` | `device_comms/rpc.py` |
参考实现:`unilabos/test/experiments/Grignard_flow_batchreact_single_pumpvalve.json`
---
## Deck 与物料生命周期
### 1. Deck 入参与两种初始化模式
系统根据设备节点 `config.deck` 的写法,自动反序列化 Deck 实例后传入 `__init__``deck` 参数。目前 `deck` 是固定字段名,只支持一个主 Deck。建议一个设备拥有一个台面台面上抽象二级、三级子物料。
有两种初始化模式:
#### init 初始化(推荐)
`config.deck` 直接包含 `_resource_type` + `_resource_child_name`,系统先用 Deck 节点的 `config` 调用 Deck 类的 `__init__` 反序列化,再将实例传入设备的 `deck` 参数。子物料随 Deck 的 `children` 一起反序列化。
```json
"config": {
"deck": {
"_resource_type": "unilabos.devices.liquid_handling.prcxi.prcxi:PRCXI9300Deck",
"_resource_child_name": "PRCXI_Deck"
}
}
```
#### deserialize 初始化
`config.deck``data` 包裹一层,系统走 `deserialize` 路径,可传入更多参数(如 `allow_marshal` 等):
```json
"config": {
"deck": {
"data": {
"_resource_child_name": "YB_Bioyond_Deck",
"_resource_type": "unilabos.resources.bioyond.decks:BIOYOND_YB_Deck"
}
}
}
```
没有特殊需求时推荐 init 初始化。
#### config.deck 字段说明
| 字段 | 说明 |
|------|------|
| `_resource_type` | Deck 类的完整模块路径(`module:ClassName` |
| `_resource_child_name` | 对应图文件中 Deck 节点的 `id`,建立父子关联 |
#### 设备 __init__ 接收
```python
def __init__(self, config=None, deck=None, protocol_type=None, **kwargs):
super().__init__(deck=deck, **kwargs)
# deck 已经是反序列化后的 Deck 实例
# → PRCXI9300Deck / BIOYOND_YB_Deck 等
```
#### Deck 节点(图文件中)
Deck 节点作为设备的 `children` 之一,`parent` 指向设备 id
```json
{
"id": "PRCXI_Deck",
"parent": "PRCXI",
"type": "deck",
"class": "",
"children": [],
"config": {
"type": "PRCXI9300Deck",
"size_x": 542, "size_y": 374, "size_z": 0,
"category": "deck",
"sites": [...]
},
"data": {}
}
```
- `config` 中的字段会传入 Deck 类的 `__init__`(因此 `__init__` 必须能接受所有 `serialize()` 输出的字段)
- `children` 初始为空时,由同步器或手动初始化填充
- `config.type` 填 Deck 类名
### 2. Deck 为空时自行初始化
如果 Deck 节点的 `children` 为空,工作站需在 `post_init` 或首次同步时自行初始化内容:
```python
@not_action
def post_init(self, ros_node):
super().post_init(ros_node)
if self.deck and not self.deck.children:
self._initialize_default_deck()
def _initialize_default_deck(self):
from my_labware import My_TipRack, My_Plate
self.deck.assign_child_resource(My_TipRack("T1"), spot=0)
self.deck.assign_child_resource(My_Plate("T2"), spot=1)
```
### 3. 物料双向同步
当工作站对接外部系统LIMS/MES需要实现 `ResourceSynchronizer` 处理双向物料同步:
```python
from unilabos.devices.workstation.workstation_base import ResourceSynchronizer
class MyResourceSynchronizer(ResourceSynchronizer):
def sync_from_external(self) -> bool:
"""从外部系统同步到 self.workstation.deck"""
external_data = self._query_external_materials()
# 以外部工站为准:根据外部数据反向创建 PLR 资源实例
for item in external_data:
cls = self._resolve_resource_class(item["type"])
resource = cls(name=item["name"], **item["params"])
self.workstation.deck.assign_child_resource(resource, spot=item["slot"])
return True
def sync_to_external(self, resource) -> bool:
"""将 UniLab 侧物料变更同步到外部系统"""
# 以 UniLab 为准:将 PLR 资源转为外部格式并推送
external_format = self._convert_to_external(resource)
return self._push_to_external(external_format)
def handle_external_change(self, change_info) -> bool:
"""处理外部系统主动推送的变更"""
return True
```
同步策略取决于业务场景:
- **以外部工站为准**:从外部 API 查询物料数据,反向创建对应的 PLR 资源实例放到 Deck 上
- **以 UniLab 为准**UniLab 侧的物料变更通过 `sync_to_external` 推送到外部系统
在工作站 `post_init` 中初始化同步器:
```python
@not_action
def post_init(self, ros_node):
super().post_init(ros_node)
self.resource_synchronizer = MyResourceSynchronizer(self)
self.resource_synchronizer.sync_from_external()
```
### 4. 序列化与持久化serialize / serialize_state
资源类需正确实现序列化,系统据此完成持久化和前端同步。
**`serialize()`** — 输出资源的结构信息(`config` 层),反序列化时作为 `__init__` 的入参回传。因此 **`__init__` 必须通过 `**kwargs`接受`serialize()` 输出的所有字段\*\*,即使当前不使用:
```python
class MyDeck(Deck):
def __init__(self, name, size_x, size_y, size_z,
sites=None, # serialize() 输出的字段
rotation=None, # serialize() 输出的字段
barcode=None, # serialize() 输出的字段
**kwargs): # 兜底:接受所有未知的 serialize 字段
super().__init__(size_x, size_y, size_z, name)
# ...
def serialize(self) -> dict:
data = super().serialize()
data["sites"] = [...] # 自定义字段
return data
```
**`serialize_state()`** — 输出资源的运行时状态(`data` 层),用于持久化可变信息。`data` 中的内容会被正确保存和恢复:
```python
class MyPlate(Plate):
def __init__(self, name, size_x, size_y, size_z,
material_info=None, **kwargs):
super().__init__(name, size_x, size_y, size_z, **kwargs)
self._unilabos_state = {}
if material_info:
self._unilabos_state["Material"] = material_info
def serialize_state(self) -> Dict[str, Any]:
data = super().serialize_state()
data.update(self._unilabos_state)
return data
```
关键要点:
- `serialize()` 输出的所有字段都会作为 `config` 回传到 `__init__`,所以 `__init__` 必须能接受它们(显式声明或 `**kwargs`
- `serialize_state()` 输出的 `data` 用于持久化运行时状态(如物料信息、液体量等)
- `_unilabos_state` 中只存可 JSON 序列化的基本类型str, int, float, bool, list, dict, None
### 5. 子物料自动同步
子物料Bottle、Plate、TipRack 等)放到 Deck 上后,系统会自动将其同步到前端的 Deck 视图。只需保证资源类正确实现了 `serialize()` / `serialize_state()` 和反序列化即可。
### 6. 图文件配置(参考 prcxi_9320_slim.json
```json
{
"nodes": [
{
"id": "my_station",
"type": "device",
"class": "my_workstation",
"config": {
"deck": {
"_resource_type": "unilabos.resources.my_module:MyDeck",
"_resource_child_name": "my_deck"
},
"host": "10.20.30.1",
"port": 9999
}
},
{
"id": "my_deck",
"parent": "my_station",
"type": "deck",
"class": "",
"children": [],
"config": {
"type": "MyLabDeck",
"size_x": 542,
"size_y": 374,
"size_z": 0,
"category": "deck",
"sites": [
{
"label": "T1",
"visible": true,
"occupied_by": null,
"position": { "x": 0, "y": 0, "z": 0 },
"size": { "width": 128.0, "height": 86, "depth": 0 },
"content_type": ["plate", "tip_rack", "tube_rack", "adaptor"]
}
]
},
"data": {}
}
],
"edges": []
}
```
Deck 节点要点:
- `config.type` 填 Deck 类名(如 `"PRCXI9300Deck"`
- `config.sites` 完整列出所有 site从 Deck 类的 `serialize()` 输出获取)
- `children` 初始为空(由同步器或手动初始化填充)
- 设备节点 `config.deck._resource_type` 指向 Deck 类的完整模块路径
---
## 子设备
子设备按标准设备接入流程创建(参见 add-device SKILL使用 `@device` 装饰器。
子设备约束:
- 图文件中 `parent` 指向工作站 ID
- 在工作站 `children` 数组中列出
---
## 关键规则
1. **`__init__` 必须接受 `deck``**kwargs`** — `WorkstationBase.**init**`需要`deck` 参数
2. **Deck 通过 `config.deck._resource_type` 反序列化传入** — 不要在 `__init__` 中手动创建 Deck
3. **Deck 为空时自行初始化内容** — 在 `post_init` 中检查并填充默认物料
4. **外部同步实现 `ResourceSynchronizer`**`sync_from_external` / `sync_to_external`
5. **通过 `self._children` 访问子设备** — 不要自行维护子设备引用
6. **`post_init` 中启动后台服务** — 不要在 `__init__` 中启动网络连接
7. **异步方法使用 `await self._ros_node.sleep()`** — 禁止 `time.sleep()``asyncio.sleep()`
8. **使用 `@not_action` 标记非动作方法**`post_init`, `initialize`, `cleanup`
9. **子物料保证正确 serialize/deserialize** — 系统自动同步到前端 Deck 视图
---
## 验证
```bash
# 模块可导入
python -c "from unilabos.devices.workstation.<name>.<name> import <ClassName>"
# 启动测试AST 自动扫描)
unilab -g <graph>.json
```
---
## 现有工作站参考
| 工作站 | 驱动类 | 类型 |
| -------------- | ----------------------------- | -------- |
| Protocol 通用 | `ProtocolNode` | Protocol |
| Bioyond 反应站 | `BioyondReactionStation` | 外部系统 |
| 纽扣电池组装 | `CoinCellAssemblyWorkstation` | 硬件控制 |
参考路径:`unilabos/devices/workstation/` 目录下各工作站实现。

View File

@@ -0,0 +1,371 @@
# 工作站高级模式参考
本文件是 SKILL.md 的补充,包含外部系统集成、物料同步、配置结构等高级模式。
Agent 在需要实现这些功能时按需阅读。
---
## 1. 外部系统集成模式
### 1.1 RPC 客户端
与外部 LIMS/MES 系统通信的标准模式。继承 `BaseRequest`,所有接口统一用 POST。
```python
from unilabos.device_comms.rpc import BaseRequest
class MySystemRPC(BaseRequest):
"""外部系统 RPC 客户端"""
def __init__(self, host: str, api_key: str):
super().__init__(host)
self.api_key = api_key
def _request(self, endpoint: str, data: dict = None) -> dict:
return self.post(
url=f"{self.host}/api/{endpoint}",
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": data or {},
},
)
def query_status(self) -> dict:
return self._request("status/query")
def create_order(self, order_data: dict) -> dict:
return self._request("order/create", order_data)
```
参考:`unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py``BioyondV1RPC`
### 1.2 HTTP 回调服务
接收外部系统报送的标准模式。使用 `WorkstationHTTPService`,在 `post_init` 中启动。
```python
from unilabos.devices.workstation.workstation_http_service import WorkstationHTTPService
class MyWorkstation(WorkstationBase):
def __init__(self, config=None, deck=None, **kwargs):
super().__init__(deck=deck, **kwargs)
self.config = config or {}
http_cfg = self.config.get("http_service_config", {})
self._http_service_config = {
"host": http_cfg.get("http_service_host", "127.0.0.1"),
"port": http_cfg.get("http_service_port", 8080),
}
self.http_service = None
def post_init(self, ros_node):
super().post_init(ros_node)
self.http_service = WorkstationHTTPService(
workstation_instance=self,
host=self._http_service_config["host"],
port=self._http_service_config["port"],
)
self.http_service.start()
```
**HTTP 服务路由**(固定端点,由 `WorkstationHTTPHandler` 自动分发):
| 端点 | 调用的工作站方法 |
|------|-----------------|
| `/report/step_finish` | `process_step_finish_report(report_request)` |
| `/report/sample_finish` | `process_sample_finish_report(report_request)` |
| `/report/order_finish` | `process_order_finish_report(report_request, used_materials)` |
| `/report/material_change` | `process_material_change_report(report_data)` |
| `/report/error_handling` | `handle_external_error(error_data)` |
实现对应方法即可接收回调:
```python
def process_step_finish_report(self, report_request) -> Dict[str, Any]:
"""处理步骤完成报告"""
step_name = report_request.data.get("stepName")
return {"success": True, "message": f"步骤 {step_name} 已处理"}
def process_order_finish_report(self, report_request, used_materials) -> Dict[str, Any]:
"""处理订单完成报告"""
order_code = report_request.data.get("orderCode")
return {"success": True}
```
参考:`unilabos/devices/workstation/workstation_http_service.py`
### 1.3 连接监控
独立线程周期性检测外部系统连接状态,状态变化时发布 ROS 事件。
```python
class ConnectionMonitor:
def __init__(self, workstation, check_interval=30):
self.workstation = workstation
self.check_interval = check_interval
self._running = False
self._thread = None
def start(self):
self._running = True
self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._thread.start()
def _monitor_loop(self):
while self._running:
try:
# 调用外部系统接口检测连接
self.workstation.hardware_interface.ping()
status = "online"
except Exception:
status = "offline"
time.sleep(self.check_interval)
```
参考:`unilabos/devices/workstation/bioyond_studio/station.py``ConnectionMonitor`
---
## 2. Config 结构模式
工作站的 `config` 在图文件中定义,传入 `__init__`。以下是常见字段模式:
### 2.1 外部系统连接
```json
{
"api_host": "http://192.168.1.100:8080",
"api_key": "YOUR_API_KEY"
}
```
### 2.2 HTTP 回调服务
```json
{
"http_service_config": {
"http_service_host": "127.0.0.1",
"http_service_port": 8080
}
}
```
### 2.3 物料类型映射
将 PLR 资源类名映射到外部系统的物料类型(名称 + UUID。用于双向物料转换。
```json
{
"material_type_mappings": {
"PLR_ResourceClassName": ["外部系统显示名", "external-type-uuid"],
"BIOYOND_PolymerStation_Reactor": ["反应器", "3a14233b-902d-0d7b-..."]
}
}
```
### 2.4 仓库映射
将仓库名映射到外部系统的仓库 UUID 和库位 UUID。用于入库/出库操作。
```json
{
"warehouse_mapping": {
"仓库名": {
"uuid": "warehouse-uuid",
"site_uuids": {
"A01": "site-uuid-A01",
"A02": "site-uuid-A02"
}
}
}
}
```
### 2.5 工作流映射
将内部工作流名映射到外部系统的工作流 ID。
```json
{
"workflow_mappings": {
"internal_workflow_name": "external-workflow-uuid"
}
}
```
### 2.6 物料默认参数
```json
{
"material_default_parameters": {
"NMP": {
"unit": "毫升",
"density": "1.03",
"densityUnit": "g/mL",
"description": "N-甲基吡咯烷酮"
}
}
}
```
---
## 3. 资源同步机制
### 3.1 ResourceSynchronizer
抽象基类,用于与外部物料系统双向同步。定义在 `workstation_base.py`
```python
from unilabos.devices.workstation.workstation_base import ResourceSynchronizer
class MyResourceSynchronizer(ResourceSynchronizer):
def __init__(self, workstation, api_client):
super().__init__(workstation)
self.api_client = api_client
def sync_from_external(self) -> bool:
"""从外部系统拉取物料到 deck"""
external_materials = self.api_client.list_materials()
for material in external_materials:
plr_resource = self._convert_to_plr(material)
self.workstation.deck.assign_child_resource(plr_resource, coordinate)
return True
def sync_to_external(self, plr_resource) -> bool:
"""将 deck 中的物料变更推送到外部系统"""
external_data = self._convert_from_plr(plr_resource)
self.api_client.update_material(external_data)
return True
def handle_external_change(self, change_info) -> bool:
"""处理外部系统推送的物料变更"""
return True
```
### 3.2 update_resource — 上传资源树到云端
将 PLR Deck 序列化后通过 ROS 服务上传。典型使用场景:
```python
# 在 post_init 中上传初始 deck
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode
ROS2DeviceNode.run_async_func(
self._ros_node.update_resource, True,
**{"resources": [self.deck]}
)
# 在动作方法中更新特定资源
ROS2DeviceNode.run_async_func(
self._ros_node.update_resource, True,
**{"resources": [updated_plate]}
)
```
---
## 4. 工作流序列管理
工作站通过 `workflow_sequence` 属性管理任务队列JSON 字符串形式)。
```python
class MyWorkstation(WorkstationBase):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._workflow_sequence = []
@property
def workflow_sequence(self) -> str:
"""返回 JSON 字符串ROS 自动发布"""
import json
return json.dumps(self._workflow_sequence)
async def append_to_workflow_sequence(self, workflow_name: str) -> Dict[str, Any]:
"""添加工作流到队列"""
self._workflow_sequence.append({
"name": workflow_name,
"status": "pending",
"created_at": time.time(),
})
return {"success": True}
async def clear_workflows(self) -> Dict[str, Any]:
"""清空工作流队列"""
self._workflow_sequence = []
return {"success": True}
```
---
## 5. 站间物料转移
工作站之间转移物料的模式。通过 ROS ActionClient 调用目标站的动作。
```python
async def transfer_materials_to_another_station(
self,
target_device_id: str,
transfer_groups: list,
**kwargs,
) -> Dict[str, Any]:
"""将物料转移到另一个工作站"""
target_node = self._children.get(target_device_id)
if not target_node:
# 通过 ROS 节点查找非子设备的目标站
pass
for group in transfer_groups:
resource = self.find_resource_by_name(group["resource_name"])
# 从本站 deck 移除
resource.unassign()
# 调用目标站的接收方法
# ...
return {"success": True, "transferred": len(transfer_groups)}
```
参考:`BioyondDispensingStation.transfer_materials_to_reaction_station`
---
## 6. post_init 完整模式
`post_init` 是工作站初始化的关键阶段,此时 ROS 节点和子设备已就绪。
```python
def post_init(self, ros_node):
super().post_init(ros_node)
# 1. 初始化外部系统客户端(此时 config 已可用)
self.rpc_client = MySystemRPC(
host=self.config.get("api_host"),
api_key=self.config.get("api_key"),
)
self.hardware_interface = self.rpc_client
# 2. 启动连接监控
self.connection_monitor = ConnectionMonitor(self)
self.connection_monitor.start()
# 3. 启动 HTTP 回调服务
if hasattr(self, '_http_service_config'):
self.http_service = WorkstationHTTPService(
workstation_instance=self,
host=self._http_service_config["host"],
port=self._http_service_config["port"],
)
self.http_service.start()
# 4. 上传 deck 到云端
ROS2DeviceNode.run_async_func(
self._ros_node.update_resource, True,
**{"resources": [self.deck]}
)
# 5. 初始化资源同步器(可选)
self.resource_synchronizer = MyResourceSynchronizer(self, self.rpc_client)
```

View File

@@ -0,0 +1,233 @@
---
name: batch-insert-reagent
description: Batch insert reagents into Uni-Lab platform — add chemicals with CAS, SMILES, supplier info. Use when the user wants to add reagents, insert chemicals, batch register reagents, or mentions 录入试剂/添加试剂/试剂入库/reagent.
---
# 批量录入试剂 Skill
通过云端 API 批量录入试剂信息,支持逐条或批量操作。
## 前置条件(缺一不可)
使用本 skill 前,**必须**先确认以下信息。如果缺少任何一项,**立即向用户询问并终止**,等补齐后再继续。
### 1. ak / sk → AUTH
询问用户的启动参数,从 `--ak` `--sk` 或 config.py 中获取。
生成 AUTH token任选一种方式
```bash
# 方式一Python 一行生成
python -c "import base64,sys; print('Authorization: Lab ' + base64.b64encode(f'{sys.argv[1]}:{sys.argv[2]}'.encode()).decode())" <ak> <sk>
# 方式二:手动计算
# base64(ak:sk) → Authorization: Lab <token>
```
### 2. --addr → BASE URL
| `--addr` 值 | BASE |
|-------------|------|
| `test` | `https://uni-lab.test.bohrium.com` |
| `uat` | `https://uni-lab.uat.bohrium.com` |
| `local` | `http://127.0.0.1:48197` |
| 不传(默认) | `https://uni-lab.bohrium.com` |
确认后设置:
```bash
BASE="<根据 addr 确定的 URL>"
AUTH="Authorization: Lab <gen_auth.py 输出的 token>"
```
**两项全部就绪后才可发起 API 请求。**
## Session State
- `lab_uuid` — 实验室 UUID首次通过 API #1 自动获取,**不需要问用户**
## 请求约定
所有请求使用 `curl -s`POST 需加 `Content-Type: application/json`
> **Windows 平台**必须使用 `curl.exe`(而非 PowerShell 的 `curl` 别名),示例中的 `curl` 均指 `curl.exe`。
---
## API Endpoints
### 1. 获取实验室信息(自动获取 lab_uuid
```bash
curl -s -X GET "$BASE/api/v1/edge/lab/info" -H "$AUTH"
```
返回:
```json
{"code": 0, "data": {"uuid": "xxx", "name": "实验室名称"}}
```
记住 `data.uuid``lab_uuid`
### 2. 录入试剂
```bash
curl -s -X POST "$BASE/api/v1/lab/reagent" \
-H "$AUTH" -H "Content-Type: application/json" \
-d '{
"lab_uuid": "<lab_uuid>",
"cas": "<CAS号>",
"name": "<试剂名称>",
"molecular_formula": "<分子式>",
"smiles": "<SMILES>",
"stock_in_quantity": <入库数量>,
"unit": "<单位字符串>",
"supplier": "<供应商>",
"production_date": "<生产日期 ISO 8601>",
"expiry_date": "<过期日期 ISO 8601>"
}'
```
返回成功时包含试剂 UUID
```json
{"code": 0, "data": {"uuid": "xxx", ...}}
```
---
## 试剂字段说明
| 字段 | 类型 | 必填 | 说明 | 示例 |
|------|------|------|------|------|
| `lab_uuid` | string | 是 | 实验室 UUID从 API #1 获取) | `"8511c672-..."` |
| `cas` | string | 是 | CAS 注册号 | `"7732-18-3"` |
| `name` | string | 是 | 试剂中文/英文名称 | `"水"` |
| `molecular_formula` | string | 是 | 分子式 | `"H2O"` |
| `smiles` | string | 是 | SMILES 表示 | `"O"` |
| `stock_in_quantity` | number | 是 | 入库数量 | `10` |
| `unit` | string | 是 | 单位(字符串,见下表) | `"mL"` |
| `supplier` | string | 否 | 供应商名称 | `"国药集团"` |
| `production_date` | string | 否 | 生产日期ISO 8601 | `"2025-11-18T00:00:00Z"` |
| `expiry_date` | string | 否 | 过期日期ISO 8601 | `"2026-11-18T00:00:00Z"` |
### unit 单位值
| 值 | 单位 |
|------|------|
| `"mL"` | 毫升 |
| `"L"` | 升 |
| `"g"` | 克 |
| `"kg"` | 千克 |
| `"瓶"` | 瓶 |
> 根据试剂状态选择:液体用 `"mL"` / `"L"`,固体用 `"g"` / `"kg"`。
---
## 批量录入策略
### 方式一:用户提供 JSON 数组
用户一次性给出多条试剂数据:
```json
[
{"cas": "7732-18-3", "name": "水", "molecular_formula": "H2O", "smiles": "O", "stock_in_quantity": 10, "unit": "mL"},
{"cas": "64-17-5", "name": "乙醇", "molecular_formula": "C2H6O", "smiles": "CCO", "stock_in_quantity": 5, "unit": "L"}
]
```
Agent 自动为每条补充 `lab_uuid``production_date``expiry_date` 等字段后逐条提交。
Agent 循环调用 API #2 逐条录入,每条记录一次 API 调用。
### 方式二:用户逐个描述
用户口头描述试剂(如「帮我录入 500mL 的无水乙醇Sigma 的」agent 自行补全字段:
1. 根据名称查找 CAS 号、分子式、SMILES参考下方速查表或自行推断
2. 构建完整的请求体
3. 向用户确认后提交
### 方式三:从 CSV/表格批量导入
用户提供 CSV 或表格文件路径agent 读取并解析:
```bash
# 期望的 CSV 格式(首行为表头)
cas,name,molecular_formula,smiles,stock_in_quantity,unit,supplier,production_date,expiry_date
7732-18-3,水,H2O,O,10,mL,农夫山泉,2025-11-18T00:00:00Z,2026-11-18T00:00:00Z
```
### 执行与汇报
每次 API 调用后:
1. 检查返回 `code`0 = 成功)
2. 记录成功/失败数量
3. 全部完成后汇总:「共录入 N 条试剂,成功 X 条,失败 Y 条」
4. 如有失败,列出失败的试剂名称和错误信息
---
## 常见试剂速查表
| 名称 | CAS | 分子式 | SMILES |
|------|-----|--------|--------|
| 水 | 7732-18-3 | H2O | O |
| 乙醇 | 64-17-5 | C2H6O | CCO |
| 甲醇 | 67-56-1 | CH4O | CO |
| 丙酮 | 67-64-1 | C3H6O | CC(C)=O |
| 二甲基亚砜(DMSO) | 67-68-5 | C2H6OS | CS(C)=O |
| 乙酸乙酯 | 141-78-6 | C4H8O2 | CCOC(C)=O |
| 二氯甲烷 | 75-09-2 | CH2Cl2 | ClCCl |
| 四氢呋喃(THF) | 109-99-9 | C4H8O | C1CCOC1 |
| N,N-二甲基甲酰胺(DMF) | 68-12-2 | C3H7NO | CN(C)C=O |
| 氯仿 | 67-66-3 | CHCl3 | ClC(Cl)Cl |
| 乙腈 | 75-05-8 | C2H3N | CC#N |
| 甲苯 | 108-88-3 | C7H8 | Cc1ccccc1 |
| 正己烷 | 110-54-3 | C6H14 | CCCCCC |
| 异丙醇 | 67-63-0 | C3H8O | CC(C)O |
| 盐酸 | 7647-01-0 | HCl | Cl |
| 硫酸 | 7664-93-9 | H2SO4 | OS(O)(=O)=O |
| 氢氧化钠 | 1310-73-2 | NaOH | [Na]O |
| 碳酸钠 | 497-19-8 | Na2CO3 | [Na]OC([O-])=O.[Na+] |
| 氯化钠 | 7647-14-5 | NaCl | [Na]Cl |
| 乙二胺四乙酸(EDTA) | 60-00-4 | C10H16N2O8 | OC(=O)CN(CCN(CC(O)=O)CC(O)=O)CC(O)=O |
> 此表仅供快速参考。对于不在表中的试剂agent 应根据化学知识推断或提示用户补充。
---
## 完整工作流 Checklist
```
Task Progress:
- [ ] Step 1: 确认 ak/sk → 生成 AUTH token
- [ ] Step 2: 确认 --addr → 设置 BASE URL
- [ ] Step 3: GET /edge/lab/info → 获取 lab_uuid
- [ ] Step 4: 收集试剂信息(用户提供列表/逐个描述/CSV文件
- [ ] Step 5: 补全缺失字段CAS、分子式、SMILES 等)
- [ ] Step 6: 向用户确认待录入的试剂列表
- [ ] Step 7: 循环调用 POST /lab/reagent 逐条录入(每条需含 lab_uuid
- [ ] Step 8: 汇总结果(成功/失败数量及详情)
```
---
## 完整示例
用户说:「帮我录入 3 种试剂500mL 无水乙醇、1kg 氯化钠、2L 去离子水」
Agent 构建的请求序列:
```json
// 第 1 条
{"lab_uuid": "8511c672-...", "cas": "64-17-5", "name": "无水乙醇", "molecular_formula": "C2H6O", "smiles": "CCO", "stock_in_quantity": 500, "unit": "mL", "supplier": "国药集团", "production_date": "2025-01-01T00:00:00Z", "expiry_date": "2026-01-01T00:00:00Z"}
// 第 2 条
{"lab_uuid": "8511c672-...", "cas": "7647-14-5", "name": "氯化钠", "molecular_formula": "NaCl", "smiles": "[Na]Cl", "stock_in_quantity": 1, "unit": "kg", "supplier": "", "production_date": "2025-01-01T00:00:00Z", "expiry_date": "2026-01-01T00:00:00Z"}
// 第 3 条
{"lab_uuid": "8511c672-...", "cas": "7732-18-3", "name": "去离子水", "molecular_formula": "H2O", "smiles": "O", "stock_in_quantity": 2, "unit": "L", "supplier": "", "production_date": "2025-01-01T00:00:00Z", "expiry_date": "2026-01-01T00:00:00Z"}
```

View File

@@ -57,7 +57,7 @@ class VirtualSampleDemo:
readings.append(round(random.uniform(0.1, 1.0), 4))
samples.append(idx)
return {"volumes": out_volumes, "readings": readings, "samples": samples}
return {"volumes": out_volumes, "readings": readings, "unilabos_samples": samples}
# ------------------------------------------------------------------
# Action 3: 入参和出参都带 samples 列(不等长)
@@ -78,7 +78,7 @@ class VirtualSampleDemo:
scores.append(score)
passed.append(r >= threshold)
return {"scores": scores, "passed": passed, "samples": samples}
return {"scores": scores, "passed": passed, "unilabos_samples": samples}
# ------------------------------------------------------------------
# 状态属性

View File

@@ -679,14 +679,17 @@ def _resolve_name(name: str, import_map: Dict[str, str]) -> str:
return name
_DECORATOR_ENUM_CLASSES = frozenset({"Side", "DataSource", "NodeType"})
def _resolve_attribute(node: ast.Attribute, import_map: Dict[str, str]) -> str:
"""
Resolve an attribute access like Side.NORTH or DataSource.HANDLE.
Returns a string like "NORTH" for enum values, or
"module.path:Class.attr" for imported references.
对于来自 ``unilabos.registry.decorators`` 的枚举类 (Side / DataSource / NodeType)
直接返回枚举成员名 (如 ``"NORTH"`` / ``"HANDLE"`` / ``"MANUAL_CONFIRM"``)
省去消费端二次 rsplit 解析。其它 import 仍返回完整模块路径。
"""
# Get the full dotted path
parts = []
current = node
while isinstance(current, ast.Attribute):
@@ -696,21 +699,20 @@ def _resolve_attribute(node: ast.Attribute, import_map: Dict[str, str]) -> str:
parts.append(current.id)
parts.reverse()
# parts = ["Side", "NORTH"] or ["DataSource", "HANDLE"]
# parts = ["Side", "NORTH"] or ["DataSource", "HANDLE"] or ["NodeType", "MANUAL_CONFIRM"]
if len(parts) >= 2:
base = parts[0]
attr = ".".join(parts[1:])
# If the base is an imported name, resolve it
if base in _DECORATOR_ENUM_CLASSES:
source = import_map.get(base, "")
if not source or _REGISTRY_DECORATOR_MODULE in source:
return parts[-1]
if base in import_map:
return f"{import_map[base]}.{attr}"
# For known enum-like patterns, return just the value
# e.g. Side.NORTH -> "NORTH"
if base in ("Side", "DataSource"):
return parts[-1]
return ".".join(parts)

View File

@@ -8,7 +8,7 @@ Usage:
device, action, resource,
InputHandle, OutputHandle,
ActionInputHandle, ActionOutputHandle,
HardwareInterface, Side, DataSource,
HardwareInterface, Side, DataSource, NodeType,
)
@device(
@@ -73,6 +73,13 @@ class DataSource(str, Enum):
EXECUTOR = "executor" # 从执行器输出数据 (用于 OutputHandle)
class NodeType(str, Enum):
"""动作的节点类型(用于区分 ILab 节点和人工确认节点等)"""
ILAB = "ILab"
MANUAL_CONFIRM = "manual_confirm"
# ---------------------------------------------------------------------------
# Device / Resource Handle (设备/资源级别端口, 序列化时包含 io_type)
# ---------------------------------------------------------------------------
@@ -335,6 +342,7 @@ def action(
description: str = "",
auto_prefix: bool = False,
parent: bool = False,
node_type: Optional["NodeType"] = None,
):
"""
动作方法装饰器
@@ -365,6 +373,8 @@ def action(
description: 动作描述
auto_prefix: 若为 True动作名使用 auto-{method_name} 形式(与无 @action 时一致)
parent: 若为 True当方法参数为空 (*args, **kwargs) 时,通过 MRO 从父类获取真实方法参数
node_type: 动作的节点类型 (NodeType.ILAB / NodeType.MANUAL_CONFIRM)。
不填写时不写入注册表。
"""
def decorator(func: F) -> F:
@@ -389,6 +399,8 @@ def action(
"auto_prefix": auto_prefix,
"parent": parent,
}
if node_type is not None:
meta["node_type"] = node_type.value if isinstance(node_type, NodeType) else str(node_type)
wrapper._action_registry_meta = meta # type: ignore[attr-defined]
# 设置 _is_always_free 保持与旧 @always_free 装饰器兼容
@@ -515,6 +527,38 @@ def clear_registry():
_registered_resources.clear()
# ---------------------------------------------------------------------------
# 枚举值归一化
# ---------------------------------------------------------------------------
def normalize_enum_value(raw: Any, enum_cls) -> Optional[str]:
"""将 AST 提取的枚举成员名 / YAML 值字符串 / 旧格式长路径统一归一化为枚举值。
适用于 Side、DataSource、NodeType 等继承自 ``str, Enum`` 的装饰器枚举。
处理以下格式:
- "MANUAL_CONFIRM" → NodeType["MANUAL_CONFIRM"].value = "manual_confirm"
- "manual_confirm" → NodeType("manual_confirm").value = "manual_confirm"
- "HANDLE" → DataSource["HANDLE"].value = "handle"
- "NORTH" → Side["NORTH"].value = "NORTH"
- 旧缓存长路径 "unilabos...NodeType.MANUAL_CONFIRM" → 先 rsplit 再查找
"""
if not raw:
return None
raw_str = str(raw)
if "." in raw_str:
raw_str = raw_str.rsplit(".", 1)[-1]
try:
return enum_cls[raw_str].value
except KeyError:
pass
try:
return enum_cls(raw_str).value
except ValueError:
return raw_str
# ---------------------------------------------------------------------------
# topic_config / not_action / always_free 装饰器
# ---------------------------------------------------------------------------

View File

@@ -2815,8 +2815,8 @@ virtual_sample_demo:
readings: readings
samples: samples
goal_default:
readings: []
samples: []
readings: null
samples: null
handles:
input:
- data_key: readings
@@ -2846,18 +2846,12 @@ virtual_sample_demo:
handler_key: samples_result_out
label: 样品索引
placeholder_keys: {}
result:
passed: passed
samples: samples
scores: scores
result: {}
schema:
description: 对 split_and_measure 输出做二次分析,入参和出参都带 samples 列
properties:
feedback:
properties: {}
required: []
title: AnalyzeReadings_Feedback
type: object
goal:
properties:
readings:
@@ -2876,52 +2870,11 @@ virtual_sample_demo:
title: AnalyzeReadings_Goal
type: object
result:
properties:
passed:
description: 是否通过阈值
items:
type: boolean
type: array
samples:
description: 每行归属的输入样品 index (0-based)
items:
type: integer
type: array
scores:
description: 分析得分
items:
type: number
type: array
required:
- scores
- passed
- samples
title: AnalyzeReadings_Result
type: object
required:
- goal
title: AnalyzeReadings
type: object
type: UniLabJsonCommandAsync
auto-cleanup:
feedback: {}
goal: {}
goal_default: {}
handles: {}
placeholder_keys: {}
result: {}
schema:
description: cleanup的参数schema
properties:
feedback: {}
goal:
properties: {}
required: []
type: object
result: {}
required:
- goal
title: cleanup参数
title: analyze_readings参数
type: object
type: UniLabJsonCommandAsync
measure_samples:
@@ -2929,7 +2882,7 @@ virtual_sample_demo:
goal:
concentrations: concentrations
goal_default:
concentrations: []
concentrations: null
handles:
output:
- data_key: concentrations
@@ -2943,17 +2896,12 @@ virtual_sample_demo:
handler_key: absorbance_out
label: 吸光度列表
placeholder_keys: {}
result:
absorbance: absorbance
concentrations: concentrations
result: {}
schema:
description: 模拟光度测量,入参出参等长
properties:
feedback:
properties: {}
required: []
title: MeasureSamples_Feedback
type: object
goal:
properties:
concentrations:
@@ -2966,25 +2914,11 @@ virtual_sample_demo:
title: MeasureSamples_Goal
type: object
result:
properties:
absorbance:
description: 吸光度列表(与浓度等长)
items:
type: number
type: array
concentrations:
description: 原始浓度列表
items:
type: number
type: array
required:
- concentrations
- absorbance
title: MeasureSamples_Result
type: object
required:
- goal
title: MeasureSamples
title: measure_samples参数
type: object
type: UniLabJsonCommandAsync
split_and_measure:
@@ -2994,7 +2928,7 @@ virtual_sample_demo:
volumes: volumes
goal_default:
split_count: 3
volumes: []
volumes: null
handles:
output:
- data_key: readings
@@ -3013,21 +2947,16 @@ virtual_sample_demo:
handler_key: volumes_out
label: 均分体积
placeholder_keys: {}
result:
readings: readings
samples: samples
volumes: volumes
result: {}
schema:
description: 均分样品后逐份测量,输出带 samples 列标注归属
properties:
feedback:
properties: {}
required: []
title: SplitAndMeasure_Feedback
type: object
goal:
properties:
split_count:
default: 3
description: 每个样品均分的份数
type: integer
volumes:
@@ -3040,31 +2969,11 @@ virtual_sample_demo:
title: SplitAndMeasure_Goal
type: object
result:
properties:
readings:
description: 测量读数
items:
type: number
type: array
samples:
description: 每行归属的输入样品 index (0-based)
items:
type: integer
type: array
volumes:
description: 均分后的体积列表
items:
type: number
type: array
required:
- volumes
- readings
- samples
title: SplitAndMeasure_Result
type: object
required:
- goal
title: SplitAndMeasure
title: split_and_measure参数
type: object
type: UniLabJsonCommandAsync
module: unilabos.devices.virtual.virtual_sample_demo:VirtualSampleDemo
@@ -3079,7 +2988,7 @@ virtual_sample_demo:
config:
properties:
config:
type: string
type: object
device_id:
type: string
required: []

View File

@@ -33,6 +33,8 @@ from unilabos.registry.decorators import (
is_not_action,
is_always_free,
get_topic_config,
NodeType,
normalize_enum_value,
)
from unilabos.registry.utils import (
ROSMsgNotFound,
@@ -159,9 +161,10 @@ class Registry:
ast_entry = self.device_type_registry.get("host_node", {})
ast_actions = ast_entry.get("class", {}).get("action_value_mappings", {})
# 取出 AST 生成的 auto-method entries, 补充特定覆写
# 取出 AST 生成的 action entries, 补充特定覆写
test_latency_action = ast_actions.get("auto-test_latency", {})
test_resource_action = ast_actions.get("auto-test_resource", {})
manual_confirm_action = ast_actions.get("manual_confirm", {})
test_resource_action["handles"] = {
"input": [
{
@@ -237,6 +240,7 @@ class Registry:
},
"test_latency": test_latency_action,
"auto-test_resource": test_resource_action,
"manual_confirm": manual_confirm_action,
},
"init_params": {},
},
@@ -847,6 +851,9 @@ class Registry:
}
if (action_args or {}).get("always_free") or method_info.get("always_free"):
entry["always_free"] = True
nt = normalize_enum_value((action_args or {}).get("node_type"), NodeType)
if nt:
entry["node_type"] = nt
return action_name, entry
# 1) auto- actions
@@ -971,6 +978,9 @@ class Registry:
}
if action_args.get("always_free") or method_info.get("always_free"):
action_entry["always_free"] = True
nt = normalize_enum_value(action_args.get("node_type"), NodeType)
if nt:
action_entry["node_type"] = nt
action_value_mappings[action_name] = action_entry
action_value_mappings = dict(sorted(action_value_mappings.items()))
@@ -1153,7 +1163,7 @@ class Registry:
return Path(BasicConfig.working_dir) / "registry_cache.pkl"
return None
_CACHE_VERSION = 3
_CACHE_VERSION = 4
def _load_config_cache(self) -> dict:
import pickle
@@ -1878,6 +1888,9 @@ class Registry:
}
if v.get("always_free"):
entry["always_free"] = True
old_node_type = old_cfg.get("node_type")
if old_node_type in [NodeType.ILAB.value, NodeType.MANUAL_CONFIRM.value]:
entry["node_type"] = old_node_type
device_config["class"]["action_value_mappings"][action_key] = entry
device_config["init_param_schema"] = {}

View File

@@ -17,6 +17,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union
from msgcenterpy.instances.typed_dict_instance import TypedDictMessageInstance
from unilabos.utils.cls_creator import import_class
from unilabos.registry.decorators import Side, DataSource, normalize_enum_value
_logger = logging.getLogger(__name__)
@@ -487,10 +488,7 @@ def normalize_ast_handles(handles_raw: Any) -> List[Dict[str, Any]]:
}
side = h.get("side")
if side:
if isinstance(side, str) and "." in side:
val = side.rsplit(".", 1)[-1]
side = val.lower() if val in ("LEFT", "RIGHT", "TOP", "BOTTOM") else val
entry["side"] = side
entry["side"] = normalize_enum_value(side, Side) or side
label = h.get("label")
if label:
entry["label"] = label
@@ -499,10 +497,7 @@ def normalize_ast_handles(handles_raw: Any) -> List[Dict[str, Any]]:
entry["data_key"] = data_key
data_source = h.get("data_source")
if data_source:
if isinstance(data_source, str) and "." in data_source:
val = data_source.rsplit(".", 1)[-1]
data_source = val.lower() if val in ("HANDLE", "EXECUTOR") else val
entry["data_source"] = data_source
entry["data_source"] = normalize_enum_value(data_source, DataSource) or data_source
description = h.get("description")
if description:
entry["description"] = description
@@ -537,17 +532,12 @@ def normalize_ast_action_handles(handles_raw: Any) -> Dict[str, Any]:
"data_type": h.get("data_type", ""),
"label": h.get("label", ""),
}
_FIELD_ENUM_MAP = {"side": Side, "data_source": DataSource}
for opt_key in ("side", "data_key", "data_source", "description", "io_type"):
val = h.get(opt_key)
if val is not None:
# Only resolve enum-style refs (e.g. DataSource.HANDLE -> handle) for data_source/side
# data_key values like "wells.@flatten", "@this.0@@@plate" must be preserved as-is
if (
isinstance(val, str)
and "." in val
and opt_key not in ("io_type", "data_key")
):
val = val.rsplit(".", 1)[-1].lower()
if opt_key in _FIELD_ENUM_MAP:
val = normalize_enum_value(val, _FIELD_ENUM_MAP[opt_key]) or val
entry[opt_key] = val
# io_type: only add when explicitly set; do not default output to "sink" (YAML convention omits it)

View File

@@ -24,7 +24,7 @@ from unilabos_msgs.srv import (
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unique_identifier_msgs.msg import UUID
from unilabos.registry.decorators import device
from unilabos.registry.decorators import device, action, NodeType
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.registry.registry import lab_registry
from unilabos.resources.container import RegularContainer
@@ -1621,6 +1621,10 @@ class HostNode(BaseROS2DeviceNode):
}
return res
@action(always_free=True, node_type=NodeType.MANUAL_CONFIRM)
def manual_confirm(self, **kwargs) -> dict:
return kwargs
def test_resource(
self,
sample_uuids: SampleUUIDsType,