Wire bioyond/coin-cell/neware param passing and add manual-confirm CSV export

- coin_cell_assembly: align battery_info to 9 fields (Time/open_circuit_voltage/pole_weight/assembly_time/assembly_pressure/electrolyte_volume/data_coin_type/electrolyte_code/coin_cell_code); expose assembly_data single array; rename CSV column coin_num -> data_coin_type
- coin_cell_workstation.yaml: add assembly_data_output handle for auto-func_sendbottle_allpack_multi
- neware manual_confirm: accept formulations + assembly_data + csv_export_dir, unpack to parallel lists, export merged CSV to {csv_export_dir}/{date}/date_{date}.csv, output pole_weight for downstream
- neware transfer -> battery_transfer_confirm with manual_confirm node_type, timeout_seconds, assignee_user_ids
- neware test -> submit_auto_export_excel, accept pole_weight input; relabel battery_system as xml工步

Made-with: Cursor
This commit is contained in:
Xie Qiming
2026-04-21 20:01:49 +08:00
parent 52b460466d
commit d1713fcca1
4 changed files with 307 additions and 88 deletions

View File

@@ -16,10 +16,12 @@
import os
import sys
import socket
import csv
import xml.etree.ElementTree as ET
import json
import time
import inspect
from datetime import datetime
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, TypedDict
@@ -1941,34 +1943,191 @@ class NewareBatteryTestSystem:
active_material: List[float],
capacity: List[float],
battery_system: List[str],
timeout_seconds: int,
assignee_user_ids: list[str],
**kwargs
formulations: List[Dict] = None,
assembly_data: List[Dict] = None,
csv_export_dir: str = "D:\\2604Agentic_test",
timeout_seconds: int = 3600,
assignee_user_ids: list[str] = None,
**kwargs,
) -> dict:
"""
timeout_seconds: 超时时间默认3600秒
collector_mass: 极流体质量
active_material: 活性物质含量
capacity: 克容量mAh/g
battery_system: 电池体系
修改的结果无效,是只读的
人工确认节点:
- 上游接收 bioyond 配方formulations+ 扣电组装数据assembly_data 单数组)
- 人工在前端填入 collector_mass / active_material / capacity / battery_system(xml工步)
并选择 target_device 与 mount_resource通道
- 内部把 assembly_data 解包为 9 个并行数组,把 pole_weight 透传给下游 submit_auto_export_excel
- 把所有数据整合后写入 {csv_export_dir}/{YYYYMMDD}/date_{YYYYMMDD}.csv
Args:
timeout_seconds: 超时时间(秒),默认 3600
collector_mass: 极流体质量 (mg)
active_material: 活性物质含量 (0.97 或 "97%")
capacity: 克容量 (mAh/g)
battery_system: xml 工步标识(如 "811_LI_002"
formulations: 配方信息列表(来自 bioyond mass_ratios
assembly_data: 扣电组装数据列表(每颗电池一个 dict
csv_export_dir: 整合 CSV 导出根目录
"""
resource = ResourceTreeSet.from_plr_resources(resource).dump()
mount_resource = ResourceTreeSet.from_plr_resources(mount_resource).dump()
kwargs.update(locals())
kwargs.pop("kwargs")
kwargs.pop("self")
return kwargs
resource_dump = ResourceTreeSet.from_plr_resources(resource).dump()
mount_resource_dump = ResourceTreeSet.from_plr_resources(mount_resource).dump()
assembly_data = assembly_data or []
formulations = formulations or []
Time = [b.get("Time", "") for b in assembly_data]
open_circuit_voltage = [b.get("open_circuit_voltage", 0.0) for b in assembly_data]
pole_weight = [b.get("pole_weight", 0.0) for b in assembly_data]
assembly_time = [b.get("assembly_time", 0) for b in assembly_data]
assembly_pressure = [b.get("assembly_pressure", 0) for b in assembly_data]
electrolyte_volume = [b.get("electrolyte_volume", 0) for b in assembly_data]
data_coin_type = [b.get("data_coin_type", 0) for b in assembly_data]
electrolyte_code = [b.get("electrolyte_code", "") for b in assembly_data]
coin_cell_code = [b.get("coin_cell_code", "") for b in assembly_data]
try:
self._export_manual_confirm_csv(
csv_export_dir=csv_export_dir,
mount_resource=mount_resource,
formulations=formulations,
assembly_rows={
"Time": Time,
"open_circuit_voltage": open_circuit_voltage,
"pole_weight": pole_weight,
"assembly_time": assembly_time,
"assembly_pressure": assembly_pressure,
"electrolyte_volume": electrolyte_volume,
"data_coin_type": data_coin_type,
"electrolyte_code": electrolyte_code,
"coin_cell_code": coin_cell_code,
},
collector_mass=collector_mass,
active_material=active_material,
capacity=capacity,
battery_system=battery_system,
)
except Exception as e:
if self._ros_node:
self._ros_node.lab_logger().warning(f"[manual_confirm] 整合 CSV 导出失败: {e}")
else:
print(f"[manual_confirm] 整合 CSV 导出失败: {e}")
return {
"resource": resource_dump,
"target_device": target_device,
"mount_resource": mount_resource_dump,
"collector_mass": collector_mass,
"active_material": active_material,
"capacity": capacity,
"battery_system": battery_system,
"formulations": formulations,
"assembly_data": assembly_data,
"pole_weight": pole_weight,
}
def _export_manual_confirm_csv(
self,
csv_export_dir: str,
mount_resource: List[ResourceSlot],
formulations: List[Dict],
assembly_rows: Dict[str, List[Any]],
collector_mass: List[float],
active_material: List[float],
capacity: List[float],
battery_system: List[str],
) -> Optional[str]:
"""把 manual_confirm 收集到的全部参数整合写入 CSV。路径{csv_export_dir}/{YYYYMMDD}/date_{YYYYMMDD}.csv"""
n_assembly = len(assembly_rows.get("Time", []))
n_channel = len(mount_resource) if mount_resource else 0
n = max(n_assembly, n_channel, len(collector_mass or []), len(active_material or []),
len(capacity or []), len(battery_system or []))
if n == 0:
return None
date_str = datetime.now().strftime("%Y%m%d")
out_dir = os.path.join(csv_export_dir, date_str)
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, f"date_{date_str}.csv")
header = [
"Time", "open_circuit_voltage", "pole_weight",
"assembly_time", "assembly_pressure", "electrolyte_volume",
"data_coin_type", "electrolyte_code", "coin_cell_code",
"orderName", "prep_bottle_barcode", "vial_bottle_barcodes",
"target_mass_ratio", "real_mass_ratio",
"collector_mass", "active_material", "capacity", "battery_system",
"channel_name",
]
file_exists = os.path.exists(out_path)
with open(out_path, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(header)
def safe_get(lst, i, default=""):
try:
return lst[i] if lst and i < len(lst) else default
except Exception:
return default
for i in range(n):
form = formulations[i] if formulations and i < len(formulations) else {}
target_ratio = form.get("target_mass_ratio", {}) if isinstance(form, dict) else {}
real_ratio = form.get("real_mass_ratio", {}) if isinstance(form, dict) else {}
ch_name = self._extract_channel_name(mount_resource[i]) if mount_resource and i < len(mount_resource) else ""
writer.writerow([
safe_get(assembly_rows["Time"], i),
safe_get(assembly_rows["open_circuit_voltage"], i, 0.0),
safe_get(assembly_rows["pole_weight"], i, 0.0),
safe_get(assembly_rows["assembly_time"], i, 0),
safe_get(assembly_rows["assembly_pressure"], i, 0),
safe_get(assembly_rows["electrolyte_volume"], i, 0),
safe_get(assembly_rows["data_coin_type"], i, 0),
safe_get(assembly_rows["electrolyte_code"], i),
safe_get(assembly_rows["coin_cell_code"], i),
form.get("orderName", "") if isinstance(form, dict) else "",
form.get("prep_bottle_barcode", "") if isinstance(form, dict) else "",
form.get("vial_bottle_barcodes", "") if isinstance(form, dict) else "",
json.dumps(target_ratio, ensure_ascii=False) if target_ratio else "",
json.dumps(real_ratio, ensure_ascii=False) if real_ratio else "",
safe_get(collector_mass, i, ""),
safe_get(active_material, i, ""),
safe_get(capacity, i, ""),
safe_get(battery_system, i, ""),
ch_name or "",
])
f.flush()
if self._ros_node:
self._ros_node.lab_logger().info(f"[manual_confirm] 整合 CSV 已写入 {out_path}{n} 行)")
return out_path
async def transfer(self, resource: List[ResourceSlot], target_device: DeviceSlot, mount_resource: List[ResourceSlot]):
future = ROS2DeviceNode.run_async_func(self._ros_node.transfer_resource_to_another, True,
async def battery_transfer_confirm(
self,
resource: List[ResourceSlot],
target_device: DeviceSlot,
mount_resource: List[ResourceSlot],
timeout_seconds: int = 3600,
assignee_user_ids: list[str] = None,
**kwargs,
):
"""
电池装夹人工确认 + TCP 转运。
- 该节点通过 yaml 的 node_type: manual_confirm 机制阻塞等待人工确认。
- 人工在前端确认通道与电池对应关系(装夹就位)后,方法体才会被框架调用。
- 方法体执行真正的 TCP 资源转运。
"""
future = ROS2DeviceNode.run_async_func(
self._ros_node.transfer_resource_to_another, True,
**{
"plr_resources": resource,
"target_device_id": target_device,
"target_resources": mount_resource,
"sites": [None] * len(mount_resource),
})
},
)
result = await future
return result
# ──────────────────────────────────────────────
@@ -2043,7 +2202,7 @@ class NewareBatteryTestSystem:
# test 动作:下发测试
# ──────────────────────────────────────────────
async def test(
async def submit_auto_export_excel(
self,
resource: List[ResourceSlot],
mount_resource: List[ResourceSlot],
@@ -2051,17 +2210,19 @@ class NewareBatteryTestSystem:
active_material: List[float],
capacity: List[float],
battery_system: List[str],
pole_weight: List[float] = None,
) -> dict:
"""
对每颗电池计算测试参数、生成 XML 工步文件并通过 TCP 下发给新威测试仪。
Args:
resource: 成品电池资源列表(含 pole_weight 状态)
resource: 成品电池资源列表(含 pole_weight 状态,仅当 pole_weight 入参为空时作为回退
mount_resource: 目标通道资源列表(含 Channel_Name = devid-subdevid-chlid
collector_mass: 各电池集流体质量 (mg)
active_material: 各电池活性物质比例0.97 或 "97%"
capacity: 各电池克容量 (mAh/g)
battery_system: 各电池体系名称(如 "811_LI_002"
battery_system: xml 工步标识(如 "811_LI_002"
pole_weight: 各电池极片质量 (mg),来自上游 manual_confirm 的透传;为 None 时回退到从 resource 状态提取
"""
import importlib
gen_mod = importlib.import_module(
@@ -2096,7 +2257,10 @@ class NewareBatteryTestSystem:
or (res.get("name") if isinstance(res, dict) else None)
or f"battery_{i}"
)
pw = self._extract_pole_weight(res)
if pole_weight and i < len(pole_weight):
pw = float(pole_weight[i])
else:
pw = self._extract_pole_weight(res)
# 3. 计算活性物质质量与容量
cm = float(collector_mass[i])

View File

@@ -1650,6 +1650,7 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
time_date = datetime.now().strftime("%Y%m%d")
#秒级时间戳用于标记每一行电池数据
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self._last_assembly_timestamp = timestamp
#生成输出文件的变量
self.csv_export_file = os.path.join(file_path, f"date_{time_date}.csv")
#将数据存入csv文件
@@ -1660,7 +1661,7 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
writer.writerow([
'Time', 'open_circuit_voltage', 'pole_weight',
'assembly_time', 'assembly_pressure', 'electrolyte_volume',
'coin_num', 'electrolyte_code', 'coin_cell_code',
'data_coin_type', 'electrolyte_code', 'coin_cell_code',
'orderName', 'prep_bottle_barcode', 'vial_bottle_barcodes',
'target_mass_ratio', 'real_mass_ratio'
])
@@ -1877,17 +1878,18 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
pole_weight = 0.0
battery_info = {
"battery_index": coin_num_N + 1,
"battery_barcode": battery_qr_code,
"electrolyte_barcode": electrolyte_qr_code,
"Time": getattr(self, "_last_assembly_timestamp", datetime.now().strftime("%Y%m%d_%H%M%S")),
"open_circuit_voltage": open_circuit_voltage,
"pole_weight": pole_weight,
"assembly_time": self.data_assembly_time,
"assembly_pressure": self.data_assembly_pressure,
"electrolyte_volume": self.data_electrolyte_volume
"electrolyte_volume": self.data_electrolyte_volume,
"data_coin_type": getattr(self, "data_coin_type", 0),
"electrolyte_code": electrolyte_qr_code,
"coin_cell_code": battery_qr_code,
}
battery_data_list.append(battery_info)
print(f"已收集第 {coin_num_N + 1} 个电池数据: 电池码={battery_info['battery_barcode']}, 电解液码={battery_info['electrolyte_barcode']}")
print(f"已收集第 {coin_num_N + 1} 个电池数据: 电池码={battery_info['coin_cell_code']}, 电解液码={battery_info['electrolyte_code']}")
time.sleep(1)
# TODO:读完再将电池数加一还是进入循环就将电池数加一需要考虑
@@ -1916,6 +1918,7 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
"success": True,
"total_batteries": len(battery_data_list),
"batteries": battery_data_list,
"assembly_data": battery_data_list,
"summary": {
"electrolyte_bottles_used": elec_num,
"batteries_per_bottle": elec_use_num,
@@ -2130,17 +2133,18 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
pole_weight = 0.0
battery_info = {
"battery_index": coin_num_N + 1,
"battery_barcode": battery_qr_code,
"electrolyte_barcode": electrolyte_qr_code,
"Time": getattr(self, "_last_assembly_timestamp", datetime.now().strftime("%Y%m%d_%H%M%S")),
"open_circuit_voltage": open_circuit_voltage,
"pole_weight": pole_weight,
"assembly_time": self.data_assembly_time,
"assembly_pressure": self.data_assembly_pressure,
"electrolyte_volume": self.data_electrolyte_volume
"electrolyte_volume": self.data_electrolyte_volume,
"data_coin_type": getattr(self, "data_coin_type", 0),
"electrolyte_code": electrolyte_qr_code,
"coin_cell_code": battery_qr_code,
}
battery_data_list.append(battery_info)
print(f"已收集第 {coin_num_N + 1} 个电池数据: 电池码={battery_info['battery_barcode']}, 电解液码={battery_info['electrolyte_barcode']}")
print(f"已收集第 {coin_num_N + 1} 个电池数据: 电池码={battery_info['coin_cell_code']}, 电解液码={battery_info['electrolyte_code']}")
time.sleep(1)
@@ -2167,6 +2171,7 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
"success": True,
"total_batteries": len(battery_data_list),
"batteries": battery_data_list,
"assembly_data": battery_data_list,
"summary": {
"electrolyte_bottles_used": elec_num,
"batteries_per_bottle": elec_use_num,