From 0895252bc1afd7edf3f268119632afbe1f7f37b5 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Sat, 11 Apr 2026 06:22:53 +0800 Subject: [PATCH] change to leap-lab backend. Support feedback interval. Reduce cocurrent lags. --- unilabos/app/main.py | 8 +- unilabos/app/web/client.py | 49 ++--- unilabos/app/ws_client.py | 8 +- unilabos/config/config.py | 2 +- unilabos/registry/ast_registry_scanner.py | 1 + unilabos/registry/decorators.py | 3 + unilabos/registry/registry.py | 5 + unilabos/ros/nodes/base_device_node.py | 210 ++++++++++++++++++---- unilabos/ros/nodes/presets/host_node.py | 44 +++-- unilabos/utils/tools.py | 16 ++ 10 files changed, 267 insertions(+), 79 deletions(-) diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 546e9594..71cc0cdf 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -233,7 +233,7 @@ def parse_args(): parser.add_argument( "--addr", type=str, - default="https://uni-lab.bohrium.com/api/v1", + default="https://leap-lab.bohrium.com/api/v1", help="Laboratory backend address", ) parser.add_argument( @@ -438,10 +438,10 @@ def main(): if args.addr != parser.get_default("addr"): if args.addr == "test": print_status("使用测试环境地址", "info") - HTTPConfig.remote_addr = "https://uni-lab.test.bohrium.com/api/v1" + HTTPConfig.remote_addr = "https://leap-lab.test.bohrium.com/api/v1" elif args.addr == "uat": print_status("使用uat环境地址", "info") - HTTPConfig.remote_addr = "https://uni-lab.uat.bohrium.com/api/v1" + HTTPConfig.remote_addr = "https://leap-lab.uat.bohrium.com/api/v1" elif args.addr == "local": print_status("使用本地环境地址", "info") HTTPConfig.remote_addr = "http://127.0.0.1:48197/api/v1" @@ -553,7 +553,7 @@ def main(): os._exit(0) if not BasicConfig.ak or not BasicConfig.sk: - print_status("后续运行必须拥有一个实验室,请前往 https://uni-lab.bohrium.com 注册实验室!", "warning") + print_status("后续运行必须拥有一个实验室,请前往 https://leap-lab.bohrium.com 注册实验室!", "warning") os._exit(1) graph: nx.Graph resource_tree_set: ResourceTreeSet diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index 1dd056ae..7f0c4866 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -36,6 +36,9 @@ class HTTPClient: auth_secret = BasicConfig.auth_secret() self.auth = auth_secret info(f"正在使用ak sk作为授权信息:[{auth_secret}]") + # 复用 TCP/TLS 连接,避免每次请求重新握手 + self._session = requests.Session() + self._session.headers.update({"Authorization": f"Lab {self.auth}"}) info(f"HTTPClient 初始化完成: remote_addr={self.remote_addr}") def resource_edge_add(self, resources: List[Dict[str, Any]]) -> requests.Response: @@ -48,7 +51,7 @@ class HTTPClient: Returns: Response: API响应对象 """ - response = requests.post( + response = self._session.post( f"{self.remote_addr}/edge/material/edge", json={ "edges": resources, @@ -75,26 +78,28 @@ class HTTPClient: Returns: Dict[str, str]: 旧UUID到新UUID的映射关系 {old_uuid: new_uuid} """ - with open(os.path.join(BasicConfig.working_dir, "req_resource_tree_add.json"), "w", encoding="utf-8") as f: - payload = {"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid} - f.write(json.dumps(payload, indent=4)) - # 从序列化数据中提取所有节点的UUID(保存旧UUID) - old_uuids = {n.res_content.uuid: n for n in resources.all_nodes} + # dump() 只调用一次,复用给文件保存和 HTTP 请求 nodes_info = [x for xs in resources.dump() for x in xs] + old_uuids = {n.res_content.uuid: n for n in resources.all_nodes} + payload = {"nodes": nodes_info, "mount_uuid": mount_uuid} + body_bytes = _fast_dumps(payload) + with open(os.path.join(BasicConfig.working_dir, "req_resource_tree_add.json"), "wb") as f: + f.write(_fast_dumps_pretty(payload)) + http_headers = {"Content-Type": "application/json"} if not self.initialized or first_add: self.initialized = True info(f"首次添加资源,当前远程地址: {self.remote_addr}") - response = requests.post( + response = self._session.post( f"{self.remote_addr}/edge/material", - json={"nodes": nodes_info, "mount_uuid": mount_uuid}, - headers={"Authorization": f"Lab {self.auth}"}, + data=body_bytes, + headers=http_headers, timeout=60, ) else: - response = requests.put( + response = self._session.put( f"{self.remote_addr}/edge/material", - json={"nodes": nodes_info, "mount_uuid": mount_uuid}, - headers={"Authorization": f"Lab {self.auth}"}, + data=body_bytes, + headers=http_headers, timeout=10, ) @@ -133,7 +138,7 @@ class HTTPClient: """ with open(os.path.join(BasicConfig.working_dir, "req_resource_tree_get.json"), "w", encoding="utf-8") as f: f.write(json.dumps({"uuids": uuid_list, "with_children": with_children}, indent=4)) - response = requests.post( + response = self._session.post( f"{self.remote_addr}/edge/material/query", json={"uuids": uuid_list, "with_children": with_children}, headers={"Authorization": f"Lab {self.auth}"}, @@ -164,14 +169,14 @@ class HTTPClient: if not self.initialized: self.initialized = True info(f"首次添加资源,当前远程地址: {self.remote_addr}") - response = requests.post( + response = self._session.post( f"{self.remote_addr}/lab/material", json={"nodes": resources}, headers={"Authorization": f"Lab {self.auth}"}, timeout=100, ) else: - response = requests.put( + response = self._session.put( f"{self.remote_addr}/lab/material", json={"nodes": resources}, headers={"Authorization": f"Lab {self.auth}"}, @@ -198,7 +203,7 @@ class HTTPClient: """ with open(os.path.join(BasicConfig.working_dir, "req_resource_get.json"), "w", encoding="utf-8") as f: f.write(json.dumps({"id": id, "with_children": with_children}, indent=4)) - response = requests.get( + response = self._session.get( f"{self.remote_addr}/lab/material", params={"id": id, "with_children": with_children}, headers={"Authorization": f"Lab {self.auth}"}, @@ -239,14 +244,14 @@ class HTTPClient: if not self.initialized: self.initialized = True info(f"首次添加资源,当前远程地址: {self.remote_addr}") - response = requests.post( + response = self._session.post( f"{self.remote_addr}/lab/material", json={"nodes": resources}, headers={"Authorization": f"Lab {self.auth}"}, timeout=100, ) else: - response = requests.put( + response = self._session.put( f"{self.remote_addr}/lab/material", json={"nodes": resources}, headers={"Authorization": f"Lab {self.auth}"}, @@ -276,7 +281,7 @@ class HTTPClient: with open(file_path, "rb") as file: files = {"files": file} logger.info(f"上传文件: {file_path} 到 {scene}") - response = requests.post( + response = self._session.post( f"{self.remote_addr}/api/account/file_upload/{scene}", files=files, headers={"Authorization": f"Lab {self.auth}"}, @@ -316,7 +321,7 @@ class HTTPClient: "Content-Type": "application/json", "Content-Encoding": "gzip", } - response = requests.post( + response = self._session.post( f"{self.remote_addr}/lab/resource", data=compressed_body, headers=headers, @@ -350,7 +355,7 @@ class HTTPClient: Returns: Response: API响应对象 """ - response = requests.get( + response = self._session.get( f"{self.remote_addr}/edge/material/download", headers={"Authorization": f"Lab {self.auth}"}, timeout=(3, 30), @@ -411,7 +416,7 @@ class HTTPClient: with open(os.path.join(BasicConfig.working_dir, "req_workflow_upload.json"), "w", encoding="utf-8") as f: f.write(json.dumps(payload, indent=4, ensure_ascii=False)) - response = requests.post( + response = self._session.post( f"{self.remote_addr}/lab/workflow/owner/import", json=payload, headers={"Authorization": f"Lab {self.auth}"}, diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 851ae320..4823a232 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -1269,7 +1269,13 @@ class QueueProcessor: if not queued_jobs: return - logger.debug(f"[QueueProcessor] Sending busy status for {len(queued_jobs)} queued jobs") + queue_summary = {} + for j in queued_jobs: + key = f"{j.device_id}/{j.action_name}" + queue_summary[key] = queue_summary.get(key, 0) + 1 + logger.debug( + f"[QueueProcessor] Sending busy status for {len(queued_jobs)} queued jobs: {queue_summary}" + ) for job_info in queued_jobs: # 快照可能已过期:在遍历过程中 end_job() 可能已将此 job 移至 READY, diff --git a/unilabos/config/config.py b/unilabos/config/config.py index b80d3b60..d8d000e2 100644 --- a/unilabos/config/config.py +++ b/unilabos/config/config.py @@ -46,7 +46,7 @@ class WSConfig: # HTTP配置 class HTTPConfig: - remote_addr = "https://uni-lab.bohrium.com/api/v1" + remote_addr = "https://leap-lab.bohrium.com/api/v1" # ROS配置 diff --git a/unilabos/registry/ast_registry_scanner.py b/unilabos/registry/ast_registry_scanner.py index 80aba3e2..62cd2dbe 100644 --- a/unilabos/registry/ast_registry_scanner.py +++ b/unilabos/registry/ast_registry_scanner.py @@ -825,6 +825,7 @@ def _extract_class_body( action_args.setdefault("placeholder_keys", {}) action_args.setdefault("always_free", False) action_args.setdefault("is_protocol", False) + action_args.setdefault("feedback_interval", 1.0) action_args.setdefault("description", "") action_args.setdefault("auto_prefix", False) action_args.setdefault("parent", False) diff --git a/unilabos/registry/decorators.py b/unilabos/registry/decorators.py index 25a2e57f..606a6a8c 100644 --- a/unilabos/registry/decorators.py +++ b/unilabos/registry/decorators.py @@ -343,6 +343,7 @@ def action( auto_prefix: bool = False, parent: bool = False, node_type: Optional["NodeType"] = None, + feedback_interval: Optional[float] = None, ): """ 动作方法装饰器 @@ -399,6 +400,8 @@ def action( "auto_prefix": auto_prefix, "parent": parent, } + if feedback_interval is not None: + meta["feedback_interval"] = feedback_interval if node_type is not None: meta["node_type"] = node_type.value if isinstance(node_type, NodeType) else str(node_type) wrapper._action_registry_meta = meta # type: ignore[attr-defined] diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 15b1b537..8e8145f7 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -238,6 +238,7 @@ class Registry: "class_name": "unilabos_class", }, "always_free": True, + "feedback_interval": 300.0, }, "test_latency": test_latency_action, "auto-test_resource": test_resource_action, @@ -852,6 +853,8 @@ class Registry: } if (action_args or {}).get("always_free") or method_info.get("always_free"): entry["always_free"] = True + _fb_iv = (action_args or {}).get("feedback_interval", method_info.get("feedback_interval", 1.0)) + entry["feedback_interval"] = _fb_iv nt = normalize_enum_value((action_args or {}).get("node_type"), NodeType) if nt: entry["node_type"] = nt @@ -979,6 +982,8 @@ class Registry: } if action_args.get("always_free") or method_info.get("always_free"): action_entry["always_free"] = True + _fb_iv = action_args.get("feedback_interval", method_info.get("feedback_interval", 1.0)) + action_entry["feedback_interval"] = _fb_iv nt = normalize_enum_value(action_args.get("node_type"), NodeType) if nt: action_entry["node_type"] = nt diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index e249bc0f..b21cbf2c 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -4,6 +4,8 @@ import json import threading import time import traceback + +from unilabos.utils.tools import fast_dumps_str as _fast_dumps_str, fast_loads as _fast_loads from typing import ( get_type_hints, TypeVar, @@ -78,6 +80,67 @@ if TYPE_CHECKING: T = TypeVar("T") +class RclpyAsyncMutex: + """rclpy executor 兼容的异步互斥锁 + + 通过 executor.create_task 唤醒等待者,避免 timer 的 InvalidHandle 问题。 + """ + + def __init__(self, name: str = ""): + self._lock = threading.Lock() + self._acquired = False + self._queue: List[Future] = [] + self._name = name + self._holder: Optional[str] = None + + async def acquire(self, node: "BaseROS2DeviceNode", tag: str = ""): + """获取锁。如果已被占用,则异步等待直到锁释放。""" + # t0 = time.time() + with self._lock: + # qlen = len(self._queue) + if not self._acquired: + self._acquired = True + self._holder = tag + # node.lab_logger().debug( + # f"[Mutex:{self._name}] 获取锁 tag={tag} (无等待, queue=0)" + # ) + return + waiter = Future() + self._queue.append(waiter) + # node.lab_logger().info( + # f"[Mutex:{self._name}] 等待锁 tag={tag} " + # f"(holder={self._holder}, queue={qlen + 1})" + # ) + await waiter + # wait_ms = (time.time() - t0) * 1000 + self._holder = tag + # node.lab_logger().info( + # f"[Mutex:{self._name}] 获取锁 tag={tag} (等了 {wait_ms:.0f}ms)" + # ) + + def release(self, node: "BaseROS2DeviceNode"): + """释放锁,通过 executor task 唤醒下一个等待者。""" + with self._lock: + # old_holder = self._holder + if self._queue: + next_waiter = self._queue.pop(0) + # node.lab_logger().debug( + # f"[Mutex:{self._name}] 释放锁 holder={old_holder} → 唤醒下一个 (剩余 queue={len(self._queue)})" + # ) + + async def _wake(): + if not next_waiter.done(): + next_waiter.set_result(None) + + rclpy.get_global_executor().create_task(_wake()) + else: + self._acquired = False + self._holder = None + # node.lab_logger().debug( + # f"[Mutex:{self._name}] 释放锁 holder={old_holder} → 空闲" + # ) + + # 在线设备注册表 registered_devices: Dict[str, "DeviceInfoType"] = {} @@ -355,6 +418,8 @@ class BaseROS2DeviceNode(Node, Generic[T]): max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}" ) + self._append_resource_lock = RclpyAsyncMutex(name=f"AR:{device_id}") + # 创建资源管理客户端 self._resource_clients: Dict[str, Client] = { "resource_add": self.create_client(ResourceAdd, "/resources/add", callback_group=self.callback_group), @@ -378,15 +443,40 @@ class BaseROS2DeviceNode(Node, Generic[T]): return res async def append_resource(req: SerialCommand_Request, res: SerialCommand_Response): + _cmd = _fast_loads(req.command) + _res_name = _cmd.get("resource", [{}]) + _res_name = (_res_name[0].get("id", "?") if isinstance(_res_name, list) and _res_name + else _res_name.get("id", "?") if isinstance(_res_name, dict) else "?") + _ar_tag = f"{_res_name}" + # _t_enter = time.time() + # self.lab_logger().info(f"[AR:{_ar_tag}] 进入 append_resource") + await self._append_resource_lock.acquire(self, tag=_ar_tag) + # _t_locked = time.time() + try: + return await _append_resource_inner(req, res, _ar_tag) + # _t_done = time.time() + # self.lab_logger().info( + # f"[AR:{_ar_tag}] 完成 " + # f"等锁={(_t_locked - _t_enter) * 1000:.0f}ms " + # f"执行={(_t_done - _t_locked) * 1000:.0f}ms " + # f"总计={(_t_done - _t_enter) * 1000:.0f}ms" + # ) + except Exception as _ex: + self.lab_logger().error(f"[AR:{_ar_tag}] 异常: {_ex}") + raise + finally: + self._append_resource_lock.release(self) + + async def _append_resource_inner(req: SerialCommand_Request, res: SerialCommand_Response, _ar_tag: str = ""): from pylabrobot.resources.deck import Deck from pylabrobot.resources import Coordinate from pylabrobot.resources import Plate - # 物料传输到对应的node节点 + # _t0 = time.time() client = self._resource_clients["c2s_update_resource_tree"] request = SerialCommand.Request() request2 = SerialCommand.Request() - command_json = json.loads(req.command) + command_json = _fast_loads(req.command) namespace = command_json["namespace"] bind_parent_id = command_json["bind_parent_id"] edge_device_id = command_json["edge_device_id"] @@ -439,7 +529,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): f"更新物料{container_instance.name}出现不支持的数据类型{type(found_resource)} {found_resource}" ) # noinspection PyUnresolvedReferences - request.command = json.dumps( + # _t1 = time.time() + # self.lab_logger().debug( + # f"[AR:{_ar_tag}] 准备完成 PLR转换+序列化 {((_t1 - _t0) * 1000):.0f}ms, 发送首次上传..." + # ) + request.command = _fast_dumps_str( { "action": "add", "data": { @@ -450,7 +544,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): } ) tree_response: SerialCommand.Response = await client.call_async(request) - uuid_maps = json.loads(tree_response.response) + # _t2 = time.time() + # self.lab_logger().debug( + # f"[AR:{_ar_tag}] 首次上传完成 {((_t2 - _t1) * 1000):.0f}ms" + # ) + uuid_maps = _fast_loads(tree_response.response) plr_instances = rts.to_plr_resources() for plr_instance in plr_instances: self.resource_tracker.loop_update_uuid(plr_instance, uuid_maps) @@ -527,12 +625,13 @@ class BaseROS2DeviceNode(Node, Generic[T]): Coordinate(location["x"], location["y"], location["z"]), **other_calling_param, ) - # 调整了液体以及Deck之后要重新Assign # noinspection PyUnresolvedReferences + # _t3 = time.time() rts_with_parent = ResourceTreeSet.from_plr_resources([parent_resource]) + # _n_parent = len(rts_with_parent.all_nodes) if rts_with_parent.root_nodes[0].res_content.uuid_parent is None: rts_with_parent.root_nodes[0].res_content.parent_uuid = self.uuid - request.command = json.dumps( + request.command = _fast_dumps_str( { "action": "add", "data": { @@ -542,11 +641,18 @@ class BaseROS2DeviceNode(Node, Generic[T]): }, } ) + # _t4 = time.time() + # self.lab_logger().debug( + # f"[AR:{_ar_tag}] 二次上传序列化 {_n_parent}节点 {((_t4 - _t3) * 1000):.0f}ms, 发送中..." + # ) tree_response: SerialCommand.Response = await client.call_async(request) - uuid_maps = json.loads(tree_response.response) + # _t5 = time.time() + uuid_maps = _fast_loads(tree_response.response) self.resource_tracker.loop_update_uuid(input_resources, uuid_maps) - self._lab_logger.info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes") - # 这里created_resources不包含parent_resource + # self._lab_logger.info( + # f"[AR:{_ar_tag}] 二次上传完成 HTTP={(_t5 - _t4) * 1000:.0f}ms " + # f"UUID映射={len(uuid_maps)}节点 总执行={(_t5 - _t0) * 1000:.0f}ms" + # ) # 发送给ResourceMeshManager action_client = ActionClient( self, @@ -1567,37 +1673,69 @@ class BaseROS2DeviceNode(Node, Generic[T]): feedback_msg_types = action_type.Feedback.get_fields_and_field_types() result_msg_types = action_type.Result.get_fields_and_field_types() - while future is not None and not future.done(): - if goal_handle.is_cancel_requested: - self.lab_logger().info(f"取消动作: {action_name}") - future.cancel() # 尝试取消线程池中的任务 - goal_handle.canceled() - return action_type.Result() + # 低频 feedback timer(10s),不阻塞完成检测 + _feedback_timer = None - self._time_spent = time.time() - time_start - self._time_remaining = time_overall - self._time_spent + def _publish_feedback(): + if future is not None and not future.done(): + self._time_spent = time.time() - time_start + self._time_remaining = time_overall - self._time_spent + feedback_values = {} + for msg_name, attr_name in action_value_mapping["feedback"].items(): + if hasattr(self.driver_instance, f"get_{attr_name}"): + method = getattr(self.driver_instance, f"get_{attr_name}") + if not asyncio.iscoroutinefunction(method): + feedback_values[msg_name] = method() + elif hasattr(self.driver_instance, attr_name): + feedback_values[msg_name] = getattr(self.driver_instance, attr_name) + if self._print_publish: + self.lab_logger().info(f"反馈: {feedback_values}") + feedback_msg = convert_to_ros_msg_with_mapping( + ros_msg_type=action_type.Feedback(), + obj=feedback_values, + value_mapping=action_value_mapping["feedback"], + ) + goal_handle.publish_feedback(feedback_msg) - # 发布反馈 - feedback_values = {} - for msg_name, attr_name in action_value_mapping["feedback"].items(): - if hasattr(self.driver_instance, f"get_{attr_name}"): - method = getattr(self.driver_instance, f"get_{attr_name}") - if not asyncio.iscoroutinefunction(method): - feedback_values[msg_name] = method() - elif hasattr(self.driver_instance, attr_name): - feedback_values[msg_name] = getattr(self.driver_instance, attr_name) - - if self._print_publish: - self.lab_logger().info(f"反馈: {feedback_values}") - - feedback_msg = convert_to_ros_msg_with_mapping( - ros_msg_type=action_type.Feedback(), - obj=feedback_values, - value_mapping=action_value_mapping["feedback"], + if action_value_mapping.get("feedback"): + _fb_interval = action_value_mapping.get("feedback_interval", 0.5) + _feedback_timer = self.create_timer( + _fb_interval, _publish_feedback, callback_group=self.callback_group ) - goal_handle.publish_feedback(feedback_msg) - time.sleep(0.5) + # 等待 action 完成 + if future is not None: + if isinstance(future, Task): + # rclpy Task:直接 await,完成瞬间唤醒 + _raw_result = await future + else: + # concurrent.futures.Future(同步 action):用 rclpy 兼容的轮询 + _poll_future = Future() + + def _on_sync_done(fut): + if not _poll_future.done(): + _poll_future.set_result(None) + + future.add_done_callback(_on_sync_done) + await _poll_future + _raw_result = future.result() + + # 确保 execution_error/success 被正确设置(不依赖 done callback 时序) + if isinstance(_raw_result, BaseException): + if not execution_error: + execution_error = traceback.format_exception( + type(_raw_result), _raw_result, _raw_result.__traceback__ + ) + execution_error = "".join(execution_error) + execution_success = False + action_return_value = _raw_result + elif not execution_error: + execution_success = True + action_return_value = _raw_result + + # 清理 feedback timer + if _feedback_timer is not None: + _feedback_timer.cancel() if future is not None and future.cancelled(): self.lab_logger().info(f"动作 {action_name} 已取消") diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index e5e212b1..e436a00d 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -4,6 +4,8 @@ import threading import time import traceback import uuid + +from unilabos.utils.tools import fast_dumps_str as _fast_dumps_str, fast_loads as _fast_loads from dataclasses import dataclass, field from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union @@ -618,22 +620,17 @@ class HostNode(BaseROS2DeviceNode): } ) ] - response: List[str] = await self.create_resource_detailed( resources, device_ids, bind_parent_id, bind_location, other_calling_param ) - try: - assert len(response) == 1, "Create Resource应当只返回一个结果" - for i in response: - res = json.loads(i) - if "suc" in res: - raise ValueError(res.get("error")) - return res - except Exception as ex: - pass - _n = "\n" - raise ValueError(f"创建资源时失败!\n{_n.join(response)}") + assert len(response) == 1, "Create Resource应当只返回一个结果" + for i in response: + res = json.loads(i) + if "suc" in res and not res["suc"]: + raise ValueError(res.get("error", "未知错误")) + return res + raise ValueError(f"创建资源时失败!响应为空") def initialize_device(self, device_id: str, device_config: ResourceDictInstance) -> None: """ @@ -1168,7 +1165,7 @@ class HostNode(BaseROS2DeviceNode): else: physical_setup_graph.nodes[resource_dict["id"]]["data"].update(resource_dict.get("data", {})) - response.response = json.dumps(uuid_mapping) if success else "FAILED" + response.response = _fast_dumps_str(uuid_mapping) if success else "FAILED" self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}") async def _resource_tree_action_get_callback(self, data: dict, response: SerialCommand_Response): # OK @@ -1230,9 +1227,26 @@ class HostNode(BaseROS2DeviceNode): """ try: # 解析请求数据 - data = json.loads(request.command) + data = _fast_loads(request.command) action = data["action"] - self.lab_logger().info(f"[Host Node-Resource] Resource tree {action} request received") + inner = data.get("data", {}) + if action == "add": + mount_uuid = inner.get("mount_uuid", "?")[:8] if isinstance(inner, dict) else "?" + tree_data = inner.get("data", []) if isinstance(inner, dict) else inner + node_count = len(tree_data) if isinstance(tree_data, list) else "?" + source = f"mount={mount_uuid}.. nodes≈{node_count}" + elif action in ("get", "remove"): + uid_list = inner.get("data", inner) if isinstance(inner, dict) else inner + source = f"uuids={len(uid_list) if isinstance(uid_list, list) else '?'}" + elif action == "update": + tree_data = inner.get("data", []) if isinstance(inner, dict) else inner + node_count = len(tree_data) if isinstance(tree_data, list) else "?" + source = f"nodes≈{node_count}" + else: + source = "" + self.lab_logger().info( + f"[Host Node-Resource] Resource tree {action} request received ({source})" + ) data = data["data"] if action == "add": await self._resource_tree_action_add_callback(data, response) diff --git a/unilabos/utils/tools.py b/unilabos/utils/tools.py index 3c7b742e..e6719208 100644 --- a/unilabos/utils/tools.py +++ b/unilabos/utils/tools.py @@ -17,6 +17,14 @@ try: default=json_default, ) + def fast_loads(data) -> dict: + """JSON 反序列化,优先使用 orjson。接受 str / bytes。""" + return orjson.loads(data) + + def fast_dumps_str(obj, **kwargs) -> str: + """JSON 序列化为 str,优先使用 orjson。用于需要 str 而非 bytes 的场景(如 ROS msg)。""" + return orjson.dumps(obj, option=orjson.OPT_NON_STR_KEYS, default=json_default).decode("utf-8") + def normalize_json(info: dict) -> dict: """经 JSON 序列化/反序列化一轮来清理非标准类型。""" return orjson.loads(orjson.dumps(info, default=json_default)) @@ -29,6 +37,14 @@ except ImportError: def fast_dumps_pretty(obj, **kwargs) -> bytes: # type: ignore[misc] return json.dumps(obj, indent=2, ensure_ascii=False, cls=TypeEncoder).encode("utf-8") + def fast_loads(data) -> dict: # type: ignore[misc] + if isinstance(data, bytes): + data = data.decode("utf-8") + return json.loads(data) + + def fast_dumps_str(obj, **kwargs) -> str: # type: ignore[misc] + return json.dumps(obj, ensure_ascii=False, cls=TypeEncoder) + def normalize_json(info: dict) -> dict: # type: ignore[misc] return json.loads(json.dumps(info, ensure_ascii=False, cls=TypeEncoder))