feat: Implement Laiyu liquid handling station with enhanced device control, testing, and documentation.

This commit is contained in:
ZiWei
2026-03-09 18:44:20 +08:00
parent b61c818f7f
commit 92bfb069d5
6 changed files with 521 additions and 801 deletions

View File

@@ -1,9 +1,7 @@
""" """
LaiYu液体处理设备后端模块 LaiYu液体处理设备后端模块
提供设备后端接口和实现
""" """
from .laiyu_backend import LaiYuLiquidBackend, create_laiyu_backend from .laiyu_v_backend import UniLiquidHandlerLaiyuBackend
__all__ = ['LaiYuLiquidBackend', 'create_laiyu_backend'] __all__ = ['UniLiquidHandlerLaiyuBackend']

View File

@@ -1,334 +0,0 @@
"""
LaiYu液体处理设备后端实现
提供设备的后端接口和控制逻辑
"""
import logging
from typing import Dict, Any, Optional, List
from abc import ABC, abstractmethod
# 尝试导入PyLabRobot后端
try:
from pylabrobot.liquid_handling.backends import LiquidHandlerBackend
PYLABROBOT_AVAILABLE = True
except ImportError:
PYLABROBOT_AVAILABLE = False
# 创建模拟后端基类
class LiquidHandlerBackend:
def __init__(self, name: str):
self.name = name
self.is_connected = False
def connect(self):
"""连接设备"""
pass
def disconnect(self):
"""断开连接"""
pass
class LaiYuLiquidBackend(LiquidHandlerBackend):
"""LaiYu液体处理设备后端"""
def __init__(self, name: str = "LaiYu_Liquid_Backend"):
"""
初始化LaiYu液体处理设备后端
Args:
name: 后端名称
"""
if PYLABROBOT_AVAILABLE:
# PyLabRobot 的 LiquidHandlerBackend 不接受参数
super().__init__()
else:
# 模拟版本接受 name 参数
super().__init__(name)
self.name = name
self.logger = logging.getLogger(__name__)
self.is_connected = False
self.device_info = {
"name": "LaiYu液体处理设备",
"version": "1.0.0",
"manufacturer": "LaiYu",
"model": "LaiYu_Liquid_Handler"
}
def connect(self) -> bool:
"""
连接到LaiYu液体处理设备
Returns:
bool: 连接是否成功
"""
try:
self.logger.info("正在连接到LaiYu液体处理设备...")
# 这里应该实现实际的设备连接逻辑
# 目前返回模拟连接成功
self.is_connected = True
self.logger.info("成功连接到LaiYu液体处理设备")
return True
except Exception as e:
self.logger.error(f"连接LaiYu液体处理设备失败: {e}")
self.is_connected = False
return False
def disconnect(self) -> bool:
"""
断开与LaiYu液体处理设备的连接
Returns:
bool: 断开连接是否成功
"""
try:
self.logger.info("正在断开与LaiYu液体处理设备的连接...")
# 这里应该实现实际的设备断开连接逻辑
self.is_connected = False
self.logger.info("成功断开与LaiYu液体处理设备的连接")
return True
except Exception as e:
self.logger.error(f"断开LaiYu液体处理设备连接失败: {e}")
return False
def is_device_connected(self) -> bool:
"""
检查设备是否已连接
Returns:
bool: 设备是否已连接
"""
return self.is_connected
def get_device_info(self) -> Dict[str, Any]:
"""
获取设备信息
Returns:
Dict[str, Any]: 设备信息字典
"""
return self.device_info.copy()
def home_device(self) -> bool:
"""
设备归零操作
Returns:
bool: 归零是否成功
"""
if not self.is_connected:
self.logger.error("设备未连接,无法执行归零操作")
return False
try:
self.logger.info("正在执行设备归零操作...")
# 这里应该实现实际的设备归零逻辑
self.logger.info("设备归零操作完成")
return True
except Exception as e:
self.logger.error(f"设备归零操作失败: {e}")
return False
def aspirate(self, volume: float, location: Dict[str, Any]) -> bool:
"""
吸液操作
Args:
volume: 吸液体积 (微升)
location: 吸液位置信息
Returns:
bool: 吸液是否成功
"""
if not self.is_connected:
self.logger.error("设备未连接,无法执行吸液操作")
return False
try:
self.logger.info(f"正在执行吸液操作: 体积={volume}μL, 位置={location}")
# 这里应该实现实际的吸液逻辑
self.logger.info("吸液操作完成")
return True
except Exception as e:
self.logger.error(f"吸液操作失败: {e}")
return False
def dispense(self, volume: float, location: Dict[str, Any]) -> bool:
"""
排液操作
Args:
volume: 排液体积 (微升)
location: 排液位置信息
Returns:
bool: 排液是否成功
"""
if not self.is_connected:
self.logger.error("设备未连接,无法执行排液操作")
return False
try:
self.logger.info(f"正在执行排液操作: 体积={volume}μL, 位置={location}")
# 这里应该实现实际的排液逻辑
self.logger.info("排液操作完成")
return True
except Exception as e:
self.logger.error(f"排液操作失败: {e}")
return False
def pick_up_tip(self, location: Dict[str, Any]) -> bool:
"""
取枪头操作
Args:
location: 枪头位置信息
Returns:
bool: 取枪头是否成功
"""
if not self.is_connected:
self.logger.error("设备未连接,无法执行取枪头操作")
return False
try:
self.logger.info(f"正在执行取枪头操作: 位置={location}")
# 这里应该实现实际的取枪头逻辑
self.logger.info("取枪头操作完成")
return True
except Exception as e:
self.logger.error(f"取枪头操作失败: {e}")
return False
def drop_tip(self, location: Dict[str, Any]) -> bool:
"""
丢弃枪头操作
Args:
location: 丢弃位置信息
Returns:
bool: 丢弃枪头是否成功
"""
if not self.is_connected:
self.logger.error("设备未连接,无法执行丢弃枪头操作")
return False
try:
self.logger.info(f"正在执行丢弃枪头操作: 位置={location}")
# 这里应该实现实际的丢弃枪头逻辑
self.logger.info("丢弃枪头操作完成")
return True
except Exception as e:
self.logger.error(f"丢弃枪头操作失败: {e}")
return False
def move_to(self, location: Dict[str, Any]) -> bool:
"""
移动到指定位置
Args:
location: 目标位置信息
Returns:
bool: 移动是否成功
"""
if not self.is_connected:
self.logger.error("设备未连接,无法执行移动操作")
return False
try:
self.logger.info(f"正在移动到位置: {location}")
# 这里应该实现实际的移动逻辑
self.logger.info("移动操作完成")
return True
except Exception as e:
self.logger.error(f"移动操作失败: {e}")
return False
def get_status(self) -> Dict[str, Any]:
"""
获取设备状态
Returns:
Dict[str, Any]: 设备状态信息
"""
return {
"connected": self.is_connected,
"device_info": self.device_info,
"status": "ready" if self.is_connected else "disconnected"
}
# PyLabRobot 抽象方法实现
def stop(self):
"""停止所有操作"""
self.logger.info("停止所有操作")
pass
@property
def num_channels(self) -> int:
"""返回通道数量"""
return 1 # 单通道移液器
def can_pick_up_tip(self, tip_rack, tip_position) -> bool:
"""检查是否可以拾取吸头"""
return True # 简化实现总是返回True
def pick_up_tips(self, tip_rack, tip_positions):
"""拾取多个吸头"""
self.logger.info(f"拾取吸头: {tip_positions}")
pass
def drop_tips(self, tip_rack, tip_positions):
"""丢弃多个吸头"""
self.logger.info(f"丢弃吸头: {tip_positions}")
pass
def pick_up_tips96(self, tip_rack):
"""拾取96个吸头"""
self.logger.info("拾取96个吸头")
pass
def drop_tips96(self, tip_rack):
"""丢弃96个吸头"""
self.logger.info("丢弃96个吸头")
pass
def aspirate96(self, volume, plate, well_positions):
"""96通道吸液"""
self.logger.info(f"96通道吸液: 体积={volume}")
pass
def dispense96(self, volume, plate, well_positions):
"""96通道排液"""
self.logger.info(f"96通道排液: 体积={volume}")
pass
def pick_up_resource(self, resource, location):
"""拾取资源"""
self.logger.info(f"拾取资源: {resource}")
pass
def drop_resource(self, resource, location):
"""放置资源"""
self.logger.info(f"放置资源: {resource}")
pass
def move_picked_up_resource(self, resource, location):
"""移动已拾取的资源"""
self.logger.info(f"移动资源: {resource}{location}")
pass
def create_laiyu_backend(name: str = "LaiYu_Liquid_Backend") -> LaiYuLiquidBackend:
"""
创建LaiYu液体处理设备后端实例
Args:
name: 后端名称
Returns:
LaiYuLiquidBackend: 后端实例
"""
return LaiYuLiquidBackend(name)

View File

@@ -1,385 +1,307 @@
"""LaiYu PLR 后端 — 对齐路径 B 硬件交互模式
import json
硬件初始化顺序与 laiyu_liquid_station.py (路径 B) 一致:
1. XYZController(auto_connect=True) — 先开串口
2. PipetteController.connect_shared() — 共享 XYZ 的串口 / 锁
3. home_all_axes() + pipette.initialize()
"""
import logging
from typing import List, Optional, Union from typing import List, Optional, Union
from pylabrobot.liquid_handling.backends.backend import ( from pylabrobot.liquid_handling.backends.backend import LiquidHandlerBackend
LiquidHandlerBackend,
)
from pylabrobot.liquid_handling.standard import ( from pylabrobot.liquid_handling.standard import (
Drop, Drop,
DropTipRack, DropTipRack,
MultiHeadAspirationContainer, MultiHeadAspirationContainer,
MultiHeadAspirationPlate, MultiHeadAspirationPlate,
MultiHeadDispenseContainer, MultiHeadDispenseContainer,
MultiHeadDispensePlate, MultiHeadDispensePlate,
Pickup, Pickup,
PickupTipRack, PickupTipRack,
ResourceDrop, ResourceDrop,
ResourceMove, ResourceMove,
ResourcePickup, ResourcePickup,
SingleChannelAspiration, SingleChannelAspiration,
SingleChannelDispense, SingleChannelDispense,
) )
from pylabrobot.resources import Resource, Tip from pylabrobot.resources import Resource, Tip
import rclpy from unilabos.devices.liquid_handling.laiyu.controllers.xyz_controller import XYZController
from rclpy.node import Node from unilabos.devices.liquid_handling.laiyu.controllers.pipette_controller import (
from sensor_msgs.msg import JointState PipetteController,
import time TipStatus,
from rclpy.action import ActionClient )
from unilabos_msgs.action import SendCmd
import re
from unilabos.devices.ros_dev.liquid_handler_joint_publisher import JointStatePublisher logger = logging.getLogger(__name__)
from unilabos.devices.liquid_handling.laiyu.controllers.pipette_controller import PipetteController, TipStatus
class UniLiquidHandlerLaiyuBackend(LiquidHandlerBackend): class UniLiquidHandlerLaiyuBackend(LiquidHandlerBackend):
"""Chatter box backend for device-free testing. Prints out all operations.""" """LaiYu 硬件后端 — PLR Backend 接口实现"""
_pip_length = 5 def __init__(
_vol_length = 8 self,
_resource_length = 20 num_channels: int = 1,
_offset_length = 16 tip_length: float = 0,
_flow_rate_length = 10 total_height: float = 310,
_blowout_length = 10 port: str = "/dev/ttyUSB0",
_lld_z_length = 10 baudrate: int = 115200,
_kwargs_length = 15 pipette_address: int = 4,
_tip_type_length = 12 ):
_max_volume_length = 16 super().__init__()
_fitting_depth_length = 20 self._num_channels = num_channels
_tip_length_length = 16 self.tip_length = tip_length
# _pickup_method_length = 20 self.total_height = total_height
_filter_length = 10
def __init__(self, num_channels: int = 8 , tip_length: float = 0 , total_height: float = 310, port: str = "/dev/ttyUSB0"): # 保存配置,延迟到 setup() 再创建硬件对象
"""Initialize a chatter box backend.""" self._port = port
super().__init__() self._baudrate = baudrate
self._num_channels = num_channels self._pipette_address = pipette_address
self.tip_length = tip_length
self.total_height = total_height
# rclpy.init()
if not rclpy.ok():
rclpy.init()
self.joint_state_publisher = None
self.hardware_interface = PipetteController(port=port)
async def setup(self): self._xyz: Optional[XYZController] = None
# self.joint_state_publisher = JointStatePublisher() self._pipette_ctrl: Optional[PipetteController] = None
# self.hardware_interface.xyz_controller.connect_device() self._ros_node = None
# self.hardware_interface.xyz_controller.home_all_axes()
await super().setup()
self.hardware_interface.connect()
self.hardware_interface.initialize()
print("Setting up the liquid handler.") # ------------------------------------------------------------------ lifecycle
async def stop(self): def post_init(self, ros_node):
print("Stopping the liquid handler.") """接收 ROS 节点引用(由 Handler.post_init 调用)"""
self._ros_node = ros_node
def serialize(self) -> dict: async def setup(self):
return {**super().serialize(), "num_channels": self.num_channels} """按路径 B 顺序初始化硬件"""
await super().setup()
def pipette_aspirate(self, volume: float, flow_rate: float): # 1. XYZ 先开串口
self._xyz = XYZController(
port=self._port,
baudrate=self._baudrate,
auto_connect=True,
)
if not self._xyz.is_connected:
raise RuntimeError("XYZ 控制器连接失败")
self.hardware_interface.pipette.set_max_speed(flow_rate) # 2. PipetteController 共享 XYZ 串口
res = self.hardware_interface.pipette.aspirate(volume=volume) self._pipette_ctrl = PipetteController(
port=self._port,
if not res: address=self._pipette_address,
self.hardware_interface.logger.error("吸取失败,当前体积: {self.hardware_interface.current_volume}") )
return self._pipette_ctrl.connect_shared(
serial_conn=self._xyz.serial_conn,
serial_lock=self._xyz.serial_lock,
xyz_controller=self._xyz,
)
self.hardware_interface.current_volume += volume # 3. 回零 + 移液器初始化
self._xyz.home_all_axes()
self._pipette_ctrl.initialize()
def pipette_dispense(self, volume: float, flow_rate: float): logger.info("LaiYu 后端硬件初始化完成")
self.hardware_interface.pipette.set_max_speed(flow_rate) async def stop(self):
res = self.hardware_interface.pipette.dispense(volume=volume) """正确断开硬件"""
if not res: try:
self.hardware_interface.logger.error("排液失败,当前体积: {self.hardware_interface.current_volume}") if self._pipette_ctrl:
return self._pipette_ctrl.disconnect_shared()
self.hardware_interface.current_volume -= volume if self._xyz:
self._xyz.disconnect()
logger.info("LaiYu 后端硬件已断开")
except Exception as e:
logger.error(f"停止后端失败: {e}")
@property # ------------------------------------------------------------------ helpers
def num_channels(self) -> int:
return self._num_channels
async def assigned_resource_callback(self, resource: Resource): def _plr_to_machine_coords(self, resource, offset):
print(f"Resource {resource.name} was assigned to the liquid handler.") """PLR Resource 坐标 → 机器坐标 (倒置龙门架: total_height - z, -y)"""
coordinate = resource.get_absolute_location(x="c", y="c")
x = coordinate.x + offset.x
y = coordinate.y + offset.y
z_plr = coordinate.z + offset.z
return x, -y, self.total_height - (z_plr + self.tip_length)
async def unassigned_resource_callback(self, name: str): def _pipette_aspirate(self, volume: float, flow_rate: float):
print(f"Resource {name} was unassigned from the liquid handler.") self._pipette_ctrl.pipette.set_max_speed(flow_rate)
res = self._pipette_ctrl.pipette.aspirate(volume=volume)
if not res:
logger.error(f"吸取失败,当前体积: {self._pipette_ctrl.current_volume}")
return
self._pipette_ctrl.current_volume += volume
async def pick_up_tips(self, ops: List[Pickup], use_channels: List[int], **backend_kwargs): def _pipette_dispense(self, volume: float, flow_rate: float):
print("Picking up tips:") self._pipette_ctrl.pipette.set_max_speed(flow_rate)
# print(ops.tip) res = self._pipette_ctrl.pipette.dispense(volume=volume)
header = ( if not res:
f"{'pip#':<{UniLiquidHandlerLaiyuBackend._pip_length}} " logger.error(f"排液失败,当前体积: {self._pipette_ctrl.current_volume}")
f"{'resource':<{UniLiquidHandlerLaiyuBackend._resource_length}} " return
f"{'offset':<{UniLiquidHandlerLaiyuBackend._offset_length}} " self._pipette_ctrl.current_volume -= volume
f"{'tip type':<{UniLiquidHandlerLaiyuBackend._tip_type_length}} "
f"{'max volume (µL)':<{UniLiquidHandlerLaiyuBackend._max_volume_length}} "
f"{'fitting depth (mm)':<{UniLiquidHandlerLaiyuBackend._fitting_depth_length}} "
f"{'tip length (mm)':<{UniLiquidHandlerLaiyuBackend._tip_length_length}} "
# f"{'pickup method':<{ChatterboxBackend._pickup_method_length}} "
f"{'filter':<{UniLiquidHandlerLaiyuBackend._filter_length}}"
)
# print(header)
for op, channel in zip(ops, use_channels): # ------------------------------------------------------------------ properties
offset = f"{round(op.offset.x, 1)},{round(op.offset.y, 1)},{round(op.offset.z, 1)}"
row = (
f" p{channel}: "
f"{op.resource.name[-30:]:<{UniLiquidHandlerLaiyuBackend._resource_length}} "
f"{offset:<{UniLiquidHandlerLaiyuBackend._offset_length}} "
f"{op.tip.__class__.__name__:<{UniLiquidHandlerLaiyuBackend._tip_type_length}} "
f"{op.tip.maximal_volume:<{UniLiquidHandlerLaiyuBackend._max_volume_length}} "
f"{op.tip.fitting_depth:<{UniLiquidHandlerLaiyuBackend._fitting_depth_length}} "
f"{op.tip.total_tip_length:<{UniLiquidHandlerLaiyuBackend._tip_length_length}} "
# f"{str(op.tip.pickup_method)[-20:]:<{ChatterboxBackend._pickup_method_length}} "
f"{'Yes' if op.tip.has_filter else 'No':<{UniLiquidHandlerLaiyuBackend._filter_length}}"
)
# print(row)
# print(op.resource.get_absolute_location())
self.tip_length = ops[0].tip.total_tip_length
coordinate = ops[0].resource.get_absolute_location(x="c",y="c")
offset_xyz = ops[0].offset
x = coordinate.x + offset_xyz.x
y = coordinate.y + offset_xyz.y
z = self.total_height - (coordinate.z + self.tip_length) + offset_xyz.z
# print("moving")
self.hardware_interface._update_tip_status()
if self.hardware_interface.tip_status == TipStatus.TIP_ATTACHED:
print("已有枪头,无需重复拾取")
return
self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200)
self.hardware_interface.xyz_controller.move_to_work_coord_safe(z=self.hardware_interface.xyz_controller.machine_config.safe_z_height,speed=100)
# self.joint_state_publisher.send_resource_action(ops[0].resource.name, x, y, z, "pick",channels=use_channels)
# goback()
def serialize(self) -> dict:
return {**super().serialize(), "num_channels": self.num_channels}
@property
def num_channels(self) -> int:
return self._num_channels
# ------------------------------------------------------------------ resource callbacks
async def drop_tips(self, ops: List[Drop], use_channels: List[int], **backend_kwargs): async def assigned_resource_callback(self, resource: Resource):
print("Dropping tips:") logger.info(f"Resource {resource.name} was assigned to the liquid handler.")
header = (
f"{'pip#':<{UniLiquidHandlerLaiyuBackend._pip_length}} "
f"{'resource':<{UniLiquidHandlerLaiyuBackend._resource_length}} "
f"{'offset':<{UniLiquidHandlerLaiyuBackend._offset_length}} "
f"{'tip type':<{UniLiquidHandlerLaiyuBackend._tip_type_length}} "
f"{'max volume (µL)':<{UniLiquidHandlerLaiyuBackend._max_volume_length}} "
f"{'fitting depth (mm)':<{UniLiquidHandlerLaiyuBackend._fitting_depth_length}} "
f"{'tip length (mm)':<{UniLiquidHandlerLaiyuBackend._tip_length_length}} "
# f"{'pickup method':<{ChatterboxBackend._pickup_method_length}} "
f"{'filter':<{UniLiquidHandlerLaiyuBackend._filter_length}}"
)
# print(header)
for op, channel in zip(ops, use_channels): async def unassigned_resource_callback(self, name: str):
offset = f"{round(op.offset.x, 1)},{round(op.offset.y, 1)},{round(op.offset.z, 1)}" logger.info(f"Resource {name} was unassigned from the liquid handler.")
row = (
f" p{channel}: "
f"{op.resource.name[-30:]:<{UniLiquidHandlerLaiyuBackend._resource_length}} "
f"{offset:<{UniLiquidHandlerLaiyuBackend._offset_length}} "
f"{op.tip.__class__.__name__:<{UniLiquidHandlerLaiyuBackend._tip_type_length}} "
f"{op.tip.maximal_volume:<{UniLiquidHandlerLaiyuBackend._max_volume_length}} "
f"{op.tip.fitting_depth:<{UniLiquidHandlerLaiyuBackend._fitting_depth_length}} "
f"{op.tip.total_tip_length:<{UniLiquidHandlerLaiyuBackend._tip_length_length}} "
# f"{str(op.tip.pickup_method)[-20:]:<{ChatterboxBackend._pickup_method_length}} "
f"{'Yes' if op.tip.has_filter else 'No':<{UniLiquidHandlerLaiyuBackend._filter_length}}"
)
# print(row)
coordinate = ops[0].resource.get_absolute_location(x="c",y="c") # ------------------------------------------------------------------ pick_up_tips
offset_xyz = ops[0].offset
x = coordinate.x + offset_xyz.x
y = coordinate.y + offset_xyz.y
z = self.total_height - (coordinate.z + self.tip_length) + offset_xyz.z -20
# print(x, y, z)
# print("moving")
self.hardware_interface._update_tip_status()
if self.hardware_interface.tip_status == TipStatus.NO_TIP:
print("无枪头,无需丢弃")
return
self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200)
self.hardware_interface.eject_tip
self.hardware_interface.xyz_controller.move_to_work_coord_safe(z=self.hardware_interface.xyz_controller.machine_config.safe_z_height)
async def aspirate( async def pick_up_tips(self, ops: List[Pickup], use_channels: List[int], **backend_kwargs):
self, tip = ops[0].tip
ops: List[SingleChannelAspiration], self.tip_length = tip.total_tip_length
use_channels: List[int], x, y, z_top = self._plr_to_machine_coords(ops[0].resource, ops[0].offset)
**backend_kwargs,
):
print("Aspirating:")
header = (
f"{'pip#':<{UniLiquidHandlerLaiyuBackend._pip_length}} "
f"{'vol(ul)':<{UniLiquidHandlerLaiyuBackend._vol_length}} "
f"{'resource':<{UniLiquidHandlerLaiyuBackend._resource_length}} "
f"{'offset':<{UniLiquidHandlerLaiyuBackend._offset_length}} "
f"{'flow rate':<{UniLiquidHandlerLaiyuBackend._flow_rate_length}} "
f"{'blowout':<{UniLiquidHandlerLaiyuBackend._blowout_length}} "
f"{'lld_z':<{UniLiquidHandlerLaiyuBackend._lld_z_length}} "
# f"{'liquids':<20}" # TODO: add liquids
)
for key in backend_kwargs:
header += f"{key:<{UniLiquidHandlerLaiyuBackend._kwargs_length}} "[-16:]
# print(header)
for o, p in zip(ops, use_channels): self._pipette_ctrl._update_tip_status()
offset = f"{round(o.offset.x, 1)},{round(o.offset.y, 1)},{round(o.offset.z, 1)}" if self._pipette_ctrl.tip_status == TipStatus.TIP_ATTACHED:
row = ( logger.warning("已有枪头,无需重复拾取")
f" p{p}: " return
f"{o.volume:<{UniLiquidHandlerLaiyuBackend._vol_length}} "
f"{o.resource.name[-20:]:<{UniLiquidHandlerLaiyuBackend._resource_length}} "
f"{offset:<{UniLiquidHandlerLaiyuBackend._offset_length}} "
f"{str(o.flow_rate):<{UniLiquidHandlerLaiyuBackend._flow_rate_length}} "
f"{str(o.blow_out_air_volume):<{UniLiquidHandlerLaiyuBackend._blowout_length}} "
f"{str(o.liquid_height):<{UniLiquidHandlerLaiyuBackend._lld_z_length}} "
# f"{o.liquids if o.liquids is not None else 'none'}"
)
for key, value in backend_kwargs.items():
if isinstance(value, list) and all(isinstance(v, bool) for v in value):
value = "".join("T" if v else "F" for v in value)
if isinstance(value, list):
value = "".join(map(str, value))
row += f" {value:<15}"
# print(row)
coordinate = ops[0].resource.get_absolute_location(x="c",y="c")
offset_xyz = ops[0].offset
x = coordinate.x + offset_xyz.x
y = coordinate.y + offset_xyz.y
z = self.total_height - (coordinate.z + self.tip_length) + offset_xyz.z
# print(x, y, z)
# print("moving")
# 判断枪头是否存在 try:
self.hardware_interface._update_tip_status() # 1. 移到枪头正上方
if not self.hardware_interface.tip_status == TipStatus.TIP_ATTACHED: self._xyz.move_to_work_coord_safe(x=x, y=y, z=z_top, speed=200)
print("无枪头,无法吸液") # 2. 下压到套枪头深度fitting_depth 是枪头套入长度)
return z_pickup = z_top + tip.fitting_depth
# 判断吸液量是否超过枪头容量 self._xyz.move_to_work_coord_safe(z=z_pickup, speed=100)
flow_rate = backend_kwargs["flow_rate"] if "flow_rate" in backend_kwargs else 500 # 3. 退回安全高度
blow_out_air_volume = backend_kwargs["blow_out_air_volume"] if "blow_out_air_volume" in backend_kwargs else 0 self._xyz.move_to_work_coord_safe(
if self.hardware_interface.current_volume + ops[0].volume + blow_out_air_volume > self.hardware_interface.max_volume: z=self._xyz.machine_config.safe_z_height, speed=100
self.hardware_interface.logger.error(f"吸液量超过枪头容量: {self.hardware_interface.current_volume + ops[0].volume} > {self.hardware_interface.max_volume}") )
return except Exception as e:
logger.error(f"pick_up_tips 移动失败: {e}")
raise
# 移动到吸液位置 # ------------------------------------------------------------------ drop_tips
self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200)
self.pipette_aspirate(volume=ops[0].volume, flow_rate=flow_rate)
async def drop_tips(self, ops: List[Drop], use_channels: List[int], **backend_kwargs):
x, y, z = self._plr_to_machine_coords(ops[0].resource, ops[0].offset)
z -= 20 # 额外下移补偿
self.hardware_interface.xyz_controller.move_to_work_coord_safe(z=self.hardware_interface.xyz_controller.machine_config.safe_z_height) self._pipette_ctrl._update_tip_status()
if blow_out_air_volume >0: if self._pipette_ctrl.tip_status == TipStatus.NO_TIP:
self.pipette_aspirate(volume=blow_out_air_volume, flow_rate=flow_rate) logger.warning("无枪头,无需丢弃")
return
try:
self._xyz.move_to_work_coord_safe(x=x, y=y, z=z, speed=200)
self._pipette_ctrl.eject_tip() # 修复: 原来缺少 ()
self._xyz.move_to_work_coord_safe(
z=self._xyz.machine_config.safe_z_height
)
except Exception as e:
logger.error(f"drop_tips 失败: {e}")
raise
# ------------------------------------------------------------------ aspirate
async def aspirate(
self,
ops: List[SingleChannelAspiration],
use_channels: List[int],
**backend_kwargs,
):
x, y, z = self._plr_to_machine_coords(ops[0].resource, ops[0].offset)
async def dispense( self._pipette_ctrl._update_tip_status()
self, if self._pipette_ctrl.tip_status != TipStatus.TIP_ATTACHED:
ops: List[SingleChannelDispense], raise RuntimeError("无枪头,无法吸液")
use_channels: List[int],
**backend_kwargs,
):
# print("Dispensing:")
header = (
f"{'pip#':<{UniLiquidHandlerLaiyuBackend._pip_length}} "
f"{'vol(ul)':<{UniLiquidHandlerLaiyuBackend._vol_length}} "
f"{'resource':<{UniLiquidHandlerLaiyuBackend._resource_length}} "
f"{'offset':<{UniLiquidHandlerLaiyuBackend._offset_length}} "
f"{'flow rate':<{UniLiquidHandlerLaiyuBackend._flow_rate_length}} "
f"{'blowout':<{UniLiquidHandlerLaiyuBackend._blowout_length}} "
f"{'lld_z':<{UniLiquidHandlerLaiyuBackend._lld_z_length}} "
# f"{'liquids':<20}" # TODO: add liquids
)
for key in backend_kwargs:
header += f"{key:<{UniLiquidHandlerLaiyuBackend._kwargs_length}} "[-16:]
# print(header)
for o, p in zip(ops, use_channels): flow_rate = backend_kwargs.get("flow_rate", 500)
offset = f"{round(o.offset.x, 1)},{round(o.offset.y, 1)},{round(o.offset.z, 1)}" blow_out_air_volume = backend_kwargs.get("blow_out_air_volume", 0)
row = (
f" p{p}: "
f"{o.volume:<{UniLiquidHandlerLaiyuBackend._vol_length}} "
f"{o.resource.name[-20:]:<{UniLiquidHandlerLaiyuBackend._resource_length}} "
f"{offset:<{UniLiquidHandlerLaiyuBackend._offset_length}} "
f"{str(o.flow_rate):<{UniLiquidHandlerLaiyuBackend._flow_rate_length}} "
f"{str(o.blow_out_air_volume):<{UniLiquidHandlerLaiyuBackend._blowout_length}} "
f"{str(o.liquid_height):<{UniLiquidHandlerLaiyuBackend._lld_z_length}} "
# f"{o.liquids if o.liquids is not None else 'none'}"
)
for key, value in backend_kwargs.items():
if isinstance(value, list) and all(isinstance(v, bool) for v in value):
value = "".join("T" if v else "F" for v in value)
if isinstance(value, list):
value = "".join(map(str, value))
row += f" {value:<{UniLiquidHandlerLaiyuBackend._kwargs_length}}"
# print(row)
coordinate = ops[0].resource.get_absolute_location(x="c",y="c")
offset_xyz = ops[0].offset
x = coordinate.x + offset_xyz.x
y = coordinate.y + offset_xyz.y
z = self.total_height - (coordinate.z + self.tip_length) + offset_xyz.z
# print(x, y, z)
# print("moving")
# 判断枪头是否存在 if (
self.hardware_interface._update_tip_status() self._pipette_ctrl.current_volume + ops[0].volume + blow_out_air_volume
if not self.hardware_interface.tip_status == TipStatus.TIP_ATTACHED: > self._pipette_ctrl.max_volume
print("无枪头,无法排液") ):
return raise RuntimeError(
# 判断排液量是否超过枪头容量 f"吸液量超过枪头容量: "
flow_rate = backend_kwargs["flow_rate"] if "flow_rate" in backend_kwargs else 500 f"{self._pipette_ctrl.current_volume + ops[0].volume} > {self._pipette_ctrl.max_volume}"
blow_out_air_volume = backend_kwargs["blow_out_air_volume"] if "blow_out_air_volume" in backend_kwargs else 0 )
if self.hardware_interface.current_volume - ops[0].volume - blow_out_air_volume < 0:
self.hardware_interface.logger.error(f"排液量超过枪头容量: {self.hardware_interface.current_volume - ops[0].volume - blow_out_air_volume} < 0")
return
self._xyz.move_to_work_coord_safe(x=x, y=y, z=z, speed=200)
# 移动到排液位置 self._pipette_aspirate(volume=ops[0].volume, flow_rate=flow_rate)
self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200)
self.pipette_dispense(volume=ops[0].volume, flow_rate=flow_rate)
self._xyz.move_to_work_coord_safe(
z=self._xyz.machine_config.safe_z_height
)
if blow_out_air_volume > 0:
self._pipette_aspirate(volume=blow_out_air_volume, flow_rate=flow_rate)
self.hardware_interface.xyz_controller.move_to_work_coord_safe(z=self.hardware_interface.xyz_controller.machine_config.safe_z_height) # ------------------------------------------------------------------ dispense
if blow_out_air_volume > 0:
self.pipette_dispense(volume=blow_out_air_volume, flow_rate=flow_rate)
# self.joint_state_publisher.send_resource_action(ops[0].resource.name, x, y, z, "",channels=use_channels)
async def pick_up_tips96(self, pickup: PickupTipRack, **backend_kwargs): async def dispense(
print(f"Picking up tips from {pickup.resource.name}.") self,
ops: List[SingleChannelDispense],
use_channels: List[int],
**backend_kwargs,
):
x, y, z = self._plr_to_machine_coords(ops[0].resource, ops[0].offset)
async def drop_tips96(self, drop: DropTipRack, **backend_kwargs): self._pipette_ctrl._update_tip_status()
print(f"Dropping tips to {drop.resource.name}.") if self._pipette_ctrl.tip_status != TipStatus.TIP_ATTACHED:
raise RuntimeError("无枪头,无法排液")
async def aspirate96( flow_rate = backend_kwargs.get("flow_rate", 500)
self, aspiration: Union[MultiHeadAspirationPlate, MultiHeadAspirationContainer] blow_out_air_volume = backend_kwargs.get("blow_out_air_volume", 0)
):
if isinstance(aspiration, MultiHeadAspirationPlate):
resource = aspiration.wells[0].parent
else:
resource = aspiration.container
print(f"Aspirating {aspiration.volume} from {resource}.")
async def dispense96(self, dispense: Union[MultiHeadDispensePlate, MultiHeadDispenseContainer]): if (
if isinstance(dispense, MultiHeadDispensePlate): self._pipette_ctrl.current_volume - ops[0].volume - blow_out_air_volume < 0
resource = dispense.wells[0].parent ):
else: raise RuntimeError(
resource = dispense.container f"排液量超过当前体积: "
print(f"Dispensing {dispense.volume} to {resource}.") f"{self._pipette_ctrl.current_volume - ops[0].volume - blow_out_air_volume} < 0"
)
async def pick_up_resource(self, pickup: ResourcePickup): self._xyz.move_to_work_coord_safe(x=x, y=y, z=z, speed=200)
print(f"Picking up resource: {pickup}") self._pipette_dispense(volume=ops[0].volume, flow_rate=flow_rate)
async def move_picked_up_resource(self, move: ResourceMove): self._xyz.move_to_work_coord_safe(
print(f"Moving picked up resource: {move}") z=self._xyz.machine_config.safe_z_height
)
if blow_out_air_volume > 0:
self._pipette_dispense(volume=blow_out_air_volume, flow_rate=flow_rate)
async def drop_resource(self, drop: ResourceDrop): # ------------------------------------------------------------------ 96-channel stubs
print(f"Dropping resource: {drop}")
def can_pick_up_tip(self, channel_idx: int, tip: Tip) -> bool: async def pick_up_tips96(self, pickup: PickupTipRack, **backend_kwargs):
return True logger.info(f"Picking up tips from {pickup.resource.name}.")
async def drop_tips96(self, drop: DropTipRack, **backend_kwargs):
logger.info(f"Dropping tips to {drop.resource.name}.")
async def aspirate96(
self, aspiration: Union[MultiHeadAspirationPlate, MultiHeadAspirationContainer]
):
if isinstance(aspiration, MultiHeadAspirationPlate):
resource = aspiration.wells[0].parent
else:
resource = aspiration.container
logger.info(f"Aspirating {aspiration.volume} from {resource}.")
async def dispense96(
self, dispense: Union[MultiHeadDispensePlate, MultiHeadDispenseContainer]
):
if isinstance(dispense, MultiHeadDispensePlate):
resource = dispense.wells[0].parent
else:
resource = dispense.container
logger.info(f"Dispensing {dispense.volume} to {resource}.")
async def pick_up_resource(self, pickup: ResourcePickup):
logger.info(f"Picking up resource: {pickup}")
async def move_picked_up_resource(self, move: ResourceMove):
logger.info(f"Moving picked up resource: {move}")
async def drop_resource(self, drop: ResourceDrop):
logger.info(f"Dropping resource: {drop}")
def can_pick_up_tip(self, channel_idx: int, tip: Tip) -> bool:
return True

View File

@@ -5,21 +5,16 @@
封装SOPA移液器的高级控制功能 封装SOPA移液器的高级控制功能
""" """
# 添加项目根目录到Python路径以解决模块导入问题
import sys import sys
import os import os
from tkinter import N
_current_file = os.path.abspath(__file__)
_project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(_current_file)))))
if _project_root not in sys.path:
sys.path.insert(0, _project_root)
from unilabos.devices.liquid_handling.laiyu.drivers.xyz_stepper_driver import ModbusException from unilabos.devices.liquid_handling.laiyu.drivers.xyz_stepper_driver import ModbusException
# 无论如何都添加项目根目录到路径
current_file = os.path.abspath(__file__)
# 从 .../Uni-Lab-OS/unilabos/devices/LaiYu_Liquid/controllers/pipette_controller.py
# 向上5级到 .../Uni-Lab-OS
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(current_file)))))
# 强制添加项目根目录到sys.path的开头
sys.path.insert(0, project_root)
import time import time
import logging import logging
from typing import Optional, List, Dict, Tuple from typing import Optional, List, Dict, Tuple
@@ -153,7 +148,7 @@ class PipetteController:
logger.error("移液器连接失败") logger.error("移液器连接失败")
return False return False
logger.info("移液器连接成功") logger.info("移液器连接成功")
# 连接XYZ步进电机控制器如果提供了端口 # 连接XYZ步进电机控制器如果提供了端口
if self.xyz_port != self.pipette_port: if self.xyz_port != self.pipette_port:
try: try:
@@ -172,24 +167,62 @@ class PipetteController:
try: try:
self.xyz_controller = XYZController(self.xyz_port, auto_connect=False) self.xyz_controller = XYZController(self.xyz_port, auto_connect=False)
self.xyz_controller.serial_conn = self.pipette.serial_port self.xyz_controller.serial_conn = self.pipette.serial_port
self.xyz_controller.serial_lock = self.pipette.lock
self.xyz_controller.is_connected = True self.xyz_controller.is_connected = True
logger.info("XYZ控制器与移液器共享串口和互斥锁")
except Exception as e: except Exception as e:
logger.info("未配置XYZ步进电机端口跳过运动控制器连接") logger.warning(f"共享端口 XYZ 控制器创建失败: {e}")
self.xyz_controller = None
self.xyz_connected = False
return True return True
except Exception as e: except Exception as e:
logger.error(f"设备连接失败: {e}") logger.error(f"设备连接失败: {e}")
return False return False
def connect_shared(self, serial_conn, serial_lock, xyz_controller: XYZController) -> bool:
"""使用已连接的串口和XYZ控制器路径 B 模式XYZ 先开串口,移液器共享)
Args:
serial_conn: 已打开的串口连接(来自 XYZController
serial_lock: 串口互斥锁(来自 XYZController
xyz_controller: 已连接的 XYZController 实例
"""
try:
self.pipette.serial_port = serial_conn
self.pipette.lock = serial_lock
self.pipette.is_connected = True
self.xyz_controller = xyz_controller
self.xyz_connected = True
logger.info("移液控制器已通过 connect_shared 共享 XYZ 串口")
return True
except Exception as e:
logger.error(f"connect_shared 失败: {e}")
return False
def disconnect_shared(self) -> None:
"""释放共享串口引用(与 connect_shared 对称)。
注意:不关闭串口本身,串口由 XYZController 负责关闭。
"""
try:
self.pipette.serial_port = None
self.pipette.lock = None
self.pipette.is_connected = False
self.xyz_controller = None
self.xyz_connected = False
logger.info("移液控制器已释放共享串口引用")
except Exception as e:
logger.error(f"disconnect_shared 失败: {e}")
def initialize(self) -> bool: def initialize(self) -> bool:
"""初始化移液器""" """初始化移液器"""
try: try:
if self.pipette.initialize(): if self.pipette.initialize():
logger.info("移液器初始化成功") logger.info("移液器初始化成功")
# 检查枪头状态
self._update_tip_status() self._update_tip_status()
self.xyz_controller.home_all_axes()
self.xyz_controller.move_to_work_coord_safe(x=0, y=-150, z=0)
return True return True
return False return False
except Exception as e: except Exception as e:
@@ -198,56 +231,58 @@ class PipetteController:
def disconnect(self): def disconnect(self):
"""断开连接""" """断开连接"""
# 断开移液器连接 if self.xyz_controller and self.xyz_connected:
if self.xyz_port != self.pipette_port:
try:
self.xyz_controller.disconnect()
logger.info("XYZ 步进电机已断开")
except Exception as e:
logger.error(f"断开 XYZ 步进电机失败: {e}")
else:
self.xyz_controller.serial_conn = None
self.xyz_connected = False
self.xyz_controller = None
self.pipette.disconnect() self.pipette.disconnect()
logger.info("移液器已断开") logger.info("移液器已断开")
# 断开 XYZ 步进电机连接
if self.xyz_controller and self.xyz_connected:
try:
self.xyz_controller.disconnect()
self.xyz_connected = False
logger.info("XYZ 步进电机已断开")
except Exception as e:
logger.error(f"断开 XYZ 步进电机失败: {e}")
def _check_xyz_safety(self, axis: MotorAxis, target_position: int) -> bool: def _check_xyz_safety(self, axis: MotorAxis, target_position: int) -> bool:
""" """
检查 XYZ 轴移动的安全性 检查 XYZ 轴移动的安全性
Args: Args:
axis: 电机轴 axis: 电机轴
target_position: 目标位置(步数) target_position: 目标位置(步数)
Returns: Returns:
是否安全 是否安全
""" """
try: try:
# 获取当前电机状态 # 获取当前电机状态
motor_position = self.xyz_controller.get_motor_status(axis) motor_position = self.xyz_controller.get_motor_status(axis)
# 检查电机状态是否正常 (不是碰撞停止或限位停止) # 检查电机状态是否正常 (不是碰撞停止或限位停止)
if motor_position.status in [MotorStatus.COLLISION_STOP, if motor_position.status in [MotorStatus.COLLISION_STOP,
MotorStatus.FORWARD_LIMIT_STOP, MotorStatus.FORWARD_LIMIT_STOP,
MotorStatus.REVERSE_LIMIT_STOP]: MotorStatus.REVERSE_LIMIT_STOP]:
logger.error(f"{axis.name} 轴电机处于错误状态: {motor_position.status.name}") logger.error(f"{axis.name} 轴电机处于错误状态: {motor_position.status.name}")
return False return False
# 检查位置限制 (扩大安全范围以适应实际硬件) # 检查位置限制 (扩大安全范围以适应实际硬件)
# 步进电机的位置范围通常很大,这里设置更合理的范围 # 步进电机的位置范围通常很大,这里设置更合理的范围
if target_position < -500000 or target_position > 500000: if target_position < -500000 or target_position > 500000:
logger.error(f"{axis.name} 轴目标位置超出安全范围: {target_position}") logger.error(f"{axis.name} 轴目标位置超出安全范围: {target_position}")
return False return False
# 检查移动距离是否过大 (单次移动不超过 20000 步约12mm) # 检查移动距离是否过大 (单次移动不超过 20000 步约12mm)
current_position = motor_position.steps current_position = motor_position.steps
move_distance = abs(target_position - current_position) move_distance = abs(target_position - current_position)
if move_distance > 20000: if move_distance > 20000:
logger.error(f"{axis.name} 轴单次移动距离过大: {move_distance}") logger.error(f"{axis.name} 轴单次移动距离过大: {move_distance}")
return False return False
return True return True
except Exception as e: except Exception as e:
logger.error(f"安全检查失败: {e}") logger.error(f"安全检查失败: {e}")
return False return False
@@ -255,48 +290,48 @@ class PipetteController:
def move_z_relative(self, distance_mm: float, speed: int = 2000, acceleration: int = 500) -> bool: def move_z_relative(self, distance_mm: float, speed: int = 2000, acceleration: int = 500) -> bool:
""" """
Z轴相对移动 Z轴相对移动
Args: Args:
distance_mm: 移动距离(mm),正值向下,负值向上 distance_mm: 移动距离(mm),正值向下,负值向上
speed: 移动速度(rpm) speed: 移动速度(rpm)
acceleration: 加速度(rpm/s) acceleration: 加速度(rpm/s)
Returns: Returns:
移动是否成功 移动是否成功
""" """
if not self.xyz_controller or not self.xyz_connected: if not self.xyz_controller or not self.xyz_connected:
logger.error("XYZ 步进电机未连接,无法执行移动") logger.error("XYZ 步进电机未连接,无法执行移动")
return False return False
try: try:
# 参数验证 # 参数验证
if abs(distance_mm) > 15.0: if abs(distance_mm) > 15.0:
logger.error(f"移动距离过大: {distance_mm}mm最大允许15mm") logger.error(f"移动距离过大: {distance_mm}mm最大允许15mm")
return False return False
if speed < 100 or speed > 5000: if speed < 100 or speed > 5000:
logger.error(f"速度参数无效: {speed}rpm范围应为100-5000") logger.error(f"速度参数无效: {speed}rpm范围应为100-5000")
return False return False
# 获取当前 Z 轴位置 # 获取当前 Z 轴位置
current_status = self.xyz_controller.get_motor_status(MotorAxis.Z) current_status = self.xyz_controller.get_motor_status(MotorAxis.Z)
current_z_position = current_status.steps current_z_position = current_status.steps
# 计算移动距离对应的步数 (1mm = 1638.4步) # 计算移动距离对应的步数 (1mm = 1638.4步)
mm_to_steps = 1638.4 mm_to_steps = 1638.4
move_distance_steps = int(distance_mm * mm_to_steps) move_distance_steps = int(distance_mm * mm_to_steps)
# 计算目标位置 # 计算目标位置
target_z_position = current_z_position + move_distance_steps target_z_position = current_z_position + move_distance_steps
# 安全检查 # 安全检查
if not self._check_xyz_safety(MotorAxis.Z, target_z_position): if not self._check_xyz_safety(MotorAxis.Z, target_z_position):
logger.error("Z轴移动安全检查失败") logger.error("Z轴移动安全检查失败")
return False return False
logger.info(f"Z轴相对移动: {distance_mm}mm ({move_distance_steps}步)") logger.info(f"Z轴相对移动: {distance_mm}mm ({move_distance_steps}步)")
logger.info(f"当前位置: {current_z_position}步 -> 目标位置: {target_z_position}") logger.info(f"当前位置: {current_z_position}步 -> 目标位置: {target_z_position}")
# 执行移动 # 执行移动
success = self.xyz_controller.move_to_position( success = self.xyz_controller.move_to_position(
axis=MotorAxis.Z, axis=MotorAxis.Z,
@@ -305,28 +340,28 @@ class PipetteController:
acceleration=acceleration, acceleration=acceleration,
precision=50 precision=50
) )
if not success: if not success:
logger.error("Z轴移动命令发送失败") logger.error("Z轴移动命令发送失败")
return False return False
# 等待移动完成 # 等待移动完成
if not self.xyz_controller.wait_for_completion(MotorAxis.Z, timeout=10.0): if not self.xyz_controller.wait_for_completion(MotorAxis.Z, timeout=10.0):
logger.error("Z轴移动超时") logger.error("Z轴移动超时")
return False return False
# 验证移动结果 # 验证移动结果
final_status = self.xyz_controller.get_motor_status(MotorAxis.Z) final_status = self.xyz_controller.get_motor_status(MotorAxis.Z)
final_position = final_status.steps final_position = final_status.steps
position_error = abs(final_position - target_z_position) position_error = abs(final_position - target_z_position)
logger.info(f"Z轴移动完成最终位置: {final_position}步,误差: {position_error}") logger.info(f"Z轴移动完成最终位置: {final_position}步,误差: {position_error}")
if position_error > 100: if position_error > 100:
logger.warning(f"Z轴位置误差较大: {position_error}") logger.warning(f"Z轴位置误差较大: {position_error}")
return True return True
except ModbusException as e: except ModbusException as e:
logger.error(f"Modbus通信错误: {e}") logger.error(f"Modbus通信错误: {e}")
return False return False
@@ -337,21 +372,20 @@ class PipetteController:
def emergency_stop(self) -> bool: def emergency_stop(self) -> bool:
""" """
紧急停止所有运动 紧急停止所有运动
Returns: Returns:
停止是否成功 停止是否成功
""" """
success = True success = True
# 停止移液器操作
try: try:
if self.pipette and self.connected: if self.pipette and self.pipette.is_connected:
# 这里可以添加移液器的紧急停止逻辑 self.pipette.emergency_stop()
logger.info("移液器紧急停止") logger.info("移液器紧急停止")
except Exception as e: except Exception as e:
logger.error(f"移液器紧急停止失败: {e}") logger.error(f"移液器紧急停止失败: {e}")
success = False success = False
# 停止 XYZ 轴运动 # 停止 XYZ 轴运动
try: try:
if self.xyz_controller and self.xyz_connected: if self.xyz_controller and self.xyz_connected:
@@ -360,7 +394,7 @@ class PipetteController:
except Exception as e: except Exception as e:
logger.error(f"XYZ 轴紧急停止失败: {e}") logger.error(f"XYZ 轴紧急停止失败: {e}")
success = False success = False
return success return success
def pickup_tip(self) -> bool: def pickup_tip(self) -> bool:
@@ -376,7 +410,7 @@ class PipetteController:
return True return True
logger.info("开始装载枪头 - Z轴向下移动10mm") logger.info("开始装载枪头 - Z轴向下移动10mm")
# 使用相对移动方法向下移动10mm # 使用相对移动方法向下移动10mm
if self.move_z_relative(distance_mm=10.0, speed=2000, acceleration=500): if self.move_z_relative(distance_mm=10.0, speed=2000, acceleration=500):
# 更新枪头状态 # 更新枪头状态
@@ -688,31 +722,31 @@ class PipetteController:
if __name__ == "__main__": if __name__ == "__main__":
# 配置日志 # 配置日志
import logging import logging
# 设置日志级别 # 设置日志级别
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
) )
def interactive_test(): def interactive_test():
"""交互式测试模式 - 适用于已连接的设备""" """交互式测试模式 - 适用于已连接的设备"""
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("🧪 移液器交互式测试模式") print("🧪 移液器交互式测试模式")
print("=" * 60) print("=" * 60)
# 获取用户输入的连接参数 # 获取用户输入的连接参数
print("\n📡 设备连接配置:") print("\n📡 设备连接配置:")
port = input("请输入移液器串口端口 (默认: /dev/ttyUSB_CH340): ").strip() or "/dev/ttyUSB_CH340" port = input("请输入移液器串口端口 (默认: /dev/ttyUSB_CH340): ").strip() or "/dev/ttyUSB_CH340"
address_input = input("请输入移液器设备地址 (默认: 4): ").strip() address_input = input("请输入移液器设备地址 (默认: 4): ").strip()
address = int(address_input) if address_input else 4 address = int(address_input) if address_input else 4
# 询问是否连接 XYZ 步进电机控制器 # 询问是否连接 XYZ 步进电机控制器
xyz_enable = input("是否连接 XYZ 步进电机控制器? (y/N): ").strip().lower() xyz_enable = input("是否连接 XYZ 步进电机控制器? (y/N): ").strip().lower()
xyz_port = None xyz_port = None
if xyz_enable not in ['n', 'no']: if xyz_enable not in ['n', 'no']:
xyz_port = input("请输入 XYZ 控制器串口端口 (默认: /dev/ttyUSB_CH340): ").strip() or "/dev/ttyUSB_CH340" xyz_port = input("请输入 XYZ 控制器串口端口 (默认: /dev/ttyUSB_CH340): ").strip() or "/dev/ttyUSB_CH340"
try: try:
# 创建移液控制器实例 # 创建移液控制器实例
if xyz_port: if xyz_port:
@@ -721,21 +755,21 @@ if __name__ == "__main__":
else: else:
print(f"\n🔧 创建移液控制器实例 (端口: {port}, 地址: {address})...") print(f"\n🔧 创建移液控制器实例 (端口: {port}, 地址: {address})...")
pipette = PipetteController(port=port, address=address) pipette = PipetteController(port=port, address=address)
# 连接设备 # 连接设备
print("\n📞 连接移液器设备...") print("\n📞 连接移液器设备...")
if not pipette.connect(): if not pipette.connect():
print("❌ 设备连接失败,请检查连接") print("❌ 设备连接失败,请检查连接")
return return
print("✅ 设备连接成功") print("✅ 设备连接成功")
# 初始化设备 # 初始化设备
print("\n🚀 初始化设备...") print("\n🚀 初始化设备...")
if not pipette.initialize(): if not pipette.initialize():
print("❌ 设备初始化失败") print("❌ 设备初始化失败")
return return
print("✅ 设备初始化成功") print("✅ 设备初始化成功")
# 交互式菜单 # 交互式菜单
while True: while True:
print("\n" + "=" * 50) print("\n" + "=" * 50)
@@ -755,9 +789,9 @@ if __name__ == "__main__":
print("99. 🚨 紧急停止") print("99. 🚨 紧急停止")
print("0. 🚪 退出程序") print("0. 🚪 退出程序")
print("=" * 50) print("=" * 50)
choice = input("\n请选择操作 (0-12, 99): ").strip() choice = input("\n请选择操作 (0-12, 99): ").strip()
if choice == "0": if choice == "0":
print("\n👋 退出程序...") print("\n👋 退出程序...")
break break
@@ -773,7 +807,7 @@ if __name__ == "__main__":
# print(f" 🔧 枪头使用次数: {status['statistics']['tip_count']}") # print(f" 🔧 枪头使用次数: {status['statistics']['tip_count']}")
print(f" ⬆️ 吸液次数: {status['statistics']['aspirate_count']}") print(f" ⬆️ 吸液次数: {status['statistics']['aspirate_count']}")
print(f" ⬇️ 排液次数: {status['statistics']['dispense_count']}") print(f" ⬇️ 排液次数: {status['statistics']['dispense_count']}")
elif choice == "2": elif choice == "2":
# 装载枪头 # 装载枪头
print("\n🔧 装载枪头...") print("\n🔧 装载枪头...")
@@ -781,14 +815,14 @@ if __name__ == "__main__":
print("📍 使用 XYZ 控制器进行 Z 轴定位 (下移 10mm)") print("📍 使用 XYZ 控制器进行 Z 轴定位 (下移 10mm)")
else: else:
print("⚠️ 未连接 XYZ 控制器,仅执行移液器枪头装载") print("⚠️ 未连接 XYZ 控制器,仅执行移液器枪头装载")
if pipette.pickup_tip(): if pipette.pickup_tip():
print("✅ 枪头装载成功") print("✅ 枪头装载成功")
if pipette.xyz_connected: if pipette.xyz_connected:
print("📍 Z 轴已移动到装载位置") print("📍 Z 轴已移动到装载位置")
else: else:
print("❌ 枪头装载失败") print("❌ 枪头装载失败")
elif choice == "3": elif choice == "3":
# 弹出枪头 # 弹出枪头
print("\n🗑️ 弹出枪头...") print("\n🗑️ 弹出枪头...")
@@ -796,7 +830,7 @@ if __name__ == "__main__":
print("✅ 枪头弹出成功") print("✅ 枪头弹出成功")
else: else:
print("❌ 枪头弹出失败") print("❌ 枪头弹出失败")
elif choice == "4": elif choice == "4":
# 吸液操作 # 吸液操作
try: try:
@@ -810,7 +844,7 @@ if __name__ == "__main__":
print("❌ 吸液失败") print("❌ 吸液失败")
except ValueError: except ValueError:
print("❌ 请输入有效的数字") print("❌ 请输入有效的数字")
elif choice == "5": elif choice == "5":
# 排液操作 # 排液操作
try: try:
@@ -824,7 +858,7 @@ if __name__ == "__main__":
print("❌ 排液失败") print("❌ 排液失败")
except ValueError: except ValueError:
print("❌ 请输入有效的数字") print("❌ 请输入有效的数字")
elif choice == "6": elif choice == "6":
# 混合操作 # 混合操作
try: try:
@@ -838,7 +872,7 @@ if __name__ == "__main__":
print("❌ 混合失败") print("❌ 混合失败")
except ValueError: except ValueError:
print("❌ 请输入有效的数字") print("❌ 请输入有效的数字")
elif choice == "7": elif choice == "7":
# 液体转移 # 液体转移
try: try:
@@ -846,7 +880,7 @@ if __name__ == "__main__":
source = input("源孔位 (可选, 如A1): ").strip() or None source = input("源孔位 (可选, 如A1): ").strip() or None
dest = input("目标孔位 (可选, 如B1): ").strip() or None dest = input("目标孔位 (可选, 如B1): ").strip() or None
new_tip = input("是否使用新枪头? (y/n, 默认y): ").strip().lower() != 'n' new_tip = input("是否使用新枪头? (y/n, 默认y): ").strip().lower() != 'n'
print(f"\n🔄 执行液体转移 ({volume}ul)...") print(f"\n🔄 执行液体转移 ({volume}ul)...")
if pipette.transfer(volume=volume, source_well=source, dest_well=dest, new_tip=new_tip): if pipette.transfer(volume=volume, source_well=source, dest_well=dest, new_tip=new_tip):
print("✅ 液体转移完成") print("✅ 液体转移完成")
@@ -854,7 +888,7 @@ if __name__ == "__main__":
print("❌ 液体转移失败") print("❌ 液体转移失败")
except ValueError: except ValueError:
print("❌ 请输入有效的数字") print("❌ 请输入有效的数字")
elif choice == "8": elif choice == "8":
# 设置液体类型 # 设置液体类型
print("\n🧪 可用液体类型:") print("\n🧪 可用液体类型:")
@@ -864,16 +898,16 @@ if __name__ == "__main__":
"3": (LiquidClass.VISCOUS, "粘稠液体"), "3": (LiquidClass.VISCOUS, "粘稠液体"),
"4": (LiquidClass.VOLATILE, "挥发性液体") "4": (LiquidClass.VOLATILE, "挥发性液体")
} }
for key, (liquid_class, description) in liquid_options.items(): for key, (liquid_class, description) in liquid_options.items():
print(f" {key}. {description}") print(f" {key}. {description}")
liquid_choice = input("请选择液体类型 (1-4): ").strip() liquid_choice = input("请选择液体类型 (1-4): ").strip()
if liquid_choice in liquid_options: if liquid_choice in liquid_options:
liquid_class, description = liquid_options[liquid_choice] liquid_class, description = liquid_options[liquid_choice]
pipette.set_liquid_class(liquid_class) pipette.set_liquid_class(liquid_class)
print(f"✅ 液体类型设置为: {description}") print(f"✅ 液体类型设置为: {description}")
# 显示参数 # 显示参数
params = pipette.liquid_params params = pipette.liquid_params
print(f"📋 参数设置:") print(f"📋 参数设置:")
@@ -883,7 +917,7 @@ if __name__ == "__main__":
print(f" 💧 预润湿: {'' if params.pre_wet else ''}") print(f" 💧 预润湿: {'' if params.pre_wet else ''}")
else: else:
print("❌ 无效选择") print("❌ 无效选择")
elif choice == "9": elif choice == "9":
# 自定义参数 # 自定义参数
try: try:
@@ -892,19 +926,19 @@ if __name__ == "__main__":
dispense_speed = input("排液速度 (默认800): ").strip() dispense_speed = input("排液速度 (默认800): ").strip()
air_gap = input("空气间隙 (ul, 默认10.0): ").strip() air_gap = input("空气间隙 (ul, 默认10.0): ").strip()
pre_wet = input("预润湿 (y/n, 默认n): ").strip().lower() == 'y' pre_wet = input("预润湿 (y/n, 默认n): ").strip().lower() == 'y'
custom_params = LiquidParameters( custom_params = LiquidParameters(
aspirate_speed=int(aspirate_speed) if aspirate_speed else 500, aspirate_speed=int(aspirate_speed) if aspirate_speed else 500,
dispense_speed=int(dispense_speed) if dispense_speed else 800, dispense_speed=int(dispense_speed) if dispense_speed else 800,
air_gap=float(air_gap) if air_gap else 10.0, air_gap=float(air_gap) if air_gap else 10.0,
pre_wet=pre_wet pre_wet=pre_wet
) )
pipette.set_custom_parameters(custom_params) pipette.set_custom_parameters(custom_params)
print("✅ 自定义参数设置完成") print("✅ 自定义参数设置完成")
except ValueError: except ValueError:
print("❌ 请输入有效的数字") print("❌ 请输入有效的数字")
elif choice == "10": elif choice == "10":
# 校准体积 # 校准体积
try: try:
@@ -914,12 +948,12 @@ if __name__ == "__main__":
print(f"✅ 校准完成,校准系数: {actual/expected:.3f}") print(f"✅ 校准完成,校准系数: {actual/expected:.3f}")
except ValueError: except ValueError:
print("❌ 请输入有效的数字") print("❌ 请输入有效的数字")
elif choice == "11": elif choice == "11":
# 重置统计 # 重置统计
pipette.reset_statistics() pipette.reset_statistics()
print("✅ 统计信息已重置") print("✅ 统计信息已重置")
elif choice == "12": elif choice == "12":
# 液体类型测试 # 液体类型测试
print("\n🧪 液体类型参数对比:") print("\n🧪 液体类型参数对比:")
@@ -929,7 +963,7 @@ if __name__ == "__main__":
(LiquidClass.VISCOUS, "粘稠液体"), (LiquidClass.VISCOUS, "粘稠液体"),
(LiquidClass.VOLATILE, "挥发性液体") (LiquidClass.VOLATILE, "挥发性液体")
] ]
for liquid_class, description in liquid_tests: for liquid_class, description in liquid_tests:
params = pipette.LIQUID_PARAMS[liquid_class] params = pipette.LIQUID_PARAMS[liquid_class]
print(f"\n📋 {description} ({liquid_class.value}):") print(f"\n📋 {description} ({liquid_class.value}):")
@@ -938,7 +972,7 @@ if __name__ == "__main__":
print(f" 💨 空气间隙: {params.air_gap}ul") print(f" 💨 空气间隙: {params.air_gap}ul")
print(f" 💧 预润湿: {'' if params.pre_wet else ''}") print(f" 💧 预润湿: {'' if params.pre_wet else ''}")
print(f" ⏱️ 吸液后延时: {params.delay_after_aspirate}s") print(f" ⏱️ 吸液后延时: {params.delay_after_aspirate}s")
elif choice == "99": elif choice == "99":
# 紧急停止 # 紧急停止
print("\n🚨 执行紧急停止...") print("\n🚨 执行紧急停止...")
@@ -949,19 +983,19 @@ if __name__ == "__main__":
else: else:
print("❌ 紧急停止执行失败") print("❌ 紧急停止执行失败")
print("⚠️ 请手动检查设备状态并采取必要措施") print("⚠️ 请手动检查设备状态并采取必要措施")
# 紧急停止后询问是否继续 # 紧急停止后询问是否继续
continue_choice = input("\n是否继续操作?(y/n): ").strip().lower() continue_choice = input("\n是否继续操作?(y/n): ").strip().lower()
if continue_choice != 'y': if continue_choice != 'y':
print("🚪 退出程序") print("🚪 退出程序")
break break
else: else:
print("❌ 无效选择,请重新输入") print("❌ 无效选择,请重新输入")
# 等待用户确认继续 # 等待用户确认继续
input("\n按回车键继续...") input("\n按回车键继续...")
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n\n⚠️ 用户中断操作") print("\n\n⚠️ 用户中断操作")
except Exception as e: except Exception as e:
@@ -974,19 +1008,19 @@ if __name__ == "__main__":
print("✅ 连接已断开") print("✅ 连接已断开")
except: except:
print("⚠️ 断开连接时出现问题") print("⚠️ 断开连接时出现问题")
def demo_test(): def demo_test():
"""演示测试模式 - 完整功能演示""" """演示测试模式 - 完整功能演示"""
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("🎬 移液控制器演示测试") print("🎬 移液控制器演示测试")
print("=" * 60) print("=" * 60)
try: try:
# 创建移液控制器实例 # 创建移液控制器实例
print("1. 🔧 创建移液控制器实例...") print("1. 🔧 创建移液控制器实例...")
pipette = PipetteController(port="/dev/ttyUSB0", address=4) pipette = PipetteController(port="/dev/ttyUSB0", address=4)
print("✅ 移液控制器实例创建成功") print("✅ 移液控制器实例创建成功")
# 连接设备 # 连接设备
print("\n2. 📞 连接移液器设备...") print("\n2. 📞 连接移液器设备...")
if pipette.connect(): if pipette.connect():
@@ -994,7 +1028,7 @@ if __name__ == "__main__":
else: else:
print("❌ 设备连接失败") print("❌ 设备连接失败")
return False return False
# 初始化设备 # 初始化设备
print("\n3. 🚀 初始化设备...") print("\n3. 🚀 初始化设备...")
if pipette.initialize(): if pipette.initialize():
@@ -1002,19 +1036,19 @@ if __name__ == "__main__":
else: else:
print("❌ 设备初始化失败") print("❌ 设备初始化失败")
return False return False
# 装载枪头 # 装载枪头
print("\n4. 🔧 装载枪头...") print("\n4. 🔧 装载枪头...")
if pipette.pickup_tip(): if pipette.pickup_tip():
print("✅ 枪头装载成功") print("✅ 枪头装载成功")
else: else:
print("❌ 枪头装载失败") print("❌ 枪头装载失败")
# 设置液体类型 # 设置液体类型
print("\n5. 🧪 设置液体类型为血清...") print("\n5. 🧪 设置液体类型为血清...")
pipette.set_liquid_class(LiquidClass.SERUM) pipette.set_liquid_class(LiquidClass.SERUM)
print("✅ 液体类型设置完成") print("✅ 液体类型设置完成")
# 吸液操作 # 吸液操作
print("\n6. 💧 执行吸液操作...") print("\n6. 💧 执行吸液操作...")
volume_to_aspirate = 100.0 volume_to_aspirate = 100.0
@@ -1023,7 +1057,7 @@ if __name__ == "__main__":
print(f"📊 当前体积: {pipette.current_volume}ul") print(f"📊 当前体积: {pipette.current_volume}ul")
else: else:
print("❌ 吸液失败") print("❌ 吸液失败")
# 排液操作 # 排液操作
print("\n7. 💦 执行排液操作...") print("\n7. 💦 执行排液操作...")
volume_to_dispense = 50.0 volume_to_dispense = 50.0
@@ -1032,14 +1066,14 @@ if __name__ == "__main__":
print(f"📊 剩余体积: {pipette.current_volume}ul") print(f"📊 剩余体积: {pipette.current_volume}ul")
else: else:
print("❌ 排液失败") print("❌ 排液失败")
# 混合操作 # 混合操作
print("\n8. 🌀 执行混合操作...") print("\n8. 🌀 执行混合操作...")
if pipette.mix(cycles=3, volume=30.0): if pipette.mix(cycles=3, volume=30.0):
print("✅ 混合完成") print("✅ 混合完成")
else: else:
print("❌ 混合失败") print("❌ 混合失败")
# 获取状态信息 # 获取状态信息
print("\n9. 📊 获取设备状态...") print("\n9. 📊 获取设备状态...")
status = pipette.get_status() status = pipette.get_status()
@@ -1052,30 +1086,30 @@ if __name__ == "__main__":
# print(f" 🔧 枪头使用次数: {status['statistics']['tip_count']}") # print(f" 🔧 枪头使用次数: {status['statistics']['tip_count']}")
print(f" ⬆️ 吸液次数: {status['statistics']['aspirate_count']}") print(f" ⬆️ 吸液次数: {status['statistics']['aspirate_count']}")
print(f" ⬇️ 排液次数: {status['statistics']['dispense_count']}") print(f" ⬇️ 排液次数: {status['statistics']['dispense_count']}")
# 弹出枪头 # 弹出枪头
print("\n10. 🗑️ 弹出枪头...") print("\n10. 🗑️ 弹出枪头...")
if pipette.eject_tip(): if pipette.eject_tip():
print("✅ 枪头弹出成功") print("✅ 枪头弹出成功")
else: else:
print("❌ 枪头弹出失败") print("❌ 枪头弹出失败")
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("✅ 移液控制器演示测试完成") print("✅ 移液控制器演示测试完成")
print("=" * 60) print("=" * 60)
return True return True
except Exception as e: except Exception as e:
print(f"\n❌ 测试过程中发生异常: {e}") print(f"\n❌ 测试过程中发生异常: {e}")
return False return False
finally: finally:
# 断开连接 # 断开连接
print("\n📞 断开连接...") print("\n📞 断开连接...")
pipette.disconnect() pipette.disconnect()
print("✅ 连接已断开") print("✅ 连接已断开")
# 主程序入口 # 主程序入口
print("🧪 移液器控制器测试程序") print("🧪 移液器控制器测试程序")
print("=" * 40) print("=" * 40)
@@ -1083,9 +1117,9 @@ if __name__ == "__main__":
print("2. 🎬 演示测试") print("2. 🎬 演示测试")
print("0. 🚪 退出") print("0. 🚪 退出")
print("=" * 40) print("=" * 40)
mode = input("请选择测试模式 (0-2): ").strip() mode = input("请选择测试模式 (0-2): ").strip()
if mode == "1": if mode == "1":
interactive_test() interactive_test()
elif mode == "2": elif mode == "2":
@@ -1094,7 +1128,7 @@ if __name__ == "__main__":
print("👋 再见!") print("👋 再见!")
else: else:
print("❌ 无效选择") print("❌ 无效选择")
print("\n🎉 程序结束!") print("\n🎉 程序结束!")
print("\n💡 使用说明:") print("\n💡 使用说明:")
print("1. 确保移液器硬件已正确连接") print("1. 确保移液器硬件已正确连接")

View File

@@ -13,7 +13,7 @@ from pylabrobot.liquid_handling import (
SingleChannelDispense, SingleChannelDispense,
PickupTipRack, PickupTipRack,
DropTipRack, DropTipRack,
MultiHeadAspirationPlate, ChatterBoxBackend, LiquidHandlerChatterboxBackend, MultiHeadAspirationPlate,
) )
from pylabrobot.liquid_handling.standard import ( from pylabrobot.liquid_handling.standard import (
MultiHeadAspirationContainer, MultiHeadAspirationContainer,
@@ -41,12 +41,6 @@ class TransformXYZDeck(Deck):
super().__init__(name, size_x, size_y, size_z) super().__init__(name, size_x, size_y, size_z)
self.name = name self.name = name
class TransformXYZBackend(LiquidHandlerBackend):
def __init__(self, name: str, host: str, port: int, timeout: float):
super().__init__()
self.host = host
self.port = port
self.timeout = timeout
class TransformXYZRvizBackend(UniLiquidHandlerRvizBackend): class TransformXYZRvizBackend(UniLiquidHandlerRvizBackend):
def __init__(self, name: str, channel_num: int): def __init__(self, name: str, channel_num: int):
@@ -86,7 +80,9 @@ class TransformXYZContainer(Plate, TipRack):
class TransformXYZHandler(LiquidHandlerAbstract): class TransformXYZHandler(LiquidHandlerAbstract):
support_touch_tip = False support_touch_tip = False
def __init__(self, deck: Deck, host: str = "127.0.0.1", port: int = 9999, timeout: float = 10.0, channel_num=1, simulator=True, **backend_kwargs): def __init__(self, deck: Deck, host: str = "127.0.0.1", port: int = 9999, timeout: float = 10.0, channel_num=1, simulator=True,
serial_port: str = "/dev/ttyUSB0", baudrate: int = 115200, pipette_address: int = 4,
total_height: float = 310, **backend_kwargs):
# Handle case where deck is passed as a dict (from serialization) # Handle case where deck is passed as a dict (from serialization)
if isinstance(deck, dict): if isinstance(deck, dict):
# Try to create a TransformXYZDeck from the dict # Try to create a TransformXYZDeck from the dict
@@ -102,11 +98,22 @@ class TransformXYZHandler(LiquidHandlerAbstract):
deck = TransformXYZDeck(name='deck', size_x=100, size_y=100, size_z=100) deck = TransformXYZDeck(name='deck', size_x=100, size_y=100, size_z=100)
if simulator: if simulator:
self._unilabos_backend = TransformXYZRvizBackend(name="laiyu",channel_num=channel_num) self._unilabos_backend = TransformXYZRvizBackend(name="laiyu", channel_num=channel_num)
else: else:
self._unilabos_backend = TransformXYZBackend(name="laiyu",host=host, port=port, timeout=timeout) self._unilabos_backend = UniLiquidHandlerLaiyuBackend(
num_channels=channel_num,
total_height=total_height,
port=serial_port,
baudrate=baudrate,
pipette_address=pipette_address,
)
super().__init__(backend=self._unilabos_backend, deck=deck, simulator=simulator, channel_num=channel_num) super().__init__(backend=self._unilabos_backend, deck=deck, simulator=simulator, channel_num=channel_num)
def post_init(self, ros_node):
super().post_init(ros_node)
if hasattr(self._unilabos_backend, 'post_init'):
self._unilabos_backend.post_init(ros_node)
async def add_liquid( async def add_liquid(
self, self,
asp_vols: Union[List[float], float], asp_vols: Union[List[float], float],
@@ -128,7 +135,25 @@ class TransformXYZHandler(LiquidHandlerAbstract):
mix_liquid_height: Optional[float] = None, mix_liquid_height: Optional[float] = None,
none_keys: List[str] = [], none_keys: List[str] = [],
): ):
pass return await super().add_liquid(
asp_vols=asp_vols,
dis_vols=dis_vols,
reagent_sources=reagent_sources,
targets=targets,
use_channels=use_channels,
flow_rates=flow_rates,
offsets=offsets,
liquid_height=liquid_height,
blow_out_air_volume=blow_out_air_volume,
spread=spread,
is_96_well=is_96_well,
delays=delays,
mix_time=mix_time,
mix_vol=mix_vol,
mix_rate=mix_rate,
mix_liquid_height=mix_liquid_height,
none_keys=none_keys,
)
async def aspirate( async def aspirate(
self, self,
@@ -142,7 +167,17 @@ class TransformXYZHandler(LiquidHandlerAbstract):
spread: Literal["wide", "tight", "custom"] = "wide", spread: Literal["wide", "tight", "custom"] = "wide",
**backend_kwargs, **backend_kwargs,
): ):
pass return await super().aspirate(
resources=resources,
vols=vols,
use_channels=use_channels,
flow_rates=flow_rates,
offsets=offsets,
liquid_height=liquid_height,
blow_out_air_volume=blow_out_air_volume,
spread=spread,
**backend_kwargs,
)
async def dispense( async def dispense(
self, self,
@@ -156,7 +191,17 @@ class TransformXYZHandler(LiquidHandlerAbstract):
spread: Literal["wide", "tight", "custom"] = "wide", spread: Literal["wide", "tight", "custom"] = "wide",
**backend_kwargs, **backend_kwargs,
): ):
pass return await super().dispense(
resources=resources,
vols=vols,
use_channels=use_channels,
flow_rates=flow_rates,
offsets=offsets,
liquid_height=liquid_height,
blow_out_air_volume=blow_out_air_volume,
spread=spread,
**backend_kwargs,
)
async def drop_tips( async def drop_tips(
self, self,
@@ -166,7 +211,13 @@ class TransformXYZHandler(LiquidHandlerAbstract):
allow_nonzero_volume: bool = False, allow_nonzero_volume: bool = False,
**backend_kwargs, **backend_kwargs,
): ):
pass return await super().drop_tips(
tip_spots=tip_spots,
use_channels=use_channels,
offsets=offsets,
allow_nonzero_volume=allow_nonzero_volume,
**backend_kwargs,
)
async def mix( async def mix(
self, self,
@@ -178,7 +229,15 @@ class TransformXYZHandler(LiquidHandlerAbstract):
mix_rate: Optional[float] = None, mix_rate: Optional[float] = None,
none_keys: List[str] = [], none_keys: List[str] = [],
): ):
pass return await super().mix(
targets=targets,
mix_time=mix_time,
mix_vol=mix_vol,
height_to_bottom=height_to_bottom,
offsets=offsets,
mix_rate=mix_rate,
none_keys=none_keys,
)
async def pick_up_tips( async def pick_up_tips(
self, self,
@@ -187,7 +246,12 @@ class TransformXYZHandler(LiquidHandlerAbstract):
offsets: Optional[List[Coordinate]] = None, offsets: Optional[List[Coordinate]] = None,
**backend_kwargs, **backend_kwargs,
): ):
pass return await super().pick_up_tips(
tip_spots=tip_spots,
use_channels=use_channels,
offsets=offsets,
**backend_kwargs,
)
async def transfer_liquid( async def transfer_liquid(
self, self,
@@ -214,5 +278,26 @@ class TransformXYZHandler(LiquidHandlerAbstract):
delays: Optional[List[int]] = None, delays: Optional[List[int]] = None,
none_keys: List[str] = [], none_keys: List[str] = [],
): ):
pass return await super().transfer_liquid(
sources=sources,
targets=targets,
tip_racks=tip_racks,
use_channels=use_channels,
asp_vols=asp_vols,
dis_vols=dis_vols,
asp_flow_rates=asp_flow_rates,
dis_flow_rates=dis_flow_rates,
offsets=offsets,
touch_tip=touch_tip,
liquid_height=liquid_height,
blow_out_air_volume=blow_out_air_volume,
spread=spread,
is_96_well=is_96_well,
mix_stage=mix_stage,
mix_times=mix_times,
mix_vol=mix_vol,
mix_rate=mix_rate,
mix_liquid_height=mix_liquid_height,
delays=delays,
none_keys=none_keys,
)

View File

@@ -6945,7 +6945,7 @@ liquid_handler.laiyu:
properties: properties:
channel_num: channel_num:
default: 1 default: 1
type: string type: integer
deck: deck:
type: object type: object
host: host:
@@ -6956,10 +6956,25 @@ liquid_handler.laiyu:
type: integer type: integer
simulator: simulator:
default: true default: true
type: string type: boolean
timeout: timeout:
default: 10.0 default: 10.0
type: number type: number
serial_port:
default: /dev/ttyUSB0
description: 硬件串口端口(非 simulator 模式下使用)
type: string
baudrate:
default: 115200
type: integer
pipette_address:
default: 4
description: SOPA 移液器 RS485 地址
type: integer
total_height:
default: 310
description: 龙门架总高度 (mm),用于坐标转换
type: number
required: required:
- deck - deck
type: object type: object