Compare commits

...

1 Commits

Author SHA1 Message Date
zhangshixiang
a45db46cf1 修改角度读取方式 2026-04-14 15:58:37 +08:00
8 changed files with 254 additions and 201 deletions

View File

@@ -132,10 +132,7 @@ class ResourceVisualization:
new_dev.set("x",str(float(node["position"]["position"]["x"])/1000))
new_dev.set("y",str(float(node["position"]["position"]["y"])/1000))
new_dev.set("z",str(float(node["position"]["position"]["z"])/1000))
if "rotation" in node["config"]:
new_dev.set("rx",str(float(node["config"]["rotation"]["x"])))
new_dev.set("ry",str(float(node["config"]["rotation"]["y"])))
new_dev.set("r",str(float(node["config"]["rotation"]["z"])))
if "pose" in node:
new_dev.set("x",str(float(node["pose"]["position"]["x"])/1000))
new_dev.set("y",str(float(node["pose"]["position"]["y"])/1000))
@@ -143,6 +140,10 @@ class ResourceVisualization:
new_dev.set("rx",str(float(node["pose"]["rotation"]["x"])))
new_dev.set("ry",str(float(node["pose"]["rotation"]["y"])))
new_dev.set("r",str(float(node["pose"]["rotation"]["z"])))
if "rotation" in node["config"]:
new_dev.set("rx",str(float(node["config"]["rotation"]["x"])))
new_dev.set("ry",str(float(node["config"]["rotation"]["y"])))
new_dev.set("r",str(float(node["config"]["rotation"]["z"])))
if "device_config" in node["config"]:
for key, value in node["config"]["device_config"].items():
new_dev.set(key, str(value))

View File

@@ -1,5 +1,31 @@
arm_slider_arm_controller:
ros__parameters:
command_interfaces:
- position
joints:
- arm_slider_arm_base_joint
- arm_slider_arm_link_1_joint
- arm_slider_arm_link_2_joint
- arm_slider_arm_link_3_joint
- arm_slider_gripper_base_joint
state_interfaces:
- position
- velocity
arm_slider_gripper_controller:
ros__parameters:
command_interfaces:
- position
joints:
- arm_slider_gripper_right_joint
state_interfaces:
- position
- velocity
controller_manager:
ros__parameters:
arm_slider_arm_controller:
type: joint_trajectory_controller/JointTrajectoryController
arm_slider_gripper_controller:
type: joint_trajectory_controller/JointTrajectoryController
joint_state_broadcaster:
type: joint_state_broadcaster/JointStateBroadcaster
update_rate: 100

266
unilabos/devices/ros_dev/moveit_interface.py Normal file → Executable file
View File

@@ -2,6 +2,7 @@ import json
import time
from copy import deepcopy
from pathlib import Path
from typing import Optional, Sequence
from moveit_msgs.msg import JointConstraint, Constraints
from rclpy.action import ActionClient
@@ -171,173 +172,160 @@ class MoveitInterface:
return True
def pick_and_place(self, command: str):
def pick_and_place(
self,
option: str,
move_group: str,
status: str,
resource: Optional[str] = None,
x_distance: Optional[float] = None,
y_distance: Optional[float] = None,
lift_height: Optional[float] = None,
retry: Optional[int] = None,
speed: Optional[float] = None,
target: Optional[str] = None,
constraints: Optional[Sequence[float]] = None,
) -> None:
"""
Using MoveIt to make the robotic arm pick or place materials to a target point.
使用 MoveIt 完成抓取/放置等序列pick/place/side_pick/side_place
Args:
command: A JSON-formatted string that includes option, target, speed, lift_height, mt_height
*option (string) : Action type: pick/place/side_pick/side_place
*move_group (string): The move group moveit will plan
*status(string) : Target pose
resource(string) : The target resource
x_distance (float) : The distance to the target in x direction(meters)
y_distance (float) : The distance to the target in y direction(meters)
lift_height (float) : The height at which the material should be lifted(meters)
retry (float) : Retry times when moveit plan fails
speed (float) : The speed of the movement, speed > 0
Returns:
None
必选option, move_group, status。
可选resource, x_distance, y_distance, lift_height, retry, speed, target, constraints。
无返回值;失败时提前 return 或打印异常。
"""
result = SendCmd.Result()
try:
cmd_str = str(command).replace("'", '"')
cmd_dict = json.loads(cmd_str)
if option not in self.move_option:
raise ValueError(f"Invalid option: {option}")
if cmd_dict["option"] in self.move_option:
option_index = self.move_option.index(cmd_dict["option"])
place_flag = option_index % 2
option_index = self.move_option.index(option)
place_flag = option_index % 2
config = {}
function_list = []
config: dict = {"move_group": move_group}
if speed is not None:
config["speed"] = speed
if retry is not None:
config["retry"] = retry
status = cmd_dict["status"]
joint_positions_ = self.joint_poses[cmd_dict["move_group"]][status]
function_list = []
joint_positions_ = self.joint_poses[move_group][status]
config.update({k: cmd_dict[k] for k in ["speed", "retry", "move_group"] if k in cmd_dict})
# 夹取 / 放置:绑定 resource 与 parent
if not place_flag:
if target is not None:
function_list.append(lambda r=resource, t=target: self.resource_manager(r, t))
else:
ee = self.moveit2[move_group].end_effector_name
function_list.append(lambda r=resource: self.resource_manager(r, ee))
else:
function_list.append(lambda r=resource: self.resource_manager(r, "world"))
# 夹取
if not place_flag:
if "target" in cmd_dict.keys():
function_list.append(lambda: self.resource_manager(cmd_dict["resource"], cmd_dict["target"]))
else:
function_list.append(
lambda: self.resource_manager(
cmd_dict["resource"], self.moveit2[cmd_dict["move_group"]].end_effector_name
joint_constraint_msgs: list = []
if constraints is not None:
for i, c in enumerate(constraints):
v = float(c)
if v > 0:
joint_constraint_msgs.append(
JointConstraint(
joint_name=self.moveit2[move_group].joint_names[i],
position=joint_positions_[i],
tolerance_above=v,
tolerance_below=v,
weight=1.0,
)
)
else:
function_list.append(lambda: self.resource_manager(cmd_dict["resource"], "world"))
constraints = []
if "constraints" in cmd_dict.keys():
if lift_height is not None:
retval = None
attempts = config.get("retry", 10)
while retval is None and attempts > 0:
retval = self.moveit2[move_group].compute_fk(joint_positions_)
time.sleep(0.1)
attempts -= 1
if retval is None:
raise ValueError("Failed to compute forward kinematics")
pose = [retval.pose.position.x, retval.pose.position.y, retval.pose.position.z]
quaternion = [
retval.pose.orientation.x,
retval.pose.orientation.y,
retval.pose.orientation.z,
retval.pose.orientation.w,
]
for i in range(len(cmd_dict["constraints"])):
v = float(cmd_dict["constraints"][i])
if v > 0:
constraints.append(
JointConstraint(
joint_name=self.moveit2[cmd_dict["move_group"]].joint_names[i],
position=joint_positions_[i],
tolerance_above=v,
tolerance_below=v,
weight=1.0,
)
)
function_list = [
lambda: self.moveit_task(
position=[retval.pose.position.x, retval.pose.position.y, retval.pose.position.z],
quaternion=quaternion,
**config,
cartesian=self.cartesian_flag,
)
] + function_list
if "lift_height" in cmd_dict.keys():
retval = None
retry = config.get("retry", 10)
while retval is None and retry > 0:
retval = self.moveit2[cmd_dict["move_group"]].compute_fk(joint_positions_)
time.sleep(0.1)
retry -= 1
if retval is None:
result.success = False
return result
pose = [retval.pose.position.x, retval.pose.position.y, retval.pose.position.z]
quaternion = [
retval.pose.orientation.x,
retval.pose.orientation.y,
retval.pose.orientation.z,
retval.pose.orientation.w,
]
pose[2] += float(lift_height)
function_list.append(
lambda p=pose.copy(), q=quaternion, cfg=config: self.moveit_task(
position=p, quaternion=q, **cfg, cartesian=self.cartesian_flag
)
)
end_pose = list(pose)
if x_distance is not None or y_distance is not None:
if x_distance is not None:
deep_pose = deepcopy(pose)
deep_pose[0] += float(x_distance)
elif y_distance is not None:
deep_pose = deepcopy(pose)
deep_pose[1] += float(y_distance)
function_list = [
lambda: self.moveit_task(
position=[retval.pose.position.x, retval.pose.position.y, retval.pose.position.z],
quaternion=quaternion,
**config,
cartesian=self.cartesian_flag,
lambda p=pose.copy(), q=quaternion, cfg=config: self.moveit_task(
position=p, quaternion=q, **cfg, cartesian=self.cartesian_flag
)
] + function_list
pose[2] += float(cmd_dict["lift_height"])
function_list.append(
lambda: self.moveit_task(
position=pose, quaternion=quaternion, **config, cartesian=self.cartesian_flag
lambda dp=deep_pose.copy(), q=quaternion, cfg=config: self.moveit_task(
position=dp, quaternion=q, **cfg, cartesian=self.cartesian_flag
)
)
end_pose = pose
end_pose = list(deep_pose)
if "x_distance" in cmd_dict.keys() or "y_distance" in cmd_dict.keys():
if "x_distance" in cmd_dict.keys():
deep_pose = deepcopy(pose)
deep_pose[0] += float(cmd_dict["x_distance"])
elif "y_distance" in cmd_dict.keys():
deep_pose = deepcopy(pose)
deep_pose[1] += float(cmd_dict["y_distance"])
retval_ik = None
attempts_ik = config.get("retry", 10)
while retval_ik is None and attempts_ik > 0:
retval_ik = self.moveit2[move_group].compute_ik(
position=end_pose,
quat_xyzw=quaternion,
constraints=Constraints(joint_constraints=joint_constraint_msgs),
)
time.sleep(0.1)
attempts_ik -= 1
if retval_ik is None:
raise ValueError("Failed to compute inverse kinematics")
position_ = [
retval_ik.position[retval_ik.name.index(i)] for i in self.moveit2[move_group].joint_names
]
jn = self.moveit2[move_group].joint_names
function_list = [
lambda pos=position_, names=jn, cfg=config: self.moveit_joint_task(
joint_positions=pos, joint_names=names, **cfg
)
] + function_list
else:
function_list = [lambda cfg=config, jp=joint_positions_: self.moveit_joint_task(**cfg, joint_positions=jp)] + function_list
function_list = [
lambda: self.moveit_task(
position=pose, quaternion=quaternion, **config, cartesian=self.cartesian_flag
)
] + function_list
function_list.append(
lambda: self.moveit_task(
position=deep_pose, quaternion=quaternion, **config, cartesian=self.cartesian_flag
)
)
end_pose = deep_pose
retval_ik = None
retry = config.get("retry", 10)
while retval_ik is None and retry > 0:
retval_ik = self.moveit2[cmd_dict["move_group"]].compute_ik(
position=end_pose, quat_xyzw=quaternion, constraints=Constraints(joint_constraints=constraints)
)
time.sleep(0.1)
retry -= 1
if retval_ik is None:
result.success = False
return result
position_ = [
retval_ik.position[retval_ik.name.index(i)]
for i in self.moveit2[cmd_dict["move_group"]].joint_names
]
function_list = [
lambda: self.moveit_joint_task(
joint_positions=position_,
joint_names=self.moveit2[cmd_dict["move_group"]].joint_names,
**config,
)
] + function_list
for i in range(len(function_list)):
if i == 0:
self.cartesian_flag = False
else:
function_list = [
lambda: self.moveit_joint_task(**config, joint_positions=joint_positions_)
] + function_list
self.cartesian_flag = True
for i in range(len(function_list)):
if i == 0:
self.cartesian_flag = False
else:
self.cartesian_flag = True
re = function_list[i]()
if not re:
print(i, re)
result.success = False
return result
result.success = True
re = function_list[i]()
if not re:
print(i, re)
raise ValueError(f"Failed to execute moveit task: {i}")
except Exception as e:
print(e)
self.cartesian_flag = False
result.success = False
return result
raise e
def set_status(self, command: str):
"""

72
unilabos/registry/devices/robot_arm.yaml Normal file → Executable file
View File

@@ -173,48 +173,64 @@ robotic_arm.SCARA_with_slider.moveit.virtual:
type: object
type: UniLabJsonCommand
pick_and_place:
feedback:
status: status
goal:
command: command
feedback: {}
goal: {}
goal_default:
command: ''
constraints: null
lift_height: null
move_group: null
option: null
resource: null
retry: null
speed: null
status: null
target: null
x_distance: null
y_distance: null
handles: {}
placeholder_keys: {}
result:
return_info: return_info
success: success
result: {}
schema:
description: ''
description: pick_and_place 显式参数UniLabJsonCommand
properties:
feedback:
additionalProperties: false
feedback: {}
goal:
properties:
constraints:
items:
type: number
type: array
lift_height:
type: string
move_group:
type: string
option:
type: string
resource:
type: string
retry:
type: string
speed:
type: string
status:
type: string
title: SendCmd_Feedback
type: object
goal:
additionalProperties: false
properties:
command:
target:
type: string
title: SendCmd_Goal
type: object
result:
additionalProperties: false
properties:
return_info:
x_distance:
type: string
success:
type: boolean
title: SendCmd_Result
y_distance:
type: string
required:
- option
- move_group
- status
type: object
result: {}
required:
- goal
title: SendCmd
title: pick_and_place参数
type: object
type: SendCmd
type: UniLabJsonCommand
set_position:
feedback:
status: status

72
unilabos/registry/devices/robot_linear_motion.yaml Normal file → Executable file
View File

@@ -684,48 +684,64 @@ linear_motion.toyo_xyz.sim:
type: object
type: UniLabJsonCommand
pick_and_place:
feedback:
status: status
goal:
command: command
feedback: {}
goal: {}
goal_default:
command: ''
constraints: null
lift_height: null
move_group: null
option: null
resource: null
retry: null
speed: null
status: null
target: null
x_distance: null
y_distance: null
handles: {}
placeholder_keys: {}
result:
return_info: return_info
success: success
result: {}
schema:
description: ''
description: pick_and_place 显式参数UniLabJsonCommand
properties:
feedback:
additionalProperties: false
feedback: {}
goal:
properties:
constraints:
items:
type: number
type: array
lift_height:
type: string
move_group:
type: string
option:
type: string
resource:
type: string
retry:
type: string
speed:
type: string
status:
type: string
title: SendCmd_Feedback
type: object
goal:
additionalProperties: false
properties:
command:
target:
type: string
title: SendCmd_Goal
type: object
result:
additionalProperties: false
properties:
return_info:
x_distance:
type: string
success:
type: boolean
title: SendCmd_Result
y_distance:
type: string
required:
- option
- move_group
- status
type: object
result: {}
required:
- goal
title: SendCmd
title: pick_and_place参数
type: object
type: SendCmd
type: UniLabJsonCommand
set_position:
feedback:
status: status

View File

@@ -593,7 +593,7 @@ def resource_ulab_to_plr(resource: dict, plr_model=False) -> "ResourcePLR":
"size_y": resource["config"].get("size_y", 0),
"size_z": resource["config"].get("size_z", 0),
"location": {**resource["position"], "type": "Coordinate"},
"rotation": {"x": 0, "y": 0, "z": 0, "type": "Rotation"}, # Resource如果没有rotation是plr版本太低
"rotation": {resource["config"].get("rotation", {"x": 0, "y": 0, "z": 0, "type": "Rotation"})}, # Resource如果没有rotation是plr版本太低
"category": resource["type"],
"model": resource["config"].get("model", None), # resource中deck没有model
"children": (

View File

@@ -244,6 +244,12 @@ class ResourceDictInstance(object):
height= content["config"].get("size_y", 0),
depth= content["config"].get("size_z", 0),
)
if "rotation" not in pose:
pose["rotation"] = ResourceDictPositionObjectType(
x=content["config"].get("rotation", {}).get("x", 0),
y=content["config"].get("rotation", {}).get("y", 0),
z=content["config"].get("rotation", {}).get("z", 0),
)
content["pose"] = pose
try:
res_dict = ResourceDict.model_validate(content)

View File

@@ -90,7 +90,7 @@ def main(
device_id="resource_mesh_manager",
device_uuid=str(uuid.uuid4()),
)
joint_republisher = JointRepublisher("joint_republisher", host_node.resource_tracker)
joint_republisher = JointRepublisher("joint_republisher","", host_node.resource_tracker)
# lh_joint_pub = LiquidHandlerJointPublisher(
# resources_config=resources_list, resource_tracker=host_node.resource_tracker
# )