Support display_name & desc in new registry system

This commit is contained in:
Xuwznln
2026-04-27 20:28:18 +08:00
parent f6b2bfaf8e
commit f71ea2a258
5 changed files with 583 additions and 165 deletions

View File

@@ -71,6 +71,22 @@ from unilabos.registry.decorators import action
- `_` 开头的方法 → 不扫描 - `_` 开头的方法 → 不扫描
- `@not_action` 标记的方法 → 排除 - `@not_action` 标记的方法 → 排除
### 参数文档 → JSON Schema 元数据
`__init__` 和 action 方法 docstring 的 `Args:` 小节里,使用以下格式生成入参 schema 的显示信息:
```python
"""
Args:
param[显示名称]: 参数说明,会写入 JSON Schema 的 description。
"""
```
- `param[显示名称]` 的显示名称会写入 goal property 的 `title`
- `:` 后面的说明会写入 goal property 的 `description`
- 如果只写 `param: 参数说明``title` 会兜底为字段名,`description` 使用参数说明。
- 如果没有写参数文档,生成器也会兜底补齐 `title=<字段名>``description=""`,但新设备应优先写清楚显示名和说明。
### @topic_config — 状态属性配置 ### @topic_config — 状态属性配置
```python ```python
@@ -105,13 +121,27 @@ import logging
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
from unilabos.registry.decorators import device, action, topic_config, not_action from unilabos.registry.decorators import action, device, not_action, topic_config
@device(id="my_device", category=["my_category"], description="设备描述") @device(
id="my_device",
category=["my_category"],
description="设备描述",
display_name="设备显示名",
)
class MyDevice: class MyDevice:
"""设备类说明。"""
_ros_node: BaseROS2DeviceNode _ros_node: BaseROS2DeviceNode
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs): def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
"""
初始化设备。
Args:
device_id[设备ID]: 设备实例 ID默认使用 my_device。
config[设备配置]: 设备启动配置。
"""
self.device_id = device_id or "my_device" self.device_id = device_id or "my_device"
self.config = config or {} self.config = config or {}
self.logger = logging.getLogger(f"MyDevice.{self.device_id}") self.logger = logging.getLogger(f"MyDevice.{self.device_id}")
@@ -133,7 +163,13 @@ class MyDevice:
@action(description="执行操作") @action(description="执行操作")
def my_action(self, param: float = 0.0, name: str = "") -> Dict[str, Any]: def my_action(self, param: float = 0.0, name: str = "") -> Dict[str, Any]:
"""带 @action 装饰器 → 注册为 'my_action' 动作""" """
带 @action 装饰器 → 注册为 'my_action' 动作。
Args:
param[操作数值]: 操作使用的数值参数。
name[操作名称]: 操作名称或备注。
"""
return {"success": True} return {"success": True}
def get_info(self) -> Dict[str, Any]: def get_info(self) -> Dict[str, Any]:

View File

@@ -14,20 +14,30 @@ Virtual Workbench Device - 模拟工作台设备
import logging import logging
import time import time
from typing import Dict, Any, Optional, List
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
from threading import Lock, RLock from threading import Lock, RLock
from typing import Any, Dict, List, Optional, cast
from typing_extensions import TypedDict from typing_extensions import TypedDict
from unilabos.registry.decorators import ( from unilabos.registry.decorators import (
device, action, ActionInputHandle, ActionOutputHandle, DataSource, topic_config, not_action, NodeType ActionInputHandle,
ActionOutputHandle,
DataSource,
NodeType,
action,
device,
not_action,
topic_config,
) )
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode
from unilabos.resources.resource_tracker import SampleUUIDsType, LabSample, ResourceTreeSet from unilabos.resources.resource_tracker import (
SampleUUIDsType,
LabSample,
ResourceTreeSet,
)
# ============ TypedDict 返回类型定义 ============ # ============ TypedDict 返回类型定义 ============
@@ -112,6 +122,7 @@ class HeatingStation:
@device( @device(
id="virtual_workbench", id="virtual_workbench",
display_name="虚拟工作台",
category=["virtual_device"], category=["virtual_device"],
description="Virtual Workbench with 1 robotic arm and 3 heating stations for concurrent material processing", description="Virtual Workbench with 1 robotic arm and 3 heating stations for concurrent material processing",
) )
@@ -137,7 +148,19 @@ class VirtualWorkbench:
HEATING_TIME: float = 60.0 # 加热时间(秒) HEATING_TIME: float = 60.0 # 加热时间(秒)
NUM_HEATING_STATIONS: int = 3 # 加热台数量 NUM_HEATING_STATIONS: int = 3 # 加热台数量
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs): def __init__(
self,
device_id: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
**kwargs,
):
"""
初始化虚拟工作台。
Args:
device_id[设备ID]: 工作台设备实例 ID默认使用 virtual_workbench。
config[设备配置]: 可包含 arm_operation_time、heating_time、num_heating_stations。
"""
# 处理可能的不同调用方式 # 处理可能的不同调用方式
if device_id is None and "id" in kwargs: if device_id is None and "id" in kwargs:
device_id = kwargs.pop("id") device_id = kwargs.pop("id")
@@ -151,9 +174,13 @@ class VirtualWorkbench:
self.data: Dict[str, Any] = {} self.data: Dict[str, Any] = {}
# 从config中获取可配置参数 # 从config中获取可配置参数
self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", self.ARM_OPERATION_TIME)) self.ARM_OPERATION_TIME = float(
self.config.get("arm_operation_time", self.ARM_OPERATION_TIME)
)
self.HEATING_TIME = float(self.config.get("heating_time", self.HEATING_TIME)) self.HEATING_TIME = float(self.config.get("heating_time", self.HEATING_TIME))
self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", self.NUM_HEATING_STATIONS)) self.NUM_HEATING_STATIONS = int(
self.config.get("num_heating_stations", self.NUM_HEATING_STATIONS)
)
# 机械臂状态和锁 # 机械臂状态和锁
self._arm_lock = Lock() self._arm_lock = Lock()
@@ -162,7 +189,8 @@ class VirtualWorkbench:
# 加热台状态 # 加热台状态
self._heating_stations: Dict[int, HeatingStation] = { self._heating_stations: Dict[int, HeatingStation] = {
i: HeatingStation(station_id=i) for i in range(1, self.NUM_HEATING_STATIONS + 1) i: HeatingStation(station_id=i)
for i in range(1, self.NUM_HEATING_STATIONS + 1)
} }
self._stations_lock = RLock() self._stations_lock = RLock()
@@ -292,45 +320,113 @@ class VirtualWorkbench:
self.logger.info(f"机械臂已释放 (完成: {task})") self.logger.info(f"机械臂已释放 (完成: {task})")
@action( @action(
always_free=True, node_type=NodeType.MANUAL_CONFIRM, placeholder_keys={ always_free=True,
"assignee_user_ids": "unilabos_manual_confirm" node_type=NodeType.MANUAL_CONFIRM,
}, goal_default={ placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"},
"timeout_seconds": 3600, goal_default={"timeout_seconds": 3600, "assignee_user_ids": []},
"assignee_user_ids": [] feedback_interval=300,
}, feedback_interval=300,
handles=[ handles=[
ActionInputHandle(key="target_device", data_type="device_id", ActionInputHandle(
label="目标设备", data_key="target_device", data_source=DataSource.HANDLE), key="target_device",
ActionInputHandle(key="resource", data_type="resource", data_type="device_id",
label="待转移资源", data_key="resource", data_source=DataSource.HANDLE), label="目标设备",
ActionInputHandle(key="mount_resource", data_type="resource", data_key="target_device",
label="目标孔位", data_key="mount_resource", data_source=DataSource.HANDLE), data_source=DataSource.HANDLE,
),
ActionInputHandle(key="collector_mass", data_type="collector_mass", ActionInputHandle(
label="极流体质量", data_key="collector_mass", data_source=DataSource.HANDLE), key="resource",
ActionInputHandle(key="active_material", data_type="active_material", data_type="resource",
label="活性物质含量", data_key="active_material", data_source=DataSource.HANDLE), label="待转移资源",
ActionInputHandle(key="capacity", data_type="capacity", data_key="resource",
label="克容量", data_key="capacity", data_source=DataSource.HANDLE), data_source=DataSource.HANDLE,
ActionInputHandle(key="battery_system", data_type="battery_system", ),
label="电池体系", data_key="battery_system", data_source=DataSource.HANDLE), ActionInputHandle(
key="mount_resource",
data_type="resource",
label="目标孔位",
data_key="mount_resource",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="collector_mass",
data_type="collector_mass",
label="极流体质量",
data_key="collector_mass",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="active_material",
data_type="active_material",
label="活性物质含量",
data_key="active_material",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="capacity",
data_type="capacity",
label="克容量",
data_key="capacity",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="battery_system",
data_type="battery_system",
label="电池体系",
data_key="battery_system",
data_source=DataSource.HANDLE,
),
# transfer使用 # transfer使用
ActionOutputHandle(key="target_device", data_type="device_id", ActionOutputHandle(
label="目标设备", data_key="target_device", data_source=DataSource.EXECUTOR), key="target_device",
ActionOutputHandle(key="resource", data_type="resource", data_type="device_id",
label="待转移资源", data_key="resource.@flatten", data_source=DataSource.EXECUTOR), label="目标设备",
ActionOutputHandle(key="mount_resource", data_type="resource", data_key="target_device",
label="目标孔位", data_key="mount_resource.@flatten", data_source=DataSource.EXECUTOR), data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="resource",
data_type="resource",
label="待转移资源",
data_key="resource.@flatten",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="mount_resource",
data_type="resource",
label="目标孔位",
data_key="mount_resource.@flatten",
data_source=DataSource.EXECUTOR,
),
# test使用 # test使用
ActionOutputHandle(key="collector_mass", data_type="collector_mass", ActionOutputHandle(
label="极流体质量", data_key="collector_mass", data_source=DataSource.EXECUTOR), key="collector_mass",
ActionOutputHandle(key="active_material", data_type="active_material", data_type="collector_mass",
label="活性物质含量", data_key="active_material", data_source=DataSource.EXECUTOR), label="极流体质量",
ActionOutputHandle(key="capacity", data_type="capacity", data_key="collector_mass",
label="克容量", data_key="capacity", data_source=DataSource.EXECUTOR), data_source=DataSource.EXECUTOR,
ActionOutputHandle(key="battery_system", data_type="battery_system", ),
label="电池体系", data_key="battery_system", data_source=DataSource.EXECUTOR), ActionOutputHandle(
] key="active_material",
data_type="active_material",
label="活性物质含量",
data_key="active_material",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="capacity",
data_type="capacity",
label="克容量",
data_key="capacity",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="battery_system",
data_type="battery_system",
label="电池体系",
data_key="battery_system",
data_source=DataSource.EXECUTOR,
),
],
) )
def manual_confirm( def manual_confirm(
self, self,
@@ -343,67 +439,156 @@ class VirtualWorkbench:
battery_system: List[str], battery_system: List[str],
timeout_seconds: int, timeout_seconds: int,
assignee_user_ids: list[str], assignee_user_ids: list[str],
**kwargs **kwargs,
) -> dict: ) -> dict:
""" """
timeout_seconds: 超时时间默认3600秒 人工确认资源转移和扣电测试参数。
collector_mass: 极流体质量
active_material: 活性物质含量 Args:
capacity: 克容量mAh/g resource[待转移资源]: 需要人工确认的资源列表。
battery_system: 电池体系 target_device[目标设备]: 资源要转移到的目标设备 ID。
修改的结果无效,是只读的 mount_resource[目标孔位]: 资源要挂载到的目标孔位列表。
collector_mass[极流体质量]: 每个样品对应的极流体质量。
active_material[活性物质含量]: 每个样品对应的活性物质含量。
capacity[克容量]: 每个样品对应的克容量,单位 mAh/g。
battery_system[电池体系]: 每个样品对应的电池体系名称。
timeout_seconds[超时时间]: 人工确认超时时间,单位秒。
assignee_user_ids[确认人]: 指定处理人工确认任务的用户 ID 列表。
Note:
修改的结果无效,是只读的。
""" """
resource = ResourceTreeSet.from_plr_resources(resource).dump() resource_tree = ResourceTreeSet.from_plr_resources(cast(Any, resource)).dump()
mount_resource = ResourceTreeSet.from_plr_resources(mount_resource).dump() mount_resource_tree = ResourceTreeSet.from_plr_resources(cast(Any, mount_resource)).dump()
kwargs.update(locals()) kwargs.update(locals())
kwargs.pop("kwargs") kwargs.pop("kwargs")
kwargs.pop("self") kwargs.pop("self")
kwargs["resource"] = resource_tree
kwargs["mount_resource"] = mount_resource_tree
kwargs.pop("resource_tree")
kwargs.pop("mount_resource_tree")
return kwargs return kwargs
@action( @action(
description="转移物料", description="转移物料",
handles=[ handles=[
ActionInputHandle(key="target_device", data_type="device_id", ActionInputHandle(
label="目标设备", data_key="target_device", data_source=DataSource.HANDLE), key="target_device",
ActionInputHandle(key="resource", data_type="resource", data_type="device_id",
label="待转移资源", data_key="resource", data_source=DataSource.HANDLE), label="目标设备",
ActionInputHandle(key="mount_resource", data_type="resource", data_key="target_device",
label="目标孔位", data_key="mount_resource", data_source=DataSource.HANDLE), data_source=DataSource.HANDLE,
] ),
ActionInputHandle(
key="resource",
data_type="resource",
label="待转移资源",
data_key="resource",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="mount_resource",
data_type="resource",
label="目标孔位",
data_key="mount_resource",
data_source=DataSource.HANDLE,
),
],
) )
async def transfer(self, resource: List[ResourceSlot], target_device: DeviceSlot, mount_resource: List[ResourceSlot]): async def transfer(
future = ROS2DeviceNode.run_async_func(self._ros_node.transfer_resource_to_another, True, self,
resource: List[ResourceSlot],
target_device: DeviceSlot,
mount_resource: List[ResourceSlot],
):
"""
转移资源到目标设备。
Args:
resource[待转移资源]: 待转移的资源列表。
target_device[目标设备]: 接收资源的目标设备 ID。
mount_resource[目标孔位]: 目标设备上的挂载孔位列表。
"""
future = ROS2DeviceNode.run_async_func(
self._ros_node.transfer_resource_to_another,
True,
**{ **{
"plr_resources": resource, "plr_resources": resource,
"target_device_id": target_device, "target_device_id": target_device,
"target_resources": mount_resource, "target_resources": mount_resource,
"sites": [None] * len(mount_resource), "sites": [None] * len(mount_resource),
}) },
)
result = await future result = await future
return result return result
@action( @action(
description="扣电测试启动", description="扣电测试启动",
handles=[ handles=[
ActionInputHandle(key="resource", data_type="resource", ActionInputHandle(
label="待转移资源", data_key="resource", data_source=DataSource.HANDLE), key="resource",
ActionInputHandle(key="mount_resource", data_type="resource", data_type="resource",
label="目标孔位", data_key="mount_resource", data_source=DataSource.HANDLE), label="待转移资源",
data_key="resource",
ActionInputHandle(key="collector_mass", data_type="collector_mass", data_source=DataSource.HANDLE,
label="极流体质量", data_key="collector_mass", data_source=DataSource.HANDLE), ),
ActionInputHandle(key="active_material", data_type="active_material", ActionInputHandle(
label="活性物质含量", data_key="active_material", data_source=DataSource.HANDLE), key="mount_resource",
ActionInputHandle(key="capacity", data_type="capacity", data_type="resource",
label="克容量", data_key="capacity", data_source=DataSource.HANDLE), label="目标孔位",
ActionInputHandle(key="battery_system", data_type="battery_system", data_key="mount_resource",
label="电池体系", data_key="battery_system", data_source=DataSource.HANDLE), data_source=DataSource.HANDLE,
] ),
ActionInputHandle(
key="collector_mass",
data_type="collector_mass",
label="极流体质量",
data_key="collector_mass",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="active_material",
data_type="active_material",
label="活性物质含量",
data_key="active_material",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="capacity",
data_type="capacity",
label="克容量",
data_key="capacity",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="battery_system",
data_type="battery_system",
label="电池体系",
data_key="battery_system",
data_source=DataSource.HANDLE,
),
],
) )
async def test( async def test(
self, resource: List[ResourceSlot], mount_resource: List[ResourceSlot], collector_mass: List[float], active_material: List[float], capacity: List[float], battery_system: list[str] self,
resource: List[ResourceSlot],
mount_resource: List[ResourceSlot],
collector_mass: List[float],
active_material: List[float],
capacity: List[float],
battery_system: list[str],
): ):
"""
启动扣电测试。
Args:
resource[待测试资源]: 需要进行扣电测试的资源列表。
mount_resource[测试孔位]: 扣电测试使用的目标孔位列表。
collector_mass[极流体质量]: 每个样品对应的极流体质量。
active_material[活性物质含量]: 每个样品对应的活性物质含量。
capacity[克容量]: 每个样品对应的克容量,单位 mAh/g。
battery_system[电池体系]: 每个样品对应的电池体系名称。
"""
print(resource) print(resource)
print(mount_resource) print(mount_resource)
print(collector_mass) print(collector_mass)
@@ -415,16 +600,11 @@ class VirtualWorkbench:
auto_prefix=True, auto_prefix=True,
description="批量准备物料 - 虚拟起始节点, 生成A1-A5物料, 输出5个handle供后续节点使用", description="批量准备物料 - 虚拟起始节点, 生成A1-A5物料, 输出5个handle供后续节点使用",
handles=[ handles=[
ActionOutputHandle(key="channel_1", data_type="workbench_material", ActionOutputHandle(key="channel_1", data_type="workbench_material", label="实验1", data_key="material_1", data_source=DataSource.EXECUTOR), # noqa: E501
label="实验1", data_key="material_1", data_source=DataSource.EXECUTOR), ActionOutputHandle(key="channel_2", data_type="workbench_material", label="实验2", data_key="material_2", data_source=DataSource.EXECUTOR), # noqa: E501
ActionOutputHandle(key="channel_2", data_type="workbench_material", ActionOutputHandle(key="channel_3", data_type="workbench_material", label="实验3", data_key="material_3", data_source=DataSource.EXECUTOR), # noqa: E501
label="实验2", data_key="material_2", data_source=DataSource.EXECUTOR), ActionOutputHandle(key="channel_4", data_type="workbench_material", label="实验4", data_key="material_4", data_source=DataSource.EXECUTOR), # noqa: E501
ActionOutputHandle(key="channel_3", data_type="workbench_material", ActionOutputHandle(key="channel_5", data_type="workbench_material", label="实验5", data_key="material_5", data_source=DataSource.EXECUTOR), # noqa: E501
label="实验3", data_key="material_3", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="channel_4", data_type="workbench_material",
label="实验4", data_key="material_4", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="channel_5", data_type="workbench_material",
label="实验5", data_key="material_5", data_source=DataSource.EXECUTOR),
], ],
) )
def prepare_materials( def prepare_materials(
@@ -437,6 +617,9 @@ class VirtualWorkbench:
作为工作流的起始节点, 生成指定数量的物料编号供后续节点使用。 作为工作流的起始节点, 生成指定数量的物料编号供后续节点使用。
输出5个handle (material_1 ~ material_5), 分别对应实验1~5。 输出5个handle (material_1 ~ material_5), 分别对应实验1~5。
Args:
count[物料数量]: 要生成的物料数量,默认生成 5 个。
""" """
materials = [i for i in range(1, count + 1)] materials = [i for i in range(1, count + 1)]
@@ -457,7 +640,11 @@ class VirtualWorkbench:
LabSample( LabSample(
sample_uuid=sample_uuid, sample_uuid=sample_uuid,
oss_path="", oss_path="",
extra={"material_uuid": content} if isinstance(content, str) else (content.serialize() if content else {}), extra=(
{"material_uuid": content}
if isinstance(content, str)
else (content.serialize() if content else {})
),
) )
for sample_uuid, content in sample_uuids.items() for sample_uuid, content in sample_uuids.items()
], ],
@@ -467,12 +654,27 @@ class VirtualWorkbench:
auto_prefix=True, auto_prefix=True,
description="将物料从An位置移动到空闲加热台, 返回分配的加热台ID", description="将物料从An位置移动到空闲加热台, 返回分配的加热台ID",
handles=[ handles=[
ActionInputHandle(key="material_input", data_type="workbench_material", ActionInputHandle(
label="物料编号", data_key="material_number", data_source=DataSource.HANDLE), key="material_input",
ActionOutputHandle(key="heating_station_output", data_type="workbench_station", data_type="workbench_material",
label="加热台ID", data_key="station_id", data_source=DataSource.EXECUTOR), label="物料编号",
ActionOutputHandle(key="material_number_output", data_type="workbench_material", data_key="material_number",
label="物料编号", data_key="material_number", data_source=DataSource.EXECUTOR), data_source=DataSource.HANDLE,
),
ActionOutputHandle(
key="heating_station_output",
data_type="workbench_station",
label="加热台ID",
data_key="station_id",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="material_number_output",
data_type="workbench_material",
label="物料编号",
data_key="material_number",
data_source=DataSource.EXECUTOR,
),
], ],
) )
def move_to_heating_station( def move_to_heating_station(
@@ -484,6 +686,9 @@ class VirtualWorkbench:
将物料从An位置移动到加热台 将物料从An位置移动到加热台
多线程并发调用时, 会竞争机械臂使用权, 并自动查找空闲加热台 多线程并发调用时, 会竞争机械臂使用权, 并自动查找空闲加热台
Args:
material_number[物料编号]: 要移动的物料编号,对应 A1、A2 等起始位置。
""" """
material_id = f"A{material_number}" material_id = f"A{material_number}"
task_desc = f"移动{material_id}到加热台" task_desc = f"移动{material_id}到加热台"
@@ -546,7 +751,8 @@ class VirtualWorkbench:
oss_path="", oss_path="",
extra=( extra=(
{"material_uuid": content} {"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {}) if isinstance(content, str)
else (content.serialize() if content else {})
), ),
) )
for sample_uuid, content in sample_uuids.items() for sample_uuid, content in sample_uuids.items()
@@ -569,7 +775,8 @@ class VirtualWorkbench:
oss_path="", oss_path="",
extra=( extra=(
{"material_uuid": content} {"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {}) if isinstance(content, str)
else (content.serialize() if content else {})
), ),
) )
for sample_uuid, content in sample_uuids.items() for sample_uuid, content in sample_uuids.items()
@@ -581,14 +788,34 @@ class VirtualWorkbench:
always_free=True, always_free=True,
description="启动指定加热台的加热程序", description="启动指定加热台的加热程序",
handles=[ handles=[
ActionInputHandle(key="station_id_input", data_type="workbench_station", ActionInputHandle(
label="加热台ID", data_key="station_id", data_source=DataSource.HANDLE), key="station_id_input",
ActionInputHandle(key="material_number_input", data_type="workbench_material", data_type="workbench_station",
label="物料编号", data_key="material_number", data_source=DataSource.HANDLE), label="加热台ID",
ActionOutputHandle(key="heating_done_station", data_type="workbench_station", data_key="station_id",
label="加热完成-加热台ID", data_key="station_id", data_source=DataSource.EXECUTOR), data_source=DataSource.HANDLE,
ActionOutputHandle(key="heating_done_material", data_type="workbench_material", ),
label="加热完成-物料编号", data_key="material_number", data_source=DataSource.EXECUTOR), ActionInputHandle(
key="material_number_input",
data_type="workbench_material",
label="物料编号",
data_key="material_number",
data_source=DataSource.HANDLE,
),
ActionOutputHandle(
key="heating_done_station",
data_type="workbench_station",
label="加热完成-加热台ID",
data_key="station_id",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="heating_done_material",
data_type="workbench_material",
label="加热完成-物料编号",
data_key="material_number",
data_source=DataSource.EXECUTOR,
),
], ],
) )
def start_heating( def start_heating(
@@ -599,6 +826,10 @@ class VirtualWorkbench:
) -> StartHeatingResult: ) -> StartHeatingResult:
""" """
启动指定加热台的加热程序 启动指定加热台的加热程序
Args:
station_id[加热台ID]: 要启动加热的加热台编号。
material_number[物料编号]: 当前加热台上的物料编号。
""" """
self.logger.info(f"[加热台{station_id}] 开始加热") self.logger.info(f"[加热台{station_id}] 开始加热")
@@ -615,7 +846,8 @@ class VirtualWorkbench:
oss_path="", oss_path="",
extra=( extra=(
{"material_uuid": content} {"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {}) if isinstance(content, str)
else (content.serialize() if content else {})
), ),
) )
for sample_uuid, content in sample_uuids.items() for sample_uuid, content in sample_uuids.items()
@@ -638,7 +870,8 @@ class VirtualWorkbench:
oss_path="", oss_path="",
extra=( extra=(
{"material_uuid": content} {"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {}) if isinstance(content, str)
else (content.serialize() if content else {})
), ),
) )
for sample_uuid, content in sample_uuids.items() for sample_uuid, content in sample_uuids.items()
@@ -658,7 +891,8 @@ class VirtualWorkbench:
oss_path="", oss_path="",
extra=( extra=(
{"material_uuid": content} {"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {}) if isinstance(content, str)
else (content.serialize() if content else {})
), ),
) )
for sample_uuid, content in sample_uuids.items() for sample_uuid, content in sample_uuids.items()
@@ -698,7 +932,9 @@ class VirtualWorkbench:
self._update_data_status(f"加热台{station_id}加热中: {progress:.1f}%") self._update_data_status(f"加热台{station_id}加热中: {progress:.1f}%")
if time.time() - last_countdown_log >= 5.0: if time.time() - last_countdown_log >= 5.0:
self.logger.info(f"[加热台{station_id}] {material_id} 剩余 {remaining:.1f}s") self.logger.info(
f"[加热台{station_id}] {material_id} 剩余 {remaining:.1f}s"
)
last_countdown_log = time.time() last_countdown_log = time.time()
if elapsed >= self.HEATING_TIME: if elapsed >= self.HEATING_TIME:
@@ -715,7 +951,9 @@ class VirtualWorkbench:
self._active_tasks[material_id]["status"] = "heating_completed" self._active_tasks[material_id]["status"] = "heating_completed"
self._update_data_status(f"加热台{station_id}加热完成") self._update_data_status(f"加热台{station_id}加热完成")
self.logger.info(f"[加热台{station_id}] {material_id}加热完成 (用时{self.HEATING_TIME}s)") self.logger.info(
f"[加热台{station_id}] {material_id}加热完成 (用时{self.HEATING_TIME}s)"
)
return { return {
"success": True, "success": True,
@@ -729,7 +967,8 @@ class VirtualWorkbench:
oss_path="", oss_path="",
extra=( extra=(
{"material_uuid": content} {"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {}) if isinstance(content, str)
else (content.serialize() if content else {})
), ),
) )
for sample_uuid, content in sample_uuids.items() for sample_uuid, content in sample_uuids.items()
@@ -740,10 +979,20 @@ class VirtualWorkbench:
auto_prefix=True, auto_prefix=True,
description="将物料从加热台移动到输出位置Cn", description="将物料从加热台移动到输出位置Cn",
handles=[ handles=[
ActionInputHandle(key="output_station_input", data_type="workbench_station", ActionInputHandle(
label="加热台ID", data_key="station_id", data_source=DataSource.HANDLE), key="output_station_input",
ActionInputHandle(key="output_material_input", data_type="workbench_material", data_type="workbench_station",
label="物料编号", data_key="material_number", data_source=DataSource.HANDLE), label="加热台ID",
data_key="station_id",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="output_material_input",
data_type="workbench_material",
label="物料编号",
data_key="material_number",
data_source=DataSource.HANDLE,
),
], ],
) )
def move_to_output( def move_to_output(
@@ -754,6 +1003,10 @@ class VirtualWorkbench:
) -> MoveToOutputResult: ) -> MoveToOutputResult:
""" """
将物料从加热台移动到输出位置Cn 将物料从加热台移动到输出位置Cn
Args:
station_id[加热台ID]: 已完成加热的加热台编号。
material_number[物料编号]: 要移动到输出位置的物料编号,对应 Cn。
""" """
output_number = material_number output_number = material_number
@@ -770,7 +1023,8 @@ class VirtualWorkbench:
oss_path="", oss_path="",
extra=( extra=(
{"material_uuid": content} {"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {}) if isinstance(content, str)
else (content.serialize() if content else {})
), ),
) )
for sample_uuid, content in sample_uuids.items() for sample_uuid, content in sample_uuids.items()
@@ -794,7 +1048,8 @@ class VirtualWorkbench:
oss_path="", oss_path="",
extra=( extra=(
{"material_uuid": content} {"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {}) if isinstance(content, str)
else (content.serialize() if content else {})
), ),
) )
for sample_uuid, content in sample_uuids.items() for sample_uuid, content in sample_uuids.items()
@@ -814,7 +1069,8 @@ class VirtualWorkbench:
oss_path="", oss_path="",
extra=( extra=(
{"material_uuid": content} {"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {}) if isinstance(content, str)
else (content.serialize() if content else {})
), ),
) )
for sample_uuid, content in sample_uuids.items() for sample_uuid, content in sample_uuids.items()
@@ -896,7 +1152,8 @@ class VirtualWorkbench:
oss_path="", oss_path="",
extra=( extra=(
{"material_uuid": content} {"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {}) if isinstance(content, str)
else (content.serialize() if content else {})
), ),
) )
for sample_uuid, content in sample_uuids.items() for sample_uuid, content in sample_uuids.items()

View File

@@ -32,7 +32,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union
MAX_SCAN_DEPTH = 10 # 最大目录递归深度 MAX_SCAN_DEPTH = 10 # 最大目录递归深度
MAX_SCAN_FILES = 1000 # 最大扫描文件数量 MAX_SCAN_FILES = 1000 # 最大扫描文件数量
_CACHE_VERSION = 1 # 缓存格式版本号,格式变更时递增 _CACHE_VERSION = 2 # 缓存格式版本号,格式变更时递增
# 合法的装饰器来源模块 # 合法的装饰器来源模块
_REGISTRY_DECORATOR_MODULE = "unilabos.registry.decorators" _REGISTRY_DECORATOR_MODULE = "unilabos.registry.decorators"
@@ -258,8 +258,6 @@ def scan_directory(
} }
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# File-level parsing # File-level parsing
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -361,6 +359,7 @@ def _parse_file(
"actions": class_body.get("actions", {}), "actions": class_body.get("actions", {}),
"status_properties": class_body.get("status_properties", {}), "status_properties": class_body.get("status_properties", {}),
"init_params": class_body.get("init_params", []), "init_params": class_body.get("init_params", []),
"init_docstring": class_body.get("init_docstring"),
"auto_methods": class_body.get("auto_methods", {}), "auto_methods": class_body.get("auto_methods", {}),
"import_map": import_map, "import_map": import_map,
} }
@@ -497,7 +496,6 @@ def _collect_imports(tree: ast.Module, module_path: str = "") -> Dict[str, str]:
return import_map return import_map
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Decorator finding & argument extraction # Decorator finding & argument extraction
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -768,6 +766,7 @@ def _extract_class_body(
"actions": {}, # method_name -> action_info "actions": {}, # method_name -> action_info
"status_properties": {}, # prop_name -> status_info "status_properties": {}, # prop_name -> status_info
"init_params": [], # [{"name": ..., "type": ..., "default": ...}, ...] "init_params": [], # [{"name": ..., "type": ..., "default": ...}, ...]
"init_docstring": None,
"auto_methods": {}, # method_name -> method_info (no @action decorator) "auto_methods": {}, # method_name -> method_info (no @action decorator)
} }
@@ -780,6 +779,7 @@ def _extract_class_body(
# --- __init__ --- # --- __init__ ---
if method_name == "__init__": if method_name == "__init__":
result["init_params"] = _extract_method_params(item, import_map) result["init_params"] = _extract_method_params(item, import_map)
result["init_docstring"] = ast.get_docstring(item)
continue continue
# --- Skip private/dunder --- # --- Skip private/dunder ---

View File

@@ -271,6 +271,7 @@ class Registry:
registry_cache.pkl 一个文件中,删除即可完全重置。 registry_cache.pkl 一个文件中,删除即可完全重置。
""" """
import time as _time import time as _time
from unilabos.registry.ast_registry_scanner import _CACHE_VERSION as AST_SCAN_CACHE_VERSION
from unilabos.registry.ast_registry_scanner import scan_directory from unilabos.registry.ast_registry_scanner import scan_directory
scan_t0 = _time.perf_counter() scan_t0 = _time.perf_counter()
@@ -286,6 +287,10 @@ class Registry:
# ---- 统一缓存:一个 pkl 包含所有数据 ---- # ---- 统一缓存:一个 pkl 包含所有数据 ----
unified_cache = self._load_config_cache() unified_cache = self._load_config_cache()
ast_cache = unified_cache.setdefault("_ast_scan", {"files": {}}) ast_cache = unified_cache.setdefault("_ast_scan", {"files": {}})
if ast_cache.get("version") != AST_SCAN_CACHE_VERSION:
ast_cache = {"version": AST_SCAN_CACHE_VERSION, "files": {}}
unified_cache["_ast_scan"] = ast_cache
unified_cache.pop("_build_results", None)
# 默认:扫描 unilabos 包所在的父目录 # 默认:扫描 unilabos 包所在的父目录
pkg_root = Path(__file__).resolve().parent.parent # .../unilabos pkg_root = Path(__file__).resolve().parent.parent # .../unilabos
@@ -561,13 +566,38 @@ class Registry:
return prop_schema return prop_schema
@staticmethod
def _apply_docstring_param_metadata(
schema: Dict[str, Any],
doc_info: Dict[str, Any],
field_to_param: Optional[Dict[str, str]] = None,
) -> None:
"""Apply parsed docstring display names and descriptions to schema properties."""
if not schema or not doc_info:
return
props = schema.get("properties", {})
if not isinstance(props, dict):
return
param_descs = doc_info.get("params", {}) or {}
param_display_names = doc_info.get("param_display_names", {}) or {}
for field_name, prop_schema in props.items():
if not isinstance(prop_schema, dict):
continue
param_name = field_to_param.get(field_name, field_name) if field_to_param else field_name
if not isinstance(param_name, str):
continue
param_name = param_name.removesuffix("[]")
prop_schema["title"] = param_display_names.get(param_name, prop_schema.get("title") or field_name)
prop_schema["description"] = param_descs.get(param_name, prop_schema.get("description") or "")
def _generate_unilab_json_command_schema( def _generate_unilab_json_command_schema(
self, method_args: list, docstring: Optional[str] = None, self, method_args: list, docstring: Optional[str] = None,
import_map: Optional[Dict[str, str]] = None, import_map: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""根据方法参数和 docstring 生成 UniLabJsonCommand schema""" """根据方法参数和 docstring 生成 UniLabJsonCommand schema"""
doc_info = parse_docstring(docstring) doc_info = parse_docstring(docstring)
param_descs = doc_info.get("params", {})
schema = { schema = {
"type": "object", "type": "object",
@@ -598,12 +628,10 @@ class Registry:
param_name, param_type, param_default, import_map=import_map param_name, param_type, param_default, import_map=import_map
) )
if param_name in param_descs:
schema["properties"][param_name]["description"] = param_descs[param_name]
if param_required: if param_required:
schema["required"].append(param_name) schema["required"].append(param_name)
self._apply_docstring_param_metadata(schema, doc_info)
return schema return schema
def _generate_status_types_schema(self, status_methods: Dict[str, Any]) -> Dict[str, Any]: def _generate_status_types_schema(self, status_methods: Dict[str, Any]) -> Dict[str, Any]:
@@ -799,6 +827,7 @@ class Registry:
type_str = "UniLabJsonCommandAsync" if is_async else "UniLabJsonCommand" type_str = "UniLabJsonCommandAsync" if is_async else "UniLabJsonCommand"
params = method_info.get("params", []) params = method_info.get("params", [])
method_doc = method_info.get("docstring") method_doc = method_info.get("docstring")
method_doc_info = parse_docstring(method_doc)
goal_schema = self._generate_schema_from_ast_params(params, method_name, method_doc, imap) goal_schema = self._generate_schema_from_ast_params(params, method_name, method_doc, imap)
if action_args is not None: if action_args is not None:
@@ -828,7 +857,11 @@ class Registry:
# action handles: 从 @action(handles=[...]) 提取并转换为标准格式 # action handles: 从 @action(handles=[...]) 提取并转换为标准格式
raw_handles = (action_args or {}).get("handles") raw_handles = (action_args or {}).get("handles")
handles = normalize_ast_action_handles(raw_handles) if isinstance(raw_handles, list) else (raw_handles or {}) handles = (
normalize_ast_action_handles(raw_handles)
if isinstance(raw_handles, list)
else (raw_handles or {})
)
# placeholder_keys: 先从参数类型自动检测,再用装饰器显式配置覆盖/补充 # placeholder_keys: 先从参数类型自动检测,再用装饰器显式配置覆盖/补充
pk = detect_placeholder_keys(params) pk = detect_placeholder_keys(params)
@@ -847,7 +880,12 @@ class Registry:
"goal": goal, "goal": goal,
"feedback": (action_args or {}).get("feedback") or {}, "feedback": (action_args or {}).get("feedback") or {},
"result": (action_args or {}).get("result") or {}, "result": (action_args or {}).get("result") or {},
"schema": wrap_action_schema(goal_schema, action_name, result_schema=result_schema), "schema": wrap_action_schema(
goal_schema,
action_name,
description=(action_args or {}).get("description") or method_doc_info.get("description", ""),
result_schema=result_schema,
),
"goal_default": goal_default, "goal_default": goal_default,
"handles": handles, "handles": handles,
"placeholder_keys": pk, "placeholder_keys": pk,
@@ -886,7 +924,11 @@ class Registry:
action_name = f"auto-{action_name}" action_name = f"auto-{action_name}"
raw_handles = action_args.get("handles") raw_handles = action_args.get("handles")
handles = normalize_ast_action_handles(raw_handles) if isinstance(raw_handles, list) else (raw_handles or {}) handles = (
normalize_ast_action_handles(raw_handles)
if isinstance(raw_handles, list)
else (raw_handles or {})
)
method_params = method_info.get("params", []) method_params = method_info.get("params", [])
@@ -979,7 +1021,10 @@ class Registry:
"schema": schema, "schema": schema,
"goal_default": goal_default, "goal_default": goal_default,
"handles": handles, "handles": handles,
"placeholder_keys": {**detect_placeholder_keys(method_params), **(action_args.get("placeholder_keys") or {})}, "placeholder_keys": {
**detect_placeholder_keys(method_params),
**(action_args.get("placeholder_keys") or {}),
},
} }
if action_args.get("always_free") or method_info.get("always_free"): if action_args.get("always_free") or method_info.get("always_free"):
action_entry["always_free"] = True action_entry["always_free"] = True
@@ -988,13 +1033,21 @@ class Registry:
nt = normalize_enum_value(action_args.get("node_type"), NodeType) nt = normalize_enum_value(action_args.get("node_type"), NodeType)
if nt: if nt:
action_entry["node_type"] = nt action_entry["node_type"] = nt
goal_schema_for_docs = action_entry.get("schema", {}).get("properties", {}).get("goal", {})
self._apply_docstring_param_metadata(
goal_schema_for_docs,
parse_docstring(method_info.get("docstring")),
goal,
)
action_value_mappings[action_name] = action_entry action_value_mappings[action_name] = action_entry
action_value_mappings = dict(sorted(action_value_mappings.items())) action_value_mappings = dict(sorted(action_value_mappings.items()))
# --- init_param_schema = { config: <init_params>, data: <status_types> } --- # --- init_param_schema = { config: <init_params>, data: <status_types> } ---
init_params = ast_meta.get("init_params", []) init_params = ast_meta.get("init_params", [])
config_schema = self._generate_schema_from_ast_params(init_params, "__init__", import_map=imap) config_schema = self._generate_schema_from_ast_params(
init_params, "__init__", ast_meta.get("init_docstring"), import_map=imap
)
data_schema = self._generate_status_schema_from_ast( data_schema = self._generate_status_schema_from_ast(
ast_meta.get("status_properties", {}), imap ast_meta.get("status_properties", {}), imap
) )
@@ -1042,7 +1095,6 @@ class Registry:
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Generate JSON Schema from AST-extracted parameter list.""" """Generate JSON Schema from AST-extracted parameter list."""
doc_info = parse_docstring(docstring) doc_info = parse_docstring(docstring)
param_descs = doc_info.get("params", {})
schema: Dict[str, Any] = { schema: Dict[str, Any] = {
"type": "object", "type": "object",
@@ -1072,12 +1124,10 @@ class Registry:
pname, ptype, pdefault, import_map pname, ptype, pdefault, import_map
) )
if pname in param_descs:
schema["properties"][pname]["description"] = param_descs[pname]
if prequired: if prequired:
schema["required"].append(pname) schema["required"].append(pname)
self._apply_docstring_param_metadata(schema, doc_info)
return schema return schema
def _generate_status_schema_from_ast( def _generate_status_schema_from_ast(
@@ -1807,7 +1857,7 @@ class Registry:
else: else:
action_key = f"auto-{k}" action_key = f"auto-{k}"
goal_schema = self._generate_unilab_json_command_schema( goal_schema = self._generate_unilab_json_command_schema(
v["args"], import_map=enhanced_import_map v["args"], docstring=v.get("docstring"), import_map=enhanced_import_map
) )
ret_type = v.get("return_type", "") ret_type = v.get("return_type", "")
result_schema = None result_schema = None
@@ -1816,7 +1866,13 @@ class Registry:
"result", ret_type, None, import_map=enhanced_import_map "result", ret_type, None, import_map=enhanced_import_map
) )
old_cfg = old_action_configs.get(action_key) or old_action_configs.get(f"auto-{k}", {}) old_cfg = old_action_configs.get(action_key) or old_action_configs.get(f"auto-{k}", {})
new_schema = wrap_action_schema(goal_schema, action_key, result_schema=result_schema) doc_info = parse_docstring(v.get("docstring"))
new_schema = wrap_action_schema(
goal_schema,
action_key,
description=doc_info.get("description", ""),
result_schema=result_schema,
)
old_schema = old_cfg.get("schema", {}) old_schema = old_cfg.get("schema", {})
if old_schema: if old_schema:
preserve_field_descriptions(new_schema, old_schema) preserve_field_descriptions(new_schema, old_schema)
@@ -1882,6 +1938,12 @@ class Registry:
merged_pk = dict(old_cfg.get("placeholder_keys", {})) merged_pk = dict(old_cfg.get("placeholder_keys", {}))
merged_pk.update(detect_placeholder_keys(v["args"])) merged_pk.update(detect_placeholder_keys(v["args"]))
goal_schema_for_docs = (
entry_schema.get("properties", {}).get("goal", {})
if isinstance(entry_schema, dict)
else {}
)
self._apply_docstring_param_metadata(goal_schema_for_docs, doc_info, entry_goal)
entry = { entry = {
"type": entry_type, "type": entry_type,
@@ -1902,7 +1964,8 @@ class Registry:
device_config["init_param_schema"] = {} device_config["init_param_schema"] = {}
init_schema = self._generate_unilab_json_command_schema( init_schema = self._generate_unilab_json_command_schema(
enhanced_info["init_params"], "__init__", enhanced_info["init_params"],
docstring=enhanced_info.get("init_docstring"),
import_map=enhanced_import_map, import_map=enhanced_import_map,
) )
device_config["init_param_schema"]["config"] = init_schema device_config["init_param_schema"]["config"] = init_schema
@@ -1949,7 +2012,9 @@ class Registry:
action_str_type_mapping[action_type_str] = target_type action_str_type_mapping[action_type_str] = target_type
if target_type is not None: if target_type is not None:
try: try:
action_config["goal_default"] = ROS2MessageInstance(target_type.Goal()).get_python_dict() action_config["goal_default"] = ROS2MessageInstance(
target_type.Goal()
).get_python_dict()
except Exception: except Exception:
action_config["goal_default"] = {} action_config["goal_default"] = {}
prev_schema = action_config.get("schema", {}) prev_schema = action_config.get("schema", {})
@@ -2141,10 +2206,15 @@ class Registry:
"unilabos_device_id": { "unilabos_device_id": {
"type": "string", "type": "string",
"default": "", "default": "",
"title": "设备ID",
"description": "UniLabOS设备ID用于指定执行动作的具体设备实例", "description": "UniLabOS设备ID用于指定执行动作的具体设备实例",
}, },
**schema["properties"]["goal"]["properties"], **schema["properties"]["goal"]["properties"],
} }
for field_name, field_schema in schema["properties"]["goal"]["properties"].items():
if isinstance(field_schema, dict):
field_schema.setdefault("title", field_name)
field_schema.setdefault("description", "")
# 将 placeholder_keys 信息添加到 schema 中 # 将 placeholder_keys 信息添加到 schema 中
if "placeholder_keys" in action_config and action_config.get("schema", {}).get( if "placeholder_keys" in action_config and action_config.get("schema", {}).get(
"properties", {} "properties", {}
@@ -2212,7 +2282,14 @@ class Registry:
lab_registry = Registry() lab_registry = Registry()
def build_registry(registry_paths=None, devices_dirs=None, upload_registry=False, check_mode=False, complete_registry=False, external_only=False): def build_registry(
registry_paths=None,
devices_dirs=None,
upload_registry=False,
check_mode=False,
complete_registry=False,
external_only=False,
):
""" """
构建或获取Registry单例实例 构建或获取Registry单例实例
""" """
@@ -2226,7 +2303,12 @@ def build_registry(registry_paths=None, devices_dirs=None, upload_registry=False
if path not in current_paths: if path not in current_paths:
lab_registry.registry_paths.append(path) lab_registry.registry_paths.append(path)
lab_registry.setup(devices_dirs=devices_dirs, upload_registry=upload_registry, complete_registry=complete_registry, external_only=external_only) lab_registry.setup(
devices_dirs=devices_dirs,
upload_registry=upload_registry,
complete_registry=complete_registry,
external_only=external_only,
)
# 将 AST 扫描的字符串类型替换为实际 ROS2 消息类(仅查找 ROS2 类型,不 import 设备模块) # 将 AST 扫描的字符串类型替换为实际 ROS2 消息类(仅查找 ROS2 类型,不 import 设备模块)
lab_registry.resolve_all_types() lab_registry.resolve_all_types()

View File

@@ -36,16 +36,40 @@ class ROSMsgNotFound(Exception):
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_SECTION_RE = re.compile(r"^(\w[\w\s]*):\s*$") _SECTION_RE = re.compile(r"^(\w[\w\s]*):\s*$")
_PARAM_HEADER_RE = re.compile(
r"^\s*(?P<name>\w[\w]*)\s*(?:\[(?P<display_name>[^\]]+)\])?(?:\s*\([^)]*\))?\s*$"
)
def _parse_docstring_param_header(param_part: str) -> Tuple[str, Optional[str]]:
"""Parse ``name[display_name]`` or Google-style ``name (type)``."""
match = _PARAM_HEADER_RE.match(param_part.strip())
if not match:
return param_part.strip().split("(")[0].strip(), None
display_name = match.group("display_name")
if display_name is not None:
display_name = display_name.strip() or None
return match.group("name").strip(), display_name
def parse_docstring(docstring: Optional[str]) -> Dict[str, Any]: def parse_docstring(docstring: Optional[str]) -> Dict[str, Any]:
""" """
解析 Google-style docstring提取描述和参数说明。 解析 docstring提取描述和参数说明。
支持:
- Google-style ``Args:`` / ``Parameters:`` 小节
- 直接参数行 ``field: desc``
- 带显示名参数行 ``field[Display Name]: desc``
Returns: Returns:
{"description": "短描述", "params": {"param1": "参数1描述", ...}} {
"description": "短描述",
"params": {"param1": "参数1描述", ...},
"param_display_names": {"param1": "显示名", ...},
}
""" """
result: Dict[str, Any] = {"description": "", "params": {}} result: Dict[str, Any] = {"description": "", "params": {}, "param_display_names": {}}
if not docstring: if not docstring:
return result return result
@@ -53,33 +77,53 @@ def parse_docstring(docstring: Optional[str]) -> Dict[str, Any]:
if not lines: if not lines:
return result return result
result["description"] = lines[0].strip()
in_args = False in_args = False
current_section: Optional[str] = None
current_param: Optional[str] = None current_param: Optional[str] = None
current_display_name: Optional[str] = None
current_desc_parts: list = [] current_desc_parts: list = []
for line in lines[1:]: def flush_current_param() -> None:
nonlocal current_param, current_display_name, current_desc_parts
if current_param is None:
return
result["params"][current_param] = "\n".join(current_desc_parts).strip()
if current_display_name:
result["param_display_names"][current_param] = current_display_name
current_param = None
current_display_name = None
current_desc_parts = []
first_line = lines[0].strip()
start_index = 0
if not _SECTION_RE.match(first_line) and ":" not in first_line:
result["description"] = first_line
start_index = 1
for line in lines[start_index:]:
stripped = line.strip() stripped = line.strip()
if not stripped:
if current_param is not None:
current_desc_parts.append("")
continue
section_match = _SECTION_RE.match(stripped) section_match = _SECTION_RE.match(stripped)
if section_match: if section_match:
if current_param is not None: flush_current_param()
result["params"][current_param] = "\n".join(current_desc_parts).strip() current_section = section_match.group(1).lower()
current_param = None in_args = current_section in ("args", "arguments", "parameters", "params")
current_desc_parts = []
section_name = section_match.group(1).lower()
in_args = section_name in ("args", "arguments", "parameters", "params")
continue continue
if not in_args: parse_as_param = in_args or current_section is None
if not parse_as_param:
continue continue
if ":" in stripped and not stripped.startswith(" "): if ":" in stripped:
if current_param is not None: flush_current_param()
result["params"][current_param] = "\n".join(current_desc_parts).strip()
param_part, _, desc_part = stripped.partition(":") param_part, _, desc_part = stripped.partition(":")
param_name = param_part.strip().split("(")[0].strip() param_name, display_name = _parse_docstring_param_header(param_part)
current_param = param_name current_param = param_name
current_display_name = display_name
current_desc_parts = [desc_part.strip()] current_desc_parts = [desc_part.strip()]
elif current_param is not None: elif current_param is not None:
aline = line aline = line
@@ -89,8 +133,7 @@ def parse_docstring(docstring: Optional[str]) -> Dict[str, Any]:
aline = aline[1:] aline = aline[1:]
current_desc_parts.append(aline.strip()) current_desc_parts.append(aline.strip())
if current_param is not None: flush_current_param()
result["params"][current_param] = "\n".join(current_desc_parts).strip()
return result return result