From 92bfb069d50b13d30c99ba6601765340bd3b59f8 Mon Sep 17 00:00:00 2001 From: ZiWei <131428629+ZiWei09@users.noreply.github.com> Date: Mon, 9 Mar 2026 18:44:20 +0800 Subject: [PATCH] feat: Implement Laiyu liquid handling station with enhanced device control, testing, and documentation. --- .../liquid_handling/laiyu/backend/__init__.py | 6 +- .../laiyu/backend/laiyu_backend.py | 334 ---------- .../laiyu/backend/laiyu_v_backend.py | 582 ++++++++---------- .../laiyu/controllers/pipette_controller.py | 260 ++++---- .../devices/liquid_handling/laiyu/laiyu.py | 121 +++- unilabos/registry/devices/liquid_handler.yaml | 19 +- 6 files changed, 521 insertions(+), 801 deletions(-) delete mode 100644 unilabos/devices/liquid_handling/laiyu/backend/laiyu_backend.py diff --git a/unilabos/devices/liquid_handling/laiyu/backend/__init__.py b/unilabos/devices/liquid_handling/laiyu/backend/__init__.py index 4bf29392..b7e1b34a 100644 --- a/unilabos/devices/liquid_handling/laiyu/backend/__init__.py +++ b/unilabos/devices/liquid_handling/laiyu/backend/__init__.py @@ -1,9 +1,7 @@ """ LaiYu液体处理设备后端模块 - -提供设备后端接口和实现 """ -from .laiyu_backend import LaiYuLiquidBackend, create_laiyu_backend +from .laiyu_v_backend import UniLiquidHandlerLaiyuBackend -__all__ = ['LaiYuLiquidBackend', 'create_laiyu_backend'] \ No newline at end of file +__all__ = ['UniLiquidHandlerLaiyuBackend'] diff --git a/unilabos/devices/liquid_handling/laiyu/backend/laiyu_backend.py b/unilabos/devices/liquid_handling/laiyu/backend/laiyu_backend.py deleted file mode 100644 index 5e8041c0..00000000 --- a/unilabos/devices/liquid_handling/laiyu/backend/laiyu_backend.py +++ /dev/null @@ -1,334 +0,0 @@ -""" -LaiYu液体处理设备后端实现 - -提供设备的后端接口和控制逻辑 -""" - -import logging -from typing import Dict, Any, Optional, List -from abc import ABC, abstractmethod - -# 尝试导入PyLabRobot后端 -try: - from pylabrobot.liquid_handling.backends import LiquidHandlerBackend - PYLABROBOT_AVAILABLE = True -except ImportError: - PYLABROBOT_AVAILABLE = False - # 创建模拟后端基类 - class LiquidHandlerBackend: - def __init__(self, name: str): - self.name = name - self.is_connected = False - - def connect(self): - """连接设备""" - pass - - def disconnect(self): - """断开连接""" - pass - - -class LaiYuLiquidBackend(LiquidHandlerBackend): - """LaiYu液体处理设备后端""" - - def __init__(self, name: str = "LaiYu_Liquid_Backend"): - """ - 初始化LaiYu液体处理设备后端 - - Args: - name: 后端名称 - """ - if PYLABROBOT_AVAILABLE: - # PyLabRobot 的 LiquidHandlerBackend 不接受参数 - super().__init__() - else: - # 模拟版本接受 name 参数 - super().__init__(name) - - self.name = name - self.logger = logging.getLogger(__name__) - self.is_connected = False - self.device_info = { - "name": "LaiYu液体处理设备", - "version": "1.0.0", - "manufacturer": "LaiYu", - "model": "LaiYu_Liquid_Handler" - } - - def connect(self) -> bool: - """ - 连接到LaiYu液体处理设备 - - Returns: - bool: 连接是否成功 - """ - try: - self.logger.info("正在连接到LaiYu液体处理设备...") - # 这里应该实现实际的设备连接逻辑 - # 目前返回模拟连接成功 - self.is_connected = True - self.logger.info("成功连接到LaiYu液体处理设备") - return True - except Exception as e: - self.logger.error(f"连接LaiYu液体处理设备失败: {e}") - self.is_connected = False - return False - - def disconnect(self) -> bool: - """ - 断开与LaiYu液体处理设备的连接 - - Returns: - bool: 断开连接是否成功 - """ - try: - self.logger.info("正在断开与LaiYu液体处理设备的连接...") - # 这里应该实现实际的设备断开连接逻辑 - self.is_connected = False - self.logger.info("成功断开与LaiYu液体处理设备的连接") - return True - except Exception as e: - self.logger.error(f"断开LaiYu液体处理设备连接失败: {e}") - return False - - def is_device_connected(self) -> bool: - """ - 检查设备是否已连接 - - Returns: - bool: 设备是否已连接 - """ - return self.is_connected - - def get_device_info(self) -> Dict[str, Any]: - """ - 获取设备信息 - - Returns: - Dict[str, Any]: 设备信息字典 - """ - return self.device_info.copy() - - def home_device(self) -> bool: - """ - 设备归零操作 - - Returns: - bool: 归零是否成功 - """ - if not self.is_connected: - self.logger.error("设备未连接,无法执行归零操作") - return False - - try: - self.logger.info("正在执行设备归零操作...") - # 这里应该实现实际的设备归零逻辑 - self.logger.info("设备归零操作完成") - return True - except Exception as e: - self.logger.error(f"设备归零操作失败: {e}") - return False - - def aspirate(self, volume: float, location: Dict[str, Any]) -> bool: - """ - 吸液操作 - - Args: - volume: 吸液体积 (微升) - location: 吸液位置信息 - - Returns: - bool: 吸液是否成功 - """ - if not self.is_connected: - self.logger.error("设备未连接,无法执行吸液操作") - return False - - try: - self.logger.info(f"正在执行吸液操作: 体积={volume}μL, 位置={location}") - # 这里应该实现实际的吸液逻辑 - self.logger.info("吸液操作完成") - return True - except Exception as e: - self.logger.error(f"吸液操作失败: {e}") - return False - - def dispense(self, volume: float, location: Dict[str, Any]) -> bool: - """ - 排液操作 - - Args: - volume: 排液体积 (微升) - location: 排液位置信息 - - Returns: - bool: 排液是否成功 - """ - if not self.is_connected: - self.logger.error("设备未连接,无法执行排液操作") - return False - - try: - self.logger.info(f"正在执行排液操作: 体积={volume}μL, 位置={location}") - # 这里应该实现实际的排液逻辑 - self.logger.info("排液操作完成") - return True - except Exception as e: - self.logger.error(f"排液操作失败: {e}") - return False - - def pick_up_tip(self, location: Dict[str, Any]) -> bool: - """ - 取枪头操作 - - Args: - location: 枪头位置信息 - - Returns: - bool: 取枪头是否成功 - """ - if not self.is_connected: - self.logger.error("设备未连接,无法执行取枪头操作") - return False - - try: - self.logger.info(f"正在执行取枪头操作: 位置={location}") - # 这里应该实现实际的取枪头逻辑 - self.logger.info("取枪头操作完成") - return True - except Exception as e: - self.logger.error(f"取枪头操作失败: {e}") - return False - - def drop_tip(self, location: Dict[str, Any]) -> bool: - """ - 丢弃枪头操作 - - Args: - location: 丢弃位置信息 - - Returns: - bool: 丢弃枪头是否成功 - """ - if not self.is_connected: - self.logger.error("设备未连接,无法执行丢弃枪头操作") - return False - - try: - self.logger.info(f"正在执行丢弃枪头操作: 位置={location}") - # 这里应该实现实际的丢弃枪头逻辑 - self.logger.info("丢弃枪头操作完成") - return True - except Exception as e: - self.logger.error(f"丢弃枪头操作失败: {e}") - return False - - def move_to(self, location: Dict[str, Any]) -> bool: - """ - 移动到指定位置 - - Args: - location: 目标位置信息 - - Returns: - bool: 移动是否成功 - """ - if not self.is_connected: - self.logger.error("设备未连接,无法执行移动操作") - return False - - try: - self.logger.info(f"正在移动到位置: {location}") - # 这里应该实现实际的移动逻辑 - self.logger.info("移动操作完成") - return True - except Exception as e: - self.logger.error(f"移动操作失败: {e}") - return False - - def get_status(self) -> Dict[str, Any]: - """ - 获取设备状态 - - Returns: - Dict[str, Any]: 设备状态信息 - """ - return { - "connected": self.is_connected, - "device_info": self.device_info, - "status": "ready" if self.is_connected else "disconnected" - } - - # PyLabRobot 抽象方法实现 - def stop(self): - """停止所有操作""" - self.logger.info("停止所有操作") - pass - - @property - def num_channels(self) -> int: - """返回通道数量""" - return 1 # 单通道移液器 - - def can_pick_up_tip(self, tip_rack, tip_position) -> bool: - """检查是否可以拾取吸头""" - return True # 简化实现,总是返回True - - def pick_up_tips(self, tip_rack, tip_positions): - """拾取多个吸头""" - self.logger.info(f"拾取吸头: {tip_positions}") - pass - - def drop_tips(self, tip_rack, tip_positions): - """丢弃多个吸头""" - self.logger.info(f"丢弃吸头: {tip_positions}") - pass - - def pick_up_tips96(self, tip_rack): - """拾取96个吸头""" - self.logger.info("拾取96个吸头") - pass - - def drop_tips96(self, tip_rack): - """丢弃96个吸头""" - self.logger.info("丢弃96个吸头") - pass - - def aspirate96(self, volume, plate, well_positions): - """96通道吸液""" - self.logger.info(f"96通道吸液: 体积={volume}") - pass - - def dispense96(self, volume, plate, well_positions): - """96通道排液""" - self.logger.info(f"96通道排液: 体积={volume}") - pass - - def pick_up_resource(self, resource, location): - """拾取资源""" - self.logger.info(f"拾取资源: {resource}") - pass - - def drop_resource(self, resource, location): - """放置资源""" - self.logger.info(f"放置资源: {resource}") - pass - - def move_picked_up_resource(self, resource, location): - """移动已拾取的资源""" - self.logger.info(f"移动资源: {resource} 到 {location}") - pass - - -def create_laiyu_backend(name: str = "LaiYu_Liquid_Backend") -> LaiYuLiquidBackend: - """ - 创建LaiYu液体处理设备后端实例 - - Args: - name: 后端名称 - - Returns: - LaiYuLiquidBackend: 后端实例 - """ - return LaiYuLiquidBackend(name) \ No newline at end of file diff --git a/unilabos/devices/liquid_handling/laiyu/backend/laiyu_v_backend.py b/unilabos/devices/liquid_handling/laiyu/backend/laiyu_v_backend.py index 9e824e1b..24c075dd 100644 --- a/unilabos/devices/liquid_handling/laiyu/backend/laiyu_v_backend.py +++ b/unilabos/devices/liquid_handling/laiyu/backend/laiyu_v_backend.py @@ -1,385 +1,307 @@ - -import json +"""LaiYu PLR 后端 — 对齐路径 B 硬件交互模式 + +硬件初始化顺序与 laiyu_liquid_station.py (路径 B) 一致: + 1. XYZController(auto_connect=True) — 先开串口 + 2. PipetteController.connect_shared() — 共享 XYZ 的串口 / 锁 + 3. home_all_axes() + pipette.initialize() +""" + +import logging from typing import List, Optional, Union -from pylabrobot.liquid_handling.backends.backend import ( - LiquidHandlerBackend, -) +from pylabrobot.liquid_handling.backends.backend import LiquidHandlerBackend from pylabrobot.liquid_handling.standard import ( - Drop, - DropTipRack, - MultiHeadAspirationContainer, - MultiHeadAspirationPlate, - MultiHeadDispenseContainer, - MultiHeadDispensePlate, - Pickup, - PickupTipRack, - ResourceDrop, - ResourceMove, - ResourcePickup, - SingleChannelAspiration, - SingleChannelDispense, + Drop, + DropTipRack, + MultiHeadAspirationContainer, + MultiHeadAspirationPlate, + MultiHeadDispenseContainer, + MultiHeadDispensePlate, + Pickup, + PickupTipRack, + ResourceDrop, + ResourceMove, + ResourcePickup, + SingleChannelAspiration, + SingleChannelDispense, ) from pylabrobot.resources import Resource, Tip -import rclpy -from rclpy.node import Node -from sensor_msgs.msg import JointState -import time -from rclpy.action import ActionClient -from unilabos_msgs.action import SendCmd -import re +from unilabos.devices.liquid_handling.laiyu.controllers.xyz_controller import XYZController +from unilabos.devices.liquid_handling.laiyu.controllers.pipette_controller import ( + PipetteController, + TipStatus, +) -from unilabos.devices.ros_dev.liquid_handler_joint_publisher import JointStatePublisher -from unilabos.devices.liquid_handling.laiyu.controllers.pipette_controller import PipetteController, TipStatus +logger = logging.getLogger(__name__) class UniLiquidHandlerLaiyuBackend(LiquidHandlerBackend): - """Chatter box backend for device-free testing. Prints out all operations.""" + """LaiYu 硬件后端 — PLR Backend 接口实现""" - _pip_length = 5 - _vol_length = 8 - _resource_length = 20 - _offset_length = 16 - _flow_rate_length = 10 - _blowout_length = 10 - _lld_z_length = 10 - _kwargs_length = 15 - _tip_type_length = 12 - _max_volume_length = 16 - _fitting_depth_length = 20 - _tip_length_length = 16 - # _pickup_method_length = 20 - _filter_length = 10 + def __init__( + self, + num_channels: int = 1, + tip_length: float = 0, + total_height: float = 310, + port: str = "/dev/ttyUSB0", + baudrate: int = 115200, + pipette_address: int = 4, + ): + super().__init__() + self._num_channels = num_channels + self.tip_length = tip_length + self.total_height = total_height - def __init__(self, num_channels: int = 8 , tip_length: float = 0 , total_height: float = 310, port: str = "/dev/ttyUSB0"): - """Initialize a chatter box backend.""" - super().__init__() - self._num_channels = num_channels - self.tip_length = tip_length - self.total_height = total_height -# rclpy.init() - if not rclpy.ok(): - rclpy.init() - self.joint_state_publisher = None - self.hardware_interface = PipetteController(port=port) + # 保存配置,延迟到 setup() 再创建硬件对象 + self._port = port + self._baudrate = baudrate + self._pipette_address = pipette_address - async def setup(self): - # self.joint_state_publisher = JointStatePublisher() - # self.hardware_interface.xyz_controller.connect_device() - # self.hardware_interface.xyz_controller.home_all_axes() - await super().setup() - self.hardware_interface.connect() - self.hardware_interface.initialize() + self._xyz: Optional[XYZController] = None + self._pipette_ctrl: Optional[PipetteController] = None + self._ros_node = None - print("Setting up the liquid handler.") + # ------------------------------------------------------------------ lifecycle - async def stop(self): - print("Stopping the liquid handler.") + def post_init(self, ros_node): + """接收 ROS 节点引用(由 Handler.post_init 调用)""" + self._ros_node = ros_node - def serialize(self) -> dict: - return {**super().serialize(), "num_channels": self.num_channels} + async def setup(self): + """按路径 B 顺序初始化硬件""" + await super().setup() - def pipette_aspirate(self, volume: float, flow_rate: float): + # 1. XYZ 先开串口 + self._xyz = XYZController( + port=self._port, + baudrate=self._baudrate, + auto_connect=True, + ) + if not self._xyz.is_connected: + raise RuntimeError("XYZ 控制器连接失败") - self.hardware_interface.pipette.set_max_speed(flow_rate) - res = self.hardware_interface.pipette.aspirate(volume=volume) - - if not res: - self.hardware_interface.logger.error("吸取失败,当前体积: {self.hardware_interface.current_volume}") - return + # 2. PipetteController 共享 XYZ 串口 + self._pipette_ctrl = PipetteController( + port=self._port, + address=self._pipette_address, + ) + self._pipette_ctrl.connect_shared( + serial_conn=self._xyz.serial_conn, + serial_lock=self._xyz.serial_lock, + xyz_controller=self._xyz, + ) - self.hardware_interface.current_volume += volume + # 3. 回零 + 移液器初始化 + self._xyz.home_all_axes() + self._pipette_ctrl.initialize() - def pipette_dispense(self, volume: float, flow_rate: float): + logger.info("LaiYu 后端硬件初始化完成") - self.hardware_interface.pipette.set_max_speed(flow_rate) - res = self.hardware_interface.pipette.dispense(volume=volume) - if not res: - self.hardware_interface.logger.error("排液失败,当前体积: {self.hardware_interface.current_volume}") - return - self.hardware_interface.current_volume -= volume + async def stop(self): + """正确断开硬件""" + try: + if self._pipette_ctrl: + self._pipette_ctrl.disconnect_shared() + if self._xyz: + self._xyz.disconnect() + logger.info("LaiYu 后端硬件已断开") + except Exception as e: + logger.error(f"停止后端失败: {e}") - @property - def num_channels(self) -> int: - return self._num_channels + # ------------------------------------------------------------------ helpers - async def assigned_resource_callback(self, resource: Resource): - print(f"Resource {resource.name} was assigned to the liquid handler.") + def _plr_to_machine_coords(self, resource, offset): + """PLR Resource 坐标 → 机器坐标 (倒置龙门架: total_height - z, -y)""" + coordinate = resource.get_absolute_location(x="c", y="c") + x = coordinate.x + offset.x + y = coordinate.y + offset.y + z_plr = coordinate.z + offset.z + return x, -y, self.total_height - (z_plr + self.tip_length) - async def unassigned_resource_callback(self, name: str): - print(f"Resource {name} was unassigned from the liquid handler.") + def _pipette_aspirate(self, volume: float, flow_rate: float): + self._pipette_ctrl.pipette.set_max_speed(flow_rate) + res = self._pipette_ctrl.pipette.aspirate(volume=volume) + if not res: + logger.error(f"吸取失败,当前体积: {self._pipette_ctrl.current_volume}") + return + self._pipette_ctrl.current_volume += volume - async def pick_up_tips(self, ops: List[Pickup], use_channels: List[int], **backend_kwargs): - print("Picking up tips:") - # print(ops.tip) - header = ( - f"{'pip#':<{UniLiquidHandlerLaiyuBackend._pip_length}} " - f"{'resource':<{UniLiquidHandlerLaiyuBackend._resource_length}} " - f"{'offset':<{UniLiquidHandlerLaiyuBackend._offset_length}} " - f"{'tip type':<{UniLiquidHandlerLaiyuBackend._tip_type_length}} " - f"{'max volume (µL)':<{UniLiquidHandlerLaiyuBackend._max_volume_length}} " - f"{'fitting depth (mm)':<{UniLiquidHandlerLaiyuBackend._fitting_depth_length}} " - f"{'tip length (mm)':<{UniLiquidHandlerLaiyuBackend._tip_length_length}} " - # f"{'pickup method':<{ChatterboxBackend._pickup_method_length}} " - f"{'filter':<{UniLiquidHandlerLaiyuBackend._filter_length}}" - ) - # print(header) + def _pipette_dispense(self, volume: float, flow_rate: float): + self._pipette_ctrl.pipette.set_max_speed(flow_rate) + res = self._pipette_ctrl.pipette.dispense(volume=volume) + if not res: + logger.error(f"排液失败,当前体积: {self._pipette_ctrl.current_volume}") + return + self._pipette_ctrl.current_volume -= volume - for op, channel in zip(ops, use_channels): - offset = f"{round(op.offset.x, 1)},{round(op.offset.y, 1)},{round(op.offset.z, 1)}" - row = ( - f" p{channel}: " - f"{op.resource.name[-30:]:<{UniLiquidHandlerLaiyuBackend._resource_length}} " - f"{offset:<{UniLiquidHandlerLaiyuBackend._offset_length}} " - f"{op.tip.__class__.__name__:<{UniLiquidHandlerLaiyuBackend._tip_type_length}} " - f"{op.tip.maximal_volume:<{UniLiquidHandlerLaiyuBackend._max_volume_length}} " - f"{op.tip.fitting_depth:<{UniLiquidHandlerLaiyuBackend._fitting_depth_length}} " - f"{op.tip.total_tip_length:<{UniLiquidHandlerLaiyuBackend._tip_length_length}} " - # f"{str(op.tip.pickup_method)[-20:]:<{ChatterboxBackend._pickup_method_length}} " - f"{'Yes' if op.tip.has_filter else 'No':<{UniLiquidHandlerLaiyuBackend._filter_length}}" - ) - # print(row) - # print(op.resource.get_absolute_location()) - - self.tip_length = ops[0].tip.total_tip_length - coordinate = ops[0].resource.get_absolute_location(x="c",y="c") - offset_xyz = ops[0].offset - x = coordinate.x + offset_xyz.x - y = coordinate.y + offset_xyz.y - z = self.total_height - (coordinate.z + self.tip_length) + offset_xyz.z - # print("moving") - self.hardware_interface._update_tip_status() - if self.hardware_interface.tip_status == TipStatus.TIP_ATTACHED: - print("已有枪头,无需重复拾取") - return - self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200) - self.hardware_interface.xyz_controller.move_to_work_coord_safe(z=self.hardware_interface.xyz_controller.machine_config.safe_z_height,speed=100) - # self.joint_state_publisher.send_resource_action(ops[0].resource.name, x, y, z, "pick",channels=use_channels) - # goback() + # ------------------------------------------------------------------ properties + def serialize(self) -> dict: + return {**super().serialize(), "num_channels": self.num_channels} + @property + def num_channels(self) -> int: + return self._num_channels + # ------------------------------------------------------------------ resource callbacks - async def drop_tips(self, ops: List[Drop], use_channels: List[int], **backend_kwargs): - print("Dropping tips:") - header = ( - f"{'pip#':<{UniLiquidHandlerLaiyuBackend._pip_length}} " - f"{'resource':<{UniLiquidHandlerLaiyuBackend._resource_length}} " - f"{'offset':<{UniLiquidHandlerLaiyuBackend._offset_length}} " - f"{'tip type':<{UniLiquidHandlerLaiyuBackend._tip_type_length}} " - f"{'max volume (µL)':<{UniLiquidHandlerLaiyuBackend._max_volume_length}} " - f"{'fitting depth (mm)':<{UniLiquidHandlerLaiyuBackend._fitting_depth_length}} " - f"{'tip length (mm)':<{UniLiquidHandlerLaiyuBackend._tip_length_length}} " - # f"{'pickup method':<{ChatterboxBackend._pickup_method_length}} " - f"{'filter':<{UniLiquidHandlerLaiyuBackend._filter_length}}" - ) - # print(header) + async def assigned_resource_callback(self, resource: Resource): + logger.info(f"Resource {resource.name} was assigned to the liquid handler.") - for op, channel in zip(ops, use_channels): - offset = f"{round(op.offset.x, 1)},{round(op.offset.y, 1)},{round(op.offset.z, 1)}" - row = ( - f" p{channel}: " - f"{op.resource.name[-30:]:<{UniLiquidHandlerLaiyuBackend._resource_length}} " - f"{offset:<{UniLiquidHandlerLaiyuBackend._offset_length}} " - f"{op.tip.__class__.__name__:<{UniLiquidHandlerLaiyuBackend._tip_type_length}} " - f"{op.tip.maximal_volume:<{UniLiquidHandlerLaiyuBackend._max_volume_length}} " - f"{op.tip.fitting_depth:<{UniLiquidHandlerLaiyuBackend._fitting_depth_length}} " - f"{op.tip.total_tip_length:<{UniLiquidHandlerLaiyuBackend._tip_length_length}} " - # f"{str(op.tip.pickup_method)[-20:]:<{ChatterboxBackend._pickup_method_length}} " - f"{'Yes' if op.tip.has_filter else 'No':<{UniLiquidHandlerLaiyuBackend._filter_length}}" - ) - # print(row) + async def unassigned_resource_callback(self, name: str): + logger.info(f"Resource {name} was unassigned from the liquid handler.") - coordinate = ops[0].resource.get_absolute_location(x="c",y="c") - offset_xyz = ops[0].offset - x = coordinate.x + offset_xyz.x - y = coordinate.y + offset_xyz.y - z = self.total_height - (coordinate.z + self.tip_length) + offset_xyz.z -20 - # print(x, y, z) - # print("moving") - self.hardware_interface._update_tip_status() - if self.hardware_interface.tip_status == TipStatus.NO_TIP: - print("无枪头,无需丢弃") - return - self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200) - self.hardware_interface.eject_tip - self.hardware_interface.xyz_controller.move_to_work_coord_safe(z=self.hardware_interface.xyz_controller.machine_config.safe_z_height) + # ------------------------------------------------------------------ pick_up_tips - async def aspirate( - self, - ops: List[SingleChannelAspiration], - use_channels: List[int], - **backend_kwargs, - ): - print("Aspirating:") - header = ( - f"{'pip#':<{UniLiquidHandlerLaiyuBackend._pip_length}} " - f"{'vol(ul)':<{UniLiquidHandlerLaiyuBackend._vol_length}} " - f"{'resource':<{UniLiquidHandlerLaiyuBackend._resource_length}} " - f"{'offset':<{UniLiquidHandlerLaiyuBackend._offset_length}} " - f"{'flow rate':<{UniLiquidHandlerLaiyuBackend._flow_rate_length}} " - f"{'blowout':<{UniLiquidHandlerLaiyuBackend._blowout_length}} " - f"{'lld_z':<{UniLiquidHandlerLaiyuBackend._lld_z_length}} " - # f"{'liquids':<20}" # TODO: add liquids - ) - for key in backend_kwargs: - header += f"{key:<{UniLiquidHandlerLaiyuBackend._kwargs_length}} "[-16:] - # print(header) + async def pick_up_tips(self, ops: List[Pickup], use_channels: List[int], **backend_kwargs): + tip = ops[0].tip + self.tip_length = tip.total_tip_length + x, y, z_top = self._plr_to_machine_coords(ops[0].resource, ops[0].offset) - for o, p in zip(ops, use_channels): - offset = f"{round(o.offset.x, 1)},{round(o.offset.y, 1)},{round(o.offset.z, 1)}" - row = ( - f" p{p}: " - f"{o.volume:<{UniLiquidHandlerLaiyuBackend._vol_length}} " - f"{o.resource.name[-20:]:<{UniLiquidHandlerLaiyuBackend._resource_length}} " - f"{offset:<{UniLiquidHandlerLaiyuBackend._offset_length}} " - f"{str(o.flow_rate):<{UniLiquidHandlerLaiyuBackend._flow_rate_length}} " - f"{str(o.blow_out_air_volume):<{UniLiquidHandlerLaiyuBackend._blowout_length}} " - f"{str(o.liquid_height):<{UniLiquidHandlerLaiyuBackend._lld_z_length}} " - # f"{o.liquids if o.liquids is not None else 'none'}" - ) - for key, value in backend_kwargs.items(): - if isinstance(value, list) and all(isinstance(v, bool) for v in value): - value = "".join("T" if v else "F" for v in value) - if isinstance(value, list): - value = "".join(map(str, value)) - row += f" {value:<15}" - # print(row) - coordinate = ops[0].resource.get_absolute_location(x="c",y="c") - offset_xyz = ops[0].offset - x = coordinate.x + offset_xyz.x - y = coordinate.y + offset_xyz.y - z = self.total_height - (coordinate.z + self.tip_length) + offset_xyz.z - # print(x, y, z) - # print("moving") + self._pipette_ctrl._update_tip_status() + if self._pipette_ctrl.tip_status == TipStatus.TIP_ATTACHED: + logger.warning("已有枪头,无需重复拾取") + return - # 判断枪头是否存在 - self.hardware_interface._update_tip_status() - if not self.hardware_interface.tip_status == TipStatus.TIP_ATTACHED: - print("无枪头,无法吸液") - return - # 判断吸液量是否超过枪头容量 - flow_rate = backend_kwargs["flow_rate"] if "flow_rate" in backend_kwargs else 500 - blow_out_air_volume = backend_kwargs["blow_out_air_volume"] if "blow_out_air_volume" in backend_kwargs else 0 - if self.hardware_interface.current_volume + ops[0].volume + blow_out_air_volume > self.hardware_interface.max_volume: - self.hardware_interface.logger.error(f"吸液量超过枪头容量: {self.hardware_interface.current_volume + ops[0].volume} > {self.hardware_interface.max_volume}") - return + try: + # 1. 移到枪头正上方 + self._xyz.move_to_work_coord_safe(x=x, y=y, z=z_top, speed=200) + # 2. 下压到套枪头深度(fitting_depth 是枪头套入长度) + z_pickup = z_top + tip.fitting_depth + self._xyz.move_to_work_coord_safe(z=z_pickup, speed=100) + # 3. 退回安全高度 + self._xyz.move_to_work_coord_safe( + z=self._xyz.machine_config.safe_z_height, speed=100 + ) + except Exception as e: + logger.error(f"pick_up_tips 移动失败: {e}") + raise - # 移动到吸液位置 - self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200) - self.pipette_aspirate(volume=ops[0].volume, flow_rate=flow_rate) + # ------------------------------------------------------------------ drop_tips + async def drop_tips(self, ops: List[Drop], use_channels: List[int], **backend_kwargs): + x, y, z = self._plr_to_machine_coords(ops[0].resource, ops[0].offset) + z -= 20 # 额外下移补偿 - self.hardware_interface.xyz_controller.move_to_work_coord_safe(z=self.hardware_interface.xyz_controller.machine_config.safe_z_height) - if blow_out_air_volume >0: - self.pipette_aspirate(volume=blow_out_air_volume, flow_rate=flow_rate) + self._pipette_ctrl._update_tip_status() + if self._pipette_ctrl.tip_status == TipStatus.NO_TIP: + logger.warning("无枪头,无需丢弃") + return + try: + self._xyz.move_to_work_coord_safe(x=x, y=y, z=z, speed=200) + self._pipette_ctrl.eject_tip() # 修复: 原来缺少 () + self._xyz.move_to_work_coord_safe( + z=self._xyz.machine_config.safe_z_height + ) + except Exception as e: + logger.error(f"drop_tips 失败: {e}") + raise + # ------------------------------------------------------------------ aspirate + async def aspirate( + self, + ops: List[SingleChannelAspiration], + use_channels: List[int], + **backend_kwargs, + ): + x, y, z = self._plr_to_machine_coords(ops[0].resource, ops[0].offset) - async def dispense( - self, - ops: List[SingleChannelDispense], - use_channels: List[int], - **backend_kwargs, - ): - # print("Dispensing:") - header = ( - f"{'pip#':<{UniLiquidHandlerLaiyuBackend._pip_length}} " - f"{'vol(ul)':<{UniLiquidHandlerLaiyuBackend._vol_length}} " - f"{'resource':<{UniLiquidHandlerLaiyuBackend._resource_length}} " - f"{'offset':<{UniLiquidHandlerLaiyuBackend._offset_length}} " - f"{'flow rate':<{UniLiquidHandlerLaiyuBackend._flow_rate_length}} " - f"{'blowout':<{UniLiquidHandlerLaiyuBackend._blowout_length}} " - f"{'lld_z':<{UniLiquidHandlerLaiyuBackend._lld_z_length}} " - # f"{'liquids':<20}" # TODO: add liquids - ) - for key in backend_kwargs: - header += f"{key:<{UniLiquidHandlerLaiyuBackend._kwargs_length}} "[-16:] - # print(header) + self._pipette_ctrl._update_tip_status() + if self._pipette_ctrl.tip_status != TipStatus.TIP_ATTACHED: + raise RuntimeError("无枪头,无法吸液") - for o, p in zip(ops, use_channels): - offset = f"{round(o.offset.x, 1)},{round(o.offset.y, 1)},{round(o.offset.z, 1)}" - row = ( - f" p{p}: " - f"{o.volume:<{UniLiquidHandlerLaiyuBackend._vol_length}} " - f"{o.resource.name[-20:]:<{UniLiquidHandlerLaiyuBackend._resource_length}} " - f"{offset:<{UniLiquidHandlerLaiyuBackend._offset_length}} " - f"{str(o.flow_rate):<{UniLiquidHandlerLaiyuBackend._flow_rate_length}} " - f"{str(o.blow_out_air_volume):<{UniLiquidHandlerLaiyuBackend._blowout_length}} " - f"{str(o.liquid_height):<{UniLiquidHandlerLaiyuBackend._lld_z_length}} " - # f"{o.liquids if o.liquids is not None else 'none'}" - ) - for key, value in backend_kwargs.items(): - if isinstance(value, list) and all(isinstance(v, bool) for v in value): - value = "".join("T" if v else "F" for v in value) - if isinstance(value, list): - value = "".join(map(str, value)) - row += f" {value:<{UniLiquidHandlerLaiyuBackend._kwargs_length}}" - # print(row) - coordinate = ops[0].resource.get_absolute_location(x="c",y="c") - offset_xyz = ops[0].offset - x = coordinate.x + offset_xyz.x - y = coordinate.y + offset_xyz.y - z = self.total_height - (coordinate.z + self.tip_length) + offset_xyz.z - # print(x, y, z) - # print("moving") + flow_rate = backend_kwargs.get("flow_rate", 500) + blow_out_air_volume = backend_kwargs.get("blow_out_air_volume", 0) - # 判断枪头是否存在 - self.hardware_interface._update_tip_status() - if not self.hardware_interface.tip_status == TipStatus.TIP_ATTACHED: - print("无枪头,无法排液") - return - # 判断排液量是否超过枪头容量 - flow_rate = backend_kwargs["flow_rate"] if "flow_rate" in backend_kwargs else 500 - blow_out_air_volume = backend_kwargs["blow_out_air_volume"] if "blow_out_air_volume" in backend_kwargs else 0 - if self.hardware_interface.current_volume - ops[0].volume - blow_out_air_volume < 0: - self.hardware_interface.logger.error(f"排液量超过枪头容量: {self.hardware_interface.current_volume - ops[0].volume - blow_out_air_volume} < 0") - return + if ( + self._pipette_ctrl.current_volume + ops[0].volume + blow_out_air_volume + > self._pipette_ctrl.max_volume + ): + raise RuntimeError( + f"吸液量超过枪头容量: " + f"{self._pipette_ctrl.current_volume + ops[0].volume} > {self._pipette_ctrl.max_volume}" + ) - - # 移动到排液位置 - self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200) - self.pipette_dispense(volume=ops[0].volume, flow_rate=flow_rate) + self._xyz.move_to_work_coord_safe(x=x, y=y, z=z, speed=200) + self._pipette_aspirate(volume=ops[0].volume, flow_rate=flow_rate) + self._xyz.move_to_work_coord_safe( + z=self._xyz.machine_config.safe_z_height + ) + if blow_out_air_volume > 0: + self._pipette_aspirate(volume=blow_out_air_volume, flow_rate=flow_rate) - self.hardware_interface.xyz_controller.move_to_work_coord_safe(z=self.hardware_interface.xyz_controller.machine_config.safe_z_height) - if blow_out_air_volume > 0: - self.pipette_dispense(volume=blow_out_air_volume, flow_rate=flow_rate) - # self.joint_state_publisher.send_resource_action(ops[0].resource.name, x, y, z, "",channels=use_channels) + # ------------------------------------------------------------------ dispense - async def pick_up_tips96(self, pickup: PickupTipRack, **backend_kwargs): - print(f"Picking up tips from {pickup.resource.name}.") + async def dispense( + self, + ops: List[SingleChannelDispense], + use_channels: List[int], + **backend_kwargs, + ): + x, y, z = self._plr_to_machine_coords(ops[0].resource, ops[0].offset) - async def drop_tips96(self, drop: DropTipRack, **backend_kwargs): - print(f"Dropping tips to {drop.resource.name}.") + self._pipette_ctrl._update_tip_status() + if self._pipette_ctrl.tip_status != TipStatus.TIP_ATTACHED: + raise RuntimeError("无枪头,无法排液") - async def aspirate96( - self, aspiration: Union[MultiHeadAspirationPlate, MultiHeadAspirationContainer] - ): - if isinstance(aspiration, MultiHeadAspirationPlate): - resource = aspiration.wells[0].parent - else: - resource = aspiration.container - print(f"Aspirating {aspiration.volume} from {resource}.") + flow_rate = backend_kwargs.get("flow_rate", 500) + blow_out_air_volume = backend_kwargs.get("blow_out_air_volume", 0) - async def dispense96(self, dispense: Union[MultiHeadDispensePlate, MultiHeadDispenseContainer]): - if isinstance(dispense, MultiHeadDispensePlate): - resource = dispense.wells[0].parent - else: - resource = dispense.container - print(f"Dispensing {dispense.volume} to {resource}.") + if ( + self._pipette_ctrl.current_volume - ops[0].volume - blow_out_air_volume < 0 + ): + raise RuntimeError( + f"排液量超过当前体积: " + f"{self._pipette_ctrl.current_volume - ops[0].volume - blow_out_air_volume} < 0" + ) - async def pick_up_resource(self, pickup: ResourcePickup): - print(f"Picking up resource: {pickup}") + self._xyz.move_to_work_coord_safe(x=x, y=y, z=z, speed=200) + self._pipette_dispense(volume=ops[0].volume, flow_rate=flow_rate) - async def move_picked_up_resource(self, move: ResourceMove): - print(f"Moving picked up resource: {move}") + self._xyz.move_to_work_coord_safe( + z=self._xyz.machine_config.safe_z_height + ) + if blow_out_air_volume > 0: + self._pipette_dispense(volume=blow_out_air_volume, flow_rate=flow_rate) - async def drop_resource(self, drop: ResourceDrop): - print(f"Dropping resource: {drop}") + # ------------------------------------------------------------------ 96-channel stubs - def can_pick_up_tip(self, channel_idx: int, tip: Tip) -> bool: - return True - + async def pick_up_tips96(self, pickup: PickupTipRack, **backend_kwargs): + logger.info(f"Picking up tips from {pickup.resource.name}.") + + async def drop_tips96(self, drop: DropTipRack, **backend_kwargs): + logger.info(f"Dropping tips to {drop.resource.name}.") + + async def aspirate96( + self, aspiration: Union[MultiHeadAspirationPlate, MultiHeadAspirationContainer] + ): + if isinstance(aspiration, MultiHeadAspirationPlate): + resource = aspiration.wells[0].parent + else: + resource = aspiration.container + logger.info(f"Aspirating {aspiration.volume} from {resource}.") + + async def dispense96( + self, dispense: Union[MultiHeadDispensePlate, MultiHeadDispenseContainer] + ): + if isinstance(dispense, MultiHeadDispensePlate): + resource = dispense.wells[0].parent + else: + resource = dispense.container + logger.info(f"Dispensing {dispense.volume} to {resource}.") + + async def pick_up_resource(self, pickup: ResourcePickup): + logger.info(f"Picking up resource: {pickup}") + + async def move_picked_up_resource(self, move: ResourceMove): + logger.info(f"Moving picked up resource: {move}") + + async def drop_resource(self, drop: ResourceDrop): + logger.info(f"Dropping resource: {drop}") + + def can_pick_up_tip(self, channel_idx: int, tip: Tip) -> bool: + return True diff --git a/unilabos/devices/liquid_handling/laiyu/controllers/pipette_controller.py b/unilabos/devices/liquid_handling/laiyu/controllers/pipette_controller.py index 17a47df1..da08d3d7 100644 --- a/unilabos/devices/liquid_handling/laiyu/controllers/pipette_controller.py +++ b/unilabos/devices/liquid_handling/laiyu/controllers/pipette_controller.py @@ -5,21 +5,16 @@ 封装SOPA移液器的高级控制功能 """ -# 添加项目根目录到Python路径以解决模块导入问题 import sys import os -from tkinter import N + +_current_file = os.path.abspath(__file__) +_project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(_current_file))))) +if _project_root not in sys.path: + sys.path.insert(0, _project_root) from unilabos.devices.liquid_handling.laiyu.drivers.xyz_stepper_driver import ModbusException -# 无论如何都添加项目根目录到路径 -current_file = os.path.abspath(__file__) -# 从 .../Uni-Lab-OS/unilabos/devices/LaiYu_Liquid/controllers/pipette_controller.py -# 向上5级到 .../Uni-Lab-OS -project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(current_file))))) -# 强制添加项目根目录到sys.path的开头 -sys.path.insert(0, project_root) - import time import logging from typing import Optional, List, Dict, Tuple @@ -153,7 +148,7 @@ class PipetteController: logger.error("移液器连接失败") return False logger.info("移液器连接成功") - + # 连接XYZ步进电机控制器(如果提供了端口) if self.xyz_port != self.pipette_port: try: @@ -172,24 +167,62 @@ class PipetteController: try: self.xyz_controller = XYZController(self.xyz_port, auto_connect=False) self.xyz_controller.serial_conn = self.pipette.serial_port + self.xyz_controller.serial_lock = self.pipette.lock self.xyz_controller.is_connected = True + logger.info("XYZ控制器与移液器共享串口和互斥锁") except Exception as e: - logger.info("未配置XYZ步进电机端口,跳过运动控制器连接") - + logger.warning(f"共享端口 XYZ 控制器创建失败: {e}") + self.xyz_controller = None + self.xyz_connected = False + return True except Exception as e: logger.error(f"设备连接失败: {e}") return False + def connect_shared(self, serial_conn, serial_lock, xyz_controller: XYZController) -> bool: + """使用已连接的串口和XYZ控制器(路径 B 模式:XYZ 先开串口,移液器共享) + + Args: + serial_conn: 已打开的串口连接(来自 XYZController) + serial_lock: 串口互斥锁(来自 XYZController) + xyz_controller: 已连接的 XYZController 实例 + """ + try: + self.pipette.serial_port = serial_conn + self.pipette.lock = serial_lock + self.pipette.is_connected = True + + self.xyz_controller = xyz_controller + self.xyz_connected = True + + logger.info("移液控制器已通过 connect_shared 共享 XYZ 串口") + return True + except Exception as e: + logger.error(f"connect_shared 失败: {e}") + return False + + def disconnect_shared(self) -> None: + """释放共享串口引用(与 connect_shared 对称)。 + + 注意:不关闭串口本身,串口由 XYZController 负责关闭。 + """ + try: + self.pipette.serial_port = None + self.pipette.lock = None + self.pipette.is_connected = False + self.xyz_controller = None + self.xyz_connected = False + logger.info("移液控制器已释放共享串口引用") + except Exception as e: + logger.error(f"disconnect_shared 失败: {e}") + def initialize(self) -> bool: """初始化移液器""" try: if self.pipette.initialize(): logger.info("移液器初始化成功") - # 检查枪头状态 self._update_tip_status() - self.xyz_controller.home_all_axes() - self.xyz_controller.move_to_work_coord_safe(x=0, y=-150, z=0) return True return False except Exception as e: @@ -198,56 +231,58 @@ class PipetteController: def disconnect(self): """断开连接""" - # 断开移液器连接 + if self.xyz_controller and self.xyz_connected: + if self.xyz_port != self.pipette_port: + try: + self.xyz_controller.disconnect() + logger.info("XYZ 步进电机已断开") + except Exception as e: + logger.error(f"断开 XYZ 步进电机失败: {e}") + else: + self.xyz_controller.serial_conn = None + self.xyz_connected = False + self.xyz_controller = None + self.pipette.disconnect() logger.info("移液器已断开") - - # 断开 XYZ 步进电机连接 - if self.xyz_controller and self.xyz_connected: - try: - self.xyz_controller.disconnect() - self.xyz_connected = False - logger.info("XYZ 步进电机已断开") - except Exception as e: - logger.error(f"断开 XYZ 步进电机失败: {e}") def _check_xyz_safety(self, axis: MotorAxis, target_position: int) -> bool: """ 检查 XYZ 轴移动的安全性 - + Args: axis: 电机轴 target_position: 目标位置(步数) - + Returns: 是否安全 """ try: # 获取当前电机状态 motor_position = self.xyz_controller.get_motor_status(axis) - + # 检查电机状态是否正常 (不是碰撞停止或限位停止) - if motor_position.status in [MotorStatus.COLLISION_STOP, - MotorStatus.FORWARD_LIMIT_STOP, + if motor_position.status in [MotorStatus.COLLISION_STOP, + MotorStatus.FORWARD_LIMIT_STOP, MotorStatus.REVERSE_LIMIT_STOP]: logger.error(f"{axis.name} 轴电机处于错误状态: {motor_position.status.name}") return False - + # 检查位置限制 (扩大安全范围以适应实际硬件) # 步进电机的位置范围通常很大,这里设置更合理的范围 if target_position < -500000 or target_position > 500000: logger.error(f"{axis.name} 轴目标位置超出安全范围: {target_position}") return False - + # 检查移动距离是否过大 (单次移动不超过 20000 步,约12mm) current_position = motor_position.steps move_distance = abs(target_position - current_position) if move_distance > 20000: logger.error(f"{axis.name} 轴单次移动距离过大: {move_distance}步") return False - + return True - + except Exception as e: logger.error(f"安全检查失败: {e}") return False @@ -255,48 +290,48 @@ class PipetteController: def move_z_relative(self, distance_mm: float, speed: int = 2000, acceleration: int = 500) -> bool: """ Z轴相对移动 - + Args: distance_mm: 移动距离(mm),正值向下,负值向上 speed: 移动速度(rpm) acceleration: 加速度(rpm/s) - + Returns: 移动是否成功 """ if not self.xyz_controller or not self.xyz_connected: logger.error("XYZ 步进电机未连接,无法执行移动") return False - + try: # 参数验证 if abs(distance_mm) > 15.0: logger.error(f"移动距离过大: {distance_mm}mm,最大允许15mm") return False - + if speed < 100 or speed > 5000: logger.error(f"速度参数无效: {speed}rpm,范围应为100-5000") return False - + # 获取当前 Z 轴位置 current_status = self.xyz_controller.get_motor_status(MotorAxis.Z) current_z_position = current_status.steps - + # 计算移动距离对应的步数 (1mm = 1638.4步) mm_to_steps = 1638.4 move_distance_steps = int(distance_mm * mm_to_steps) - + # 计算目标位置 target_z_position = current_z_position + move_distance_steps - + # 安全检查 if not self._check_xyz_safety(MotorAxis.Z, target_z_position): logger.error("Z轴移动安全检查失败") return False - + logger.info(f"Z轴相对移动: {distance_mm}mm ({move_distance_steps}步)") logger.info(f"当前位置: {current_z_position}步 -> 目标位置: {target_z_position}步") - + # 执行移动 success = self.xyz_controller.move_to_position( axis=MotorAxis.Z, @@ -305,28 +340,28 @@ class PipetteController: acceleration=acceleration, precision=50 ) - + if not success: logger.error("Z轴移动命令发送失败") return False - + # 等待移动完成 if not self.xyz_controller.wait_for_completion(MotorAxis.Z, timeout=10.0): logger.error("Z轴移动超时") return False - + # 验证移动结果 final_status = self.xyz_controller.get_motor_status(MotorAxis.Z) final_position = final_status.steps position_error = abs(final_position - target_z_position) - + logger.info(f"Z轴移动完成,最终位置: {final_position}步,误差: {position_error}步") - + if position_error > 100: logger.warning(f"Z轴位置误差较大: {position_error}步") - + return True - + except ModbusException as e: logger.error(f"Modbus通信错误: {e}") return False @@ -337,21 +372,20 @@ class PipetteController: def emergency_stop(self) -> bool: """ 紧急停止所有运动 - + Returns: 停止是否成功 """ success = True - - # 停止移液器操作 + try: - if self.pipette and self.connected: - # 这里可以添加移液器的紧急停止逻辑 + if self.pipette and self.pipette.is_connected: + self.pipette.emergency_stop() logger.info("移液器紧急停止") except Exception as e: logger.error(f"移液器紧急停止失败: {e}") success = False - + # 停止 XYZ 轴运动 try: if self.xyz_controller and self.xyz_connected: @@ -360,7 +394,7 @@ class PipetteController: except Exception as e: logger.error(f"XYZ 轴紧急停止失败: {e}") success = False - + return success def pickup_tip(self) -> bool: @@ -376,7 +410,7 @@ class PipetteController: return True logger.info("开始装载枪头 - Z轴向下移动10mm") - + # 使用相对移动方法,向下移动10mm if self.move_z_relative(distance_mm=10.0, speed=2000, acceleration=500): # 更新枪头状态 @@ -688,31 +722,31 @@ class PipetteController: if __name__ == "__main__": # 配置日志 import logging - + # 设置日志级别 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) - + def interactive_test(): """交互式测试模式 - 适用于已连接的设备""" print("\n" + "=" * 60) print("🧪 移液器交互式测试模式") print("=" * 60) - + # 获取用户输入的连接参数 print("\n📡 设备连接配置:") port = input("请输入移液器串口端口 (默认: /dev/ttyUSB_CH340): ").strip() or "/dev/ttyUSB_CH340" address_input = input("请输入移液器设备地址 (默认: 4): ").strip() address = int(address_input) if address_input else 4 - + # 询问是否连接 XYZ 步进电机控制器 xyz_enable = input("是否连接 XYZ 步进电机控制器? (y/N): ").strip().lower() xyz_port = None if xyz_enable not in ['n', 'no']: xyz_port = input("请输入 XYZ 控制器串口端口 (默认: /dev/ttyUSB_CH340): ").strip() or "/dev/ttyUSB_CH340" - + try: # 创建移液控制器实例 if xyz_port: @@ -721,21 +755,21 @@ if __name__ == "__main__": else: print(f"\n🔧 创建移液控制器实例 (端口: {port}, 地址: {address})...") pipette = PipetteController(port=port, address=address) - + # 连接设备 print("\n📞 连接移液器设备...") if not pipette.connect(): print("❌ 设备连接失败,请检查连接") return print("✅ 设备连接成功") - + # 初始化设备 print("\n🚀 初始化设备...") if not pipette.initialize(): print("❌ 设备初始化失败") return print("✅ 设备初始化成功") - + # 交互式菜单 while True: print("\n" + "=" * 50) @@ -755,9 +789,9 @@ if __name__ == "__main__": print("99. 🚨 紧急停止") print("0. 🚪 退出程序") print("=" * 50) - + choice = input("\n请选择操作 (0-12, 99): ").strip() - + if choice == "0": print("\n👋 退出程序...") break @@ -773,7 +807,7 @@ if __name__ == "__main__": # print(f" 🔧 枪头使用次数: {status['statistics']['tip_count']}") print(f" ⬆️ 吸液次数: {status['statistics']['aspirate_count']}") print(f" ⬇️ 排液次数: {status['statistics']['dispense_count']}") - + elif choice == "2": # 装载枪头 print("\n🔧 装载枪头...") @@ -781,14 +815,14 @@ if __name__ == "__main__": print("📍 使用 XYZ 控制器进行 Z 轴定位 (下移 10mm)") else: print("⚠️ 未连接 XYZ 控制器,仅执行移液器枪头装载") - + if pipette.pickup_tip(): print("✅ 枪头装载成功") if pipette.xyz_connected: print("📍 Z 轴已移动到装载位置") else: print("❌ 枪头装载失败") - + elif choice == "3": # 弹出枪头 print("\n🗑️ 弹出枪头...") @@ -796,7 +830,7 @@ if __name__ == "__main__": print("✅ 枪头弹出成功") else: print("❌ 枪头弹出失败") - + elif choice == "4": # 吸液操作 try: @@ -810,7 +844,7 @@ if __name__ == "__main__": print("❌ 吸液失败") except ValueError: print("❌ 请输入有效的数字") - + elif choice == "5": # 排液操作 try: @@ -824,7 +858,7 @@ if __name__ == "__main__": print("❌ 排液失败") except ValueError: print("❌ 请输入有效的数字") - + elif choice == "6": # 混合操作 try: @@ -838,7 +872,7 @@ if __name__ == "__main__": print("❌ 混合失败") except ValueError: print("❌ 请输入有效的数字") - + elif choice == "7": # 液体转移 try: @@ -846,7 +880,7 @@ if __name__ == "__main__": source = input("源孔位 (可选, 如A1): ").strip() or None dest = input("目标孔位 (可选, 如B1): ").strip() or None new_tip = input("是否使用新枪头? (y/n, 默认y): ").strip().lower() != 'n' - + print(f"\n🔄 执行液体转移 ({volume}ul)...") if pipette.transfer(volume=volume, source_well=source, dest_well=dest, new_tip=new_tip): print("✅ 液体转移完成") @@ -854,7 +888,7 @@ if __name__ == "__main__": print("❌ 液体转移失败") except ValueError: print("❌ 请输入有效的数字") - + elif choice == "8": # 设置液体类型 print("\n🧪 可用液体类型:") @@ -864,16 +898,16 @@ if __name__ == "__main__": "3": (LiquidClass.VISCOUS, "粘稠液体"), "4": (LiquidClass.VOLATILE, "挥发性液体") } - + for key, (liquid_class, description) in liquid_options.items(): print(f" {key}. {description}") - + liquid_choice = input("请选择液体类型 (1-4): ").strip() if liquid_choice in liquid_options: liquid_class, description = liquid_options[liquid_choice] pipette.set_liquid_class(liquid_class) print(f"✅ 液体类型设置为: {description}") - + # 显示参数 params = pipette.liquid_params print(f"📋 参数设置:") @@ -883,7 +917,7 @@ if __name__ == "__main__": print(f" 💧 预润湿: {'是' if params.pre_wet else '否'}") else: print("❌ 无效选择") - + elif choice == "9": # 自定义参数 try: @@ -892,19 +926,19 @@ if __name__ == "__main__": dispense_speed = input("排液速度 (默认800): ").strip() air_gap = input("空气间隙 (ul, 默认10.0): ").strip() pre_wet = input("预润湿 (y/n, 默认n): ").strip().lower() == 'y' - + custom_params = LiquidParameters( aspirate_speed=int(aspirate_speed) if aspirate_speed else 500, dispense_speed=int(dispense_speed) if dispense_speed else 800, air_gap=float(air_gap) if air_gap else 10.0, pre_wet=pre_wet ) - + pipette.set_custom_parameters(custom_params) print("✅ 自定义参数设置完成") except ValueError: print("❌ 请输入有效的数字") - + elif choice == "10": # 校准体积 try: @@ -914,12 +948,12 @@ if __name__ == "__main__": print(f"✅ 校准完成,校准系数: {actual/expected:.3f}") except ValueError: print("❌ 请输入有效的数字") - + elif choice == "11": # 重置统计 pipette.reset_statistics() print("✅ 统计信息已重置") - + elif choice == "12": # 液体类型测试 print("\n🧪 液体类型参数对比:") @@ -929,7 +963,7 @@ if __name__ == "__main__": (LiquidClass.VISCOUS, "粘稠液体"), (LiquidClass.VOLATILE, "挥发性液体") ] - + for liquid_class, description in liquid_tests: params = pipette.LIQUID_PARAMS[liquid_class] print(f"\n📋 {description} ({liquid_class.value}):") @@ -938,7 +972,7 @@ if __name__ == "__main__": print(f" 💨 空气间隙: {params.air_gap}ul") print(f" 💧 预润湿: {'是' if params.pre_wet else '否'}") print(f" ⏱️ 吸液后延时: {params.delay_after_aspirate}s") - + elif choice == "99": # 紧急停止 print("\n🚨 执行紧急停止...") @@ -949,19 +983,19 @@ if __name__ == "__main__": else: print("❌ 紧急停止执行失败") print("⚠️ 请手动检查设备状态并采取必要措施") - + # 紧急停止后询问是否继续 continue_choice = input("\n是否继续操作?(y/n): ").strip().lower() if continue_choice != 'y': print("🚪 退出程序") break - + else: print("❌ 无效选择,请重新输入") - + # 等待用户确认继续 input("\n按回车键继续...") - + except KeyboardInterrupt: print("\n\n⚠️ 用户中断操作") except Exception as e: @@ -974,19 +1008,19 @@ if __name__ == "__main__": print("✅ 连接已断开") except: print("⚠️ 断开连接时出现问题") - + def demo_test(): """演示测试模式 - 完整功能演示""" print("\n" + "=" * 60) print("🎬 移液控制器演示测试") print("=" * 60) - + try: # 创建移液控制器实例 print("1. 🔧 创建移液控制器实例...") pipette = PipetteController(port="/dev/ttyUSB0", address=4) print("✅ 移液控制器实例创建成功") - + # 连接设备 print("\n2. 📞 连接移液器设备...") if pipette.connect(): @@ -994,7 +1028,7 @@ if __name__ == "__main__": else: print("❌ 设备连接失败") return False - + # 初始化设备 print("\n3. 🚀 初始化设备...") if pipette.initialize(): @@ -1002,19 +1036,19 @@ if __name__ == "__main__": else: print("❌ 设备初始化失败") return False - + # 装载枪头 print("\n4. 🔧 装载枪头...") if pipette.pickup_tip(): print("✅ 枪头装载成功") else: print("❌ 枪头装载失败") - + # 设置液体类型 print("\n5. 🧪 设置液体类型为血清...") pipette.set_liquid_class(LiquidClass.SERUM) print("✅ 液体类型设置完成") - + # 吸液操作 print("\n6. 💧 执行吸液操作...") volume_to_aspirate = 100.0 @@ -1023,7 +1057,7 @@ if __name__ == "__main__": print(f"📊 当前体积: {pipette.current_volume}ul") else: print("❌ 吸液失败") - + # 排液操作 print("\n7. 💦 执行排液操作...") volume_to_dispense = 50.0 @@ -1032,14 +1066,14 @@ if __name__ == "__main__": print(f"📊 剩余体积: {pipette.current_volume}ul") else: print("❌ 排液失败") - + # 混合操作 print("\n8. 🌀 执行混合操作...") if pipette.mix(cycles=3, volume=30.0): print("✅ 混合完成") else: print("❌ 混合失败") - + # 获取状态信息 print("\n9. 📊 获取设备状态...") status = pipette.get_status() @@ -1052,30 +1086,30 @@ if __name__ == "__main__": # print(f" 🔧 枪头使用次数: {status['statistics']['tip_count']}") print(f" ⬆️ 吸液次数: {status['statistics']['aspirate_count']}") print(f" ⬇️ 排液次数: {status['statistics']['dispense_count']}") - + # 弹出枪头 print("\n10. 🗑️ 弹出枪头...") if pipette.eject_tip(): print("✅ 枪头弹出成功") else: print("❌ 枪头弹出失败") - + print("\n" + "=" * 60) print("✅ 移液控制器演示测试完成") print("=" * 60) - + return True - + except Exception as e: print(f"\n❌ 测试过程中发生异常: {e}") return False - + finally: # 断开连接 print("\n📞 断开连接...") pipette.disconnect() print("✅ 连接已断开") - + # 主程序入口 print("🧪 移液器控制器测试程序") print("=" * 40) @@ -1083,9 +1117,9 @@ if __name__ == "__main__": print("2. 🎬 演示测试") print("0. 🚪 退出") print("=" * 40) - + mode = input("请选择测试模式 (0-2): ").strip() - + if mode == "1": interactive_test() elif mode == "2": @@ -1094,7 +1128,7 @@ if __name__ == "__main__": print("👋 再见!") else: print("❌ 无效选择") - + print("\n🎉 程序结束!") print("\n💡 使用说明:") print("1. 确保移液器硬件已正确连接") diff --git a/unilabos/devices/liquid_handling/laiyu/laiyu.py b/unilabos/devices/liquid_handling/laiyu/laiyu.py index 0d7074a7..8591c888 100644 --- a/unilabos/devices/liquid_handling/laiyu/laiyu.py +++ b/unilabos/devices/liquid_handling/laiyu/laiyu.py @@ -13,7 +13,7 @@ from pylabrobot.liquid_handling import ( SingleChannelDispense, PickupTipRack, DropTipRack, - MultiHeadAspirationPlate, ChatterBoxBackend, LiquidHandlerChatterboxBackend, + MultiHeadAspirationPlate, ) from pylabrobot.liquid_handling.standard import ( MultiHeadAspirationContainer, @@ -41,12 +41,6 @@ class TransformXYZDeck(Deck): super().__init__(name, size_x, size_y, size_z) self.name = name -class TransformXYZBackend(LiquidHandlerBackend): - def __init__(self, name: str, host: str, port: int, timeout: float): - super().__init__() - self.host = host - self.port = port - self.timeout = timeout class TransformXYZRvizBackend(UniLiquidHandlerRvizBackend): def __init__(self, name: str, channel_num: int): @@ -86,7 +80,9 @@ class TransformXYZContainer(Plate, TipRack): class TransformXYZHandler(LiquidHandlerAbstract): support_touch_tip = False - def __init__(self, deck: Deck, host: str = "127.0.0.1", port: int = 9999, timeout: float = 10.0, channel_num=1, simulator=True, **backend_kwargs): + def __init__(self, deck: Deck, host: str = "127.0.0.1", port: int = 9999, timeout: float = 10.0, channel_num=1, simulator=True, + serial_port: str = "/dev/ttyUSB0", baudrate: int = 115200, pipette_address: int = 4, + total_height: float = 310, **backend_kwargs): # Handle case where deck is passed as a dict (from serialization) if isinstance(deck, dict): # Try to create a TransformXYZDeck from the dict @@ -102,11 +98,22 @@ class TransformXYZHandler(LiquidHandlerAbstract): deck = TransformXYZDeck(name='deck', size_x=100, size_y=100, size_z=100) if simulator: - self._unilabos_backend = TransformXYZRvizBackend(name="laiyu",channel_num=channel_num) + self._unilabos_backend = TransformXYZRvizBackend(name="laiyu", channel_num=channel_num) else: - self._unilabos_backend = TransformXYZBackend(name="laiyu",host=host, port=port, timeout=timeout) + self._unilabos_backend = UniLiquidHandlerLaiyuBackend( + num_channels=channel_num, + total_height=total_height, + port=serial_port, + baudrate=baudrate, + pipette_address=pipette_address, + ) super().__init__(backend=self._unilabos_backend, deck=deck, simulator=simulator, channel_num=channel_num) + def post_init(self, ros_node): + super().post_init(ros_node) + if hasattr(self._unilabos_backend, 'post_init'): + self._unilabos_backend.post_init(ros_node) + async def add_liquid( self, asp_vols: Union[List[float], float], @@ -128,7 +135,25 @@ class TransformXYZHandler(LiquidHandlerAbstract): mix_liquid_height: Optional[float] = None, none_keys: List[str] = [], ): - pass + return await super().add_liquid( + asp_vols=asp_vols, + dis_vols=dis_vols, + reagent_sources=reagent_sources, + targets=targets, + use_channels=use_channels, + flow_rates=flow_rates, + offsets=offsets, + liquid_height=liquid_height, + blow_out_air_volume=blow_out_air_volume, + spread=spread, + is_96_well=is_96_well, + delays=delays, + mix_time=mix_time, + mix_vol=mix_vol, + mix_rate=mix_rate, + mix_liquid_height=mix_liquid_height, + none_keys=none_keys, + ) async def aspirate( self, @@ -142,7 +167,17 @@ class TransformXYZHandler(LiquidHandlerAbstract): spread: Literal["wide", "tight", "custom"] = "wide", **backend_kwargs, ): - pass + return await super().aspirate( + resources=resources, + vols=vols, + use_channels=use_channels, + flow_rates=flow_rates, + offsets=offsets, + liquid_height=liquid_height, + blow_out_air_volume=blow_out_air_volume, + spread=spread, + **backend_kwargs, + ) async def dispense( self, @@ -156,7 +191,17 @@ class TransformXYZHandler(LiquidHandlerAbstract): spread: Literal["wide", "tight", "custom"] = "wide", **backend_kwargs, ): - pass + return await super().dispense( + resources=resources, + vols=vols, + use_channels=use_channels, + flow_rates=flow_rates, + offsets=offsets, + liquid_height=liquid_height, + blow_out_air_volume=blow_out_air_volume, + spread=spread, + **backend_kwargs, + ) async def drop_tips( self, @@ -166,7 +211,13 @@ class TransformXYZHandler(LiquidHandlerAbstract): allow_nonzero_volume: bool = False, **backend_kwargs, ): - pass + return await super().drop_tips( + tip_spots=tip_spots, + use_channels=use_channels, + offsets=offsets, + allow_nonzero_volume=allow_nonzero_volume, + **backend_kwargs, + ) async def mix( self, @@ -178,7 +229,15 @@ class TransformXYZHandler(LiquidHandlerAbstract): mix_rate: Optional[float] = None, none_keys: List[str] = [], ): - pass + return await super().mix( + targets=targets, + mix_time=mix_time, + mix_vol=mix_vol, + height_to_bottom=height_to_bottom, + offsets=offsets, + mix_rate=mix_rate, + none_keys=none_keys, + ) async def pick_up_tips( self, @@ -187,7 +246,12 @@ class TransformXYZHandler(LiquidHandlerAbstract): offsets: Optional[List[Coordinate]] = None, **backend_kwargs, ): - pass + return await super().pick_up_tips( + tip_spots=tip_spots, + use_channels=use_channels, + offsets=offsets, + **backend_kwargs, + ) async def transfer_liquid( self, @@ -214,5 +278,26 @@ class TransformXYZHandler(LiquidHandlerAbstract): delays: Optional[List[int]] = None, none_keys: List[str] = [], ): - pass - \ No newline at end of file + return await super().transfer_liquid( + sources=sources, + targets=targets, + tip_racks=tip_racks, + use_channels=use_channels, + asp_vols=asp_vols, + dis_vols=dis_vols, + asp_flow_rates=asp_flow_rates, + dis_flow_rates=dis_flow_rates, + offsets=offsets, + touch_tip=touch_tip, + liquid_height=liquid_height, + blow_out_air_volume=blow_out_air_volume, + spread=spread, + is_96_well=is_96_well, + mix_stage=mix_stage, + mix_times=mix_times, + mix_vol=mix_vol, + mix_rate=mix_rate, + mix_liquid_height=mix_liquid_height, + delays=delays, + none_keys=none_keys, + ) diff --git a/unilabos/registry/devices/liquid_handler.yaml b/unilabos/registry/devices/liquid_handler.yaml index b04d6317..93a13c32 100644 --- a/unilabos/registry/devices/liquid_handler.yaml +++ b/unilabos/registry/devices/liquid_handler.yaml @@ -6945,7 +6945,7 @@ liquid_handler.laiyu: properties: channel_num: default: 1 - type: string + type: integer deck: type: object host: @@ -6956,10 +6956,25 @@ liquid_handler.laiyu: type: integer simulator: default: true - type: string + type: boolean timeout: default: 10.0 type: number + serial_port: + default: /dev/ttyUSB0 + description: 硬件串口端口(非 simulator 模式下使用) + type: string + baudrate: + default: 115200 + type: integer + pipette_address: + default: 4 + description: SOPA 移液器 RS485 地址 + type: integer + total_height: + default: 310 + description: 龙门架总高度 (mm),用于坐标转换 + type: number required: - deck type: object