launch file ROS2

This commit is contained in:
acki
2025-11-12 18:31:38 +01:00
parent c549321bac
commit 3d7bfca61e

View File

@@ -1,12 +1,8 @@
from launch import LaunchDescription from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription, OpaqueFunction, Shutdown, RegisterEventHandler, TimerAction from launch.actions import DeclareLaunchArgument, OpaqueFunction, Shutdown, RegisterEventHandler, TimerAction, LogInfo
from launch.conditions import IfCondition
from launch.event_handlers import OnProcessExit from launch.event_handlers import OnProcessExit
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import LaunchConfiguration from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description(): def generate_launch_description():
@@ -16,7 +12,7 @@ def generate_launch_description():
with_com_arg = DeclareLaunchArgument('with_com', default_value='true', description='Launch COM nodes?') with_com_arg = DeclareLaunchArgument('with_com', default_value='true', description='Launch COM nodes?')
with_strat_arg = DeclareLaunchArgument('with_strat', default_value='true', description='Launch strategy nodes?') with_strat_arg = DeclareLaunchArgument('with_strat', default_value='true', description='Launch strategy nodes?')
channel_type = LaunchConfiguration('channel_type', default='serial') channel_type = LaunchConfiguration('channel_type', default='serial')
serial_port = LaunchConfiguration('serial_port', default='/dev/rplidar') serial_port = LaunchConfiguration('serial_port', default='/dev/rplidar')
serial_baudrate = LaunchConfiguration('serial_baudrate', default='115200') serial_baudrate = LaunchConfiguration('serial_baudrate', default='115200')
frame_id = LaunchConfiguration('frame_id', default='laser') frame_id = LaunchConfiguration('frame_id', default='laser')
@@ -24,49 +20,57 @@ def generate_launch_description():
angle_compensate = LaunchConfiguration('angle_compensate', default='true') angle_compensate = LaunchConfiguration('angle_compensate', default='true')
scan_mode = LaunchConfiguration('scan_mode', default='Sensitivity') scan_mode = LaunchConfiguration('scan_mode', default='Sensitivity')
# Get launch configs # ----------------------------
with_gui = LaunchConfiguration('with_gui') # Infinite LIDAR restart logic
with_rplidar = LaunchConfiguration('with_rplidar') # ----------------------------
with_com = LaunchConfiguration('with_com') def create_lidar_with_restart():
with_strat = LaunchConfiguration('with_strat') """Create a lidar node and its auto-restart handler."""
lidar_node = Node(
package='rplidar_ros',
executable='rplidar_node',
name='rplidar_node',
parameters=[{
'channel_type': channel_type,
'serial_port': serial_port,
'serial_baudrate': serial_baudrate,
'frame_id': frame_id,
'inverted': inverted,
'angle_compensate': angle_compensate,
'scan_mode': scan_mode,
}],
output='screen'
)
# Register a restart handler that re-creates both node + handler
restart_handler = RegisterEventHandler(
OnProcessExit(
target_action=lidar_node,
on_exit=[
LogInfo(msg="[Launch] RPLIDAR crashed — restarting in 2s..."),
TimerAction(
period=2.0,
actions=create_lidar_with_restart() # recursive: spawns new node + handler
)
]
)
)
return [lidar_node, restart_handler]
def launch_rplidar_restart_if_needed(context, *args, **kwargs): def launch_rplidar_restart_if_needed(context, *args, **kwargs):
if context.launch_configurations.get('with_rplidar') == 'true': if context.launch_configurations.get('with_rplidar') == 'true':
def create_rplidar_node(): # Start after 3 seconds
return Node( return [
package='rplidar_ros', TimerAction(
executable='rplidar_node', period=3.0,
name='rplidar_node', actions=create_lidar_with_restart()
parameters=[{
'channel_type': channel_type,
'serial_port': serial_port,
'serial_baudrate': serial_baudrate,
'frame_id': frame_id,
'inverted': inverted,
'angle_compensate': angle_compensate
}],
output='screen'
) )
]
rplidar_node = create_rplidar_node()
restart_handler = RegisterEventHandler(
OnProcessExit(
target_action=rplidar_node,
on_exit=[
TimerAction(
period=2.0,
actions=[create_rplidar_node()]
)
]
)
)
return [rplidar_node, restart_handler]
return [] return []
# ----------------------------
# Function to launch GUI and shutdown handler # GUI node
# ----------------------------
def launch_gui(context, *args, **kwargs): def launch_gui(context, *args, **kwargs):
if context.launch_configurations.get('with_gui') == 'true': if context.launch_configurations.get('with_gui') == 'true':
gui = Node( gui = Node(
@@ -83,45 +87,39 @@ def generate_launch_description():
return [gui, shutdown] return [gui, shutdown]
return [] return []
# Function to launch COM nodes # ----------------------------
# COM nodes
# ----------------------------
def launch_com(context, *args, **kwargs): def launch_com(context, *args, **kwargs):
if context.launch_configurations.get('with_com') == 'true': if context.launch_configurations.get('with_com') == 'true':
serial_listener = Node(
package='modelec_com',
executable='serial_listener',
name='serial_listener'
)
pcb_odo_interface = Node(
package='modelec_com',
executable='pcb_odo_interface',
name='pcb_odo_interface',
parameters=[{
'serial_port': "/dev/USB_ODO",
'baudrate': 115200,
'name': "pcb_odo",
}]
)
pcb_action_interface = Node(
package='modelec_com',
executable='pcb_action_interface',
name='pcb_action_interface',
parameters=[{
'serial_port': "/dev/USB_ACTION",
'baudrate': 115200,
'name': "pcb_action",
}]
)
return [ return [
serial_listener, Node(package='modelec_com', executable='serial_listener', name='serial_listener'),
pcb_odo_interface, Node(
pcb_action_interface, package='modelec_com',
executable='pcb_odo_interface',
name='pcb_odo_interface',
parameters=[{
'serial_port': "/dev/USB_ODO",
'baudrate': 115200,
'name': "pcb_odo",
}]
),
Node(
package='modelec_com',
executable='pcb_action_interface',
name='pcb_action_interface',
parameters=[{
'serial_port': "/dev/USB_ACTION",
'baudrate': 115200,
'name': "pcb_action",
}]
),
] ]
return [] return []
# Function to launch strategy nodes # ----------------------------
# Strategy nodes
# ----------------------------
def launch_strat(context, *args, **kwargs): def launch_strat(context, *args, **kwargs):
if context.launch_configurations.get('with_strat') == 'true': if context.launch_configurations.get('with_strat') == 'true':
return [ return [
@@ -131,41 +129,17 @@ def generate_launch_description():
] ]
return [] return []
# ----------------------------
# Final launch description
# ----------------------------
return LaunchDescription([ return LaunchDescription([
DeclareLaunchArgument('channel_type', default_value=channel_type, description='Specifying channel type of lidar'),
DeclareLaunchArgument( DeclareLaunchArgument('serial_port', default_value=serial_port, description='Specifying usb port to connected lidar'),
'channel_type', DeclareLaunchArgument('serial_baudrate', default_value=serial_baudrate, description='Specifying usb port baudrate to connected lidar'),
default_value=channel_type, DeclareLaunchArgument('frame_id', default_value=frame_id, description='Specifying frame_id of lidar'),
description='Specifying channel type of lidar'), DeclareLaunchArgument('inverted', default_value=inverted, description='Specifying whether or not to invert scan data'),
DeclareLaunchArgument('angle_compensate', default_value=angle_compensate, description='Specifying whether or not to enable angle_compensate of scan data'),
DeclareLaunchArgument( DeclareLaunchArgument('scan_mode', default_value=scan_mode, description='Specifying scan mode of lidar'),
'serial_port',
default_value=serial_port,
description='Specifying usb port to connected lidar'),
DeclareLaunchArgument(
'serial_baudrate',
default_value=serial_baudrate,
description='Specifying usb port baudrate to connected lidar'),
DeclareLaunchArgument(
'frame_id',
default_value=frame_id,
description='Specifying frame_id of lidar'),
DeclareLaunchArgument(
'inverted',
default_value=inverted,
description='Specifying whether or not to invert scan data'),
DeclareLaunchArgument(
'angle_compensate',
default_value=angle_compensate,
description='Specifying whether or not to enable angle_compensate of scan data'),
DeclareLaunchArgument(
'scan_mode',
default_value=scan_mode,
description='Specifying scan mode of lidar'),
with_gui_arg, with_gui_arg,
with_rplidar_arg, with_rplidar_arg,