mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-04-29 09:52:10 +00:00
Merge origin/dev into backup/local-0.10.18-20260324
This commit is contained in:
@@ -80,19 +80,20 @@ class HTTPClient:
|
||||
f.write(json.dumps(payload, indent=4))
|
||||
# 从序列化数据中提取所有节点的UUID(保存旧UUID)
|
||||
old_uuids = {n.res_content.uuid: n for n in resources.all_nodes}
|
||||
nodes_info = [x for xs in resources.dump() for x in xs]
|
||||
if not self.initialized or first_add:
|
||||
self.initialized = True
|
||||
info(f"首次添加资源,当前远程地址: {self.remote_addr}")
|
||||
response = requests.post(
|
||||
f"{self.remote_addr}/edge/material",
|
||||
json={"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid},
|
||||
json={"nodes": nodes_info, "mount_uuid": mount_uuid},
|
||||
headers={"Authorization": f"Lab {self.auth}"},
|
||||
timeout=60,
|
||||
)
|
||||
else:
|
||||
response = requests.put(
|
||||
f"{self.remote_addr}/edge/material",
|
||||
json={"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid},
|
||||
json={"nodes": nodes_info, "mount_uuid": mount_uuid},
|
||||
headers={"Authorization": f"Lab {self.auth}"},
|
||||
timeout=10,
|
||||
)
|
||||
@@ -111,6 +112,7 @@ class HTTPClient:
|
||||
uuid_mapping[i["uuid"]] = i["cloud_uuid"]
|
||||
else:
|
||||
logger.error(f"添加物料失败: {response.text}")
|
||||
logger.trace(f"添加物料失败: {nodes_info}")
|
||||
for u, n in old_uuids.items():
|
||||
if u in uuid_mapping:
|
||||
n.res_content.uuid = uuid_mapping[u]
|
||||
|
||||
@@ -754,6 +754,32 @@ class MessageProcessor:
|
||||
req = JobAddReq(**data)
|
||||
|
||||
job_log = format_job_log(req.job_id, req.task_id, req.device_id, req.action)
|
||||
|
||||
# 服务端对always_free动作可能跳过query_action_state直接发job_start,
|
||||
# 此时job尚未注册,需要自动补注册
|
||||
existing_job = self.device_manager.get_job_info(req.job_id)
|
||||
if not existing_job:
|
||||
action_name = req.action
|
||||
device_action_key = f"/devices/{req.device_id}/{action_name}"
|
||||
action_always_free = self._check_action_always_free(req.device_id, action_name)
|
||||
|
||||
if action_always_free:
|
||||
job_info = JobInfo(
|
||||
job_id=req.job_id,
|
||||
task_id=req.task_id,
|
||||
device_id=req.device_id,
|
||||
action_name=action_name,
|
||||
device_action_key=device_action_key,
|
||||
status=JobStatus.QUEUE,
|
||||
start_time=time.time(),
|
||||
always_free=True,
|
||||
)
|
||||
self.device_manager.add_queue_request(job_info)
|
||||
logger.info(f"[MessageProcessor] Job {job_log} always_free, auto-registered from direct job_start")
|
||||
else:
|
||||
logger.error(f"[MessageProcessor] Job {job_log} not registered (missing query_action_state)")
|
||||
return
|
||||
|
||||
success = self.device_manager.start_job(req.job_id)
|
||||
if not success:
|
||||
logger.error(f"[MessageProcessor] Failed to start job {job_log}")
|
||||
@@ -1087,7 +1113,7 @@ class MessageProcessor:
|
||||
"task_id": task_id,
|
||||
"job_id": job_id,
|
||||
"free": free,
|
||||
"need_more": need_more,
|
||||
"need_more": need_more + 1,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1227,7 +1253,7 @@ class QueueProcessor:
|
||||
"task_id": job_info.task_id,
|
||||
"job_id": job_info.job_id,
|
||||
"free": False,
|
||||
"need_more": 10,
|
||||
"need_more": 10 + 1,
|
||||
},
|
||||
}
|
||||
self.message_processor.send_message(message)
|
||||
@@ -1260,7 +1286,7 @@ class QueueProcessor:
|
||||
"task_id": job_info.task_id,
|
||||
"job_id": job_info.job_id,
|
||||
"free": False,
|
||||
"need_more": 10,
|
||||
"need_more": 10 + 1,
|
||||
},
|
||||
}
|
||||
success = self.message_processor.send_message(message)
|
||||
@@ -1343,6 +1369,10 @@ class WebSocketClient(BaseCommunicationClient):
|
||||
self.message_processor = MessageProcessor(self.websocket_url, self.send_queue, self.device_manager)
|
||||
self.queue_processor = QueueProcessor(self.device_manager, self.message_processor)
|
||||
|
||||
# running状态debounce缓存: {job_id: (last_send_timestamp, last_feedback_data)}
|
||||
self._job_running_last_sent: Dict[str, tuple] = {}
|
||||
self._job_running_debounce_interval: float = 10.0 # 秒
|
||||
|
||||
# 设置相互引用
|
||||
self.message_processor.set_queue_processor(self.queue_processor)
|
||||
self.message_processor.set_websocket_client(self)
|
||||
@@ -1442,22 +1472,32 @@ class WebSocketClient(BaseCommunicationClient):
|
||||
logger.debug(f"[WebSocketClient] Not connected, cannot publish job status for job_id: {item.job_id}")
|
||||
return
|
||||
|
||||
job_log = format_job_log(item.job_id, item.task_id, item.device_id, item.action_name)
|
||||
|
||||
# 拦截最终结果状态,与原版本逻辑一致
|
||||
if status in ["success", "failed"]:
|
||||
self._job_running_last_sent.pop(item.job_id, None)
|
||||
|
||||
host_node = HostNode.get_instance(0)
|
||||
if host_node:
|
||||
# 从HostNode的device_action_status中移除job_id
|
||||
try:
|
||||
host_node._device_action_status[item.device_action_key].job_ids.pop(item.job_id, None)
|
||||
except (KeyError, AttributeError):
|
||||
logger.warning(f"[WebSocketClient] Failed to remove job {item.job_id} from HostNode status")
|
||||
|
||||
# logger.debug(f"[WebSocketClient] Intercepting final status for job_id: {item.job_id} - {status}")
|
||||
|
||||
# 通知队列处理器job完成(包括timeout的job)
|
||||
self.queue_processor.handle_job_completed(item.job_id, status)
|
||||
|
||||
# 发送job状态消息
|
||||
# running状态按job_id做debounce,内容变化时仍然上报
|
||||
if status == "running":
|
||||
now = time.time()
|
||||
cached = self._job_running_last_sent.get(item.job_id)
|
||||
if cached is not None:
|
||||
last_ts, last_data = cached
|
||||
if now - last_ts < self._job_running_debounce_interval and last_data == feedback_data:
|
||||
logger.trace(f"[WebSocketClient] Job status debounced (skip): {job_log} - {status}")
|
||||
return
|
||||
self._job_running_last_sent[item.job_id] = (now, feedback_data)
|
||||
|
||||
message = {
|
||||
"action": "job_status",
|
||||
"data": {
|
||||
@@ -1473,7 +1513,6 @@ class WebSocketClient(BaseCommunicationClient):
|
||||
}
|
||||
self.message_processor.send_message(message)
|
||||
|
||||
job_log = format_job_log(item.job_id, item.task_id, item.device_id, item.action_name)
|
||||
logger.trace(f"[WebSocketClient] Job status published: {job_log} - {status}")
|
||||
|
||||
def send_ping(self, ping_id: str, timestamp: float) -> None:
|
||||
|
||||
Reference in New Issue
Block a user