mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-04-25 13:49:57 +00:00
change to leap-lab backend. Support feedback interval. Reduce cocurrent lags.
This commit is contained in:
@@ -233,7 +233,7 @@ def parse_args():
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--addr",
|
"--addr",
|
||||||
type=str,
|
type=str,
|
||||||
default="https://uni-lab.bohrium.com/api/v1",
|
default="https://leap-lab.bohrium.com/api/v1",
|
||||||
help="Laboratory backend address",
|
help="Laboratory backend address",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
@@ -438,10 +438,10 @@ def main():
|
|||||||
if args.addr != parser.get_default("addr"):
|
if args.addr != parser.get_default("addr"):
|
||||||
if args.addr == "test":
|
if args.addr == "test":
|
||||||
print_status("使用测试环境地址", "info")
|
print_status("使用测试环境地址", "info")
|
||||||
HTTPConfig.remote_addr = "https://uni-lab.test.bohrium.com/api/v1"
|
HTTPConfig.remote_addr = "https://leap-lab.test.bohrium.com/api/v1"
|
||||||
elif args.addr == "uat":
|
elif args.addr == "uat":
|
||||||
print_status("使用uat环境地址", "info")
|
print_status("使用uat环境地址", "info")
|
||||||
HTTPConfig.remote_addr = "https://uni-lab.uat.bohrium.com/api/v1"
|
HTTPConfig.remote_addr = "https://leap-lab.uat.bohrium.com/api/v1"
|
||||||
elif args.addr == "local":
|
elif args.addr == "local":
|
||||||
print_status("使用本地环境地址", "info")
|
print_status("使用本地环境地址", "info")
|
||||||
HTTPConfig.remote_addr = "http://127.0.0.1:48197/api/v1"
|
HTTPConfig.remote_addr = "http://127.0.0.1:48197/api/v1"
|
||||||
@@ -553,7 +553,7 @@ def main():
|
|||||||
os._exit(0)
|
os._exit(0)
|
||||||
|
|
||||||
if not BasicConfig.ak or not BasicConfig.sk:
|
if not BasicConfig.ak or not BasicConfig.sk:
|
||||||
print_status("后续运行必须拥有一个实验室,请前往 https://uni-lab.bohrium.com 注册实验室!", "warning")
|
print_status("后续运行必须拥有一个实验室,请前往 https://leap-lab.bohrium.com 注册实验室!", "warning")
|
||||||
os._exit(1)
|
os._exit(1)
|
||||||
graph: nx.Graph
|
graph: nx.Graph
|
||||||
resource_tree_set: ResourceTreeSet
|
resource_tree_set: ResourceTreeSet
|
||||||
|
|||||||
@@ -36,6 +36,9 @@ class HTTPClient:
|
|||||||
auth_secret = BasicConfig.auth_secret()
|
auth_secret = BasicConfig.auth_secret()
|
||||||
self.auth = auth_secret
|
self.auth = auth_secret
|
||||||
info(f"正在使用ak sk作为授权信息:[{auth_secret}]")
|
info(f"正在使用ak sk作为授权信息:[{auth_secret}]")
|
||||||
|
# 复用 TCP/TLS 连接,避免每次请求重新握手
|
||||||
|
self._session = requests.Session()
|
||||||
|
self._session.headers.update({"Authorization": f"Lab {self.auth}"})
|
||||||
info(f"HTTPClient 初始化完成: remote_addr={self.remote_addr}")
|
info(f"HTTPClient 初始化完成: remote_addr={self.remote_addr}")
|
||||||
|
|
||||||
def resource_edge_add(self, resources: List[Dict[str, Any]]) -> requests.Response:
|
def resource_edge_add(self, resources: List[Dict[str, Any]]) -> requests.Response:
|
||||||
@@ -48,7 +51,7 @@ class HTTPClient:
|
|||||||
Returns:
|
Returns:
|
||||||
Response: API响应对象
|
Response: API响应对象
|
||||||
"""
|
"""
|
||||||
response = requests.post(
|
response = self._session.post(
|
||||||
f"{self.remote_addr}/edge/material/edge",
|
f"{self.remote_addr}/edge/material/edge",
|
||||||
json={
|
json={
|
||||||
"edges": resources,
|
"edges": resources,
|
||||||
@@ -75,26 +78,28 @@ class HTTPClient:
|
|||||||
Returns:
|
Returns:
|
||||||
Dict[str, str]: 旧UUID到新UUID的映射关系 {old_uuid: new_uuid}
|
Dict[str, str]: 旧UUID到新UUID的映射关系 {old_uuid: new_uuid}
|
||||||
"""
|
"""
|
||||||
with open(os.path.join(BasicConfig.working_dir, "req_resource_tree_add.json"), "w", encoding="utf-8") as f:
|
# dump() 只调用一次,复用给文件保存和 HTTP 请求
|
||||||
payload = {"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid}
|
|
||||||
f.write(json.dumps(payload, indent=4))
|
|
||||||
# 从序列化数据中提取所有节点的UUID(保存旧UUID)
|
|
||||||
old_uuids = {n.res_content.uuid: n for n in resources.all_nodes}
|
|
||||||
nodes_info = [x for xs in resources.dump() for x in xs]
|
nodes_info = [x for xs in resources.dump() for x in xs]
|
||||||
|
old_uuids = {n.res_content.uuid: n for n in resources.all_nodes}
|
||||||
|
payload = {"nodes": nodes_info, "mount_uuid": mount_uuid}
|
||||||
|
body_bytes = _fast_dumps(payload)
|
||||||
|
with open(os.path.join(BasicConfig.working_dir, "req_resource_tree_add.json"), "wb") as f:
|
||||||
|
f.write(_fast_dumps_pretty(payload))
|
||||||
|
http_headers = {"Content-Type": "application/json"}
|
||||||
if not self.initialized or first_add:
|
if not self.initialized or first_add:
|
||||||
self.initialized = True
|
self.initialized = True
|
||||||
info(f"首次添加资源,当前远程地址: {self.remote_addr}")
|
info(f"首次添加资源,当前远程地址: {self.remote_addr}")
|
||||||
response = requests.post(
|
response = self._session.post(
|
||||||
f"{self.remote_addr}/edge/material",
|
f"{self.remote_addr}/edge/material",
|
||||||
json={"nodes": nodes_info, "mount_uuid": mount_uuid},
|
data=body_bytes,
|
||||||
headers={"Authorization": f"Lab {self.auth}"},
|
headers=http_headers,
|
||||||
timeout=60,
|
timeout=60,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
response = requests.put(
|
response = self._session.put(
|
||||||
f"{self.remote_addr}/edge/material",
|
f"{self.remote_addr}/edge/material",
|
||||||
json={"nodes": nodes_info, "mount_uuid": mount_uuid},
|
data=body_bytes,
|
||||||
headers={"Authorization": f"Lab {self.auth}"},
|
headers=http_headers,
|
||||||
timeout=10,
|
timeout=10,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -133,7 +138,7 @@ class HTTPClient:
|
|||||||
"""
|
"""
|
||||||
with open(os.path.join(BasicConfig.working_dir, "req_resource_tree_get.json"), "w", encoding="utf-8") as f:
|
with open(os.path.join(BasicConfig.working_dir, "req_resource_tree_get.json"), "w", encoding="utf-8") as f:
|
||||||
f.write(json.dumps({"uuids": uuid_list, "with_children": with_children}, indent=4))
|
f.write(json.dumps({"uuids": uuid_list, "with_children": with_children}, indent=4))
|
||||||
response = requests.post(
|
response = self._session.post(
|
||||||
f"{self.remote_addr}/edge/material/query",
|
f"{self.remote_addr}/edge/material/query",
|
||||||
json={"uuids": uuid_list, "with_children": with_children},
|
json={"uuids": uuid_list, "with_children": with_children},
|
||||||
headers={"Authorization": f"Lab {self.auth}"},
|
headers={"Authorization": f"Lab {self.auth}"},
|
||||||
@@ -164,14 +169,14 @@ class HTTPClient:
|
|||||||
if not self.initialized:
|
if not self.initialized:
|
||||||
self.initialized = True
|
self.initialized = True
|
||||||
info(f"首次添加资源,当前远程地址: {self.remote_addr}")
|
info(f"首次添加资源,当前远程地址: {self.remote_addr}")
|
||||||
response = requests.post(
|
response = self._session.post(
|
||||||
f"{self.remote_addr}/lab/material",
|
f"{self.remote_addr}/lab/material",
|
||||||
json={"nodes": resources},
|
json={"nodes": resources},
|
||||||
headers={"Authorization": f"Lab {self.auth}"},
|
headers={"Authorization": f"Lab {self.auth}"},
|
||||||
timeout=100,
|
timeout=100,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
response = requests.put(
|
response = self._session.put(
|
||||||
f"{self.remote_addr}/lab/material",
|
f"{self.remote_addr}/lab/material",
|
||||||
json={"nodes": resources},
|
json={"nodes": resources},
|
||||||
headers={"Authorization": f"Lab {self.auth}"},
|
headers={"Authorization": f"Lab {self.auth}"},
|
||||||
@@ -198,7 +203,7 @@ class HTTPClient:
|
|||||||
"""
|
"""
|
||||||
with open(os.path.join(BasicConfig.working_dir, "req_resource_get.json"), "w", encoding="utf-8") as f:
|
with open(os.path.join(BasicConfig.working_dir, "req_resource_get.json"), "w", encoding="utf-8") as f:
|
||||||
f.write(json.dumps({"id": id, "with_children": with_children}, indent=4))
|
f.write(json.dumps({"id": id, "with_children": with_children}, indent=4))
|
||||||
response = requests.get(
|
response = self._session.get(
|
||||||
f"{self.remote_addr}/lab/material",
|
f"{self.remote_addr}/lab/material",
|
||||||
params={"id": id, "with_children": with_children},
|
params={"id": id, "with_children": with_children},
|
||||||
headers={"Authorization": f"Lab {self.auth}"},
|
headers={"Authorization": f"Lab {self.auth}"},
|
||||||
@@ -239,14 +244,14 @@ class HTTPClient:
|
|||||||
if not self.initialized:
|
if not self.initialized:
|
||||||
self.initialized = True
|
self.initialized = True
|
||||||
info(f"首次添加资源,当前远程地址: {self.remote_addr}")
|
info(f"首次添加资源,当前远程地址: {self.remote_addr}")
|
||||||
response = requests.post(
|
response = self._session.post(
|
||||||
f"{self.remote_addr}/lab/material",
|
f"{self.remote_addr}/lab/material",
|
||||||
json={"nodes": resources},
|
json={"nodes": resources},
|
||||||
headers={"Authorization": f"Lab {self.auth}"},
|
headers={"Authorization": f"Lab {self.auth}"},
|
||||||
timeout=100,
|
timeout=100,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
response = requests.put(
|
response = self._session.put(
|
||||||
f"{self.remote_addr}/lab/material",
|
f"{self.remote_addr}/lab/material",
|
||||||
json={"nodes": resources},
|
json={"nodes": resources},
|
||||||
headers={"Authorization": f"Lab {self.auth}"},
|
headers={"Authorization": f"Lab {self.auth}"},
|
||||||
@@ -276,7 +281,7 @@ class HTTPClient:
|
|||||||
with open(file_path, "rb") as file:
|
with open(file_path, "rb") as file:
|
||||||
files = {"files": file}
|
files = {"files": file}
|
||||||
logger.info(f"上传文件: {file_path} 到 {scene}")
|
logger.info(f"上传文件: {file_path} 到 {scene}")
|
||||||
response = requests.post(
|
response = self._session.post(
|
||||||
f"{self.remote_addr}/api/account/file_upload/{scene}",
|
f"{self.remote_addr}/api/account/file_upload/{scene}",
|
||||||
files=files,
|
files=files,
|
||||||
headers={"Authorization": f"Lab {self.auth}"},
|
headers={"Authorization": f"Lab {self.auth}"},
|
||||||
@@ -316,7 +321,7 @@ class HTTPClient:
|
|||||||
"Content-Type": "application/json",
|
"Content-Type": "application/json",
|
||||||
"Content-Encoding": "gzip",
|
"Content-Encoding": "gzip",
|
||||||
}
|
}
|
||||||
response = requests.post(
|
response = self._session.post(
|
||||||
f"{self.remote_addr}/lab/resource",
|
f"{self.remote_addr}/lab/resource",
|
||||||
data=compressed_body,
|
data=compressed_body,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
@@ -350,7 +355,7 @@ class HTTPClient:
|
|||||||
Returns:
|
Returns:
|
||||||
Response: API响应对象
|
Response: API响应对象
|
||||||
"""
|
"""
|
||||||
response = requests.get(
|
response = self._session.get(
|
||||||
f"{self.remote_addr}/edge/material/download",
|
f"{self.remote_addr}/edge/material/download",
|
||||||
headers={"Authorization": f"Lab {self.auth}"},
|
headers={"Authorization": f"Lab {self.auth}"},
|
||||||
timeout=(3, 30),
|
timeout=(3, 30),
|
||||||
@@ -411,7 +416,7 @@ class HTTPClient:
|
|||||||
with open(os.path.join(BasicConfig.working_dir, "req_workflow_upload.json"), "w", encoding="utf-8") as f:
|
with open(os.path.join(BasicConfig.working_dir, "req_workflow_upload.json"), "w", encoding="utf-8") as f:
|
||||||
f.write(json.dumps(payload, indent=4, ensure_ascii=False))
|
f.write(json.dumps(payload, indent=4, ensure_ascii=False))
|
||||||
|
|
||||||
response = requests.post(
|
response = self._session.post(
|
||||||
f"{self.remote_addr}/lab/workflow/owner/import",
|
f"{self.remote_addr}/lab/workflow/owner/import",
|
||||||
json=payload,
|
json=payload,
|
||||||
headers={"Authorization": f"Lab {self.auth}"},
|
headers={"Authorization": f"Lab {self.auth}"},
|
||||||
|
|||||||
@@ -1269,7 +1269,13 @@ class QueueProcessor:
|
|||||||
if not queued_jobs:
|
if not queued_jobs:
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug(f"[QueueProcessor] Sending busy status for {len(queued_jobs)} queued jobs")
|
queue_summary = {}
|
||||||
|
for j in queued_jobs:
|
||||||
|
key = f"{j.device_id}/{j.action_name}"
|
||||||
|
queue_summary[key] = queue_summary.get(key, 0) + 1
|
||||||
|
logger.debug(
|
||||||
|
f"[QueueProcessor] Sending busy status for {len(queued_jobs)} queued jobs: {queue_summary}"
|
||||||
|
)
|
||||||
|
|
||||||
for job_info in queued_jobs:
|
for job_info in queued_jobs:
|
||||||
# 快照可能已过期:在遍历过程中 end_job() 可能已将此 job 移至 READY,
|
# 快照可能已过期:在遍历过程中 end_job() 可能已将此 job 移至 READY,
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ class WSConfig:
|
|||||||
|
|
||||||
# HTTP配置
|
# HTTP配置
|
||||||
class HTTPConfig:
|
class HTTPConfig:
|
||||||
remote_addr = "https://uni-lab.bohrium.com/api/v1"
|
remote_addr = "https://leap-lab.bohrium.com/api/v1"
|
||||||
|
|
||||||
|
|
||||||
# ROS配置
|
# ROS配置
|
||||||
|
|||||||
@@ -825,6 +825,7 @@ def _extract_class_body(
|
|||||||
action_args.setdefault("placeholder_keys", {})
|
action_args.setdefault("placeholder_keys", {})
|
||||||
action_args.setdefault("always_free", False)
|
action_args.setdefault("always_free", False)
|
||||||
action_args.setdefault("is_protocol", False)
|
action_args.setdefault("is_protocol", False)
|
||||||
|
action_args.setdefault("feedback_interval", 1.0)
|
||||||
action_args.setdefault("description", "")
|
action_args.setdefault("description", "")
|
||||||
action_args.setdefault("auto_prefix", False)
|
action_args.setdefault("auto_prefix", False)
|
||||||
action_args.setdefault("parent", False)
|
action_args.setdefault("parent", False)
|
||||||
|
|||||||
@@ -343,6 +343,7 @@ def action(
|
|||||||
auto_prefix: bool = False,
|
auto_prefix: bool = False,
|
||||||
parent: bool = False,
|
parent: bool = False,
|
||||||
node_type: Optional["NodeType"] = None,
|
node_type: Optional["NodeType"] = None,
|
||||||
|
feedback_interval: Optional[float] = None,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
动作方法装饰器
|
动作方法装饰器
|
||||||
@@ -399,6 +400,8 @@ def action(
|
|||||||
"auto_prefix": auto_prefix,
|
"auto_prefix": auto_prefix,
|
||||||
"parent": parent,
|
"parent": parent,
|
||||||
}
|
}
|
||||||
|
if feedback_interval is not None:
|
||||||
|
meta["feedback_interval"] = feedback_interval
|
||||||
if node_type is not None:
|
if node_type is not None:
|
||||||
meta["node_type"] = node_type.value if isinstance(node_type, NodeType) else str(node_type)
|
meta["node_type"] = node_type.value if isinstance(node_type, NodeType) else str(node_type)
|
||||||
wrapper._action_registry_meta = meta # type: ignore[attr-defined]
|
wrapper._action_registry_meta = meta # type: ignore[attr-defined]
|
||||||
|
|||||||
@@ -238,6 +238,7 @@ class Registry:
|
|||||||
"class_name": "unilabos_class",
|
"class_name": "unilabos_class",
|
||||||
},
|
},
|
||||||
"always_free": True,
|
"always_free": True,
|
||||||
|
"feedback_interval": 300.0,
|
||||||
},
|
},
|
||||||
"test_latency": test_latency_action,
|
"test_latency": test_latency_action,
|
||||||
"auto-test_resource": test_resource_action,
|
"auto-test_resource": test_resource_action,
|
||||||
@@ -852,6 +853,8 @@ class Registry:
|
|||||||
}
|
}
|
||||||
if (action_args or {}).get("always_free") or method_info.get("always_free"):
|
if (action_args or {}).get("always_free") or method_info.get("always_free"):
|
||||||
entry["always_free"] = True
|
entry["always_free"] = True
|
||||||
|
_fb_iv = (action_args or {}).get("feedback_interval", method_info.get("feedback_interval", 1.0))
|
||||||
|
entry["feedback_interval"] = _fb_iv
|
||||||
nt = normalize_enum_value((action_args or {}).get("node_type"), NodeType)
|
nt = normalize_enum_value((action_args or {}).get("node_type"), NodeType)
|
||||||
if nt:
|
if nt:
|
||||||
entry["node_type"] = nt
|
entry["node_type"] = nt
|
||||||
@@ -979,6 +982,8 @@ class Registry:
|
|||||||
}
|
}
|
||||||
if action_args.get("always_free") or method_info.get("always_free"):
|
if action_args.get("always_free") or method_info.get("always_free"):
|
||||||
action_entry["always_free"] = True
|
action_entry["always_free"] = True
|
||||||
|
_fb_iv = action_args.get("feedback_interval", method_info.get("feedback_interval", 1.0))
|
||||||
|
action_entry["feedback_interval"] = _fb_iv
|
||||||
nt = normalize_enum_value(action_args.get("node_type"), NodeType)
|
nt = normalize_enum_value(action_args.get("node_type"), NodeType)
|
||||||
if nt:
|
if nt:
|
||||||
action_entry["node_type"] = nt
|
action_entry["node_type"] = nt
|
||||||
|
|||||||
@@ -4,6 +4,8 @@ import json
|
|||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
from unilabos.utils.tools import fast_dumps_str as _fast_dumps_str, fast_loads as _fast_loads
|
||||||
from typing import (
|
from typing import (
|
||||||
get_type_hints,
|
get_type_hints,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
@@ -78,6 +80,67 @@ if TYPE_CHECKING:
|
|||||||
T = TypeVar("T")
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
class RclpyAsyncMutex:
|
||||||
|
"""rclpy executor 兼容的异步互斥锁
|
||||||
|
|
||||||
|
通过 executor.create_task 唤醒等待者,避免 timer 的 InvalidHandle 问题。
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, name: str = ""):
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._acquired = False
|
||||||
|
self._queue: List[Future] = []
|
||||||
|
self._name = name
|
||||||
|
self._holder: Optional[str] = None
|
||||||
|
|
||||||
|
async def acquire(self, node: "BaseROS2DeviceNode", tag: str = ""):
|
||||||
|
"""获取锁。如果已被占用,则异步等待直到锁释放。"""
|
||||||
|
# t0 = time.time()
|
||||||
|
with self._lock:
|
||||||
|
# qlen = len(self._queue)
|
||||||
|
if not self._acquired:
|
||||||
|
self._acquired = True
|
||||||
|
self._holder = tag
|
||||||
|
# node.lab_logger().debug(
|
||||||
|
# f"[Mutex:{self._name}] 获取锁 tag={tag} (无等待, queue=0)"
|
||||||
|
# )
|
||||||
|
return
|
||||||
|
waiter = Future()
|
||||||
|
self._queue.append(waiter)
|
||||||
|
# node.lab_logger().info(
|
||||||
|
# f"[Mutex:{self._name}] 等待锁 tag={tag} "
|
||||||
|
# f"(holder={self._holder}, queue={qlen + 1})"
|
||||||
|
# )
|
||||||
|
await waiter
|
||||||
|
# wait_ms = (time.time() - t0) * 1000
|
||||||
|
self._holder = tag
|
||||||
|
# node.lab_logger().info(
|
||||||
|
# f"[Mutex:{self._name}] 获取锁 tag={tag} (等了 {wait_ms:.0f}ms)"
|
||||||
|
# )
|
||||||
|
|
||||||
|
def release(self, node: "BaseROS2DeviceNode"):
|
||||||
|
"""释放锁,通过 executor task 唤醒下一个等待者。"""
|
||||||
|
with self._lock:
|
||||||
|
# old_holder = self._holder
|
||||||
|
if self._queue:
|
||||||
|
next_waiter = self._queue.pop(0)
|
||||||
|
# node.lab_logger().debug(
|
||||||
|
# f"[Mutex:{self._name}] 释放锁 holder={old_holder} → 唤醒下一个 (剩余 queue={len(self._queue)})"
|
||||||
|
# )
|
||||||
|
|
||||||
|
async def _wake():
|
||||||
|
if not next_waiter.done():
|
||||||
|
next_waiter.set_result(None)
|
||||||
|
|
||||||
|
rclpy.get_global_executor().create_task(_wake())
|
||||||
|
else:
|
||||||
|
self._acquired = False
|
||||||
|
self._holder = None
|
||||||
|
# node.lab_logger().debug(
|
||||||
|
# f"[Mutex:{self._name}] 释放锁 holder={old_holder} → 空闲"
|
||||||
|
# )
|
||||||
|
|
||||||
|
|
||||||
# 在线设备注册表
|
# 在线设备注册表
|
||||||
registered_devices: Dict[str, "DeviceInfoType"] = {}
|
registered_devices: Dict[str, "DeviceInfoType"] = {}
|
||||||
|
|
||||||
@@ -355,6 +418,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}"
|
max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._append_resource_lock = RclpyAsyncMutex(name=f"AR:{device_id}")
|
||||||
|
|
||||||
# 创建资源管理客户端
|
# 创建资源管理客户端
|
||||||
self._resource_clients: Dict[str, Client] = {
|
self._resource_clients: Dict[str, Client] = {
|
||||||
"resource_add": self.create_client(ResourceAdd, "/resources/add", callback_group=self.callback_group),
|
"resource_add": self.create_client(ResourceAdd, "/resources/add", callback_group=self.callback_group),
|
||||||
@@ -378,15 +443,40 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
return res
|
return res
|
||||||
|
|
||||||
async def append_resource(req: SerialCommand_Request, res: SerialCommand_Response):
|
async def append_resource(req: SerialCommand_Request, res: SerialCommand_Response):
|
||||||
|
_cmd = _fast_loads(req.command)
|
||||||
|
_res_name = _cmd.get("resource", [{}])
|
||||||
|
_res_name = (_res_name[0].get("id", "?") if isinstance(_res_name, list) and _res_name
|
||||||
|
else _res_name.get("id", "?") if isinstance(_res_name, dict) else "?")
|
||||||
|
_ar_tag = f"{_res_name}"
|
||||||
|
# _t_enter = time.time()
|
||||||
|
# self.lab_logger().info(f"[AR:{_ar_tag}] 进入 append_resource")
|
||||||
|
await self._append_resource_lock.acquire(self, tag=_ar_tag)
|
||||||
|
# _t_locked = time.time()
|
||||||
|
try:
|
||||||
|
return await _append_resource_inner(req, res, _ar_tag)
|
||||||
|
# _t_done = time.time()
|
||||||
|
# self.lab_logger().info(
|
||||||
|
# f"[AR:{_ar_tag}] 完成 "
|
||||||
|
# f"等锁={(_t_locked - _t_enter) * 1000:.0f}ms "
|
||||||
|
# f"执行={(_t_done - _t_locked) * 1000:.0f}ms "
|
||||||
|
# f"总计={(_t_done - _t_enter) * 1000:.0f}ms"
|
||||||
|
# )
|
||||||
|
except Exception as _ex:
|
||||||
|
self.lab_logger().error(f"[AR:{_ar_tag}] 异常: {_ex}")
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
self._append_resource_lock.release(self)
|
||||||
|
|
||||||
|
async def _append_resource_inner(req: SerialCommand_Request, res: SerialCommand_Response, _ar_tag: str = ""):
|
||||||
from pylabrobot.resources.deck import Deck
|
from pylabrobot.resources.deck import Deck
|
||||||
from pylabrobot.resources import Coordinate
|
from pylabrobot.resources import Coordinate
|
||||||
from pylabrobot.resources import Plate
|
from pylabrobot.resources import Plate
|
||||||
|
|
||||||
# 物料传输到对应的node节点
|
# _t0 = time.time()
|
||||||
client = self._resource_clients["c2s_update_resource_tree"]
|
client = self._resource_clients["c2s_update_resource_tree"]
|
||||||
request = SerialCommand.Request()
|
request = SerialCommand.Request()
|
||||||
request2 = SerialCommand.Request()
|
request2 = SerialCommand.Request()
|
||||||
command_json = json.loads(req.command)
|
command_json = _fast_loads(req.command)
|
||||||
namespace = command_json["namespace"]
|
namespace = command_json["namespace"]
|
||||||
bind_parent_id = command_json["bind_parent_id"]
|
bind_parent_id = command_json["bind_parent_id"]
|
||||||
edge_device_id = command_json["edge_device_id"]
|
edge_device_id = command_json["edge_device_id"]
|
||||||
@@ -439,7 +529,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
f"更新物料{container_instance.name}出现不支持的数据类型{type(found_resource)} {found_resource}"
|
f"更新物料{container_instance.name}出现不支持的数据类型{type(found_resource)} {found_resource}"
|
||||||
)
|
)
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
request.command = json.dumps(
|
# _t1 = time.time()
|
||||||
|
# self.lab_logger().debug(
|
||||||
|
# f"[AR:{_ar_tag}] 准备完成 PLR转换+序列化 {((_t1 - _t0) * 1000):.0f}ms, 发送首次上传..."
|
||||||
|
# )
|
||||||
|
request.command = _fast_dumps_str(
|
||||||
{
|
{
|
||||||
"action": "add",
|
"action": "add",
|
||||||
"data": {
|
"data": {
|
||||||
@@ -450,7 +544,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
tree_response: SerialCommand.Response = await client.call_async(request)
|
tree_response: SerialCommand.Response = await client.call_async(request)
|
||||||
uuid_maps = json.loads(tree_response.response)
|
# _t2 = time.time()
|
||||||
|
# self.lab_logger().debug(
|
||||||
|
# f"[AR:{_ar_tag}] 首次上传完成 {((_t2 - _t1) * 1000):.0f}ms"
|
||||||
|
# )
|
||||||
|
uuid_maps = _fast_loads(tree_response.response)
|
||||||
plr_instances = rts.to_plr_resources()
|
plr_instances = rts.to_plr_resources()
|
||||||
for plr_instance in plr_instances:
|
for plr_instance in plr_instances:
|
||||||
self.resource_tracker.loop_update_uuid(plr_instance, uuid_maps)
|
self.resource_tracker.loop_update_uuid(plr_instance, uuid_maps)
|
||||||
@@ -527,12 +625,13 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
Coordinate(location["x"], location["y"], location["z"]),
|
Coordinate(location["x"], location["y"], location["z"]),
|
||||||
**other_calling_param,
|
**other_calling_param,
|
||||||
)
|
)
|
||||||
# 调整了液体以及Deck之后要重新Assign
|
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
|
# _t3 = time.time()
|
||||||
rts_with_parent = ResourceTreeSet.from_plr_resources([parent_resource])
|
rts_with_parent = ResourceTreeSet.from_plr_resources([parent_resource])
|
||||||
|
# _n_parent = len(rts_with_parent.all_nodes)
|
||||||
if rts_with_parent.root_nodes[0].res_content.uuid_parent is None:
|
if rts_with_parent.root_nodes[0].res_content.uuid_parent is None:
|
||||||
rts_with_parent.root_nodes[0].res_content.parent_uuid = self.uuid
|
rts_with_parent.root_nodes[0].res_content.parent_uuid = self.uuid
|
||||||
request.command = json.dumps(
|
request.command = _fast_dumps_str(
|
||||||
{
|
{
|
||||||
"action": "add",
|
"action": "add",
|
||||||
"data": {
|
"data": {
|
||||||
@@ -542,11 +641,18 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
# _t4 = time.time()
|
||||||
|
# self.lab_logger().debug(
|
||||||
|
# f"[AR:{_ar_tag}] 二次上传序列化 {_n_parent}节点 {((_t4 - _t3) * 1000):.0f}ms, 发送中..."
|
||||||
|
# )
|
||||||
tree_response: SerialCommand.Response = await client.call_async(request)
|
tree_response: SerialCommand.Response = await client.call_async(request)
|
||||||
uuid_maps = json.loads(tree_response.response)
|
# _t5 = time.time()
|
||||||
|
uuid_maps = _fast_loads(tree_response.response)
|
||||||
self.resource_tracker.loop_update_uuid(input_resources, uuid_maps)
|
self.resource_tracker.loop_update_uuid(input_resources, uuid_maps)
|
||||||
self._lab_logger.info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes")
|
# self._lab_logger.info(
|
||||||
# 这里created_resources不包含parent_resource
|
# f"[AR:{_ar_tag}] 二次上传完成 HTTP={(_t5 - _t4) * 1000:.0f}ms "
|
||||||
|
# f"UUID映射={len(uuid_maps)}节点 总执行={(_t5 - _t0) * 1000:.0f}ms"
|
||||||
|
# )
|
||||||
# 发送给ResourceMeshManager
|
# 发送给ResourceMeshManager
|
||||||
action_client = ActionClient(
|
action_client = ActionClient(
|
||||||
self,
|
self,
|
||||||
@@ -1567,37 +1673,69 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
feedback_msg_types = action_type.Feedback.get_fields_and_field_types()
|
feedback_msg_types = action_type.Feedback.get_fields_and_field_types()
|
||||||
result_msg_types = action_type.Result.get_fields_and_field_types()
|
result_msg_types = action_type.Result.get_fields_and_field_types()
|
||||||
|
|
||||||
while future is not None and not future.done():
|
# 低频 feedback timer(10s),不阻塞完成检测
|
||||||
if goal_handle.is_cancel_requested:
|
_feedback_timer = None
|
||||||
self.lab_logger().info(f"取消动作: {action_name}")
|
|
||||||
future.cancel() # 尝试取消线程池中的任务
|
|
||||||
goal_handle.canceled()
|
|
||||||
return action_type.Result()
|
|
||||||
|
|
||||||
self._time_spent = time.time() - time_start
|
def _publish_feedback():
|
||||||
self._time_remaining = time_overall - self._time_spent
|
if future is not None and not future.done():
|
||||||
|
self._time_spent = time.time() - time_start
|
||||||
|
self._time_remaining = time_overall - self._time_spent
|
||||||
|
feedback_values = {}
|
||||||
|
for msg_name, attr_name in action_value_mapping["feedback"].items():
|
||||||
|
if hasattr(self.driver_instance, f"get_{attr_name}"):
|
||||||
|
method = getattr(self.driver_instance, f"get_{attr_name}")
|
||||||
|
if not asyncio.iscoroutinefunction(method):
|
||||||
|
feedback_values[msg_name] = method()
|
||||||
|
elif hasattr(self.driver_instance, attr_name):
|
||||||
|
feedback_values[msg_name] = getattr(self.driver_instance, attr_name)
|
||||||
|
if self._print_publish:
|
||||||
|
self.lab_logger().info(f"反馈: {feedback_values}")
|
||||||
|
feedback_msg = convert_to_ros_msg_with_mapping(
|
||||||
|
ros_msg_type=action_type.Feedback(),
|
||||||
|
obj=feedback_values,
|
||||||
|
value_mapping=action_value_mapping["feedback"],
|
||||||
|
)
|
||||||
|
goal_handle.publish_feedback(feedback_msg)
|
||||||
|
|
||||||
# 发布反馈
|
if action_value_mapping.get("feedback"):
|
||||||
feedback_values = {}
|
_fb_interval = action_value_mapping.get("feedback_interval", 0.5)
|
||||||
for msg_name, attr_name in action_value_mapping["feedback"].items():
|
_feedback_timer = self.create_timer(
|
||||||
if hasattr(self.driver_instance, f"get_{attr_name}"):
|
_fb_interval, _publish_feedback, callback_group=self.callback_group
|
||||||
method = getattr(self.driver_instance, f"get_{attr_name}")
|
|
||||||
if not asyncio.iscoroutinefunction(method):
|
|
||||||
feedback_values[msg_name] = method()
|
|
||||||
elif hasattr(self.driver_instance, attr_name):
|
|
||||||
feedback_values[msg_name] = getattr(self.driver_instance, attr_name)
|
|
||||||
|
|
||||||
if self._print_publish:
|
|
||||||
self.lab_logger().info(f"反馈: {feedback_values}")
|
|
||||||
|
|
||||||
feedback_msg = convert_to_ros_msg_with_mapping(
|
|
||||||
ros_msg_type=action_type.Feedback(),
|
|
||||||
obj=feedback_values,
|
|
||||||
value_mapping=action_value_mapping["feedback"],
|
|
||||||
)
|
)
|
||||||
|
|
||||||
goal_handle.publish_feedback(feedback_msg)
|
# 等待 action 完成
|
||||||
time.sleep(0.5)
|
if future is not None:
|
||||||
|
if isinstance(future, Task):
|
||||||
|
# rclpy Task:直接 await,完成瞬间唤醒
|
||||||
|
_raw_result = await future
|
||||||
|
else:
|
||||||
|
# concurrent.futures.Future(同步 action):用 rclpy 兼容的轮询
|
||||||
|
_poll_future = Future()
|
||||||
|
|
||||||
|
def _on_sync_done(fut):
|
||||||
|
if not _poll_future.done():
|
||||||
|
_poll_future.set_result(None)
|
||||||
|
|
||||||
|
future.add_done_callback(_on_sync_done)
|
||||||
|
await _poll_future
|
||||||
|
_raw_result = future.result()
|
||||||
|
|
||||||
|
# 确保 execution_error/success 被正确设置(不依赖 done callback 时序)
|
||||||
|
if isinstance(_raw_result, BaseException):
|
||||||
|
if not execution_error:
|
||||||
|
execution_error = traceback.format_exception(
|
||||||
|
type(_raw_result), _raw_result, _raw_result.__traceback__
|
||||||
|
)
|
||||||
|
execution_error = "".join(execution_error)
|
||||||
|
execution_success = False
|
||||||
|
action_return_value = _raw_result
|
||||||
|
elif not execution_error:
|
||||||
|
execution_success = True
|
||||||
|
action_return_value = _raw_result
|
||||||
|
|
||||||
|
# 清理 feedback timer
|
||||||
|
if _feedback_timer is not None:
|
||||||
|
_feedback_timer.cancel()
|
||||||
|
|
||||||
if future is not None and future.cancelled():
|
if future is not None and future.cancelled():
|
||||||
self.lab_logger().info(f"动作 {action_name} 已取消")
|
self.lab_logger().info(f"动作 {action_name} 已取消")
|
||||||
|
|||||||
@@ -4,6 +4,8 @@ import threading
|
|||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
|
from unilabos.utils.tools import fast_dumps_str as _fast_dumps_str, fast_loads as _fast_loads
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union
|
from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union
|
||||||
|
|
||||||
@@ -618,22 +620,17 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
|
|
||||||
response: List[str] = await self.create_resource_detailed(
|
response: List[str] = await self.create_resource_detailed(
|
||||||
resources, device_ids, bind_parent_id, bind_location, other_calling_param
|
resources, device_ids, bind_parent_id, bind_location, other_calling_param
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
assert len(response) == 1, "Create Resource应当只返回一个结果"
|
||||||
assert len(response) == 1, "Create Resource应当只返回一个结果"
|
for i in response:
|
||||||
for i in response:
|
res = json.loads(i)
|
||||||
res = json.loads(i)
|
if "suc" in res and not res["suc"]:
|
||||||
if "suc" in res:
|
raise ValueError(res.get("error", "未知错误"))
|
||||||
raise ValueError(res.get("error"))
|
return res
|
||||||
return res
|
raise ValueError(f"创建资源时失败!响应为空")
|
||||||
except Exception as ex:
|
|
||||||
pass
|
|
||||||
_n = "\n"
|
|
||||||
raise ValueError(f"创建资源时失败!\n{_n.join(response)}")
|
|
||||||
|
|
||||||
def initialize_device(self, device_id: str, device_config: ResourceDictInstance) -> None:
|
def initialize_device(self, device_id: str, device_config: ResourceDictInstance) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -1168,7 +1165,7 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
else:
|
else:
|
||||||
physical_setup_graph.nodes[resource_dict["id"]]["data"].update(resource_dict.get("data", {}))
|
physical_setup_graph.nodes[resource_dict["id"]]["data"].update(resource_dict.get("data", {}))
|
||||||
|
|
||||||
response.response = json.dumps(uuid_mapping) if success else "FAILED"
|
response.response = _fast_dumps_str(uuid_mapping) if success else "FAILED"
|
||||||
self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}")
|
self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}")
|
||||||
|
|
||||||
async def _resource_tree_action_get_callback(self, data: dict, response: SerialCommand_Response): # OK
|
async def _resource_tree_action_get_callback(self, data: dict, response: SerialCommand_Response): # OK
|
||||||
@@ -1230,9 +1227,26 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# 解析请求数据
|
# 解析请求数据
|
||||||
data = json.loads(request.command)
|
data = _fast_loads(request.command)
|
||||||
action = data["action"]
|
action = data["action"]
|
||||||
self.lab_logger().info(f"[Host Node-Resource] Resource tree {action} request received")
|
inner = data.get("data", {})
|
||||||
|
if action == "add":
|
||||||
|
mount_uuid = inner.get("mount_uuid", "?")[:8] if isinstance(inner, dict) else "?"
|
||||||
|
tree_data = inner.get("data", []) if isinstance(inner, dict) else inner
|
||||||
|
node_count = len(tree_data) if isinstance(tree_data, list) else "?"
|
||||||
|
source = f"mount={mount_uuid}.. nodes≈{node_count}"
|
||||||
|
elif action in ("get", "remove"):
|
||||||
|
uid_list = inner.get("data", inner) if isinstance(inner, dict) else inner
|
||||||
|
source = f"uuids={len(uid_list) if isinstance(uid_list, list) else '?'}"
|
||||||
|
elif action == "update":
|
||||||
|
tree_data = inner.get("data", []) if isinstance(inner, dict) else inner
|
||||||
|
node_count = len(tree_data) if isinstance(tree_data, list) else "?"
|
||||||
|
source = f"nodes≈{node_count}"
|
||||||
|
else:
|
||||||
|
source = ""
|
||||||
|
self.lab_logger().info(
|
||||||
|
f"[Host Node-Resource] Resource tree {action} request received ({source})"
|
||||||
|
)
|
||||||
data = data["data"]
|
data = data["data"]
|
||||||
if action == "add":
|
if action == "add":
|
||||||
await self._resource_tree_action_add_callback(data, response)
|
await self._resource_tree_action_add_callback(data, response)
|
||||||
|
|||||||
@@ -17,6 +17,14 @@ try:
|
|||||||
default=json_default,
|
default=json_default,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def fast_loads(data) -> dict:
|
||||||
|
"""JSON 反序列化,优先使用 orjson。接受 str / bytes。"""
|
||||||
|
return orjson.loads(data)
|
||||||
|
|
||||||
|
def fast_dumps_str(obj, **kwargs) -> str:
|
||||||
|
"""JSON 序列化为 str,优先使用 orjson。用于需要 str 而非 bytes 的场景(如 ROS msg)。"""
|
||||||
|
return orjson.dumps(obj, option=orjson.OPT_NON_STR_KEYS, default=json_default).decode("utf-8")
|
||||||
|
|
||||||
def normalize_json(info: dict) -> dict:
|
def normalize_json(info: dict) -> dict:
|
||||||
"""经 JSON 序列化/反序列化一轮来清理非标准类型。"""
|
"""经 JSON 序列化/反序列化一轮来清理非标准类型。"""
|
||||||
return orjson.loads(orjson.dumps(info, default=json_default))
|
return orjson.loads(orjson.dumps(info, default=json_default))
|
||||||
@@ -29,6 +37,14 @@ except ImportError:
|
|||||||
def fast_dumps_pretty(obj, **kwargs) -> bytes: # type: ignore[misc]
|
def fast_dumps_pretty(obj, **kwargs) -> bytes: # type: ignore[misc]
|
||||||
return json.dumps(obj, indent=2, ensure_ascii=False, cls=TypeEncoder).encode("utf-8")
|
return json.dumps(obj, indent=2, ensure_ascii=False, cls=TypeEncoder).encode("utf-8")
|
||||||
|
|
||||||
|
def fast_loads(data) -> dict: # type: ignore[misc]
|
||||||
|
if isinstance(data, bytes):
|
||||||
|
data = data.decode("utf-8")
|
||||||
|
return json.loads(data)
|
||||||
|
|
||||||
|
def fast_dumps_str(obj, **kwargs) -> str: # type: ignore[misc]
|
||||||
|
return json.dumps(obj, ensure_ascii=False, cls=TypeEncoder)
|
||||||
|
|
||||||
def normalize_json(info: dict) -> dict: # type: ignore[misc]
|
def normalize_json(info: dict) -> dict: # type: ignore[misc]
|
||||||
return json.loads(json.dumps(info, ensure_ascii=False, cls=TypeEncoder))
|
return json.loads(json.dumps(info, ensure_ascii=False, cls=TypeEncoder))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user