Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/industrial-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
- cron: '0 1 * * *' # Runs daily to check for dependency issues or flaking tests
jobs:
call_reusable_workflow:
uses: vortexntnu/vortex-ci/.github/workflows/reusable-industrial-ci.yml@main
uses: vortexntnu/vortex-ci/.github/workflows/reusable-industrial-ci.yml@debs/ament-pytest
with:
upstream_workspace: './dependencies.repos'
before_install_upstream_dependencies: './scripts/ci_install_dependencies.sh'
Expand Down
Empty file added filtering/COLCON_IGNORE
Empty file.
Empty file added mission/FSM/COLCON_IGNORE
Empty file.
6 changes: 6 additions & 0 deletions mission/keyboard_joy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# keyboard_joy

`keyboard_joy` is a ROS 2 Python node that publishes `sensor_msgs/Joy` messages based on keyboard input, acting as a simple joystick replacement.
Key-to-axis and key-to-button mappings are configurable via YAML and support both hold and sticky axis modes.

TODO: Currently **only works on Xorg** (not Wayland) due to limitations in global keyboard capture.
32 changes: 32 additions & 0 deletions mission/keyboard_joy/config/key_mappings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
axes:
# Left stick (surge/sway)
w: [1, 1.0, 'hold']
s: [1, -1.0, 'hold']
a: [0, 1.0, 'hold']
d: [0, -1.0, 'hold']

# Heave
Key.space: [2, 1.0, 'hold'] # Up (RT)
Key.shift: [2, -1.0, 'hold'] # Down (LT)

# Rotation (pitch/yaw)
Key.up: [4, 1.0, 'hold']
Comment on lines +1 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have a comment or something explaining what the three values in the list represent.

Key.down: [4, -1.0, 'hold']
Key.left: [3, 1.0, 'hold']
Key.right: [3, -1.0, 'hold']

parameters:
axis_increment_rate: 0.02 # update interval (higher = slower response)
Comment on lines +18 to +19
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name says rate but the value is a period, as the bot mentioned. Choose one (50 Hz or 0.02 s). Also publish rate/period should probably be a param too

axis_increment_step: 0.1 # axis change per update (higher = faster movement)


buttons:
# Roll
q: 4 # LB
e: 5 # RB

# Modes
'1': 0 # A - Xbox mode
'2': 1 # B - Killswitch
'3': 2 # X - Auto
'4': 3 # Y - Reference
Empty file.
163 changes: 163 additions & 0 deletions mission/keyboard_joy/keyboard_joy/keyboard_joy_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#!/usr/bin/env python3
import os
import threading

import rclpy
import yaml
from ament_index_python.packages import get_package_share_directory
from pynput import keyboard
from rclpy.node import Node
from sensor_msgs.msg import Joy

start_message = r"""
██ ▄█▀▓█████▓██ ██▓ ▄▄▄▄ ▒█████ ▄▄▄ ██▀███ ▓█████▄ ▄▄▄██▀▀▀▒█████ ▓██ ██▓
██▄█▒ ▓█ ▀ ▒██ ██▒▓█████▄ ▒██▒ ██▒▒████▄ ▓██ ▒ ██▒▒██▀ ██▌ ▒██ ▒██▒ ██▒▒██ ██▒
▓███▄░ ▒███ ▒██ ██░▒██▒ ▄██▒██░ ██▒▒██ ▀█▄ ▓██ ░▄█ ▒░██ █▌ ░██ ▒██░ ██▒ ▒██ ██░
▓██ █▄ ▒▓█ ▄ ░ ▐██▓░▒██░█▀ ▒██ ██░░██▄▄▄▄██ ▒██▀▀█▄ ░▓█▄ ▌▓██▄██▓ ▒██ ██░ ░ ▐██▓░
▒██▒ █▄░▒████▒ ░ ██▒▓░░▓█ ▀█▓░ ████▓▒░ ▓█ ▓██▒░██▓ ▒██▒░▒████▓ ▓███▒ ░ ████▓▒░ ░ ██▒▓░
▒ ▒▒ ▓▒░░ ▒░ ░ ██▒▒▒ ░▒▓███▀▒░ ▒░▒░▒░ ▒▒ ▓▒█░░ ▒▓ ░▒▓░ ▒▒▓ ▒ ▒▓▒▒░ ░ ▒░▒░▒░ ██▒▒▒
░ ░▒ ▒░ ░ ░ ░▓██ ░▒░ ▒░▒ ░ ░ ▒ ▒░ ▒ ▒▒ ░ ░▒ ░ ▒░ ░ ▒ ▒ ▒ ░▒░ ░ ▒ ▒░ ▓██ ░▒░
░ ░░ ░ ░ ▒ ▒ ░░ ░ ░ ░ ░ ░ ▒ ░ ▒ ░░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ▒ ▒ ░░
░ ░ ░ ░░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░
░ ░ ░ ░ ░ ░
"""
Comment on lines +12 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes



class KeyboardJoy(Node):
def __init__(self):
super().__init__('keyboard_joy')

self.declare_parameter('config', '')
Comment on lines +29 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer

self.declare_parameter('config', Parameter.Type.STRING)

self.load_key_mappings()

self.joy_pub = self.create_publisher(Joy, 'joy', 10)
Comment on lines +32 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topic name parameter in orca.yaml?


max_axis_index = max((v[0] for v in self.axis_mappings.values()), default=-1)
max_button_index = max(self.button_mappings.values(), default=-1)

self.joy_msg = Joy()
self.joy_msg.axes = [0.0] * (max_axis_index + 1)
self.joy_msg.buttons = [0] * (max_button_index + 1)
Comment on lines +38 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an idea, but would it make sense to utilize the frame id of the joy message, e.g. "Joystick" and "Keyboard"? Wouldnt make a difference unless handled by the subscriber, but maybe nice for logs/bags?

Comment on lines +35 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not super clear what is going on here (especially the +1), would be nice with a comment or abstraction. Ties back to the comment in config file perhaps


self.active_axes = {}
self.sticky_axes = {}

self.lock = threading.Lock()

self.listener = keyboard.Listener(
on_press=self.on_press, on_release=self.on_release
)
self.listener.start()

self.create_timer(0.05, self.publish_joy)
self.create_timer(self.axis_increment_rate, self.update_active_axes)

self.get_logger().info(start_message)

def load_key_mappings(self):
config_file = self.get_parameter(
'config'
).get_parameter_value().string_value or os.path.join(
get_package_share_directory('keyboard_joy'), 'config', 'key_mappings.yaml'
)
Comment on lines +58 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default case here seems redundant if the node is launched anyway?


try:
with open(config_file) as f:
keymap = yaml.safe_load(f) or {}
except Exception as e:
self.get_logger().error(f"Failed to load config '{config_file}': {e}")
keymap = {}

self.axis_mappings = keymap.get('axes', {})
self.button_mappings = keymap.get('buttons', {})
params = keymap.get('parameters', {})

self.axis_increment_rate = float(params.get('axis_increment_rate', 0.02))
self.axis_increment_step = float(params.get('axis_increment_step', 0.05))

def on_press(self, key):
key_str = self.key_to_string(key)
if not key_str:
return
with self.lock:
if key_str in self.axis_mappings:
axis, val, mode = self.axis_mappings[key_str]
if mode == 'sticky':
self.sticky_axes[axis] = max(
min(
self.sticky_axes.get(axis, 0.0)
+ val * self.axis_increment_step,
1.0,
),
-1.0,
)
self.joy_msg.axes[axis] = round(self.sticky_axes[axis], 3)
else:
self.active_axes[axis] = val
elif key_str in self.button_mappings:
self.joy_msg.buttons[self.button_mappings[key_str]] = 1

def on_release(self, key):
key_str = self.key_to_string(key)
if not key_str:
return
with self.lock:
if key_str in self.axis_mappings:
axis, _, mode = self.axis_mappings[key_str]
self.active_axes.pop(axis, None)
if mode != 'sticky':
self.joy_msg.axes[axis] = 0.0
elif key_str in self.button_mappings:
self.joy_msg.buttons[self.button_mappings[key_str]] = 0

@staticmethod
def key_to_string(key):
"""Convert pynput key event to a normalized string."""
if hasattr(key, 'char') and key.char:
return key.char.lower()
if hasattr(key, 'name') and key.name:
return f'Key.{key.name}'
return None

def update_active_axes(self):
"""Gradually update active axes towards their target values."""
with self.lock:
for axis, target in self.active_axes.items():
current = self.joy_msg.axes[axis]
delta = (
self.axis_increment_step
if target > 0
else -self.axis_increment_step
)
next_val = current + delta
if (delta > 0 and next_val > target) or (
delta < 0 and next_val < target
):
next_val = target
self.joy_msg.axes[axis] = round(next_val, 3)

def publish_joy(self):
with self.lock:
self.joy_msg.header.stamp = self.get_clock().now().to_msg()
self.joy_pub.publish(self.joy_msg)

def destroy_node(self):
if self.listener:
self.listener.stop()
super().destroy_node()


def main(args=None):
rclpy.init(args=args)
node = KeyboardJoy()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()
35 changes: 35 additions & 0 deletions mission/keyboard_joy/launch/keyboard_joy_node.launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node


def generate_launch_description():
keyboard_config = os.path.join(
get_package_share_directory('keyboard_joy'), 'config', 'key_mappings.yaml'
)

orca_params = os.path.join(
get_package_share_directory('auv_setup'), 'config', 'robots', 'orca.yaml'
)

return LaunchDescription(
[
DeclareLaunchArgument(
'config',
default_value=keyboard_config,
description='Path to key mappings YAML file',
),
Node(
package='keyboard_joy',
executable='keyboard_joy_node',
name='keyboard_joy',
namespace='orca',
output='screen',
parameters=[{'config': LaunchConfiguration('config')}, orca_params],
),
]
)
20 changes: 20 additions & 0 deletions mission/keyboard_joy/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>keyboard_joy</name>
<version>0.0.0</version>
<description>Keyboard teleop node that publishes sensor_msgs/Joy messages</description>
<maintainer email="[email protected]">kluge7</maintainer>
<license>MIT</license>

<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>python3-pynput</depend>

<test_depend>ament_pytest</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
4 changes: 4 additions & 0 deletions mission/keyboard_joy/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/keyboard_joy
[install]
install_scripts=$base/lib/keyboard_joy
30 changes: 30 additions & 0 deletions mission/keyboard_joy/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
from glob import glob

from setuptools import setup

package_name = 'keyboard_joy'

setup(
name=package_name,
version='0.0.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages', ['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*.launch.py')),
(os.path.join('share', package_name, 'config'), glob('config/*.yaml')),
],
install_requires=['setuptools'],
tests_require=['pytest'],
zip_safe=True,
maintainer='kluge7',
maintainer_email='[email protected]',
description='Keyboard teleop node that publishes sensor_msgs/Joy messages',
license='MIT',
entry_points={
'console_scripts': [
'keyboard_joy_node = keyboard_joy.keyboard_joy_node:main',
],
},
)