diff --git a/unilabos/devices/neware_battery_test_system/neware_battery_test_system.py b/unilabos/devices/neware_battery_test_system/neware_battery_test_system.py index 7dc48f4c..015e08c8 100644 --- a/unilabos/devices/neware_battery_test_system/neware_battery_test_system.py +++ b/unilabos/devices/neware_battery_test_system/neware_battery_test_system.py @@ -1934,6 +1934,55 @@ class NewareBatteryTestSystem: return result + def mock_assembly_data(self) -> dict: + """ + 模拟扣电组装站 auto-func_sendbottle_allpack_multi 的输出,返回固定的 2 颗电池 assembly_data。 + 用于在没有真实扣电组装站的情况下,测试 + mock_assembly_data → manual_confirm → battery_transfer_confirm → submit_auto_export_excel + 的完整参数传递与 TCP 下发链路。 + + Returns: + dict: { + "assembly_data": list[dict], # 9 字段 × 2 颗电池 + "success": bool, + "return_info": str, + } + """ + assembly_data = [ + { + "Time": "20260421_143022", + "open_circuit_voltage": 3.721, + "pole_weight": 98.43, + "assembly_time": 120, + "assembly_pressure": 5.2, + "electrolyte_volume": 80.0, + "data_coin_type": 2, + "electrolyte_code": "EL-2026042101", + "coin_cell_code": "CC-2026042101", + }, + { + "Time": "20260421_143255", + "open_circuit_voltage": 3.698, + "pole_weight": 97.85, + "assembly_time": 118, + "assembly_pressure": 5.1, + "electrolyte_volume": 79.5, + "data_coin_type": 2, + "electrolyte_code": "EL-2026042102", + "coin_cell_code": "CC-2026042102", + }, + ] + info = f"mock_assembly_data 返回 {len(assembly_data)} 颗电池的模拟组装数据" + if self._ros_node: + self._ros_node.lab_logger().info(f"[mock_assembly_data] {info}") + else: + print(f"[mock_assembly_data] {info}") + return { + "assembly_data": assembly_data, + "success": True, + "return_info": info, + } + def manual_confirm( self, resource: List[ResourceSlot], @@ -2013,6 +2062,7 @@ class NewareBatteryTestSystem: return { "resource": resource_dump, + "coin_cell_code": coin_cell_code, "target_device": target_device, "mount_resource": mount_resource_dump, "collector_mass": collector_mass, @@ -2202,25 +2252,29 @@ class NewareBatteryTestSystem: async def submit_auto_export_excel( self, - resource: List[ResourceSlot], mount_resource: List[ResourceSlot], collector_mass: List[float], active_material: List[float], capacity: List[float], battery_system: List[str], pole_weight: List[float] = None, + coin_cell_code: List[str] = None, + resource: List[ResourceSlot] = None, ) -> dict: """ 对每颗电池计算测试参数、生成 XML 工步文件并通过 TCP 下发给新威测试仪。 + 循环长度由 mount_resource 驱动(真正要下发的通道数量)。 + Args: - resource: 成品电池资源列表(含 pole_weight 状态,仅当 pole_weight 入参为空时作为回退) - mount_resource: 目标通道资源列表(含 Channel_Name = devid-subdevid-chlid) + mount_resource: 目标通道资源列表(含 Channel_Name = devid-subdevid-chlid),循环长度来源 collector_mass: 各电池集流体质量 (mg) active_material: 各电池活性物质比例(0.97 或 "97%") capacity: 各电池克容量 (mAh/g) battery_system: xml 工步标识(如 "811_LI_002") - pole_weight: 各电池极片质量 (mg),来自上游 manual_confirm 的透传;为 None 时回退到从 resource 状态提取 + pole_weight: 各电池极片质量 (mg),来自上游 manual_confirm 的透传;为空时回退到从 resource 状态提取 + coin_cell_code: 各电池条码(来自上游 manual_confirm 从 assembly_data 解包);作为 Neware 备份文件的 CoinID/barcode + resource: 成品电池资源列表(可选);仅在 coin_cell_code 与 pole_weight 均未提供时作为回退 """ import importlib gen_mod = importlib.import_module( @@ -2228,10 +2282,26 @@ class NewareBatteryTestSystem: ) from .neware_driver import start_test as _start_test - n = len(resource) + resource = resource or [] + pole_weight = pole_weight or [] + coin_cell_code = coin_cell_code or [] + + n = len(mount_resource) if mount_resource else 0 results = [] submitted = 0 + if n == 0: + msg = "mount_resource 为空,没有通道可下发" + if self._ros_node: + self._ros_node.lab_logger().warning(f"[test] {msg}") + return { + "return_info": f"共 0 颗电池,成功下发 0 颗({msg})", + "success": False, + "submitted_count": 0, + "total_count": 0, + "results": [], + } + xml_dir = os.path.join(os.path.dirname(__file__), "xml_recipes") os.makedirs(xml_dir, exist_ok=True) backup_dir = self._last_backup_dir or os.path.join(os.path.dirname(__file__), "backup") @@ -2248,17 +2318,20 @@ class NewareBatteryTestSystem: raise ValueError(f"Channel_Name 格式错误,期望 devid-subdevid-chlid,实际: {ch_name}") devid, subdevid, chlid = int(parts[0]), int(parts[1]), int(parts[2]) - # 2. 获取电池标识与极片重量 - res = resource[i] + # 2. 获取电池标识与极片重量(按优先级 coin_cell_code > resource > 兜底) + res = resource[i] if i < len(resource) else None coin_id = ( - getattr(res, "name", None) + (coin_cell_code[i] if i < len(coin_cell_code) and coin_cell_code[i] else None) + or (getattr(res, "name", None) if res is not None else None) or (res.get("name") if isinstance(res, dict) else None) or f"battery_{i}" ) if pole_weight and i < len(pole_weight): pw = float(pole_weight[i]) - else: + elif res is not None: pw = self._extract_pole_weight(res) + else: + raise ValueError(f"无法获取 pole_weight:pole_weight 列表长度不足 且 resource 为空") # 3. 计算活性物质质量与容量 cm = float(collector_mass[i]) diff --git a/unilabos/devices/neware_battery_test_system/neware_driver.py b/unilabos/devices/neware_battery_test_system/neware_driver.py index f7038e20..a491bc71 100644 --- a/unilabos/devices/neware_battery_test_system/neware_driver.py +++ b/unilabos/devices/neware_battery_test_system/neware_driver.py @@ -5,14 +5,18 @@ def build_start_command(devid, subdevid, chlid, CoinID, ip_in_xml="127.0.0.1", devtype:int=27, recipe_path:str=f"D:\\HHM_test\\A001.xml", - backup_dir:str=f"D:\\HHM_test\\backup") -> str: + backup_dir:str=f"D:\\HHM_test\\backup", + filetype:int=1) -> str: + """ + filetype: 备份文件类型。0=NDA(新威原生),1=Excel。默认 1。 + """ lines = [ '', '', ' start', ' ', f' {recipe_path}', - f' ', + f' ', ' ', '', ] @@ -36,8 +40,11 @@ def recv_until_marks(sock: socket.socket, timeout=60): return bytes(buf) return bytes(buf) -def start_test(ip="127.0.0.1", port=502, devid=3, subdevid=2, chlid=1, CoinID="A001", recipe_path=f"D:\\HHM_test\\A001.xml", backup_dir=f"D:\\HHM_test\\backup"): - xml_cmd = build_start_command(devid=devid, subdevid=subdevid, chlid=chlid, CoinID=CoinID, recipe_path=recipe_path, backup_dir=backup_dir) +def start_test(ip="127.0.0.1", port=502, devid=3, subdevid=2, chlid=1, CoinID="A001", recipe_path=f"D:\\HHM_test\\A001.xml", backup_dir=f"D:\\HHM_test\\backup", filetype:int=1): + """ + filetype: 备份文件类型,0=NDA,1=Excel。默认 1。 + """ + xml_cmd = build_start_command(devid=devid, subdevid=subdevid, chlid=chlid, CoinID=CoinID, recipe_path=recipe_path, backup_dir=backup_dir, filetype=filetype) #print(xml_cmd) with socket.create_connection((ip, port), timeout=60) as s: s.sendall(xml_cmd.encode("utf-8")) diff --git a/unilabos/registry/devices/neware_battery_test_system.yaml b/unilabos/registry/devices/neware_battery_test_system.yaml index 6fbbc8aa..e56dae03 100644 --- a/unilabos/registry/devices/neware_battery_test_system.yaml +++ b/unilabos/registry/devices/neware_battery_test_system.yaml @@ -354,6 +354,42 @@ neware_battery_test_system: title: upload_backup_to_oss参数 type: object type: UniLabJsonCommand + mock_assembly_data: + type: UniLabJsonCommand + goal: {} + feedback: {} + result: + assembly_data: assembly_data + success: success + return_info: return_info + schema: + title: mock_assembly_data参数 + description: 模拟扣电组装站 auto-func_sendbottle_allpack_multi 输出固定的 assembly_data(用于测试 neware 完整链路) + type: object + properties: + goal: + type: object + properties: + unilabos_device_id: + type: string + default: '' + description: UniLabOS设备ID,用于指定执行动作的具体设备实例 + required: [] + feedback: {} + result: + type: object + required: + - goal + goal_default: {} + handles: + input: [] + output: + - handler_key: assembly_data_output + data_type: array + label: 扣电组装数据列表(模拟) + data_key: assembly_data + data_source: executor + placeholder_keys: {} manual_confirm: type: UniLabJsonCommand goal: @@ -372,6 +408,7 @@ neware_battery_test_system: feedback: {} result: resource: resource + coin_cell_code: coin_cell_code target_device: target_device mount_resource: mount_resource collector_mass: collector_mass @@ -711,6 +748,11 @@ neware_battery_test_system: label: 极片质量 data_key: pole_weight data_source: executor + - handler_key: coin_cell_code + data_type: array + label: 电池条码列表 + data_key: coin_cell_code + data_source: executor placeholder_keys: resource: unilabos_resources target_device: unilabos_devices @@ -729,6 +771,7 @@ neware_battery_test_system: capacity: capacity battery_system: battery_system pole_weight: pole_weight + coin_cell_code: coin_cell_code feedback: {} result: return_info: return_info @@ -932,8 +975,11 @@ neware_battery_test_system: type: array items: type: number + coin_cell_code: + type: array + items: + type: string required: - - resource - mount_resource - collector_mass - active_material @@ -954,6 +1000,7 @@ neware_battery_test_system: capacity: [] battery_system: [] pole_weight: [] + coin_cell_code: [] handles: input: - handler_key: resource @@ -998,6 +1045,12 @@ neware_battery_test_system: data_key: pole_weight data_source: handle io_type: source + - handler_key: coin_cell_code + data_type: array + label: 电池条码列表 + data_key: coin_cell_code + data_source: handle + io_type: source output: [] placeholder_keys: resource: unilabos_resources