A modular robotics control system with a 4-layer architecture for flexible hardware abstraction and visual programming. This platform provides a comprehensive framework for building and controlling robotic systems with safety monitoring, state management, and real-time communication capabilities.
- Overview
- Architecture
- Project Structure
- Quick Start
- Installation
- Core Components
- Development Guide
- Testing
- Demos and Examples
- Configuration
- Documentation
- Contributing
- Troubleshooting
The Orchestrator Platform is designed to provide a robust, scalable foundation for robotics applications. It implements a layered architecture that separates hardware concerns from application logic, enabling flexible deployment across different hardware configurations while maintaining consistent interfaces and safety guarantees.
- Hardware Abstraction Layer (HAL): Unified interface for sensors, actuators, and communication
- Safety System: Multi-layered safety monitoring with hardware and software checks
- State Management: Centralized state persistence and synchronization
- MQTT Communication: Real-time messaging and control
- Visual Programming: Node-RED integration for flow-based programming
- Comprehensive Testing: Unit, integration, and functional test coverage
- Modular Design: Pluggable components and extensible architecture
The system implements a 4-layer architecture designed for flexibility and maintainability:
┌─────────────────────────────────────────┐
│ Control & UI Layer │
│ (Node-RED, Web Interface) │
├─────────────────────────────────────────┤
│ Communication Layer │
│ (MQTT Broker) │
├─────────────────────────────────────────┤
│ Hardware Abstraction Layer │
│ (Python HAL Service, Safety, │
│ State Management, Logging) │
├─────────────────────────────────────────┤
│ Physical Hardware │
│ (Raspberry Pi, Sensors, Actuators) │
└─────────────────────────────────────────┘
- Physical Hardware: Raspberry Pi, sensors (encoders, LIDAR), actuators (motors)
- Hardware Abstraction Layer: Python services providing unified interfaces
- Communication Layer: MQTT broker for real-time messaging
- Control & UI Layer: Node-RED flows and web-based dashboards
The codebase is organized into logical directories for maintainability and ease of navigation:
orchestrator-platform/
├── hal_service/ # Core HAL implementation
│ ├── __init__.py # Package initialization
│ ├── base.py # Base classes and interfaces
│ ├── config.py # Configuration management
│ ├── encoder_sensor.py # Encoder sensor interface
│ ├── lidar_sensor.py # LIDAR sensor interface
│ ├── logging_service.py # Logging service implementation
│ ├── motor_controller.py # Motor control interface
│ ├── mqtt_client.py # MQTT communication client
│ ├── safety_monitor.py # Safety monitoring implementation
│ ├── state_manager.py # State management implementation
│ └── requirements.txt # HAL-specific dependencies
├── docs/ # Comprehensive documentation
│ ├── README.md # Documentation index
│ ├── node_red_config.md # Node-RED setup guide
│ └── hal_service/ # Component-specific documentation
│ ├── README_ENCODER.md
│ ├── README_LIDAR.md
│ ├── README_LOGGING.md
│ ├── README_MOTOR.md
│ ├── README_MQTT.md
│ ├── README_SAFETY.md
│ └── README_STATE_MANAGER.md
├── tests/ # Complete test suite
│ ├── conftest.py # Test configuration and fixtures
│ ├── test_basic.py # Basic functionality tests
│ ├── test_config.py # Configuration tests
│ ├── test_*_sensor.py # Sensor-specific tests
│ ├── test_*_controller.py # Controller tests
│ ├── test_*_service.py # Service tests
│ └── test_*_integration.py # Integration tests
├── demos/ # Examples and validation scripts
│ ├── config_example.py # Configuration examples
│ ├── demo_*.py # Component demonstrations
│ ├── *_example.py # Usage examples
│ ├── validate_*.py # Validation scripts
│ └── demo_logs/ # Demo execution logs
├── configs/ # Configuration files
│ ├── config.yaml # Main configuration
│ ├── example_config.yaml # Configuration template
│ ├── systemd/ # Service definitions
│ └── node_red_config/ # Node-RED configurations
├── logs/ # Runtime logs
├── .kiro/specs/ # Feature specifications
│ └── orchestrator-platform/
│ ├── requirements.md # Feature requirements
│ ├── design.md # System design
│ └── tasks.md # Implementation tasks
├── .github/workflows/ # CI/CD configuration
├── orchestrator_hal.py # Main HAL service
├── safety_monitor_service.py # Safety service
├── state_manager_service.py # State management service
├── requirements.txt # Project dependencies
├── run_tests.py # Test runner script
└── run_demo.py # Demo runner script
- Python 3.8+
- Raspberry Pi (for hardware components)
- MQTT broker (Mosquitto recommended)
- Node-RED (for visual programming interface)
-
Clone and Install
git clone <repository-url> cd orchestrator-platform # Create virtual environment (recommended) python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate # Install dependencies pip install -r requirements.txt pip install -r hal_service/requirements.txt
-
Configure System Dependencies
# On Raspberry Pi sudo apt update sudo apt install mosquitto mosquitto-clients nodejs npm sudo systemctl enable mosquitto sudo systemctl start mosquitto # Install Node-RED sudo npm install -g --unsafe-perm node-red sudo npm install -g --unsafe-perm node-red-dashboard
-
Configure Hardware
cp configs/example_config.yaml configs/config.yaml # Edit configs/config.yaml with your hardware settings # Enable I2C and SPI (on Raspberry Pi) sudo raspi-config # Navigate to Interface Options -> I2C -> Enable # Navigate to Interface Options -> SPI -> Enable # Add user to required groups sudo usermod -a -G gpio,i2c,spi $USER # Logout and login again for group changes to take effect
-
Verify Installation
# Test MQTT broker mosquitto_pub -h localhost -t test -m "hello" mosquitto_sub -h localhost -t test # Run basic tests python run_tests.py # Run demo to verify setup python run_demo.py demo_mock_hal
-
Start Core Services
# Terminal 1: Start HAL service python orchestrator_hal.py # Terminal 2: Start safety monitor python safety_monitor_service.py # Terminal 3: Start state manager python state_manager_service.py # Terminal 4: Start Node-RED (optional) node-red
-
Access Web Interface
# Node-RED editor: http://localhost:1880 # Dashboard: http://localhost:1880/ui
Raspberry Pi:
sudo apt update
sudo apt install python3-pip mosquitto mosquitto-clients nodejs npm
sudo pip3 install RPi.GPIO
Development Machine:
# Install Python dependencies
pip install -r requirements.txt
# Install Node-RED (optional)
npm install -g node-red
Using Virtual Environment (Recommended):
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
- GPIO Configuration: Ensure proper GPIO pin assignments in
configs/config.yaml
- I2C/SPI: Enable I2C and SPI interfaces via
raspi-config
- Permissions: Add user to
gpio
andi2c
groups - MQTT Broker: Configure and start Mosquitto broker
The HAL provides unified interfaces for all hardware components:
- Encoder Sensor (
encoder_sensor.py
): Rotary encoder interface with position tracking - LIDAR Sensor (
lidar_sensor.py
): Distance measurement and obstacle detection
- Motor Controller (
motor_controller.py
): PWM motor control with safety limits
- MQTT Client (
mqtt_client.py
): Pub/sub messaging with automatic reconnection
- Logging Service (
logging_service.py
): Centralized logging with rotation - Safety Monitor (
safety_monitor.py
): Real-time safety checking - State Manager (
state_manager.py
): State persistence and synchronization
Multi-layered safety architecture:
- Hardware Safety: GPIO-based emergency stops and limit switches
- Software Safety: Runtime monitoring of system parameters
- Communication Safety: Watchdog timers and heartbeat monitoring
- State Safety: Validation of state transitions and persistence
Centralized state management with:
- Persistence: State saved to disk with atomic writes
- Synchronization: MQTT-based state distribution
- Validation: Schema-based state validation
- Recovery: Automatic state recovery on startup
MQTT-based communication providing:
- Real-time Messaging: Low-latency command and telemetry
- Topic Organization: Hierarchical topic structure
- Quality of Service: Configurable QoS levels
- Retained Messages: State persistence across connections
- Python 3.8+ with pip
- Git
- Code editor (VS Code recommended)
- Hardware (Raspberry Pi 4 recommended for full testing)
-
Clone Repository
git clone <repository-url> cd orchestrator-platform
-
Setup Python Environment
# Create and activate virtual environment python -m venv venv source venv/bin/activate # Linux/Mac # OR venv\Scripts\activate # Windows # Upgrade pip and install dependencies pip install --upgrade pip pip install -r requirements.txt pip install -r hal_service/requirements.txt # Install development dependencies pip install pytest pytest-cov flake8 black mypy
-
Configure Development Environment
# Copy example configuration cp configs/example_config.yaml configs/config.yaml # Set Python path for imports export PYTHONPATH="${PYTHONPATH}:$(pwd)" # Add to ~/.bashrc or ~/.zshrc for persistence echo 'export PYTHONPATH="${PYTHONPATH}:'$(pwd)'"' >> ~/.bashrc
-
Install System Dependencies (Development Machine)
# Ubuntu/Debian sudo apt install mosquitto mosquitto-clients # macOS brew install mosquitto # Start MQTT broker sudo systemctl start mosquitto # Linux brew services start mosquitto # macOS
-
Setup Node-RED (Optional)
# Install Node.js and npm if not already installed curl -fsSL https://deb.nodesource.com/setup_18.x | sudo -E bash - sudo apt-get install -y nodejs # Install Node-RED sudo npm install -g --unsafe-perm node-red sudo npm install -g --unsafe-perm node-red-dashboard # Configure Node-RED mkdir -p ~/.node-red cp configs/node_red_config/settings.js ~/.node-red/
-
Verify Development Setup
# Run all tests python run_tests.py # Run linting flake8 hal_service/ tests/ # Run type checking mypy hal_service/ # Run demo python run_demo.py demo_mock_hal
VS Code Setup (recommended):
-
Install Python extension
-
Configure settings.json:
{ "python.defaultInterpreterPath": "./venv/bin/python", "python.linting.enabled": true, "python.linting.flake8Enabled": true, "python.formatting.provider": "black", "python.testing.pytestEnabled": true, "python.testing.pytestArgs": ["tests/"] }
-
Install recommended extensions:
- Python
- Python Docstring Generator
- GitLens
- YAML
- Base Classes: Extend
hal_service.base.BaseComponent
for new components - Configuration: Use
hal_service.config
for settings management - Logging: Integrate with
hal_service.logging_service
- Testing: Follow existing test patterns in
tests/
-
Create Component Class:
from hal_service.base import BaseComponent class NewSensor(BaseComponent): def __init__(self, config): super().__init__(config) # Initialize hardware def read_data(self): # Implement data reading pass
-
Add Configuration Schema:
# In hal_service/config.py class NewSensorConfig(BaseModel): pin: int sample_rate: float = 10.0
-
Create Tests:
# In tests/test_new_sensor.py def test_new_sensor_initialization(): # Test component initialization pass
-
Add Documentation:
# In docs/hal_service/README_NEW_SENSOR.md # New Sensor Component ## Overview Description of the new sensor...
-
Feature Development:
- Create feature branch
- Implement component in
hal_service/
- Add comprehensive tests in
tests/
- Update documentation in
docs/
-
Testing:
# Run all tests python run_tests.py # Run specific test python -m pytest tests/test_new_component.py -v
-
Validation:
# Create demo script python run_demo.py new_component_demo # Validate functionality python run_demo.py validate_new_component
The test suite is organized by component and test type:
- Unit Tests: Individual component testing
- Integration Tests: Multi-component interaction testing
- Functional Tests: End-to-end workflow testing
- Hardware Tests: Hardware-in-the-loop testing (when available)
All Tests:
python run_tests.py
Specific Test Categories:
# Unit tests only
python -m pytest tests/test_*_sensor.py tests/test_*_controller.py
# Integration tests
python -m pytest tests/test_*_integration.py
# Configuration tests
python -m pytest tests/test_config.py
Test Coverage:
python -m pytest --cov=hal_service tests/
Tests use pytest
with configuration in tests/conftest.py
:
- Fixtures: Common test setup and teardown
- Mocking: Hardware simulation for unit tests
- Parametrization: Multiple test scenarios
- Markers: Test categorization and selection
The demos/
directory contains practical examples:
demo_logging.py
- Logging service demonstrationdemo_safety_monitor.py
- Safety system showcaseencoder_example.py
- Encoder sensor usagelidar_example.py
- LIDAR sensor integrationmotor_example.py
- Motor control examplesmqtt_example.py
- MQTT communication patterns
config_example.py
- Configuration managementlogging_example.py
- Logging setup and usage
validate_mqtt.py
- MQTT connectivity validationvalidate_safety_monitor.py
- Safety system validation
List Available Demos:
python run_demo.py
Run Specific Demo:
python run_demo.py demo_safety_monitor
python run_demo.py validate_mqtt
Demo with Arguments:
python demos/motor_example.py --speed 50 --duration 10
The primary configuration file is configs/config.yaml
:
# Hardware Configuration
gpio:
motor_pins: [18, 19]
encoder_pins: [20, 21]
safety_pin: 22
# MQTT Configuration
mqtt:
broker: "localhost"
port: 1883
topics:
commands: "orchestrator/commands"
telemetry: "orchestrator/telemetry"
# Safety Configuration
safety:
max_speed: 100
emergency_stop_pin: 22
watchdog_timeout: 5.0
# Logging Configuration
logging:
level: "INFO"
file: "logs/orchestrator.log"
max_size: "10MB"
backup_count: 5
configs/config.yaml
- Production configurationconfigs/example_config.yaml
- Template and development settings- Environment variables override file settings
SystemD service files in configs/systemd/
:
orchestrator-safety.service
- Safety monitor servicestate-manager.service
- State management service
The docs/
directory contains comprehensive documentation:
- User Guide - Complete guide for operating the robot via dashboard
- MQTT Reference - Complete MQTT communication protocol reference
- System Access Guide - System configuration and access
- Deployment Guide - Installation and deployment instructions
- HAL Service Documentation - Hardware abstraction layer components
- Encoder sensor API and usage
- LiDAR sensor integration guide
- Logging service configuration
- Motor controller interface
- MQTT client usage patterns
- Safety system architecture
- State management guide
- Node-RED Documentation - Control interface implementation
- Command flow implementation
- Telemetry flow implementation
- Dashboard UI implementation
- Mission sequencer implementation
- Mock HAL Documentation - Development and testing tools
- Node-RED Configuration - Node-RED integration setup
# View main documentation index
cat docs/README.md
# Access user guide
cat docs/USER_GUIDE.md
# Review MQTT protocol
cat docs/MQTT_REFERENCE.md
# Check component documentation
ls docs/hal_service/
-
Fork and Clone:
git clone <your-fork-url> cd orchestrator-platform
-
Create Development Environment:
python -m venv venv source venv/bin/activate pip install -r requirements.txt
-
Install Development Tools:
pip install flake8 black pytest-cov
- Style: Follow PEP 8, use
black
for formatting - Linting: Use
flake8
for code quality - Testing: Maintain >90% test coverage
- Documentation: Document all public APIs
- Create feature branch from
main
- Implement changes with tests
- Update documentation
- Run full test suite
- Submit pull request with description
- Tests pass and coverage maintained
- Code follows style guidelines
- Documentation updated
- No breaking changes (or properly documented)
- Security considerations addressed
Import Errors:
# Ensure Python path includes project root
export PYTHONPATH="${PYTHONPATH}:$(pwd)"
GPIO Permission Errors:
# Add user to gpio group
sudo usermod -a -G gpio $USER
# Logout and login again
MQTT Connection Issues:
# Check broker status
sudo systemctl status mosquitto
# Test connectivity
mosquitto_pub -h localhost -t test -m "hello"
mosquitto_sub -h localhost -t test
Hardware Not Detected:
# Check I2C devices
sudo i2cdetect -y 1
# Check GPIO status
gpio readall
Enable debug logging:
# In config.yaml
logging:
level: "DEBUG"
Or via environment variable:
export ORCHESTRATOR_LOG_LEVEL=DEBUG
python orchestrator_hal.py
Check logs for issues:
# View recent logs
tail -f logs/orchestrator.log
# Search for errors
grep ERROR logs/orchestrator.log
# Analyze demo logs
cat demos/demo_logs/orchestrator.log
Monitor system performance:
# CPU and memory usage
htop
# GPIO activity
watch -n 1 gpio readall
# MQTT traffic
mosquitto_sub -h localhost -t "orchestrator/#" -v
This project is licensed under the MIT License - see the LICENSE file for details.
For support and questions:
- Check the documentation in
docs/
- Review existing issues and demos
- Create an issue for bugs or feature requests
- Refer to component-specific README files for detailed usage
The Orchestrator Platform provides a robust foundation for robotics applications with safety, reliability, and extensibility at its core.