修改移液部分代码,如果没有10ul枪头,则不使用10ul移液枪

This commit is contained in:
q434343
2026-04-14 15:35:04 +08:00
parent 7df67ea9f3
commit f22c3f4c42
7 changed files with 339 additions and 246 deletions

View File

@@ -1196,6 +1196,27 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
none_keys=none_keys,
)
@staticmethod
def _tip_rack_is_10ul_range(rack: TipRack) -> bool:
"""判断 tip 盒是否为 10µL 量程(对应右头);优先用孔位上 prototype tip 的 maximal_volume。"""
children = getattr(rack, "children", None) or []
if children:
spot = children[0]
tr = getattr(spot, "tracker", None)
tip = None
if tr is not None:
tip = getattr(tr, "_tip", None) or getattr(tr, "tip", None)
if tip is None:
tip = getattr(spot, "tip", None)
mv = getattr(tip, "maximal_volume", None) if tip is not None else None
if mv is not None:
try:
return float(mv) <= 10.0
except (TypeError, ValueError):
pass
ident = f"{getattr(rack, 'model', '') or ''} {type(rack).__name__}".lower()
return "10ul" in ident
async def transfer_liquid(
self,
sources: Sequence[Container],
@@ -1230,19 +1251,20 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
_asp_list = asp_vols if isinstance(asp_vols, list) else [asp_vols]
_dis_list = dis_vols if isinstance(dis_vols, list) else [dis_vols]
if all(v <= 10.0 for v in _asp_list) and all(v <= 10.0 for v in _dis_list):
use_channels = [1]
mix_vol = max(min(mix_vol,10),0) if mix_vol is not None else None
sources = await self._resolve_to_plr_resources(sources)
targets = await self._resolve_to_plr_resources(targets)
tip_racks = list(await self._resolve_to_plr_resources(tip_racks))
change_slots = []
change_slots.append(sources[0].parent)
change_slots.append(targets[0].parent)
if isinstance(tip_racks[0], TipRack):
tip_rack = tip_racks[0]
else:
tip_rack = tip_racks[0].parent
small_vols = all(v <= 10.0 for v in _asp_list) and all(v <= 10.0 for v in _dis_list)
if small_vols and self._tip_rack_is_10ul_range(tip_rack):
use_channels = [1]
mix_vol = max(min(mix_vol, 10), 0) if mix_vol is not None else None
change_slots = []
change_slots.append(sources[0].parent)
change_slots.append(targets[0].parent)
change_slots.append(tip_rack)

View File

@@ -2,6 +2,7 @@ import json
import time
from copy import deepcopy
from pathlib import Path
from typing import Optional, Sequence
from moveit_msgs.msg import JointConstraint, Constraints
from rclpy.action import ActionClient
@@ -171,173 +172,160 @@ class MoveitInterface:
return True
def pick_and_place(self, command: str):
def pick_and_place(
self,
option: str,
move_group: str,
status: str,
resource: Optional[str] = None,
x_distance: Optional[float] = None,
y_distance: Optional[float] = None,
lift_height: Optional[float] = None,
retry: Optional[int] = None,
speed: Optional[float] = None,
target: Optional[str] = None,
constraints: Optional[Sequence[float]] = None,
) -> None:
"""
Using MoveIt to make the robotic arm pick or place materials to a target point.
使用 MoveIt 完成抓取/放置等序列pick/place/side_pick/side_place
Args:
command: A JSON-formatted string that includes option, target, speed, lift_height, mt_height
*option (string) : Action type: pick/place/side_pick/side_place
*move_group (string): The move group moveit will plan
*status(string) : Target pose
resource(string) : The target resource
x_distance (float) : The distance to the target in x direction(meters)
y_distance (float) : The distance to the target in y direction(meters)
lift_height (float) : The height at which the material should be lifted(meters)
retry (float) : Retry times when moveit plan fails
speed (float) : The speed of the movement, speed > 0
Returns:
None
必选option, move_group, status。
可选resource, x_distance, y_distance, lift_height, retry, speed, target, constraints。
无返回值;失败时提前 return 或打印异常。
"""
result = SendCmd.Result()
try:
cmd_str = str(command).replace("'", '"')
cmd_dict = json.loads(cmd_str)
if option not in self.move_option:
raise ValueError(f"Invalid option: {option}")
if cmd_dict["option"] in self.move_option:
option_index = self.move_option.index(cmd_dict["option"])
place_flag = option_index % 2
option_index = self.move_option.index(option)
place_flag = option_index % 2
config = {}
function_list = []
config: dict = {"move_group": move_group}
if speed is not None:
config["speed"] = speed
if retry is not None:
config["retry"] = retry
status = cmd_dict["status"]
joint_positions_ = self.joint_poses[cmd_dict["move_group"]][status]
function_list = []
joint_positions_ = self.joint_poses[move_group][status]
config.update({k: cmd_dict[k] for k in ["speed", "retry", "move_group"] if k in cmd_dict})
# 夹取 / 放置:绑定 resource 与 parent
if not place_flag:
if target is not None:
function_list.append(lambda r=resource, t=target: self.resource_manager(r, t))
else:
ee = self.moveit2[move_group].end_effector_name
function_list.append(lambda r=resource: self.resource_manager(r, ee))
else:
function_list.append(lambda r=resource: self.resource_manager(r, "world"))
# 夹取
if not place_flag:
if "target" in cmd_dict.keys():
function_list.append(lambda: self.resource_manager(cmd_dict["resource"], cmd_dict["target"]))
else:
function_list.append(
lambda: self.resource_manager(
cmd_dict["resource"], self.moveit2[cmd_dict["move_group"]].end_effector_name
joint_constraint_msgs: list = []
if constraints is not None:
for i, c in enumerate(constraints):
v = float(c)
if v > 0:
joint_constraint_msgs.append(
JointConstraint(
joint_name=self.moveit2[move_group].joint_names[i],
position=joint_positions_[i],
tolerance_above=v,
tolerance_below=v,
weight=1.0,
)
)
else:
function_list.append(lambda: self.resource_manager(cmd_dict["resource"], "world"))
constraints = []
if "constraints" in cmd_dict.keys():
if lift_height is not None:
retval = None
attempts = config.get("retry", 10)
while retval is None and attempts > 0:
retval = self.moveit2[move_group].compute_fk(joint_positions_)
time.sleep(0.1)
attempts -= 1
if retval is None:
raise ValueError("Failed to compute forward kinematics")
pose = [retval.pose.position.x, retval.pose.position.y, retval.pose.position.z]
quaternion = [
retval.pose.orientation.x,
retval.pose.orientation.y,
retval.pose.orientation.z,
retval.pose.orientation.w,
]
for i in range(len(cmd_dict["constraints"])):
v = float(cmd_dict["constraints"][i])
if v > 0:
constraints.append(
JointConstraint(
joint_name=self.moveit2[cmd_dict["move_group"]].joint_names[i],
position=joint_positions_[i],
tolerance_above=v,
tolerance_below=v,
weight=1.0,
)
)
function_list = [
lambda: self.moveit_task(
position=[retval.pose.position.x, retval.pose.position.y, retval.pose.position.z],
quaternion=quaternion,
**config,
cartesian=self.cartesian_flag,
)
] + function_list
if "lift_height" in cmd_dict.keys():
retval = None
retry = config.get("retry", 10)
while retval is None and retry > 0:
retval = self.moveit2[cmd_dict["move_group"]].compute_fk(joint_positions_)
time.sleep(0.1)
retry -= 1
if retval is None:
result.success = False
return result
pose = [retval.pose.position.x, retval.pose.position.y, retval.pose.position.z]
quaternion = [
retval.pose.orientation.x,
retval.pose.orientation.y,
retval.pose.orientation.z,
retval.pose.orientation.w,
]
pose[2] += float(lift_height)
function_list.append(
lambda p=pose.copy(), q=quaternion, cfg=config: self.moveit_task(
position=p, quaternion=q, **cfg, cartesian=self.cartesian_flag
)
)
end_pose = list(pose)
if x_distance is not None or y_distance is not None:
if x_distance is not None:
deep_pose = deepcopy(pose)
deep_pose[0] += float(x_distance)
elif y_distance is not None:
deep_pose = deepcopy(pose)
deep_pose[1] += float(y_distance)
function_list = [
lambda: self.moveit_task(
position=[retval.pose.position.x, retval.pose.position.y, retval.pose.position.z],
quaternion=quaternion,
**config,
cartesian=self.cartesian_flag,
lambda p=pose.copy(), q=quaternion, cfg=config: self.moveit_task(
position=p, quaternion=q, **cfg, cartesian=self.cartesian_flag
)
] + function_list
pose[2] += float(cmd_dict["lift_height"])
function_list.append(
lambda: self.moveit_task(
position=pose, quaternion=quaternion, **config, cartesian=self.cartesian_flag
lambda dp=deep_pose.copy(), q=quaternion, cfg=config: self.moveit_task(
position=dp, quaternion=q, **cfg, cartesian=self.cartesian_flag
)
)
end_pose = pose
end_pose = list(deep_pose)
if "x_distance" in cmd_dict.keys() or "y_distance" in cmd_dict.keys():
if "x_distance" in cmd_dict.keys():
deep_pose = deepcopy(pose)
deep_pose[0] += float(cmd_dict["x_distance"])
elif "y_distance" in cmd_dict.keys():
deep_pose = deepcopy(pose)
deep_pose[1] += float(cmd_dict["y_distance"])
retval_ik = None
attempts_ik = config.get("retry", 10)
while retval_ik is None and attempts_ik > 0:
retval_ik = self.moveit2[move_group].compute_ik(
position=end_pose,
quat_xyzw=quaternion,
constraints=Constraints(joint_constraints=joint_constraint_msgs),
)
time.sleep(0.1)
attempts_ik -= 1
if retval_ik is None:
raise ValueError("Failed to compute inverse kinematics")
position_ = [
retval_ik.position[retval_ik.name.index(i)] for i in self.moveit2[move_group].joint_names
]
jn = self.moveit2[move_group].joint_names
function_list = [
lambda pos=position_, names=jn, cfg=config: self.moveit_joint_task(
joint_positions=pos, joint_names=names, **cfg
)
] + function_list
else:
function_list = [lambda cfg=config, jp=joint_positions_: self.moveit_joint_task(**cfg, joint_positions=jp)] + function_list
function_list = [
lambda: self.moveit_task(
position=pose, quaternion=quaternion, **config, cartesian=self.cartesian_flag
)
] + function_list
function_list.append(
lambda: self.moveit_task(
position=deep_pose, quaternion=quaternion, **config, cartesian=self.cartesian_flag
)
)
end_pose = deep_pose
retval_ik = None
retry = config.get("retry", 10)
while retval_ik is None and retry > 0:
retval_ik = self.moveit2[cmd_dict["move_group"]].compute_ik(
position=end_pose, quat_xyzw=quaternion, constraints=Constraints(joint_constraints=constraints)
)
time.sleep(0.1)
retry -= 1
if retval_ik is None:
result.success = False
return result
position_ = [
retval_ik.position[retval_ik.name.index(i)]
for i in self.moveit2[cmd_dict["move_group"]].joint_names
]
function_list = [
lambda: self.moveit_joint_task(
joint_positions=position_,
joint_names=self.moveit2[cmd_dict["move_group"]].joint_names,
**config,
)
] + function_list
for i in range(len(function_list)):
if i == 0:
self.cartesian_flag = False
else:
function_list = [
lambda: self.moveit_joint_task(**config, joint_positions=joint_positions_)
] + function_list
self.cartesian_flag = True
for i in range(len(function_list)):
if i == 0:
self.cartesian_flag = False
else:
self.cartesian_flag = True
re = function_list[i]()
if not re:
print(i, re)
result.success = False
return result
result.success = True
re = function_list[i]()
if not re:
print(i, re)
raise ValueError(f"Failed to execute moveit task: {i}")
except Exception as e:
print(e)
self.cartesian_flag = False
result.success = False
return result
raise e
def set_status(self, command: str):
"""

View File

@@ -173,48 +173,64 @@ robotic_arm.SCARA_with_slider.moveit.virtual:
type: object
type: UniLabJsonCommand
pick_and_place:
feedback:
status: status
goal:
command: command
feedback: {}
goal: {}
goal_default:
command: ''
constraints: null
lift_height: null
move_group: null
option: null
resource: null
retry: null
speed: null
status: null
target: null
x_distance: null
y_distance: null
handles: {}
placeholder_keys: {}
result:
return_info: return_info
success: success
result: {}
schema:
description: ''
description: pick_and_place 显式参数UniLabJsonCommand
properties:
feedback:
additionalProperties: false
feedback: {}
goal:
properties:
constraints:
items:
type: number
type: array
lift_height:
type: string
move_group:
type: string
option:
type: string
resource:
type: string
retry:
type: string
speed:
type: string
status:
type: string
title: SendCmd_Feedback
type: object
goal:
additionalProperties: false
properties:
command:
target:
type: string
title: SendCmd_Goal
type: object
result:
additionalProperties: false
properties:
return_info:
x_distance:
type: string
success:
type: boolean
title: SendCmd_Result
y_distance:
type: string
required:
- option
- move_group
- status
type: object
result: {}
required:
- goal
title: SendCmd
title: pick_and_place参数
type: object
type: SendCmd
type: UniLabJsonCommand
set_position:
feedback:
status: status

View File

@@ -684,48 +684,64 @@ linear_motion.toyo_xyz.sim:
type: object
type: UniLabJsonCommand
pick_and_place:
feedback:
status: status
goal:
command: command
feedback: {}
goal: {}
goal_default:
command: ''
constraints: null
lift_height: null
move_group: null
option: null
resource: null
retry: null
speed: null
status: null
target: null
x_distance: null
y_distance: null
handles: {}
placeholder_keys: {}
result:
return_info: return_info
success: success
result: {}
schema:
description: ''
description: pick_and_place 显式参数UniLabJsonCommand
properties:
feedback:
additionalProperties: false
feedback: {}
goal:
properties:
constraints:
items:
type: number
type: array
lift_height:
type: string
move_group:
type: string
option:
type: string
resource:
type: string
retry:
type: string
speed:
type: string
status:
type: string
title: SendCmd_Feedback
type: object
goal:
additionalProperties: false
properties:
command:
target:
type: string
title: SendCmd_Goal
type: object
result:
additionalProperties: false
properties:
return_info:
x_distance:
type: string
success:
type: boolean
title: SendCmd_Result
y_distance:
type: string
required:
- option
- move_group
- status
type: object
result: {}
required:
- goal
title: SendCmd
title: pick_and_place参数
type: object
type: SendCmd
type: UniLabJsonCommand
set_position:
feedback:
status: status

View File

@@ -543,7 +543,12 @@ class BaseROS2DeviceNode(Node, Generic[T]):
}
)
tree_response: SerialCommand.Response = await client.call_async(request)
uuid_maps = json.loads(tree_response.response)
_raw_resp = tree_response.response if tree_response else ""
if _raw_resp:
uuid_maps = json.loads(_raw_resp)
else:
uuid_maps = {}
self._lab_logger.warning("Resource tree add 返回空响应,跳过 UUID 映射")
self.resource_tracker.loop_update_uuid(input_resources, uuid_maps)
self._lab_logger.info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes")
# 这里created_resources不包含parent_resource
@@ -1729,10 +1734,25 @@ class BaseROS2DeviceNode(Node, Generic[T]):
try:
function_args[arg_name] = self._convert_resources_sync(resource_data["uuid"])[0]
except Exception as e:
self.lab_logger().error(
f"转换ResourceSlot参数 {arg_name} 失败: {e}\n{traceback.format_exc()}"
# UUID 在资源树中不存在,尝试从传入的完整 dict 直接构建 PLR 资源
self.lab_logger().warning(
f"UUID查询 {arg_name} 失败,尝试从传入数据直接构建: {e}"
)
raise JsonCommandInitError(f"ResourceSlot参数转换失败: {arg_name}")
try:
fallback_tree = ResourceTreeSet.from_raw_dict_list([resource_data])
if len(fallback_tree.trees) == 0:
raise
plr_list = fallback_tree.to_plr_resources()
if not plr_list:
raise
plr_res = plr_list[0]
figured = self.resource_tracker.figure_resource(plr_res, try_mode=True)
function_args[arg_name] = figured[0] if figured else plr_res
except Exception:
self.lab_logger().error(
f"转换ResourceSlot参数 {arg_name} 失败(含回退): {e}\n{traceback.format_exc()}"
)
raise JsonCommandInitError(f"ResourceSlot参数转换失败: {arg_name}")
# 处理 ResourceSlot 列表
elif isinstance(arg_type, tuple) and len(arg_type) == 2:
@@ -1744,10 +1764,25 @@ class BaseROS2DeviceNode(Node, Generic[T]):
uuids = [r["uuid"] for r in resource_list if isinstance(r, dict) and "id" in r]
function_args[arg_name] = self._convert_resources_sync(*uuids) if uuids else []
except Exception as e:
self.lab_logger().error(
f"转换ResourceSlot列表参数 {arg_name} 失败: {e}\n{traceback.format_exc()}"
self.lab_logger().warning(
f"UUID查询列表 {arg_name} 失败,尝试从传入数据直接构建: {e}"
)
raise JsonCommandInitError(f"ResourceSlot列表参数转换失败: {arg_name}")
try:
dict_items = [r for r in resource_list if isinstance(r, dict) and "id" in r]
fallback_tree = ResourceTreeSet.from_raw_dict_list(dict_items)
if len(fallback_tree.trees) == 0:
raise
plr_list = fallback_tree.to_plr_resources()
resolved = []
for plr_res in plr_list:
figured = self.resource_tracker.figure_resource(plr_res, try_mode=True)
resolved.append(figured[0] if figured else plr_res)
function_args[arg_name] = resolved
except Exception:
self.lab_logger().error(
f"转换ResourceSlot列表参数 {arg_name} 失败(含回退): {e}\n{traceback.format_exc()}"
)
raise JsonCommandInitError(f"ResourceSlot列表参数转换失败: {arg_name}")
# todo: 默认反报送
return function(**function_args)

View File

@@ -28,7 +28,7 @@
"matrix_id": "",
"simulator": false,
"channel_num": 2,
"step_mode": true,
"step_mode": false,
"calibration_points": {
"line_1": [[452.07,21.19], [313.88,21.19], [176.87,21.19], [39.08,21.19]],
"line_2": [[451.37,116.68], [313.28,116.88], [176.58,116.69], [38.58,117.18]],