mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-04-26 00:59:58 +00:00
修改角度读取方式
This commit is contained in:
266
unilabos/devices/ros_dev/moveit_interface.py
Normal file → Executable file
266
unilabos/devices/ros_dev/moveit_interface.py
Normal file → Executable file
@@ -2,6 +2,7 @@ import json
|
||||
import time
|
||||
from copy import deepcopy
|
||||
from pathlib import Path
|
||||
from typing import Optional, Sequence
|
||||
|
||||
from moveit_msgs.msg import JointConstraint, Constraints
|
||||
from rclpy.action import ActionClient
|
||||
@@ -171,173 +172,160 @@ class MoveitInterface:
|
||||
|
||||
return True
|
||||
|
||||
def pick_and_place(self, command: str):
|
||||
def pick_and_place(
|
||||
self,
|
||||
option: str,
|
||||
move_group: str,
|
||||
status: str,
|
||||
resource: Optional[str] = None,
|
||||
x_distance: Optional[float] = None,
|
||||
y_distance: Optional[float] = None,
|
||||
lift_height: Optional[float] = None,
|
||||
retry: Optional[int] = None,
|
||||
speed: Optional[float] = None,
|
||||
target: Optional[str] = None,
|
||||
constraints: Optional[Sequence[float]] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Using MoveIt to make the robotic arm pick or place materials to a target point.
|
||||
使用 MoveIt 完成抓取/放置等序列(pick/place/side_pick/side_place)。
|
||||
|
||||
Args:
|
||||
command: A JSON-formatted string that includes option, target, speed, lift_height, mt_height
|
||||
|
||||
*option (string) : Action type: pick/place/side_pick/side_place
|
||||
*move_group (string): The move group moveit will plan
|
||||
*status(string) : Target pose
|
||||
resource(string) : The target resource
|
||||
x_distance (float) : The distance to the target in x direction(meters)
|
||||
y_distance (float) : The distance to the target in y direction(meters)
|
||||
lift_height (float) : The height at which the material should be lifted(meters)
|
||||
retry (float) : Retry times when moveit plan fails
|
||||
speed (float) : The speed of the movement, speed > 0
|
||||
Returns:
|
||||
None
|
||||
必选:option, move_group, status。
|
||||
可选:resource, x_distance, y_distance, lift_height, retry, speed, target, constraints。
|
||||
无返回值;失败时提前 return 或打印异常。
|
||||
"""
|
||||
result = SendCmd.Result()
|
||||
|
||||
try:
|
||||
cmd_str = str(command).replace("'", '"')
|
||||
cmd_dict = json.loads(cmd_str)
|
||||
if option not in self.move_option:
|
||||
raise ValueError(f"Invalid option: {option}")
|
||||
|
||||
if cmd_dict["option"] in self.move_option:
|
||||
option_index = self.move_option.index(cmd_dict["option"])
|
||||
place_flag = option_index % 2
|
||||
option_index = self.move_option.index(option)
|
||||
place_flag = option_index % 2
|
||||
|
||||
config = {}
|
||||
function_list = []
|
||||
config: dict = {"move_group": move_group}
|
||||
if speed is not None:
|
||||
config["speed"] = speed
|
||||
if retry is not None:
|
||||
config["retry"] = retry
|
||||
|
||||
status = cmd_dict["status"]
|
||||
joint_positions_ = self.joint_poses[cmd_dict["move_group"]][status]
|
||||
function_list = []
|
||||
joint_positions_ = self.joint_poses[move_group][status]
|
||||
|
||||
config.update({k: cmd_dict[k] for k in ["speed", "retry", "move_group"] if k in cmd_dict})
|
||||
# 夹取 / 放置:绑定 resource 与 parent
|
||||
if not place_flag:
|
||||
if target is not None:
|
||||
function_list.append(lambda r=resource, t=target: self.resource_manager(r, t))
|
||||
else:
|
||||
ee = self.moveit2[move_group].end_effector_name
|
||||
function_list.append(lambda r=resource: self.resource_manager(r, ee))
|
||||
else:
|
||||
function_list.append(lambda r=resource: self.resource_manager(r, "world"))
|
||||
|
||||
# 夹取
|
||||
if not place_flag:
|
||||
if "target" in cmd_dict.keys():
|
||||
function_list.append(lambda: self.resource_manager(cmd_dict["resource"], cmd_dict["target"]))
|
||||
else:
|
||||
function_list.append(
|
||||
lambda: self.resource_manager(
|
||||
cmd_dict["resource"], self.moveit2[cmd_dict["move_group"]].end_effector_name
|
||||
joint_constraint_msgs: list = []
|
||||
if constraints is not None:
|
||||
for i, c in enumerate(constraints):
|
||||
v = float(c)
|
||||
if v > 0:
|
||||
joint_constraint_msgs.append(
|
||||
JointConstraint(
|
||||
joint_name=self.moveit2[move_group].joint_names[i],
|
||||
position=joint_positions_[i],
|
||||
tolerance_above=v,
|
||||
tolerance_below=v,
|
||||
weight=1.0,
|
||||
)
|
||||
)
|
||||
else:
|
||||
function_list.append(lambda: self.resource_manager(cmd_dict["resource"], "world"))
|
||||
|
||||
constraints = []
|
||||
if "constraints" in cmd_dict.keys():
|
||||
if lift_height is not None:
|
||||
retval = None
|
||||
attempts = config.get("retry", 10)
|
||||
while retval is None and attempts > 0:
|
||||
retval = self.moveit2[move_group].compute_fk(joint_positions_)
|
||||
time.sleep(0.1)
|
||||
attempts -= 1
|
||||
if retval is None:
|
||||
raise ValueError("Failed to compute forward kinematics")
|
||||
pose = [retval.pose.position.x, retval.pose.position.y, retval.pose.position.z]
|
||||
quaternion = [
|
||||
retval.pose.orientation.x,
|
||||
retval.pose.orientation.y,
|
||||
retval.pose.orientation.z,
|
||||
retval.pose.orientation.w,
|
||||
]
|
||||
|
||||
for i in range(len(cmd_dict["constraints"])):
|
||||
v = float(cmd_dict["constraints"][i])
|
||||
if v > 0:
|
||||
constraints.append(
|
||||
JointConstraint(
|
||||
joint_name=self.moveit2[cmd_dict["move_group"]].joint_names[i],
|
||||
position=joint_positions_[i],
|
||||
tolerance_above=v,
|
||||
tolerance_below=v,
|
||||
weight=1.0,
|
||||
)
|
||||
)
|
||||
function_list = [
|
||||
lambda: self.moveit_task(
|
||||
position=[retval.pose.position.x, retval.pose.position.y, retval.pose.position.z],
|
||||
quaternion=quaternion,
|
||||
**config,
|
||||
cartesian=self.cartesian_flag,
|
||||
)
|
||||
] + function_list
|
||||
|
||||
if "lift_height" in cmd_dict.keys():
|
||||
retval = None
|
||||
retry = config.get("retry", 10)
|
||||
while retval is None and retry > 0:
|
||||
retval = self.moveit2[cmd_dict["move_group"]].compute_fk(joint_positions_)
|
||||
time.sleep(0.1)
|
||||
retry -= 1
|
||||
if retval is None:
|
||||
result.success = False
|
||||
return result
|
||||
pose = [retval.pose.position.x, retval.pose.position.y, retval.pose.position.z]
|
||||
quaternion = [
|
||||
retval.pose.orientation.x,
|
||||
retval.pose.orientation.y,
|
||||
retval.pose.orientation.z,
|
||||
retval.pose.orientation.w,
|
||||
]
|
||||
pose[2] += float(lift_height)
|
||||
function_list.append(
|
||||
lambda p=pose.copy(), q=quaternion, cfg=config: self.moveit_task(
|
||||
position=p, quaternion=q, **cfg, cartesian=self.cartesian_flag
|
||||
)
|
||||
)
|
||||
end_pose = list(pose)
|
||||
|
||||
if x_distance is not None or y_distance is not None:
|
||||
if x_distance is not None:
|
||||
deep_pose = deepcopy(pose)
|
||||
deep_pose[0] += float(x_distance)
|
||||
elif y_distance is not None:
|
||||
deep_pose = deepcopy(pose)
|
||||
deep_pose[1] += float(y_distance)
|
||||
|
||||
function_list = [
|
||||
lambda: self.moveit_task(
|
||||
position=[retval.pose.position.x, retval.pose.position.y, retval.pose.position.z],
|
||||
quaternion=quaternion,
|
||||
**config,
|
||||
cartesian=self.cartesian_flag,
|
||||
lambda p=pose.copy(), q=quaternion, cfg=config: self.moveit_task(
|
||||
position=p, quaternion=q, **cfg, cartesian=self.cartesian_flag
|
||||
)
|
||||
] + function_list
|
||||
|
||||
pose[2] += float(cmd_dict["lift_height"])
|
||||
function_list.append(
|
||||
lambda: self.moveit_task(
|
||||
position=pose, quaternion=quaternion, **config, cartesian=self.cartesian_flag
|
||||
lambda dp=deep_pose.copy(), q=quaternion, cfg=config: self.moveit_task(
|
||||
position=dp, quaternion=q, **cfg, cartesian=self.cartesian_flag
|
||||
)
|
||||
)
|
||||
end_pose = pose
|
||||
end_pose = list(deep_pose)
|
||||
|
||||
if "x_distance" in cmd_dict.keys() or "y_distance" in cmd_dict.keys():
|
||||
if "x_distance" in cmd_dict.keys():
|
||||
deep_pose = deepcopy(pose)
|
||||
deep_pose[0] += float(cmd_dict["x_distance"])
|
||||
elif "y_distance" in cmd_dict.keys():
|
||||
deep_pose = deepcopy(pose)
|
||||
deep_pose[1] += float(cmd_dict["y_distance"])
|
||||
retval_ik = None
|
||||
attempts_ik = config.get("retry", 10)
|
||||
while retval_ik is None and attempts_ik > 0:
|
||||
retval_ik = self.moveit2[move_group].compute_ik(
|
||||
position=end_pose,
|
||||
quat_xyzw=quaternion,
|
||||
constraints=Constraints(joint_constraints=joint_constraint_msgs),
|
||||
)
|
||||
time.sleep(0.1)
|
||||
attempts_ik -= 1
|
||||
if retval_ik is None:
|
||||
raise ValueError("Failed to compute inverse kinematics")
|
||||
position_ = [
|
||||
retval_ik.position[retval_ik.name.index(i)] for i in self.moveit2[move_group].joint_names
|
||||
]
|
||||
jn = self.moveit2[move_group].joint_names
|
||||
function_list = [
|
||||
lambda pos=position_, names=jn, cfg=config: self.moveit_joint_task(
|
||||
joint_positions=pos, joint_names=names, **cfg
|
||||
)
|
||||
] + function_list
|
||||
else:
|
||||
function_list = [lambda cfg=config, jp=joint_positions_: self.moveit_joint_task(**cfg, joint_positions=jp)] + function_list
|
||||
|
||||
function_list = [
|
||||
lambda: self.moveit_task(
|
||||
position=pose, quaternion=quaternion, **config, cartesian=self.cartesian_flag
|
||||
)
|
||||
] + function_list
|
||||
function_list.append(
|
||||
lambda: self.moveit_task(
|
||||
position=deep_pose, quaternion=quaternion, **config, cartesian=self.cartesian_flag
|
||||
)
|
||||
)
|
||||
end_pose = deep_pose
|
||||
|
||||
retval_ik = None
|
||||
retry = config.get("retry", 10)
|
||||
while retval_ik is None and retry > 0:
|
||||
retval_ik = self.moveit2[cmd_dict["move_group"]].compute_ik(
|
||||
position=end_pose, quat_xyzw=quaternion, constraints=Constraints(joint_constraints=constraints)
|
||||
)
|
||||
time.sleep(0.1)
|
||||
retry -= 1
|
||||
if retval_ik is None:
|
||||
result.success = False
|
||||
return result
|
||||
position_ = [
|
||||
retval_ik.position[retval_ik.name.index(i)]
|
||||
for i in self.moveit2[cmd_dict["move_group"]].joint_names
|
||||
]
|
||||
function_list = [
|
||||
lambda: self.moveit_joint_task(
|
||||
joint_positions=position_,
|
||||
joint_names=self.moveit2[cmd_dict["move_group"]].joint_names,
|
||||
**config,
|
||||
)
|
||||
] + function_list
|
||||
for i in range(len(function_list)):
|
||||
if i == 0:
|
||||
self.cartesian_flag = False
|
||||
else:
|
||||
function_list = [
|
||||
lambda: self.moveit_joint_task(**config, joint_positions=joint_positions_)
|
||||
] + function_list
|
||||
self.cartesian_flag = True
|
||||
|
||||
for i in range(len(function_list)):
|
||||
if i == 0:
|
||||
self.cartesian_flag = False
|
||||
else:
|
||||
self.cartesian_flag = True
|
||||
|
||||
re = function_list[i]()
|
||||
if not re:
|
||||
print(i, re)
|
||||
result.success = False
|
||||
return result
|
||||
result.success = True
|
||||
re = function_list[i]()
|
||||
if not re:
|
||||
print(i, re)
|
||||
raise ValueError(f"Failed to execute moveit task: {i}")
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
self.cartesian_flag = False
|
||||
result.success = False
|
||||
|
||||
return result
|
||||
raise e
|
||||
|
||||
def set_status(self, command: str):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user