Files
Uni-Lab-OS/unilabos/devices/neware_battery_test_system/neware_driver.py
Xie Qiming 79c0815b70 fix(neware): 修复 submit_auto_export_excel 因 resource=[] 导致 0 下发 + filetype kwarg
问题:
- 日志中 submit_auto_export_excel 收到 resource=[](工作流本身不传成品电池资源,
  电池由人工搬运),原代码 n = len(resource) = 0 → 整个循环跳过 →
  "共 0 颗电池,成功下发 0 颗"。
- neware_driver.start_test 原来不接收 filetype kwarg,导致 TypeError 阻塞下发。

修复:
1. submit_auto_export_excel 改为由 mount_resource 驱动循环长度:
   - 新签名以 mount_resource 为主,resource/pole_weight/coin_cell_code 均可选
   - 新增 coin_cell_code 入参,coin_id 优先级 coin_cell_code > resource.name > fallback
   - n==0 时提前返回并给出明确错误信息
2. manual_confirm 的返回值与 YAML handles/output 新增 coin_cell_code
   (从已解包的 assembly_data 直接取)
3. submit_auto_export_excel YAML goal/schema/goal_default/handles.input
   新增 coin_cell_code;required 中移除 resource(不再强制)
4. neware_driver.build_start_command / start_test 增加 filetype:int=1 参数,
   动态嵌入 XML backup 配置,消除 TypeError

Made-with: Cursor
2026-04-22 16:24:35 +08:00

57 lines
2.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import socket
END_MARKS = [b"\r\n#\r\n", b"</bts>"] # 读到任一标志即可判定完整响应
def build_start_command(devid, subdevid, chlid, CoinID,
ip_in_xml="127.0.0.1",
devtype:int=27,
recipe_path:str=f"D:\\HHM_test\\A001.xml",
backup_dir:str=f"D:\\HHM_test\\backup",
filetype:int=1) -> str:
"""
filetype: 备份文件类型。0=NDA新威原生1=Excel。默认 1。
"""
lines = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<bts version="1.0">',
' <cmd>start</cmd>',
' <list count="1">',
f' <start ip="{ip_in_xml}" devtype="{devtype}" devid="{devid}" subdevid="{subdevid}" chlid="{chlid}" barcode="{CoinID}">{recipe_path}</start>',
f' <backup backupdir="{backup_dir}" remotedir="" filenametype="1" customfilename="" createdirbydate="0" filetype="{int(filetype)}" backupontime="1" backupontimeinterval="1" backupfree="0" />',
' </list>',
'</bts>',
]
# TCP 模式:请求必须以 #\r\n 结束(协议要求)
return "\r\n".join(lines) + "\r\n#\r\n"
def recv_until_marks(sock: socket.socket, timeout=60):
sock.settimeout(timeout) # 上限给足,协议允许到 30s:contentReference[oaicite:2]{index=2}
buf = bytearray()
while True:
chunk = sock.recv(8192)
if not chunk:
break
buf += chunk
# 读到结束标志就停,避免等对端断开
for m in END_MARKS:
if m in buf:
return bytes(buf)
# 保险:读到完整 XML 结束标签也停
if b"</bts>" in buf:
return bytes(buf)
return bytes(buf)
def start_test(ip="127.0.0.1", port=502, devid=3, subdevid=2, chlid=1, CoinID="A001", recipe_path=f"D:\\HHM_test\\A001.xml", backup_dir=f"D:\\HHM_test\\backup", filetype:int=1):
"""
filetype: 备份文件类型0=NDA1=Excel。默认 1。
"""
xml_cmd = build_start_command(devid=devid, subdevid=subdevid, chlid=chlid, CoinID=CoinID, recipe_path=recipe_path, backup_dir=backup_dir, filetype=filetype)
#print(xml_cmd)
with socket.create_connection((ip, port), timeout=60) as s:
s.sendall(xml_cmd.encode("utf-8"))
data = recv_until_marks(s, timeout=60)
return data.decode("utf-8", errors="replace")
if __name__ == "__main__":
resp = start_test(ip="127.0.0.1", port=502, devid=4, subdevid=10, chlid=1, CoinID="A001", recipe_path=f"D:\\HHM_test\\A001.xml", backup_dir=f"D:\\HHM_test\\backup")
print(resp)