From 38bf95b13c40086e4c5383687152cdd9e7627281 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Tue, 10 Mar 2026 14:12:44 +0800 Subject: [PATCH 1/6] re signal host ready event --- unilabos/app/web/api.py | 4 +- unilabos/app/ws_client.py | 92 +++++++++++++++++---------------- unilabos/config/config.py | 2 +- unilabos/resources/container.py | 4 +- 4 files changed, 53 insertions(+), 49 deletions(-) diff --git a/unilabos/app/web/api.py b/unilabos/app/web/api.py index 0f6077c8..a67d09d2 100644 --- a/unilabos/app/web/api.py +++ b/unilabos/app/web/api.py @@ -1340,5 +1340,5 @@ def setup_api_routes(app): # 启动广播任务 @app.on_event("startup") async def startup_event(): - asyncio.create_task(broadcast_device_status()) - asyncio.create_task(broadcast_status_page_data()) + asyncio.create_task(broadcast_device_status(), name="web-api-startup-device") + asyncio.create_task(broadcast_status_page_data(), name="web-api-startup-status") diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 35b4766a..faaa3075 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -469,6 +469,7 @@ class MessageProcessor: open_timeout=20, ping_interval=WSConfig.ping_interval, ping_timeout=10, + close_timeout=5, additional_headers={ "Authorization": f"Lab {BasicConfig.auth_secret()}", "EdgeSession": f"{self.session_id}", @@ -479,42 +480,45 @@ class MessageProcessor: self.connected = True self.reconnect_count = 0 - logger.trace(f"[MessageProcessor] Connected to {self.websocket_url}") + logger.info(f"[MessageProcessor] 已连接到 {self.websocket_url}") # 启动发送协程 - send_task = asyncio.create_task(self._send_handler()) + send_task = asyncio.create_task(self._send_handler(), name="websocket-send_task") + + # 每次连接(含重连)后重新向服务端注册, + # 否则服务端不知道客户端已上线,不会推送消息。 + if self.websocket_client: + self.websocket_client.publish_host_ready() try: # 接收消息循环 await self._message_handler() finally: + # 必须在 async with __aexit__ 之前停止 send_task, + # 否则 send_task 会在关闭握手期间继续发送数据, + # 干扰 websockets 库的内部清理,导致 task 泄漏。 + self.connected = False send_task.cancel() try: await send_task except asyncio.CancelledError: pass - self.connected = False except websockets.exceptions.ConnectionClosed: - logger.warning("[MessageProcessor] Connection closed") - self.connected = False + logger.warning("[MessageProcessor] 与服务端连接中断") except TimeoutError: logger.warning( - f"[MessageProcessor] Connection timeout (attempt {self.reconnect_count + 1}), " - f"server may be temporarily unavailable" + f"[MessageProcessor] 与服务端连接通信超时 (已尝试 {self.reconnect_count + 1} 次),请检查您的网络状况" ) - self.connected = False except websockets.exceptions.InvalidStatus as e: logger.warning( - f"[MessageProcessor] Server returned unexpected HTTP status {e.response.status_code}, " - f"WebSocket endpoint may not be ready yet" + f"[MessageProcessor] 收到服务端注册码 {e.response.status_code}, 上一进程可能还未退出" ) - self.connected = False except Exception as e: - logger.error(f"[MessageProcessor] Connection error: {str(e)}") logger.error(traceback.format_exc()) - self.connected = False + logger.error(f"[MessageProcessor] 尝试重连时出错 {str(e)}") finally: + self.connected = False self.websocket = None # 重连逻辑 @@ -522,10 +526,9 @@ class MessageProcessor: break if self.reconnect_count < WSConfig.max_reconnect_attempts: self.reconnect_count += 1 - backoff = min(WSConfig.reconnect_interval * (2 ** (self.reconnect_count - 1)), 60) + backoff = WSConfig.reconnect_interval logger.info( - f"[MessageProcessor] Reconnecting in {backoff}s " - f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})" + f"[MessageProcessor] 即将在 {backoff} 秒后重连 (已尝试 {self.reconnect_count}/{WSConfig.max_reconnect_attempts})" ) await asyncio.sleep(backoff) else: @@ -533,40 +536,38 @@ class MessageProcessor: break async def _message_handler(self): - """处理接收到的消息""" + """处理接收到的消息。 + + ConnectionClosed 不在此处捕获,让其向上传播到 _connection_handler, + 以便 async with websockets.connect() 的 __aexit__ 能感知连接已断, + 正确清理内部 task,避免 task 泄漏。 + """ if not self.websocket: logger.error("[MessageProcessor] WebSocket connection is None") return - try: - async for message in self.websocket: - try: - data = json.loads(message) - message_type = data.get("action", "") - message_data = data.get("data") - if self.session_id and self.session_id == data.get("edge_session"): - await self._process_message(message_type, message_data) + async for message in self.websocket: + try: + data = json.loads(message) + message_type = data.get("action", "") + message_data = data.get("data") + if self.session_id and self.session_id == data.get("edge_session"): + await self._process_message(message_type, message_data) + else: + if message_type.endswith("_material"): + logger.trace( + f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}" + ) + logger.debug( + f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}" + ) else: - if message_type.endswith("_material"): - logger.trace( - f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}" - ) - logger.debug( - f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}" - ) - else: - await self._process_message(message_type, message_data) - except json.JSONDecodeError: - logger.error(f"[MessageProcessor] Invalid JSON received: {message}") - except Exception as e: - logger.error(f"[MessageProcessor] Error processing message: {str(e)}") - logger.error(traceback.format_exc()) - - except websockets.exceptions.ConnectionClosed: - logger.info("[MessageProcessor] Message handler stopped - connection closed") - except Exception as e: - logger.error(f"[MessageProcessor] Message handler error: {str(e)}") - logger.error(traceback.format_exc()) + await self._process_message(message_type, message_data) + except json.JSONDecodeError: + logger.error(f"[MessageProcessor] Invalid JSON received: {message}") + except Exception as e: + logger.error(f"[MessageProcessor] Error processing message: {str(e)}") + logger.error(traceback.format_exc()) async def _send_handler(self): """处理发送队列中的消息""" @@ -615,6 +616,7 @@ class MessageProcessor: except asyncio.CancelledError: logger.debug("[MessageProcessor] Send handler cancelled") + raise except Exception as e: logger.error(f"[MessageProcessor] Fatal error in send handler: {str(e)}") logger.error(traceback.format_exc()) diff --git a/unilabos/config/config.py b/unilabos/config/config.py index 4b7d91a4..d66b399d 100644 --- a/unilabos/config/config.py +++ b/unilabos/config/config.py @@ -40,7 +40,7 @@ class BasicConfig: class WSConfig: reconnect_interval = 5 # 重连间隔(秒) max_reconnect_attempts = 999 # 最大重连次数 - ping_interval = 30 # ping间隔(秒) + ping_interval = 20 # ping间隔(秒) # HTTP配置 diff --git a/unilabos/resources/container.py b/unilabos/resources/container.py index ed3871d3..08d40af0 100644 --- a/unilabos/resources/container.py +++ b/unilabos/resources/container.py @@ -12,9 +12,11 @@ class RegularContainer(Container): kwargs["size_y"] = 0 if "size_z" not in kwargs: kwargs["size_z"] = 0 + if "category" not in kwargs: + kwargs["category"] = "container" self.kwargs = kwargs - super().__init__(*args, category="container", **kwargs) + super().__init__(*args, **kwargs) def load_state(self, state: Dict[str, Any]): super().load_state(state) From 95d34562149a2caf629f16d5a952416d9239fa7f Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Tue, 10 Mar 2026 15:27:39 +0800 Subject: [PATCH 2/6] add create_resource schema --- unilabos/registry/registry.py | 16 +++++++++++++--- unilabos/ros/nodes/presets/host_node.py | 8 +++++++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 02d80cca..2a277664 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -97,6 +97,18 @@ class Registry: ) test_resource_schema["description"] = "用于测试物料、设备和样本。" + create_resource_method_info = host_node_enhanced_info.get("action_methods", {}).get("create_resource", {}) + create_resource_schema = self._generate_unilab_json_command_schema( + create_resource_method_info.get("args", []), + "create_resource", + create_resource_method_info.get("return_annotation"), + ) + create_resource_schema["description"] = "用于创建物料" + raw_create_resource_schema = ros_action_to_json_schema( + self.ResourceCreateFromOuterEasy, "用于创建或更新物料资源,每次传入一个物料信息。" + ) + raw_create_resource_schema["properties"]["result"] = create_resource_schema["properties"]["result"] + self.device_type_registry.update( { "host_node": { @@ -140,9 +152,7 @@ class Registry: }, "feedback": {}, "result": {"success": "success"}, - "schema": ros_action_to_json_schema( - self.ResourceCreateFromOuterEasy, "用于创建或更新物料资源,每次传入一个物料信息。" - ), + "schema": raw_create_resource_schema, "goal_default": yaml.safe_load( io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal)) ), diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 4a868523..dd10bfd1 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -68,6 +68,12 @@ class TestResourceReturn(TypedDict): unilabos_samples: List[LabSample] +class CreateResourceReturn(TypedDict): + created_resource_tree: List[List[ResourceDict]] + liquid_input_resource_tree: List[Dict[str, Any]] + unilabos_samples: List[LabSample] + + class TestLatencyReturn(TypedDict): """test_latency方法的返回值类型""" @@ -556,7 +562,7 @@ class HostNode(BaseROS2DeviceNode): liquid_type: list[str] = [], liquid_volume: list[int] = [], slot_on_deck: str = "", - ): + ) -> CreateResourceReturn: # 暂不支持多对同名父子同时存在 res_creation_input = { "id": res_id.split("/")[-1], From 4e82f62327dfc546fe79f21acbe3bc59e10b23b4 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Tue, 10 Mar 2026 15:57:27 +0800 Subject: [PATCH 3/6] fix prcxi check --- unilabos/devices/liquid_handling/prcxi/prcxi.py | 8 ++++---- unilabos/ros/nodes/presets/host_node.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/unilabos/devices/liquid_handling/prcxi/prcxi.py b/unilabos/devices/liquid_handling/prcxi/prcxi.py index f34583fe..47b213ad 100644 --- a/unilabos/devices/liquid_handling/prcxi/prcxi.py +++ b/unilabos/devices/liquid_handling/prcxi/prcxi.py @@ -634,7 +634,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract): def __init__( self, - deck: Deck, + deck: PRCXI9300Deck, host: str, port: int, timeout: float, @@ -648,11 +648,11 @@ class PRCXI9300Handler(LiquidHandlerAbstract): is_9320=False, ): tablets_info = [] - count = 0 - for child in deck.children: + for site_id in range(len(deck.sites)): + child = deck._get_site_resource(site_id) # 如果放其他类型的物料,是不可以的 if hasattr(child, "_unilabos_state") and "Material" in child._unilabos_state: - number = int(child.name.replace("T", "")) + number = site_id + 1 tablets_info.append( WorkTablets( Number=number, Code=f"T{number}", Material=child._unilabos_state["Material"] diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index dd10bfd1..8ab0a624 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -65,13 +65,13 @@ class DeviceActionStatus: class TestResourceReturn(TypedDict): resources: List[List[ResourceDict]] devices: List[Dict[str, Any]] - unilabos_samples: List[LabSample] + # unilabos_samples: List[LabSample] class CreateResourceReturn(TypedDict): created_resource_tree: List[List[ResourceDict]] liquid_input_resource_tree: List[Dict[str, Any]] - unilabos_samples: List[LabSample] + # unilabos_samples: List[LabSample] class TestLatencyReturn(TypedDict): From e5e30a1c7d6735412e184f64806b98a01be8d4ce Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Tue, 10 Mar 2026 16:00:24 +0800 Subject: [PATCH 4/6] ret info fix --- unilabos/registry/registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 2a277664..1310c5b7 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -107,7 +107,7 @@ class Registry: raw_create_resource_schema = ros_action_to_json_schema( self.ResourceCreateFromOuterEasy, "用于创建或更新物料资源,每次传入一个物料信息。" ) - raw_create_resource_schema["properties"]["result"] = create_resource_schema["properties"]["result"] + raw_create_resource_schema["properties"]["result"]["properties"]["return_info"] = create_resource_schema["properties"]["result"] self.device_type_registry.update( { From 3155b2f97e3ba7df9c6a8c7848c21b6954af5bc3 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Tue, 10 Mar 2026 16:04:27 +0800 Subject: [PATCH 5/6] ret info fix revert --- unilabos/registry/registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 1310c5b7..2a277664 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -107,7 +107,7 @@ class Registry: raw_create_resource_schema = ros_action_to_json_schema( self.ResourceCreateFromOuterEasy, "用于创建或更新物料资源,每次传入一个物料信息。" ) - raw_create_resource_schema["properties"]["result"]["properties"]["return_info"] = create_resource_schema["properties"]["result"] + raw_create_resource_schema["properties"]["result"] = create_resource_schema["properties"]["result"] self.device_type_registry.update( { From 6d319d91ffc2629876df5ccd014a076126cf51b3 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Tue, 10 Mar 2026 16:26:37 +0800 Subject: [PATCH 6/6] correct raise create resource error --- unilabos/ros/nodes/base_device_node.py | 2 ++ unilabos/ros/nodes/presets/host_node.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 6ff8cc57..772e667b 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -569,9 +569,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): future.add_done_callback(done_cb) except ImportError: self.lab_logger().error("Host请求添加物料时,本环境并不存在pylabrobot") + res.response = get_result_info_str(traceback.format_exc(), False, {}) except Exception as e: self.lab_logger().error("Host请求添加物料时出错") self.lab_logger().error(traceback.format_exc()) + res.response = get_result_info_str(traceback.format_exc(), False, {}) return res # noinspection PyTypeChecker diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 8ab0a624..aa8b813f 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -615,6 +615,8 @@ class HostNode(BaseROS2DeviceNode): assert len(response) == 1, "Create Resource应当只返回一个结果" for i in response: res = json.loads(i) + if "suc" in res: + raise ValueError(res.get("error")) return res except Exception as ex: pass