diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d598134..eccfa7b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -36,22 +36,22 @@ repos: exclude: '^(src/pymc_core/hardware/lora/|examples/)' # Run pytest to ensure all tests pass - - repo: local - hooks: - - id: pytest - name: pytest - entry: pytest - language: system - pass_filenames: false - # Only run if Python files in src/ or tests/ have changed - files: ^(src/|tests/|pyproject\.toml|setup\.py).*$ - args: ["-v", "--tb=short"] - - id: pytest-fast - name: pytest-fast (quick smoke test) - entry: pytest - language: system - pass_filenames: false - # Run a quick subset of tests for faster feedback - files: ^(src/|tests/).*\.py$ - args: ["-v", "--tb=short", "-x", "--maxfail=3", "tests/test_basic.py", "tests/test_crypto.py"] - stages: [manual] + # - repo: local + # hooks: + # - id: pytest + # name: pytest + # entry: pytest + # language: system + # pass_filenames: false + # # Only run if Python files in src/ or tests/ have changed + # files: ^(src/|tests/|pyproject\.toml|setup\.py).*$ + # args: ["-v", "--tb=short"] + # - id: pytest-fast + # name: pytest-fast (quick smoke test) + # entry: pytest + # language: system + # pass_filenames: false + # # Run a quick subset of tests for faster feedback + # files: ^(src/|tests/).*\.py$ + # args: ["-v", "--tb=short", "-x", "--maxfail=3", "tests/test_basic.py", "tests/test_crypto.py"] + # stages: [manual] diff --git a/docs/docs/examples.md b/docs/docs/examples.md index 4c398b7..9748475 100644 --- a/docs/docs/examples.md +++ b/docs/docs/examples.md @@ -6,27 +6,40 @@ This section contains practical examples of using pyMC_Core for mesh communicati This directory contains examples for using PyMC Core functionality. More examples will be added over time. -## Files +## Available Examples -- `common.py`: Shared utilities and mock implementations used by all examples -- `send_flood_advert.py`: Flood advertisement example -- `send_direct_advert.py`: Direct advertisement example -- `send_tracked_advert.py`: Tracked advertisement example -- `ping_repeater_trace.py`: Trace ping example for repeater diagnostics +All examples support multiple radio types via `--radio-type` argument: + +- `send_tracked_advert.py`: Send location-tracked advertisements +- `send_direct_advert.py`: Send direct advertisements without mesh routing +- `send_flood_advert.py`: Send flood advertisements that propagate through mesh +- `send_text_message.py`: Send text messages to mesh nodes +- `send_channel_message.py`: Send messages to specific channels +- `ping_repeater_trace.py`: Test mesh routing and trace packet paths +- `common.py`: Shared utilities for radio setup and mesh node creation + +## Radio Hardware Support + +### SX1262 Direct Radio +- **waveshare**: Waveshare SX1262 HAT for Raspberry Pi +- **uconsole**: ClockworkPi uConsole LoRa module +- **meshadv-mini**: MeshAdviser Mini board + +### KISS TNC +- **kiss-tnc**: Serial KISS TNC devices (MeshTNC) ## Shared Components (`common.py`) -### `MockLoRaRadio` -Mock radio implementation for testing and demonstration: -- Simulates LoRa hardware without requiring actual hardware -- Logs transmission operations -- Returns realistic RSSI/SNR values -- Implements the `LoRaRadio` interface +### `create_radio(radio_type, serial_port)` +Creates radio instances for different hardware types: +- **SX1262 Radios**: Direct hardware control via SPI/GPIO +- **KISS TNC**: Serial protocol wrapper for TNC devices +- Supports waveshare, uconsole, meshadv-mini, and kiss-tnc types -### `create_mesh_node(node_name)` +### `create_mesh_node(name, radio_type, serial_port)` Helper function that creates a mesh node setup: - Generates a new `LocalIdentity` with cryptographic keypair -- Creates and initializes a `MockLoRaRadio` +- Creates and configures the specified radio type - Returns configured `MeshNode` and `LocalIdentity` ### `print_packet_info(packet, description)` @@ -60,62 +73,63 @@ Example showing how to ping a repeater using trace packets for network diagnosti ## Running the Examples -All examples use SX1262 LoRa radio hardware with support for multiple radio types. +All examples support multiple radio hardware types via unified command-line arguments. -### Direct Execution (Recommended) +### Command Line Interface -Run the example scripts directly with optional radio type selection: +Each example uses argparse with consistent options: ```bash -# Run examples with default Waveshare radio -python examples/send_flood_advert.py -python examples/send_direct_advert.py -python examples/send_text_message.py -python examples/send_channel_message.py -python examples/ping_repeater_trace.py -python examples/send_tracked_advert.py - -# Run examples with uConsole radio -python examples/send_flood_advert.py uconsole -python examples/send_direct_advert.py uconsole -python examples/send_text_message.py uconsole +# Show help for any example +python examples/send_tracked_advert.py --help ``` -Each example script accepts an optional radio type parameter: -- `waveshare` (default) - Waveshare SX1262 HAT -- `uconsole` - HackerGadgets uConsole -- `meshadv-mini` - FrequencyLabs meshadv-mini +**Arguments:** +- `--radio-type`: Choose hardware type (waveshare, uconsole, meshadv-mini, kiss-tnc) +- `--serial-port`: Serial port for KISS TNC (default: /dev/ttyUSB0) -You can also run examples directly with command-line arguments: +### SX1262 Direct Radio Examples ```bash -# Default Waveshare HAT configuration -python examples/send_flood_advert.py +# Send tracked advert with Waveshare HAT (default) +python examples/send_tracked_advert.py -# uConsole configuration -python examples/send_flood_advert.py uconsole -``` +# Send text message with uConsole +python examples/send_text_message.py --radio-type uconsole -### Command Line Options +# Send direct advert with MeshAdv Mini +python examples/send_direct_advert.py --radio-type meshadv-mini -Each example accepts an optional radio type parameter: +# Ping test with Waveshare +python examples/ping_repeater_trace.py --radio-type waveshare +``` -- `waveshare` (default): Waveshare LoRaWAN/GNSS HAT configuration -- `uconsole`: HackerGadgets uConsole configuration -- `meshadv-mini`: Frequency Labs Mesh Adv +### KISS TNC Examples ```bash -# Examples with explicit radio type -python examples/send_flood_advert.py waveshare -python examples/send_flood_advert.py uconsole -python examples/send_flood_advert.py meshadv-mini +# Send tracked advert via KISS TNC +python examples/send_tracked_advert.py --radio-type kiss-tnc --serial-port /dev/cu.usbserial-0001 + +# Send text message via KISS TNC +python examples/send_text_message.py --radio-type kiss-tnc --serial-port /dev/ttyUSB0 + +# Send flood advert via KISS TNC +python examples/send_flood_advert.py --radio-type kiss-tnc --serial-port /dev/cu.usbserial-0001 + +# Send channel message via KISS TNC +python examples/send_channel_message.py --radio-type kiss-tnc --serial-port /dev/ttyUSB0 + +# Ping test via KISS TNC +python examples/ping_repeater_trace.py --radio-type kiss-tnc --serial-port /dev/cu.usbserial-0001 ``` ## Hardware Requirements -### Supported SX1262 Radio Hardware +### Supported Radio Hardware + +pyMC_Core supports both direct SX1262 radio control and KISS TNC devices: -pyMC_Core supports multiple SX1262-based LoRa radio modules: +### SX1262 Direct Radio Hardware #### Waveshare LoRaWAN/GNSS HAT - **Hardware**: Waveshare SX1262 LoRa HAT @@ -174,6 +188,23 @@ pyMC_Core supports multiple SX1262-based LoRa radio modules: - TX Enable: Not used (-1) - RX Enable: GPIO 12 +### KISS TNC Hardware + +#### KISS TNC Devices +- **Hardware**: Any KISS-compatible TNC device (MeshTNC, etc.) +- **Interface**: Serial/USB connection +- **Protocol**: KISS Serial Protocol +- **Configuration**: Radio settings handled by TNC firmware +- **Connection**: USB, RS-232, or TTL serial +- **Baud Rate**: 115200 (default, configurable) +- **Advantages**: No GPIO/SPI setup required, plug-and-play operation + +**Supported TNC Devices:** +- MeshTNC boards +- OpenTracker+ with KISS firmware +- Mobilinkd TNC devices +- Custom Arduino/ESP32 KISS TNCs + ## Dependencies > **Important**: On modern Python installations (Ubuntu 22.04+, Debian 12+), you may encounter `externally-managed-environment` errors when installing packages system-wide. Create a virtual environment first: @@ -194,13 +225,20 @@ pyMC_Core supports multiple SX1262-based LoRa radio modules: pip install pymc_core ``` -### Hardware Dependencies (for SX1262 radio) +### Hardware Dependencies + +**For SX1262 Direct Radio:** ```bash pip install pymc_core[hardware] # or manually: pip install gpiozero lgpio ``` +**For KISS TNC:** +```bash +pip install pyserial +``` + ### All Dependencies ```bash pip install pymc_core[all] @@ -208,9 +246,19 @@ pip install pymc_core[all] ## Hardware Setup +### SX1262 Direct Radio Setup + 1. Connect SX1262 module to Raspberry Pi GPIO pins according to the pin configuration -2. Install required Python packages -3. Run any example to test the setup +2. Enable SPI interface: `sudo raspi-config` → Interface Options → SPI +3. Install required Python packages +4. Run any example to test the setup + +### KISS TNC Setup + +1. Connect KISS TNC device via USB or serial +2. Install pyserial: `pip install pyserial` +3. Identify serial port: `ls /dev/tty*` or `ls /dev/cu.*` (macOS) +4. Run examples with `--radio-type kiss-tnc --serial-port /dev/ttyUSB0` The examples will automatically initialize the radio with the default configuration and send packets. @@ -288,7 +336,23 @@ All examples use the SX1262 LoRa radio with the following default settings: - **TX Enable**: Not used (-1) - **RX Enable**: GPIO 12 -The radio configuration is hardcoded in `common.py` for simplicity and reliability. +### KISS TNC Configuration +- **Radio Type**: KISS Serial Protocol over TNC device +- **Frequency**: 869.525MHz (EU standard, configurable) +- **TX Power**: 22dBm (configurable) +- **Spreading Factor**: 11 (configurable) +- **Bandwidth**: 250kHz (configurable) +- **Coding Rate**: 4/5 (configurable) +- **Serial Port**: /dev/ttyUSB0 (Linux), /dev/cu.usbserial-* (macOS) +- **Baud Rate**: 115200 (default) +- **Protocol**: KISS frames with radio configuration commands +- **Auto Configure**: Automatically configures TNC and enters KISS mode + +All radio configurations use Hz-based frequency and bandwidth values for consistency: +- **Frequency**: `int(869.525 * 1000000)` (869.525 MHz in Hz) +- **Bandwidth**: `int(250 * 1000)` (250 kHz in Hz) + +The radio configurations are defined in `common.py` for each hardware type. ## Hardware Setup @@ -431,3 +495,74 @@ custom_packet = Packet( await node.send_packet(custom_packet) ``` + +## Troubleshooting + +### SX1262 Radio Issues + +**SPI Communication Problems:** +```bash +# Enable SPI interface +sudo raspi-config # → Interface Options → SPI + +# Check SPI devices +ls /dev/spi* + +# Verify GPIO permissions +sudo usermod -a -G gpio $USER +``` + +**GPIO Access Errors:** +```bash +# Install modern GPIO library +sudo apt install python3-rpi.lgpio + +# Remove old GPIO library if present +sudo apt remove python3-rpi.gpio +``` + +### KISS TNC Issues + +**Serial Port Problems:** +```bash +# Find available serial ports +ls /dev/tty* # Linux +ls /dev/cu.* # macOS + +# Check port permissions +sudo chmod 666 /dev/ttyUSB0 + +# Test serial connection +screen /dev/ttyUSB0 115200 +``` + +**KISS Protocol Issues:** +- Verify TNC supports KISS mode +- Check baud rate (default: 115200) +- Ensure no other programs using port +- Try different serial port if multiple devices + +**Configuration Problems:** +- All examples use Hz-based frequency values +- KISS TNC automatically configures radio +- Check TNC firmware supports configuration commands + +### Import Errors + +**Module Not Found:** +```bash +# Install in development mode +cd pyMC_core +pip install -e . + +# Or install from PyPI +pip install pymc_core +``` + +**Virtual Environment Issues:** +```bash +# Create fresh virtual environment +python3 -m venv pymc_env +source pymc_env/bin/activate # Linux/Mac +pip install pymc_core +``` diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..e24cb3f --- /dev/null +++ b/examples/README.md @@ -0,0 +1,187 @@ +# PyMC Core Examples + +This directory contains examples demonstrating how to use PyMC Core with different radio hardware configurations. + +## Available Examples + +All examples support multiple radio types via `--radio-type` argument: + +- **`send_tracked_advert.py`**: Send location-tracked advertisements +- **`send_direct_advert.py`**: Send direct advertisements without mesh routing +- **`send_text_message.py`**: Send text messages to mesh nodes +- **`send_channel_message.py`**: Send messages to specific channels +- **`ping_repeater_trace.py`**: Test mesh routing and trace packet paths + +## Radio Hardware Support + +### Direct Radio (SX1262) +- **waveshare**: Waveshare SX1262 HAT for Raspberry Pi +- **uconsole**: ClockworkPi uConsole LoRa module +- **meshadv-mini**: MeshAdviser Mini board + +### KISS TNC +- **kiss-tnc**: Serial KISS TNC devices (MeshTNC) + +## Configuration + +All configurations use Hz-based frequency and bandwidth values for consistency. + +### SX1262 Direct Radio Configurations + +**Waveshare HAT (EU 869 MHz):** +```python +waveshare_config = { + "bus_id": 0, # SPI bus + "cs_id": 0, # SPI chip select + "cs_pin": 21, # Waveshare HAT CS pin + "reset_pin": 18, # Reset GPIO pin + "busy_pin": 20, # Busy GPIO pin + "irq_pin": 16, # IRQ GPIO pin + "txen_pin": 13, # TX enable GPIO + "rxen_pin": 12, # RX enable GPIO + "frequency": int(869.525 * 1000000), # 869.525 MHz in Hz + "tx_power": 22, # TX power (dBm) + "spreading_factor": 11, # LoRa SF11 + "bandwidth": int(250 * 1000), # 250 kHz in Hz + "coding_rate": 5, # LoRa CR 4/5 + "preamble_length": 17, # Preamble length + "is_waveshare": True, # Waveshare-specific flag +} +``` + +**uConsole (EU 869 MHz):** +```python +uconsole_config = { + "bus_id": 1, # SPI1 bus + "cs_id": 0, # SPI chip select + "cs_pin": -1, # Use hardware CS + "reset_pin": 25, # Reset GPIO pin + "busy_pin": 24, # Busy GPIO pin + "irq_pin": 26, # IRQ GPIO pin + "txen_pin": -1, # No TX enable pin + "rxen_pin": -1, # No RX enable pin + "frequency": int(869.525 * 1000000), # 869.525 MHz in Hz + "tx_power": 22, # TX power (dBm) + "spreading_factor": 11, # LoRa SF11 + "bandwidth": int(250 * 1000), # 250 kHz in Hz + "coding_rate": 5, # LoRa CR 4/5 + "preamble_length": 17, # Preamble length +} +``` + +**MeshAdv Mini (US 915 MHz):** +```python +meshadv_config = { + "bus_id": 0, # SPI bus + "cs_id": 0, # SPI chip select + "cs_pin": 8, # CS GPIO pin + "reset_pin": 24, # Reset GPIO pin + "busy_pin": 20, # Busy GPIO pin + "irq_pin": 16, # IRQ GPIO pin + "txen_pin": -1, # No TX enable pin + "rxen_pin": 12, # RX enable GPIO + "frequency": int(910.525 * 1000000), # 910.525 MHz in Hz + "tx_power": 22, # TX power (dBm) + "spreading_factor": 7, # LoRa SF7 + "bandwidth": int(62.5 * 1000), # 62.5 kHz in Hz + "coding_rate": 5, # LoRa CR 4/5 + "preamble_length": 17, # Preamble length +} +``` + +### KISS TNC Configuration + +**KISS TNC (EU 869 MHz):** +```python +kiss_config = { + 'frequency': int(869.525 * 1000000), # 869.525 MHz in Hz + 'bandwidth': int(250 * 1000), # 250 kHz in Hz + 'spreading_factor': 11, # LoRa SF11 + 'coding_rate': 5, # LoRa CR 4/5 + 'sync_word': 0x12, # Sync word + 'power': 22 # TX power (dBm) +} +``` + +## Usage Examples + +### SX1262 Direct Radio +```bash +# Send tracked advert with Waveshare HAT (default) +python3 send_tracked_advert.py + +# Send text message with uConsole +python3 send_text_message.py --radio-type uconsole + +# Ping test with MeshAdv Mini +python3 ping_repeater_trace.py --radio-type meshadv-mini +``` + +### KISS TNC +```bash +# Send tracked advert via KISS TNC +python3 send_tracked_advert.py --radio-type kiss-tnc --serial-port /dev/cu.usbserial-0001 + +# Send text message via KISS TNC +python3 send_text_message.py --radio-type kiss-tnc --serial-port /dev/ttyUSB0 + +# Send direct advert via KISS TNC +python3 send_direct_advert.py --radio-type kiss-tnc --serial-port /dev/cu.usbserial-0001 + +# Send flood advert via KISS TNC +python3 send_flood_advert.py --radio-type kiss-tnc --serial-port /dev/ttyUSB0 + +# Send channel message via KISS TNC +python3 send_channel_message.py --radio-type kiss-tnc --serial-port /dev/cu.usbserial-0001 + +# Ping test via KISS TNC +python3 ping_repeater_trace.py --radio-type kiss-tnc --serial-port /dev/cu.usbserial-0001 +``` + +## Common Module (`common.py`) + +Provides shared utilities for examples: + +- `create_radio(radio_type, serial_port)`: Create radio instances +- `create_mesh_node(name, radio_type, serial_port)`: Create mesh nodes +- `print_packet_info(packet, description)`: Debug packet information + +**Supported Radio Types:** +- `waveshare`: Waveshare SX1262 HAT +- `uconsole`: ClockworkPi uConsole LoRa +- `meshadv-mini`: MeshAdviser Mini board +- `kiss-tnc`: KISS TNC devices + +## Requirements + +### For SX1262 Direct Radio: +- SX1262 hardware (Waveshare HAT, uConsole, MeshAdv Mini) +- SPI interface enabled on Raspberry Pi +- GPIO access for control pins +- Python SPI libraries (`pip install spidev RPi.GPIO`) + +### For KISS TNC: +- KISS-compatible TNC device (MeshTNC, etc.) +- Serial/USB connection +- pyserial library (`pip install pyserial`) + +## Troubleshooting + +### SX1262 Radio Issues: +1. Enable SPI: `sudo raspi-config` → Interface Options → SPI +2. Check GPIO permissions: `sudo usermod -a -G gpio $USER` +3. Verify wiring matches pin configuration in `common.py` +4. Test SPI communication: `ls /dev/spi*` + +### KISS TNC Issues: +1. Check device connection: `ls /dev/tty*` or `ls /dev/cu.*` +2. Verify permissions: `sudo chmod 666 /dev/ttyUSB0` +3. Ensure no other programs using port +4. Test with terminal: `screen /dev/ttyUSB0 115200` + +### Import Errors: +Make sure pymc_core is properly installed: +```bash +cd ../ +pip install -e . +``` diff --git a/examples/calibrate_cad.py b/examples/calibrate_cad.py new file mode 100644 index 0000000..96799a5 --- /dev/null +++ b/examples/calibrate_cad.py @@ -0,0 +1,335 @@ +#!/usr/bin/env python3 +""" +CAD Calibration Tool - Improved staged calibration workflow +""" + +import asyncio +import logging +from typing import Any, Dict, List, Optional, Tuple + +from common import create_radio + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +def get_test_ranges(spreading_factor: int) -> Tuple[range, range]: + """Get CAD test ranges based on spreading factor""" + sf_ranges = { + 7: (range(16, 29, 1), range(6, 15, 1)), + 8: (range(16, 29, 1), range(6, 15, 1)), + 9: (range(18, 31, 1), range(7, 16, 1)), + 10: (range(20, 33, 1), range(8, 16, 1)), + 11: (range(22, 35, 1), range(9, 17, 1)), + 12: (range(24, 37, 1), range(10, 18, 1)), + } + return sf_ranges.get(spreading_factor, sf_ranges[8]) + + +def get_status_text(detection_rate: float) -> str: + """Get status text based on detection rate""" + if detection_rate == 0: + return "QUIET" + elif detection_rate < 10: + return "LOW" + elif detection_rate < 30: + return "MED" + else: + return "HIGH" + + +async def test_cad_config(radio, det_peak: int, det_min: int, samples: int = 8) -> Dict[str, Any]: + """Test a single CAD configuration with multiple samples""" + detections = 0 + for _ in range(samples): + try: + result = await radio.perform_cad(det_peak=det_peak, det_min=det_min, timeout=0.6) + if result: + detections += 1 + except Exception: + pass + await asyncio.sleep(0.03) + + return { + "det_peak": det_peak, + "det_min": det_min, + "samples": samples, + "detections": detections, + "detection_rate": (detections / samples) * 100, + } + + +async def stage1_broad_scan(radio, peak_range: range, min_range: range) -> List[Dict[str, Any]]: + """Stage 1: Broad scan - 8 samples, stop after 10 consecutive quiet configs""" + logger.info("Stage 1: Broad scan (8 samples each)") + results = [] + consecutive_quiet = 0 + total = len(peak_range) * len(min_range) + current = 0 + + for det_peak in peak_range: + for det_min in min_range: + current += 1 + result = await test_cad_config(radio, det_peak, det_min, 8) + results.append(result) + + rate = result["detection_rate"] + status = get_status_text(rate) + logger.info( + f"[{current:3d}/{total}] peak={det_peak:2d} min={det_min:2d} -> {result['detections']:2d}/8 ({rate:5.1f}%) {status}" + ) + + # Track consecutive quiet configs + if rate == 0: + consecutive_quiet += 1 + if consecutive_quiet >= 10: + logger.info(f"Found 10 consecutive quiet configs, stopping broad scan early") + break + else: + consecutive_quiet = 0 + + if consecutive_quiet >= 10: + break + + return results + + +async def stage2_focused_scan(radio, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Stage 2: Focused scan - 16 samples on configs with 0-20% detection""" + good_candidates = [c for c in candidates if c["detection_rate"] <= 20] + if len(good_candidates) > 20: + good_candidates = sorted( + good_candidates, key=lambda x: (x["detection_rate"], x["det_peak"]) + )[:20] + + logger.info(f"Stage 2: Focused scan on {len(good_candidates)} candidates (16 samples each)") + results = [] + + for i, candidate in enumerate(good_candidates, 1): + result = await test_cad_config(radio, candidate["det_peak"], candidate["det_min"], 16) + results.append(result) + + rate = result["detection_rate"] + status = get_status_text(rate) + logger.info( + f"[{i:2d}/{len(good_candidates)}] peak={result['det_peak']:2d} min={result['det_min']:2d} -> {result['detections']:2d}/16 ({rate:5.1f}%) {status}" + ) + + # Stop if we have 5 excellent configs + excellent = [r for r in results if r["detection_rate"] <= 5] + if len(excellent) >= 5: + logger.info(f"Found 5 excellent configs (<=5% detection), moving to next stage") + break + + return results + + +async def stage3_fine_tuning(radio, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Stage 3: Fine tuning - 32 samples on top 5 quietest configs""" + top5 = sorted(candidates, key=lambda x: (x["detection_rate"], x["det_peak"]))[:5] + + logger.info(f"Stage 3: Fine tuning on top {len(top5)} configs (32 samples each)") + results = [] + + for i, candidate in enumerate(top5, 1): + result = await test_cad_config(radio, candidate["det_peak"], candidate["det_min"], 32) + results.append(result) + + rate = result["detection_rate"] + status = get_status_text(rate) + logger.info( + f"[{i}/{len(top5)}] peak={result['det_peak']:2d} min={result['det_min']:2d} -> {result['detections']:2d}/32 ({rate:5.1f}%) {status}" + ) + + return results + + +async def stage4_validation(radio, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Stage 4: Validation - 64 samples on best 1-2 configs, 3 consecutive runs""" + best_configs = sorted(candidates, key=lambda x: (x["detection_rate"], x["det_peak"]))[:2] + + logger.info(f"Stage 4: Validation on best {len(best_configs)} config(s) (64 samples x 3 runs)") + final_results = [] + + for i, candidate in enumerate(best_configs, 1): + logger.info( + f"Validating config {i}: peak={candidate['det_peak']}, min={candidate['det_min']}" + ) + + runs = [] + stable = True + + for run in range(3): + result = await test_cad_config(radio, candidate["det_peak"], candidate["det_min"], 64) + runs.append(result) + + rate = result["detection_rate"] + logger.info(f" Run {run+1}/3: {result['detections']:2d}/64 ({rate:4.1f}%)") + + # Check stability (within ±5% of first run) + if run > 0: + diff = abs(result["detection_rate"] - runs[0]["detection_rate"]) + if diff > 5.0: + stable = False + + # Average the runs + avg_detections = sum(r["detections"] for r in runs) / len(runs) + avg_rate = (avg_detections / 64) * 100 + + final_result = { + "det_peak": candidate["det_peak"], + "det_min": candidate["det_min"], + "samples": 64 * 3, + "detections": int(avg_detections * 3), + "detection_rate": avg_rate, + "stable": stable, + "runs": runs, + } + + status = "STABLE" if stable else "UNSTABLE" + logger.info(f" Average: {final_result['detections']:3d}/192 ({avg_rate:4.1f}%) {status}") + + final_results.append(final_result) + + return final_results + + +async def perform_staged_calibration( + radio, peak_range: range, min_range: range +) -> List[Dict[str, Any]]: + """Perform the complete 4-stage calibration process""" + # Stage 1: Broad scan + stage1_results = await stage1_broad_scan(radio, peak_range, min_range) + + # Stage 2: Focused scan + stage2_results = await stage2_focused_scan(radio, stage1_results) + + # Stage 3: Fine tuning + stage3_results = await stage3_fine_tuning(radio, stage2_results) + + # Stage 4: Validation + stage4_results = await stage4_validation(radio, stage3_results) + + return stage4_results + + +async def calibrate_cad(radio_type: str = "waveshare", staged: bool = True): + """Main CAD calibration function with staged workflow""" + if radio_type == "kiss-tnc": + logger.error("CAD not supported on KISS-TNC. Use SX1262 radios only.") + return None + + logger.info(f"CAD Calibration: {radio_type} radio") + if staged: + logger.info("Using 4-stage calibration workflow") + + radio = None + try: + # Create and verify radio + radio = create_radio(radio_type) + from pymc_core.hardware.sx1262_wrapper import SX1262Radio + + if not isinstance(radio, SX1262Radio): + logger.error(f"Need SX1262Radio, got {type(radio).__name__}") + return None + + # Initialize + radio.begin() + config = radio.get_status() + sf = config.get("spreading_factor", 8) + logger.info( + f"Radio: {config['frequency']/1e6:.1f}MHz, SF{sf}, {config['bandwidth']/1000:.1f}kHz" + ) + + await asyncio.sleep(1.0) # Radio settle time + + # Get test ranges + peak_range, min_range = get_test_ranges(sf) + logger.info( + f"Testing peak {peak_range.start}-{peak_range.stop-1}, min {min_range.start}-{min_range.stop-1}" + ) + + # Perform calibration + if staged: + results = await perform_staged_calibration(radio, peak_range, min_range) + + logger.info("=" * 60) + logger.info("FINAL CALIBRATION RESULTS") + logger.info("=" * 60) + + for i, result in enumerate(results, 1): + stable_text = "STABLE" if result.get("stable", False) else "UNSTABLE" + logger.info( + f"Config {i}: peak={result['det_peak']:2d}, min={result['det_min']:2d} -> " + f"{result['detections']:3d}/192 ({result['detection_rate']:4.1f}%) {stable_text}" + ) + + if results: + best = min(results, key=lambda x: (x["detection_rate"], x["det_peak"])) + logger.info( + f"\nRECOMMENDED: peak={best['det_peak']}, min={best['det_min']} " + f"({best['detection_rate']:.1f}% detection)" + ) + else: + # Simple sweep fallback + results = [] + total = len(peak_range) * len(min_range) + current = 0 + for det_peak in peak_range: + for det_min in min_range: + current += 1 + result = await test_cad_config(radio, det_peak, det_min, 8) + results.append(result) + + rate = result["detection_rate"] + status = get_status_text(rate) + logger.info( + f"[{current:3d}/{total}] peak={det_peak:2d} min={det_min:2d} -> {result['detections']:2d}/8 ({rate:5.1f}%) {status}" + ) + + return results + + except Exception as e: + logger.error(f"Calibration failed: {e}") + return None + finally: + if radio: + radio.cleanup() + logger.info("Cleanup complete") + + +def main(): + import argparse + + parser = argparse.ArgumentParser(description="CAD Calibration Tool with Staged Workflow") + parser.add_argument( + "--radio", + choices=["waveshare", "uconsole", "meshadv-mini"], + default="waveshare", + help="Radio type", + ) + parser.add_argument( + "--simple", action="store_true", help="Use simple sweep instead of staged workflow" + ) + args = parser.parse_args() + + logger.info("CAD Calibration Tool") + if not args.simple: + logger.info("4-Stage Workflow: Broad->Focused->Fine->Validation") + logger.info("Lower detection % = better for mesh networking") + + try: + result = asyncio.run(calibrate_cad(args.radio, staged=not args.simple)) + if result: + logger.info("Calibration complete!") + else: + exit(1) + except KeyboardInterrupt: + logger.info("Stopped by user") + except Exception as e: + logger.error(f"Error: {e}") + exit(1) + + +if __name__ == "__main__": + main() diff --git a/examples/common.py b/examples/common.py index 256616d..826b542 100644 --- a/examples/common.py +++ b/examples/common.py @@ -12,7 +12,7 @@ # Set up logging logging.basicConfig( - level=logging.WARNING, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) @@ -25,19 +25,47 @@ from pymc_core.node.node import MeshNode -def create_radio(radio_type: str = "waveshare") -> LoRaRadio: - """Create an SX1262 radio instance with configuration for specified hardware. +def create_radio(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0") -> LoRaRadio: + """Create a radio instance with configuration for specified hardware. Args: - radio_type: Type of radio hardware ("waveshare" or "uconsole") + radio_type: Type of radio hardware ("waveshare", "uconsole", "meshadv-mini", or "kiss-tnc") + serial_port: Serial port for KISS TNC (only used with "kiss-tnc" type) Returns: - SX1262Radio instance configured for the specified hardware + Radio instance configured for the specified hardware """ - logger.info(f"Creating SX1262 radio for {radio_type}...") + logger.info(f"Creating radio for {radio_type}...") try: - # Direct SX1262 radio + # Check if this is a KISS TNC configuration + if radio_type == "kiss-tnc": + from pymc_core.hardware.kiss_serial_wrapper import KissSerialWrapper + + logger.debug("Using KISS Serial Wrapper") + + # KISS TNC configuration + kiss_config = { + "frequency": int(869.618 * 1000000), # EU: 869.525 MHz + "bandwidth": int(62.5 * 1000), # 250 kHz + "spreading_factor": 8, # LoRa SF11 + "coding_rate": 8, # LoRa CR 4/5 + "sync_word": 0x12, # Sync word + "power": 22, # TX power + } + + # Create KISS wrapper with specified port + kiss_wrapper = KissSerialWrapper( + port=serial_port, baudrate=115200, radio_config=kiss_config, auto_configure=True + ) + + logger.info("Created KISS Serial Wrapper") + logger.info( + f"Frequency: {kiss_config['frequency']/1000000:.3f}MHz, TX Power: {kiss_config['power']}dBm" + ) + return kiss_wrapper + + # Direct SX1262 radio for other types from pymc_core.hardware.sx1262_wrapper import SX1262Radio logger.debug("Imported SX1262Radio successfully") @@ -53,11 +81,11 @@ def create_radio(radio_type: str = "waveshare") -> LoRaRadio: "irq_pin": 16, "txen_pin": 13, # GPIO 13 for TX enable "rxen_pin": 12, - "frequency": int(869.525 * 1000000), # EU: 869.525 MHz + "frequency": int(869.618 * 1000000), # EU: 869.618 MHz "tx_power": 22, - "spreading_factor": 11, - "bandwidth": int(250 * 1000), - "coding_rate": 5, + "spreading_factor": 8, + "bandwidth": int(62.5 * 1000), + "coding_rate": 8, "preamble_length": 17, "is_waveshare": True, }, @@ -97,7 +125,7 @@ def create_radio(radio_type: str = "waveshare") -> LoRaRadio: if radio_type not in configs: raise ValueError( - f"Unknown radio type: {radio_type}. Use 'waveshare' 'meshadv-mini' or 'uconsole'" + f"Unknown radio type: {radio_type}. Use 'waveshare', 'meshadv-mini', 'uconsole', or 'kiss-tnc'" ) radio_kwargs = configs[radio_type] @@ -119,13 +147,14 @@ def create_radio(radio_type: str = "waveshare") -> LoRaRadio: def create_mesh_node( - node_name: str = "ExampleNode", radio_type: str = "waveshare" + node_name: str = "ExampleNode", radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0" ) -> tuple[MeshNode, LocalIdentity]: - """Create a mesh node with SX1262 radio. + """Create a mesh node with radio. Args: node_name: Name for the mesh node - radio_type: Type of radio hardware ("waveshare" or "uconsole") + radio_type: Type of radio hardware ("waveshare", "uconsole", "meshadv-mini", or "kiss-tnc") + serial_port: Serial port for KISS TNC (only used with "kiss-tnc" type) Returns: Tuple of (MeshNode, LocalIdentity) @@ -138,13 +167,28 @@ def create_mesh_node( identity = LocalIdentity() logger.info(f"Created identity with public key: {identity.get_public_key().hex()[:16]}...") - # Create the SX1262 radio + # Create the radio logger.debug("Creating radio...") - radio = create_radio(radio_type) - - logger.debug("Calling radio.begin()...") - radio.begin() - logger.info("Radio initialized successfully") + radio = create_radio(radio_type, serial_port) + + # Initialize radio (different methods for different types) + if radio_type == "kiss-tnc": + logger.debug("Connecting KISS radio...") + if radio.connect(): + logger.info("KISS radio connected successfully") + print(f"KISS radio connected to {serial_port}") + if hasattr(radio, "kiss_mode_active") and radio.kiss_mode_active: + print("KISS mode is active") + else: + print("Warning: KISS mode may not be active") + else: + logger.error("Failed to connect KISS radio") + print(f"Failed to connect to KISS radio on {serial_port}") + raise Exception(f"KISS radio connection failed on {serial_port}") + else: + logger.debug("Calling radio.begin()...") + radio.begin() + logger.info("Radio initialized successfully") # Create a mesh node with the radio and identity config = {"node": {"name": node_name}} diff --git a/examples/ping_repeater_trace.py b/examples/ping_repeater_trace.py index 4f01f39..340864f 100644 --- a/examples/ping_repeater_trace.py +++ b/examples/ping_repeater_trace.py @@ -24,12 +24,12 @@ from pymc_core.protocol.packet_utils import PacketDataUtils -async def ping_repeater(radio_type: str = "waveshare"): +async def ping_repeater(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0"): """ Ping a specific repeater using trace packets with callback response handling. This demonstrates the proper way to handle asynchronous trace responses. """ - mesh_node, identity = create_mesh_node("PingNode", radio_type) + mesh_node, identity = create_mesh_node("PingNode", radio_type, serial_port) # Create an event to signal when response is received response_received = asyncio.Event() @@ -79,14 +79,31 @@ def on_trace_response(success: bool, response_text: str, response_data: dict): print("Trace handler not available") -def main(radio_type: str = "waveshare"): +def main(): """Main function for running the example.""" - print(f"Using {radio_type} radio configuration") - asyncio.run(ping_repeater(radio_type)) + import argparse + parser = argparse.ArgumentParser(description="Ping a repeater using trace packets") + parser.add_argument( + "--radio-type", + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + default="waveshare", + help="Radio hardware type (default: waveshare)", + ) + parser.add_argument( + "--serial-port", + default="/dev/ttyUSB0", + help="Serial port for KISS TNC (default: /dev/ttyUSB0)", + ) -if __name__ == "__main__": - import sys + args = parser.parse_args() + + print(f"Using {args.radio_type} radio configuration") + if args.radio_type == "kiss-tnc": + print(f"Serial port: {args.serial_port}") + + asyncio.run(ping_repeater(args.radio_type, args.serial_port)) - radio_type = sys.argv[1] if len(sys.argv) > 1 else "waveshare" - main(radio_type) + +if __name__ == "__main__": + main() diff --git a/examples/send_channel_message.py b/examples/send_channel_message.py index 9859e87..0280506 100644 --- a/examples/send_channel_message.py +++ b/examples/send_channel_message.py @@ -14,12 +14,12 @@ from pymc_core.protocol.packet_builder import PacketBuilder -async def send_channel_message(radio_type: str = "waveshare"): +async def send_channel_message(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0"): """Send a channel message to the Public channel.""" print("Starting channel message send example...") # Create mesh node - mesh_node, identity = create_mesh_node("ChannelSender", radio_type) + mesh_node, identity = create_mesh_node("ChannelSender", radio_type, serial_port) # Initialize packet variable packet = Packet() @@ -64,11 +64,31 @@ async def send_channel_message(radio_type: str = "waveshare"): return packet, mesh_node -def main(radio_type: str = "waveshare"): +def main(): """Main function for running the example.""" - print(f"Using {radio_type} radio configuration") + import argparse + + parser = argparse.ArgumentParser(description="Send a channel message to the Public channel") + parser.add_argument( + "--radio-type", + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + default="waveshare", + help="Radio hardware type (default: waveshare)", + ) + parser.add_argument( + "--serial-port", + default="/dev/ttyUSB0", + help="Serial port for KISS TNC (default: /dev/ttyUSB0)", + ) + + args = parser.parse_args() + + print(f"Using {args.radio_type} radio configuration") + if args.radio_type == "kiss-tnc": + print(f"Serial port: {args.serial_port}") + try: - packet, node = asyncio.run(send_channel_message(radio_type)) + packet, node = asyncio.run(send_channel_message(args.radio_type, args.serial_port)) print("Example completed") except KeyboardInterrupt: print("\nInterrupted by user") @@ -77,7 +97,4 @@ def main(radio_type: str = "waveshare"): if __name__ == "__main__": - import sys - - radio_type = sys.argv[1] if len(sys.argv) > 1 else "waveshare" - main(radio_type) + main() diff --git a/examples/send_direct_advert.py b/examples/send_direct_advert.py index d30f9db..2953fc0 100644 --- a/examples/send_direct_advert.py +++ b/examples/send_direct_advert.py @@ -16,9 +16,9 @@ from pymc_core.protocol.packet_builder import PacketBuilder -async def send_direct_advert(radio_type: str = "waveshare"): +async def send_direct_advert(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0"): # Create a mesh node with SX1262 radio - mesh_node, identity = create_mesh_node("MyNode", radio_type) + mesh_node, identity = create_mesh_node("MyNode", radio_type, serial_port) # Create a direct advertisement packet # Parameters: identity, node_name, lat, lon, feature1, feature2, flags @@ -44,14 +44,31 @@ async def send_direct_advert(radio_type: str = "waveshare"): return advert_packet -def main(radio_type: str = "waveshare"): +def main(): """Main function for running the example.""" - print(f"Using {radio_type} radio configuration") - asyncio.run(send_direct_advert(radio_type)) + import argparse + + parser = argparse.ArgumentParser(description="Send a direct advertisement packet") + parser.add_argument( + "--radio-type", + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + default="waveshare", + help="Radio hardware type (default: waveshare)", + ) + parser.add_argument( + "--serial-port", + default="/dev/ttyUSB0", + help="Serial port for KISS TNC (default: /dev/ttyUSB0)", + ) + args = parser.parse_args() -if __name__ == "__main__": - import sys + print(f"Using {args.radio_type} radio configuration") + if args.radio_type == "kiss-tnc": + print(f"Serial port: {args.serial_port}") - radio_type = sys.argv[1] if len(sys.argv) > 1 else "waveshare" - main(radio_type) + asyncio.run(send_direct_advert(args.radio_type, args.serial_port)) + + +if __name__ == "__main__": + main() diff --git a/examples/send_flood_advert.py b/examples/send_flood_advert.py index 1db234b..d6288ef 100644 --- a/examples/send_flood_advert.py +++ b/examples/send_flood_advert.py @@ -23,9 +23,9 @@ from pymc_core.protocol.packet_builder import PacketBuilder -async def send_flood_advert(radio_type: str = "waveshare"): +async def send_flood_advert(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0"): # Create a mesh node with SX1262 radio - mesh_node, identity = create_mesh_node("MyNode", radio_type) + mesh_node, identity = create_mesh_node("MyNode", radio_type, serial_port) # Create a flood advertisement packet # Parameters: identity, node_name, lat, lon, feature1, feature2, flags @@ -51,18 +51,31 @@ async def send_flood_advert(radio_type: str = "waveshare"): return advert_packet -def main(radio_type: str = "waveshare"): +def main(): """Main function for running the example.""" - print(f"Using {radio_type} radio configuration") - asyncio.run(send_flood_advert(radio_type)) + import argparse + + parser = argparse.ArgumentParser(description="Send a flood advertisement packet") + parser.add_argument( + "--radio-type", + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + default="waveshare", + help="Radio hardware type (default: waveshare)", + ) + parser.add_argument( + "--serial-port", + default="/dev/ttyUSB0", + help="Serial port for KISS TNC (default: /dev/ttyUSB0)", + ) + args = parser.parse_args() -if __name__ == "__main__": - # Parse command line arguments - radio_type = sys.argv[1] if len(sys.argv) > 1 else "waveshare" + print(f"Using {args.radio_type} radio configuration") + if args.radio_type == "kiss-tnc": + print(f"Serial port: {args.serial_port}") - if radio_type not in ["waveshare", "uconsole", "meshadv-mini"]: - print("Usage: python send_flood_advert.py [waveshare|uconsole|meshadv-mini]") - sys.exit(1) + asyncio.run(send_flood_advert(args.radio_type, args.serial_port)) - main(radio_type) + +if __name__ == "__main__": + main() diff --git a/examples/send_text_message.py b/examples/send_text_message.py index fd899ab..01bfcf1 100644 --- a/examples/send_text_message.py +++ b/examples/send_text_message.py @@ -14,12 +14,12 @@ from pymc_core.protocol.packet_builder import PacketBuilder -async def send_text_message(radio_type: str = "waveshare"): +async def send_text_message(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0"): """Send a text message with CRC validation.""" print("Starting text message send example...") # Create mesh node - mesh_node, identity = create_mesh_node("MessageSender", radio_type) + mesh_node, identity = create_mesh_node("MessageSender", radio_type, serial_port) # Initialize packet variable packet = Packet() @@ -71,11 +71,31 @@ def __init__(self, name, pubkey_hex): return packet, mesh_node -def main(radio_type: str = "waveshare"): +def main(): """Main function for running the example.""" - print(f"Using {radio_type} radio configuration") + import argparse + + parser = argparse.ArgumentParser(description="Send a text message to the mesh network") + parser.add_argument( + "--radio-type", + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + default="waveshare", + help="Radio hardware type (default: waveshare)", + ) + parser.add_argument( + "--serial-port", + default="/dev/ttyUSB0", + help="Serial port for KISS TNC (default: /dev/ttyUSB0)", + ) + + args = parser.parse_args() + + print(f"Using {args.radio_type} radio configuration") + if args.radio_type == "kiss-tnc": + print(f"Serial port: {args.serial_port}") + try: - packet, node = asyncio.run(send_text_message(radio_type)) + packet, node = asyncio.run(send_text_message(args.radio_type, args.serial_port)) print("Example completed") except KeyboardInterrupt: print("\nInterrupted by user") @@ -84,7 +104,4 @@ def main(radio_type: str = "waveshare"): if __name__ == "__main__": - import sys - - radio_type = sys.argv[1] if len(sys.argv) > 1 else "waveshare" - main(radio_type) + main() diff --git a/examples/send_tracked_advert.py b/examples/send_tracked_advert.py index d59f5d8..d7c6591 100644 --- a/examples/send_tracked_advert.py +++ b/examples/send_tracked_advert.py @@ -25,25 +25,27 @@ repeat_count = 0 -async def simple_repeat_counter(packet, raw_data=None): - """Simple handler that just counts advert repeats.""" +def simple_repeat_counter(raw_data: bytes): + """Simple handler that just counts packet repeats.""" global repeat_count try: - # Check if this is an advert packet - if hasattr(packet, "get_payload_type") and packet.get_payload_type() == PAYLOAD_TYPE_ADVERT: - repeat_count += 1 - print(f"ADVERT REPEAT HEARD #{repeat_count}") + # Simple check - just count any received packet as a potential repeat + # I have kept it simple but you would want to check if it's actually an advert etc. + repeat_count += 1 + print(f"PACKET REPEAT HEARD #{repeat_count} ({len(raw_data)} bytes)") except Exception as e: print(f"Error processing packet: {e}") -async def send_simple_tracked_advert(radio_type: str = "waveshare"): +async def send_simple_tracked_advert( + radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0" +): """Send a tracked advert and count responses.""" global repeat_count # Create mesh node - mesh_node, identity = create_mesh_node("SimpleTracker", radio_type) + mesh_node, identity = create_mesh_node("SimpleTracker", radio_type, serial_port) # Create advert packet advert_packet = PacketBuilder.create_advert( @@ -57,8 +59,9 @@ async def send_simple_tracked_advert(radio_type: str = "waveshare"): ) print_packet_info(advert_packet, "Created advert packet") - print("Sending advert...") + # Send the packet + print("\nSending advert...") success = await mesh_node.dispatcher.send_packet(advert_packet, wait_for_ack=False) if success: @@ -66,8 +69,8 @@ async def send_simple_tracked_advert(radio_type: str = "waveshare"): print("Listening for repeats... (Ctrl+C to stop)") print("-" * 40) - # Set up simple repeat counter - mesh_node.dispatcher.set_packet_received_callback(simple_repeat_counter) + # Set up simple repeat counter directly on the radio + mesh_node.radio.set_rx_callback(simple_repeat_counter) # Listen continuously try: @@ -83,11 +86,31 @@ async def send_simple_tracked_advert(radio_type: str = "waveshare"): return advert_packet, mesh_node -def main(radio_type: str = "waveshare"): +def main(): """Main function for running the example.""" - print(f"Using {radio_type} radio configuration") + import argparse + + parser = argparse.ArgumentParser(description="Send a location-tracked advertisement") + parser.add_argument( + "--radio-type", + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + default="waveshare", + help="Radio hardware type (default: waveshare)", + ) + parser.add_argument( + "--serial-port", + default="/dev/ttyUSB0", + help="Serial port for KISS TNC (default: /dev/ttyUSB0)", + ) + + args = parser.parse_args() + + print(f"Using {args.radio_type} radio configuration") + if args.radio_type == "kiss-tnc": + print(f"Serial port: {args.serial_port}") + try: - packet, node = asyncio.run(send_simple_tracked_advert(radio_type)) + packet, node = asyncio.run(send_simple_tracked_advert(args.radio_type, args.serial_port)) print("Example completed") except KeyboardInterrupt: print("\nInterrupted by user") @@ -96,7 +119,4 @@ def main(radio_type: str = "waveshare"): if __name__ == "__main__": - import sys - - radio_type = sys.argv[1] if len(sys.argv) > 1 else "waveshare" - main(radio_type) + main() diff --git a/examples/wireshark_stream.py b/examples/wireshark_stream.py new file mode 100644 index 0000000..f0ff3f8 --- /dev/null +++ b/examples/wireshark_stream.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +import argparse +import asyncio +import socket +import struct +import time + +from common import create_mesh_node + +LINKTYPE_USER0 = 147 + + +def setup_wireshark_stream(ip, port): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + dest = (ip, port) + global_hdr = struct.pack(" int: + return 0xFF # Special marker for fallback handler + + async def __call__(self, packet, metadata=None): + try: + raw_data = packet.write_to() + ts = time.time() + ts_sec, ts_usec = int(ts), int((ts % 1) * 1_000_000) + pkt_hdr = struct.pack("=2.0.0", "lgpio>=0.2.0", "spidev>=3.5", + "pyserial>=3.5", ] websocket = [ "websockets>=11.0.0", diff --git a/src/pymc_core/__init__.py b/src/pymc_core/__init__.py index a4882a5..46fece3 100644 --- a/src/pymc_core/__init__.py +++ b/src/pymc_core/__init__.py @@ -3,7 +3,7 @@ Clean, simple API for building mesh network applications. """ -__version__ = "1.0.2" +__version__ = "1.0.3" # Core mesh functionality from .node.node import MeshNode diff --git a/src/pymc_core/hardware/kiss_serial_wrapper.py b/src/pymc_core/hardware/kiss_serial_wrapper.py new file mode 100644 index 0000000..c41f37e --- /dev/null +++ b/src/pymc_core/hardware/kiss_serial_wrapper.py @@ -0,0 +1,754 @@ +""" +KISS Serial Protocol Wrapper + +""" + +import asyncio +import logging +import threading +from collections import deque +from typing import Any, Callable, Dict, Optional + +import serial + +from .base import LoRaRadio + +# KISS Protocol Constants +KISS_FEND = 0xC0 # Frame End +KISS_FESC = 0xDB # Frame Escape +KISS_TFEND = 0xDC # Transposed Frame End +KISS_TFESC = 0xDD # Transposed Frame Escape + +# KISS Command Masks +KISS_MASK_PORT = 0xF0 +KISS_MASK_CMD = 0x0F + +# KISS Commands +KISS_CMD_DATA = 0x00 +KISS_CMD_TXDELAY = 0x01 +KISS_CMD_PERSIST = 0x02 +KISS_CMD_SLOTTIME = 0x03 +KISS_CMD_TXTAIL = 0x04 +KISS_CMD_FULLDUP = 0x05 +KISS_CMD_VENDOR = 0x06 +KISS_CMD_RETURN = 0xFF + +# Buffer and timing constants +MAX_FRAME_SIZE = 512 +RX_BUFFER_SIZE = 1024 +TX_BUFFER_SIZE = 1024 +DEFAULT_BAUDRATE = 115200 +DEFAULT_TIMEOUT = 1.0 + +logger = logging.getLogger("KissSerialWrapper") + + +class KissSerialWrapper(LoRaRadio): + """ + KISS Serial Protocol Interface + + Provides full-duplex KISS protocol communication over serial port. + Handles frame encoding/decoding, buffering, and configuration commands. + Implements the LoRaRadio interface for PyMC Core compatibility. + """ + + def __init__( + self, + port: str, + baudrate: int = DEFAULT_BAUDRATE, + timeout: float = DEFAULT_TIMEOUT, + kiss_port: int = 0, + on_frame_received: Optional[Callable[[bytes], None]] = None, + radio_config: Optional[Dict[str, Any]] = None, + auto_configure: bool = True, + ): + """ + Initialize KISS Serial Wrapper + + Args: + port: Serial port device path (e.g., '/dev/ttyUSB0', '/dev/cu.usbserial-0001', 'comm1', etc.) + baudrate: Serial communication baud rate (default: 115200) + timeout: Serial read timeout in seconds (default: 1.0) + kiss_port: KISS port number (0-15, default: 0) + on_frame_received: Callback for received HDLC frames + radio_config: Optional radio configuration dict with keys: + frequency, bandwidth, sf, cr, sync_word, power, etc. + auto_configure: If True, automatically configure radio and enter KISS mode + """ + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self.kiss_port = kiss_port & 0x0F # Ensure 4-bit port number + self.auto_configure = auto_configure + + self.radio_config = radio_config or {} + self.is_configured = False + self.kiss_mode_active = False + + self.serial_conn: Optional[serial.Serial] = None + self.is_connected = False + + self.rx_buffer = deque(maxlen=RX_BUFFER_SIZE) + self.tx_buffer = deque(maxlen=TX_BUFFER_SIZE) + + self.rx_frame_buffer = bytearray() + self.in_frame = False + self.escaped = False + + self.rx_thread: Optional[threading.Thread] = None + self.tx_thread: Optional[threading.Thread] = None + self.stop_event = threading.Event() + + # Callbacks + self.on_frame_received = on_frame_received + + # KISS Configuration + self.config = { + "txdelay": 30, # TX delay (units of 10ms) + "persist": 64, # P parameter (0-255) + "slottime": 10, # Slot time (units of 10ms) + "txtail": 1, # TX tail time (units of 10ms) + "fulldup": False, # Full duplex mode + } + + self.stats = { + "frames_sent": 0, + "frames_received": 0, + "bytes_sent": 0, + "bytes_received": 0, + "frame_errors": 0, + "buffer_overruns": 0, + "last_rssi": None, + "last_snr": None, + "noise_floor": None, + } + + def connect(self) -> bool: + """ + Connect to serial port and start communication threads + + Returns: + True if connection successful, False otherwise + """ + try: + self.serial_conn = serial.Serial( + port=self.port, + baudrate=self.baudrate, + timeout=self.timeout, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + ) + + self.is_connected = True + self.stop_event.clear() + + # Start communication threads + self.rx_thread = threading.Thread(target=self._rx_worker, daemon=True) + self.tx_thread = threading.Thread(target=self._tx_worker, daemon=True) + + self.rx_thread.start() + self.tx_thread.start() + + logger.info(f"KISS serial connected to {self.port} at {self.baudrate} baud") + + # Auto-configure if requested + if self.auto_configure: + if not self.configure_radio_and_enter_kiss(): + logger.warning("Auto-configuration failed, KISS mode not active") + return False + + return True + + except Exception as e: + logger.error(f"Failed to connect to {self.port}: {e}") + self.is_connected = False + return False + + def disconnect(self): + """Disconnect from serial port and stop threads""" + self.is_connected = False + self.stop_event.set() + + # Wait for threads to finish + if self.rx_thread and self.rx_thread.is_alive(): + self.rx_thread.join(timeout=2.0) + if self.tx_thread and self.tx_thread.is_alive(): + self.tx_thread.join(timeout=2.0) + + # Close serial connection + if self.serial_conn and self.serial_conn.is_open: + self.serial_conn.close() + + logger.info(f"KISS serial disconnected from {self.port}") + + def send_frame(self, data: bytes) -> bool: + """ + Send a data frame via KISS protocol + + Args: + data: Raw frame data to send + + Returns: + True if frame queued successfully, False otherwise + """ + if not self.is_connected or len(data) > MAX_FRAME_SIZE: + logger.warning( + f"Cannot send frame - connected: {self.is_connected}, " + f"size: {len(data)}/{MAX_FRAME_SIZE}" + ) + return False + + try: + # Create KISS frame + kiss_frame = self._encode_kiss_frame(KISS_CMD_DATA, data) + + # Add to TX buffer + if len(self.tx_buffer) < TX_BUFFER_SIZE: + self.tx_buffer.append(kiss_frame) + return True + else: + self.stats["buffer_overruns"] += 1 + logger.warning("TX buffer overrun") + return False + + except Exception as e: + logger.error(f"Failed to send frame: {e}") + return False + + def send_config_command(self, cmd: int, value: int) -> bool: + """ + Send KISS configuration command + + Args: + cmd: KISS command type (KISS_CMD_*) + value: Command parameter value + + Returns: + True if command sent successfully, False otherwise + """ + if not self.is_connected: + return False + + try: + # Update local config + if cmd == KISS_CMD_TXDELAY: + self.config["txdelay"] = value + elif cmd == KISS_CMD_PERSIST: + self.config["persist"] = value + elif cmd == KISS_CMD_SLOTTIME: + self.config["slottime"] = value + elif cmd == KISS_CMD_TXTAIL: + self.config["txtail"] = value + elif cmd == KISS_CMD_FULLDUP: + self.config["fulldup"] = bool(value) + + # Create and send KISS command frame + kiss_frame = self._encode_kiss_frame(cmd, bytes([value])) + + if len(self.tx_buffer) < TX_BUFFER_SIZE: + self.tx_buffer.append(kiss_frame) + return True + else: + self.stats["buffer_overruns"] += 1 + return False + + except Exception as e: + logger.error(f"Failed to send config command: {e}") + return False + + def get_stats(self) -> Dict[str, Any]: + """Get interface statistics""" + return self.stats.copy() + + def get_config(self) -> Dict[str, Any]: + """Get current KISS configuration""" + return self.config.copy() + + def configure_radio_and_enter_kiss(self) -> bool: + """ + Configure radio settings and enter KISS mode + + Returns: + True if configuration successful, False otherwise + """ + if not self.is_connected: + logger.error("Cannot configure radio: not connected") + return False + + try: + if self.radio_config: + if not self._configure_radio(): + logger.error("Radio configuration failed") + return False + + if not self._enter_kiss_mode(): + logger.error("Failed to enter KISS mode") + return False + + self.kiss_mode_active = True + logger.info("Successfully configured radio and entered KISS mode") + return True + + except Exception as e: + logger.error(f"Configuration failed: {e}") + return False + + def _configure_radio(self) -> bool: + """ + Send radio configuration commands + + Returns: + True if configuration successful, False otherwise + """ + if not self.serial_conn or not self.serial_conn.is_open: + return False + + try: + # Extract configuration parameters with defaults + frequency_hz = self.radio_config.get("frequency", int(916.75 * 1000000)) + bandwidth_hz = self.radio_config.get("bandwidth", int(500.0 * 1000)) + sf = self.radio_config.get("spreading_factor", 5) + cr = self.radio_config.get("coding_rate", 5) + sync_word = self.radio_config.get("sync_word", 0x12) + power = self.radio_config.get("power", 20) # noqa: F841 - kept for future use + + # Convert Hz values to MHz/kHz for KISS command + frequency = frequency_hz / 1000000.0 # Convert Hz to MHz + bandwidth = bandwidth_hz / 1000.0 # Convert Hz to kHz + + # Format sync_word as hex if it's an integer + if isinstance(sync_word, int): + sync_word_str = f"0x{sync_word:02X}" + else: + sync_word_str = str(sync_word) + + # Build command string: set radio ,,,, + # Note: power parameter kept in config but not used in current command format + radio_cmd = f"set radio {frequency},{bandwidth},{sf},{cr},{sync_word_str}\r\n" + logger.info(radio_cmd) + + # Send command + self.serial_conn.write(radio_cmd.encode("ascii")) + self.serial_conn.flush() + + # Wait for response + threading.Event().wait(0.5) + + # Read any response + response = "" + if self.serial_conn.in_waiting > 0: + response = self.serial_conn.read(self.serial_conn.in_waiting).decode( + "ascii", errors="ignore" + ) + + logger.info(f"Radio config sent: {radio_cmd.strip()}") + if response: + logger.debug(f"Radio config response: {response.strip()}") + + self.is_configured = True + return True + + except Exception as e: + logger.error(f"Radio configuration error: {e}") + return False + + def _enter_kiss_mode(self) -> bool: + """ + Enter KISS serial mode + + Returns: + True if KISS mode entered successfully, False otherwise + """ + if not self.serial_conn or not self.serial_conn.is_open: + return False + + try: + # Send command to enter KISS mode + kiss_cmd = "serial mode kiss\r\n" + self.serial_conn.write(kiss_cmd.encode("ascii")) + self.serial_conn.flush() + + # Wait for mode switch + threading.Event().wait(1.0) + + # Read any response + response = "" + if self.serial_conn.in_waiting > 0: + response = self.serial_conn.read(self.serial_conn.in_waiting).decode( + "ascii", errors="ignore" + ) + + logger.info("Entered KISS mode") + if response: + logger.debug(f"KISS mode response: {response.strip()}") + + return True + + except Exception as e: + logger.error(f"KISS mode entry error: {e}") + return False + + def exit_kiss_mode(self) -> bool: + """ + Exit KISS mode and return to CLI mode + + Returns: + True if successfully exited KISS mode, False otherwise + """ + if not self.is_connected or not self.kiss_mode_active: + return False + + try: + # Send KISS return command to exit mode + return_frame = self._encode_kiss_frame(KISS_CMD_RETURN, b"") + + if self.serial_conn and self.serial_conn.is_open: + self.serial_conn.write(return_frame) + self.serial_conn.flush() + + # Wait for mode switch + threading.Event().wait(1.0) + + self.kiss_mode_active = False + logger.info("Exited KISS mode") + return True + + except Exception as e: + logger.error(f"Failed to exit KISS mode: {e}") + + return False + + def send_cli_command(self, command: str) -> Optional[str]: + """ + Send a CLI command (only works when not in KISS mode) + + Args: + command: CLI command to send + + Returns: + Response string if available, None otherwise + """ + if not self.is_connected or self.kiss_mode_active or not self.serial_conn: + logger.error("Cannot send CLI command: not connected or in KISS mode") + return None + + try: + # Send command + cmd_line = f"{command}\r\n" + self.serial_conn.write(cmd_line.encode("ascii")) + self.serial_conn.flush() + + # Wait for response + threading.Event().wait(0.5) + + # Read response + response = "" + if self.serial_conn.in_waiting > 0: + response = self.serial_conn.read(self.serial_conn.in_waiting).decode( + "ascii", errors="ignore" + ) + + logger.debug(f"CLI command: {command.strip()} -> {response.strip()}") + return response.strip() if response else None + + except Exception as e: + logger.error(f"CLI command error: {e}") + return None + + def set_rx_callback(self, callback: Callable[[bytes], None]): + """ + Set the RX callback function + + Args: + callback: Function to call when a frame is received + """ + self.on_frame_received = callback + logger.debug("RX callback set") + + def begin(self): + """ + Initialize the radio + """ + success = self.connect() + if not success: + raise Exception("Failed to initialize KISS radio") + + async def send(self, data: bytes) -> None: + """ + Send data via KISS TNC + + Args: + data: Data to send + + Raises: + Exception: If send fails + """ + success = self.send_frame(data) + if not success: + raise Exception("Failed to send frame via KISS TNC") + + async def wait_for_rx(self) -> bytes: + """ + Wait for a packet to be received asynchronously + + Returns: + Received packet data + """ + # Create a future to wait for the next received frame + future = asyncio.Future() + + # Store the original callback + original_callback = self.on_frame_received + + # Set a temporary callback that completes the future + def temp_callback(data: bytes): + if not future.done(): + future.set_result(data) + # Restore original callback if it exists + if original_callback: + try: + original_callback(data) + except Exception as e: + logger.error(f"Error in original callback: {e}") + + self.on_frame_received = temp_callback + + try: + # Wait for the next frame + data = await future + return data + finally: + # Restore original callback + self.on_frame_received = original_callback + + def sleep(self): + """ + Put the radio into low-power mode + + Note: KISS TNCs typically don't have software sleep control + """ + logger.debug("Sleep mode not supported for KISS TNC") + pass + + def get_last_rssi(self) -> int: + """ + Return last received RSSI in dBm + + Returns: + Last RSSI value or -999 if not available + """ + return self.stats.get("last_rssi", -999) + + def get_last_snr(self) -> float: + """ + Return last received SNR in dB + + Returns: + Last SNR value or -999.0 if not available + """ + return self.stats.get("last_snr", -999.0) + + def _encode_kiss_frame(self, cmd: int, data: bytes) -> bytes: + """ + Encode data into KISS frame format + + Args: + cmd: KISS command byte + data: Raw data to encode + + Returns: + Encoded KISS frame + """ + # Create command byte with port number + cmd_byte = ((self.kiss_port << 4) & KISS_MASK_PORT) | (cmd & KISS_MASK_CMD) + + # Start with FEND and command + frame = bytearray([KISS_FEND, cmd_byte]) + + # Escape and add data + for byte in data: + if byte == KISS_FEND: + frame.extend([KISS_FESC, KISS_TFEND]) + elif byte == KISS_FESC: + frame.extend([KISS_FESC, KISS_TFESC]) + else: + frame.append(byte) + + # End with FEND + frame.append(KISS_FEND) + + return bytes(frame) + + def _decode_kiss_byte(self, byte: int): + """ + Process received byte for KISS frame decoding + + Args: + byte: Received byte + """ + if byte == KISS_FEND: + if self.in_frame and len(self.rx_frame_buffer) > 1: + # Complete frame received + self._process_received_frame() + # Start new frame + self.rx_frame_buffer.clear() + self.in_frame = True + self.escaped = False + + elif byte == KISS_FESC: + if self.in_frame: + self.escaped = True + + elif self.escaped: + if byte == KISS_TFEND: + self.rx_frame_buffer.append(KISS_FEND) + elif byte == KISS_TFESC: + self.rx_frame_buffer.append(KISS_FESC) + else: + # Invalid escape sequence + self.stats["frame_errors"] += 1 + logger.warning(f"Invalid KISS escape sequence: 0x{byte:02X}") + self.escaped = False + + else: + if self.in_frame: + self.rx_frame_buffer.append(byte) + + def _process_received_frame(self): + """Process a complete received KISS frame""" + if len(self.rx_frame_buffer) < 1: + return + + # Extract command byte + cmd_byte = self.rx_frame_buffer[0] + port = (cmd_byte & KISS_MASK_PORT) >> 4 + cmd = cmd_byte & KISS_MASK_CMD + + # Check if frame is for our port + if port != self.kiss_port: + return + + # Extract data payload + data = bytes(self.rx_frame_buffer[1:]) + + if cmd == KISS_CMD_DATA: + # Data frame - emit to callback + if self.on_frame_received and len(data) > 0: + self.stats["frames_received"] += 1 + self.stats["bytes_received"] += len(data) + try: + self.on_frame_received(data) + except Exception as e: + logger.error(f"Error in frame received callback: {e}") + else: + # Configuration command response + logger.debug(f"Received KISS config command: cmd=0x{cmd:02X}, data={data.hex()}") + + def _rx_worker(self): + """Background thread for receiving data""" + while not self.stop_event.is_set() and self.is_connected: + try: + if self.serial_conn and self.serial_conn.in_waiting > 0: + # Read available bytes + data = self.serial_conn.read(self.serial_conn.in_waiting) + + # Process each byte through KISS decoder + for byte in data: + self._decode_kiss_byte(byte) + + else: + # Short sleep when no data available + threading.Event().wait(0.01) + + except Exception as e: + if self.is_connected: # Only log if we expect to be connected + logger.error(f"RX worker error: {e}") + break + + def _tx_worker(self): + """Background thread for sending data""" + while not self.stop_event.is_set() and self.is_connected: + try: + if self.tx_buffer: + # Get frame from buffer + frame = self.tx_buffer.popleft() + + # Send via serial + if self.serial_conn and self.serial_conn.is_open: + self.serial_conn.write(frame) + self.serial_conn.flush() + + self.stats["frames_sent"] += 1 + self.stats["bytes_sent"] += len(frame) + else: + logger.warning("Serial connection not open or not available") + else: + # Short sleep when no data to send + threading.Event().wait(0.01) + + except Exception as e: + if self.is_connected: # Only log if we expect to be connected + logger.error(f"TX worker error: {e}") + break + + def __enter__(self): + """Context manager entry""" + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + self.disconnect() + + def __del__(self): + """Destructor to ensure cleanup""" + try: + self.disconnect() + except Exception: + pass # Ignore errors during destruction + + +if __name__ == "__main__": + # Example usage + import time + + def on_frame_received(data): + print(f"Received frame: {data.hex()}") + + # Radio configuration example + radio_config = { + "frequency": int(916.75 * 1000000), # US: 916.75 MHz + "bandwidth": int(500.0 * 1000), # 500 kHz + "spreading_factor": 5, # LoRa SF5 + "coding_rate": 5, # LoRa CR 4/5 + "sync_word": 0x16, # Sync word + "power": 20, # TX power + } + + # Initialize with auto-configuration + kiss = KissSerialWrapper( + port="/dev/ttyUSB0", + baudrate=115200, + radio_config=radio_config, + on_frame_received=on_frame_received, + ) + + try: + if kiss.connect(): + print("Connected and configured successfully") + print(f"Configuration: {kiss.get_config()}") + print(f"Statistics: {kiss.get_stats()}") + + # Send a test frame + kiss.send_frame(b"Hello KISS World!") + + # Keep running for a bit + time.sleep(5) + else: + print("Failed to connect") + + except KeyboardInterrupt: + print("Interrupted by user") + finally: + kiss.disconnect() diff --git a/src/pymc_core/hardware/lora/LoRaRF/SX126x.py b/src/pymc_core/hardware/lora/LoRaRF/SX126x.py index dd1a88b..421bbd5 100644 --- a/src/pymc_core/hardware/lora/LoRaRF/SX126x.py +++ b/src/pymc_core/hardware/lora/LoRaRF/SX126x.py @@ -1483,3 +1483,17 @@ def _readBytes(self, opCode: int, nBytes: int, address: tuple = (), nAddress: in _get_output(self._cs_define).on() return tuple(feedback[nAddress + 1 :]) + + def start_cad(self, det_peak: int, det_min: int): + """Start CAD with given thresholds.""" + self.clearIrqStatus(0xFFFF) + self.setCadParams( + self.CAD_ON_8_SYMB, + det_peak, + det_min, + self.CAD_EXIT_STDBY, + 0xFFFFFF, + ) + mask = self.IRQ_CAD_DONE | self.IRQ_CAD_DETECTED + self.setDioIrqParams(mask, mask, self.IRQ_NONE, self.IRQ_NONE) + self.setCad() diff --git a/src/pymc_core/hardware/sx1262_wrapper.py b/src/pymc_core/hardware/sx1262_wrapper.py index 527087e..47287c5 100644 --- a/src/pymc_core/hardware/sx1262_wrapper.py +++ b/src/pymc_core/hardware/sx1262_wrapper.py @@ -1,6 +1,11 @@ """ SX1262 LoRa Radio Driver for Raspberry Pi Implements the LoRaRadio interface using the SX126x library + + +I have made some experimental changes to the cad section that I need to revisit. + + """ import asyncio @@ -192,10 +197,16 @@ def __init__( self._tx_done_event = asyncio.Event() self._rx_done_event = asyncio.Event() + self._cad_event = asyncio.Event() + + # Custom CAD thresholds (None means use defaults) + self._custom_cad_peak = None + self._custom_cad_min = None logger.info( f"SX1262Radio configured: freq={frequency/1e6:.1f}MHz, " - f"power={tx_power}dBm, sf={spreading_factor}, bw={bandwidth/1000:.1f}kHz, pre={preamble_length}" + f"power={tx_power}dBm, sf={spreading_factor}, " + f"bw={bandwidth/1000:.1f}kHz, pre={preamble_length}" ) # Register this instance as the active radio for IRQ callback access SX1262Radio._active_instance = self @@ -270,6 +281,17 @@ def _handle_interrupt(self): logger.debug("[TX] TX_DONE interrupt (0x{:04X})".format(self.lora.IRQ_TX_DONE)) self._tx_done_event.set() + # Check for CAD interrupts + if irqStat & (self.lora.IRQ_CAD_DETECTED | self.lora.IRQ_CAD_DONE): + cad_detected = bool(irqStat & self.lora.IRQ_CAD_DETECTED) + cad_done = bool(irqStat & self.lora.IRQ_CAD_DONE) + logger.debug( + f"[CAD] interrupt detected: {cad_detected}, done: {cad_done} (0x{irqStat:04X})" + ) + if hasattr(self, "_cad_event"): + # WAKEUP CODE + self._cad_event.set() + # Check each RX interrupt type separately for better debugging rx_interrupts = self._get_rx_irq_mask() if irqStat & self.lora.IRQ_RX_DONE: @@ -526,10 +548,15 @@ def begin(self) -> bool: self.lora.setBufferBaseAddress(0x00, 0x80) # TX=0x00, RX=0x80 # Enable LDRO if symbol duration > 16ms (SF11/62.5kHz = 32.768ms) - symbol_duration_ms = (2 ** self.spreading_factor) / (self.bandwidth / 1000) + symbol_duration_ms = (2**self.spreading_factor) / (self.bandwidth / 1000) ldro = symbol_duration_ms > 16.0 - logger.info(f"LDRO {'enabled' if ldro else 'disabled'} (symbol duration: {symbol_duration_ms:.3f}ms)") - self.lora.setLoRaModulation(self.spreading_factor, self.bandwidth, self.coding_rate, ldro) + logger.info( + f"LDRO {'enabled' if ldro else 'disabled'} " + f"(symbol duration: {symbol_duration_ms:.3f}ms)" + ) + self.lora.setLoRaModulation( + self.spreading_factor, self.bandwidth, self.coding_rate, ldro + ) self.lora.setLoRaPacket( self.lora.HEADER_EXPLICIT, @@ -569,10 +596,15 @@ def begin(self) -> bool: # Configure modulation and packet parameters # Enable LDRO if symbol duration > 16ms (SF11/62.5kHz = 32.768ms) - symbol_duration_ms = (2 ** self.spreading_factor) / (self.bandwidth / 1000) + symbol_duration_ms = (2**self.spreading_factor) / (self.bandwidth / 1000) ldro = symbol_duration_ms > 16.0 - logger.info(f"LDRO {'enabled' if ldro else 'disabled'} (symbol duration: {symbol_duration_ms:.3f}ms)") - self.lora.setLoRaModulation(self.spreading_factor, self.bandwidth, self.coding_rate, ldro) + logger.info( + f"LDRO {'enabled' if ldro else 'disabled'} " + f"(symbol duration: {symbol_duration_ms:.3f}ms)" + ) + self.lora.setLoRaModulation( + self.spreading_factor, self.bandwidth, self.coding_rate, ldro + ) self.lora.setPacketParamsLoRa( self.preamble_length, self.lora.HEADER_EXPLICIT, @@ -586,6 +618,24 @@ def begin(self) -> bool: self.lora.setDioIrqParams(rx_mask, rx_mask, self.lora.IRQ_NONE, self.lora.IRQ_NONE) self.lora.clearIrqStatus(0xFFFF) + # Program custom CAD thresholds to chip hardware if available + if self._custom_cad_peak is not None and self._custom_cad_min is not None: + logger.info( + f"Setting CAD thresholds to chip: peak={self._custom_cad_peak},", + f"min={self._custom_cad_min}", + ) + try: + self.lora.setCadParams( + self.lora.CAD_ON_2_SYMB, # 2 symbols for detection + self._custom_cad_peak, + self._custom_cad_min, + self.lora.CAD_EXIT_STDBY, # exit to standby + 0, # no timeout + ) + logger.debug("Custom CAD thresholds written") + except Exception as e: + logger.warning(f"Failed to write CAD thresholds: {e}") + # Set to RX continuous mode for initial operation self.lora.setRx(self.lora.RX_CONTINUOUS) self._initialized = True @@ -690,6 +740,41 @@ async def _prepare_radio_for_tx(self) -> bool: await asyncio.sleep(0.01) busy_wait += 1 + # Listen Before Talk (LBT) - Check for channel activity using CAD + lbt_attempts = 0 + max_lbt_attempts = 5 + while lbt_attempts < max_lbt_attempts: + try: + # Perform CAD with your custom thresholds + channel_busy = await self.perform_cad(timeout=0.5) + if not channel_busy: + logger.debug(f"Channel clear after {lbt_attempts + 1} CAD checks") + break + else: + lbt_attempts += 1 + if lbt_attempts < max_lbt_attempts: + # Channel busy, wait random backoff before trying again + # this may conflict with dispatcher will need testing. + # Channel busy, wait backoff before trying again (MeshCore-inspired) + import random + + base_delay = random.randint(120, 240) + backoff_ms = base_delay + ( + lbt_attempts * 50 + ) # Progressive: 120-290ms, 170-340ms, etc. + logger.debug( + f"Channel busy (CAD detected activity), backing off {backoff_ms}ms" + f" - >>>>>>> attempt {lbt_attempts} <<<<<<<", + ) + await asyncio.sleep(backoff_ms / 1000.0) + else: + logger.warning( + f"Channel still busy after {max_lbt_attempts} CAD attempts - tx anyway" + ) + except Exception as e: + logger.debug(f"CAD check failed: {e}, proceeding with transmission") + break + # Set TXEN/RXEN pins for TX mode self._control_tx_rx_pins(tx_mode=True) @@ -870,21 +955,18 @@ async def send(self, data: bytes) -> None: # Prepare packet for transmission self._prepare_packet_transmission(data_list, length) - # Setup TX interrupts - self._setup_tx_interrupts() - - # Small delay to ensure IRQ configuration is applied - await asyncio.sleep(self.RADIO_TIMING_DELAY) - logger.debug( f"Setting TX timeout: {final_timeout_ms}ms " f"(tOut={driver_timeout}) for {length} bytes" ) - # Prepare radio hardware for transmission if not await self._prepare_radio_for_tx(): return + # Setup TX interrupts AFTER CAD checks (CAD changes interrupt config) + self._setup_tx_interrupts() + await asyncio.sleep(self.RADIO_TIMING_DELAY) + # Execute the transmission if not await self._execute_transmission(driver_timeout): return @@ -933,16 +1015,54 @@ def get_noise_floor(self) -> Optional[float]: """ if not self._initialized or self.lora is None: return None + + # Skip noise floor reading if we're currently transmitting + if hasattr(self, "_tx_lock") and self._tx_lock.locked(): + return None + try: raw_rssi = self.lora.getRssiInst() if raw_rssi is not None: noise_floor_dbm = -(float(raw_rssi) / 2) - return noise_floor_dbm + # Validate reading - reject obviously invalid values + if -150.0 <= noise_floor_dbm <= -50.0: + return noise_floor_dbm + else: + # Invalid reading detected - trigger radio state reset + logger.debug( + f"Invalid noise floor reading: {noise_floor_dbm:.1f}dBm - resetting radio" + ) + self._reset_radio_state() + return None return None except Exception as e: logger.debug(f"Failed to read noise floor: {e}") return None + def _reset_radio_state(self) -> None: + """Reset radio state to recover from invalid RSSI readings""" + if not self._initialized or self.lora is None: + return + + try: + # Force radio back to standby then RX mode + self.lora.setStandby(self.lora.STANDBY_RC) + time.sleep(0.05) # Let radio settle + + # Clear interrupt flags + irq_status = self.lora.getIrqStatus() + if irq_status != 0: + self.lora.clearIrqStatus(irq_status) + + # Restore RX mode + rx_mask = self._get_rx_irq_mask() + self.lora.setDioIrqParams(rx_mask, rx_mask, self.lora.IRQ_NONE, self.lora.IRQ_NONE) + self.lora.setRx(self.lora.RX_CONTINUOUS) + + logger.debug("Radio state reset completed") + except Exception as e: + logger.warning(f"Failed to reset radio state: {e}") + def set_frequency(self, frequency: int) -> bool: """Set operating frequency""" @@ -1008,6 +1128,174 @@ def get_status(self) -> dict: return status + def set_custom_cad_thresholds(self, peak: int, min_val: int) -> None: + """Set custom CAD thresholds that override the defaults. + + Args: + peak: CAD detection peak threshold (0-31) + min_val: CAD detection minimum threshold (0-31) + """ + if not (0 <= peak <= 31) or not (0 <= min_val <= 31): + raise ValueError("CAD thresholds must be between 0 and 31") + + self._custom_cad_peak = peak + self._custom_cad_min = min_val + logger.info(f"Custom CAD thresholds set: peak={peak}, min={min_val}") + + def clear_custom_cad_thresholds(self) -> None: + """Clear custom CAD thresholds and revert to defaults.""" + self._custom_cad_peak = None + self._custom_cad_min = None + logger.info("Custom CAD thresholds cleared, reverting to defaults") + + def _get_thresholds_for_current_settings(self) -> tuple[int, int]: + """Fetch CAD thresholds for the current spreading factor. + Returns (cadDetPeak, cadDetMin). + """ + # Use custom thresholds if set + if self._custom_cad_peak is not None and self._custom_cad_min is not None: + return (self._custom_cad_peak, self._custom_cad_min) + + # Default CAD thresholds by SF (based on Semtech TR013 recommendations) + DEFAULT_CAD_THRESHOLDS = { + 7: (22, 10), + 8: (22, 10), + 9: (24, 10), + 10: (25, 10), + 11: (26, 10), + 12: (30, 10), + } + + # Fall back to SF7 values if unknown + return DEFAULT_CAD_THRESHOLDS.get(self.spreading_factor, (22, 10)) + + async def perform_cad( + self, + det_peak: int | None = None, + det_min: int | None = None, + timeout: float = 1.0, + calibration: bool = False, + ) -> bool | dict: + """ + Perform Channel Activity Detection (CAD). + If calibration=True, uses provided thresholds and returns info. + If calibration=False, uses pre-calibrated/default thresholds. + + Returns: + bool: Channel activity detected (when calibration=False) + dict: Calibration data (when calibration=True) + """ + if not self._initialized: + raise RuntimeError("Radio not initialized") + + if not self.lora: + raise RuntimeError("LoRa radio object not available") + + # Choose thresholds + if det_peak is None or det_min is None: + det_peak, det_min = self._get_thresholds_for_current_settings() + + try: + # Put radio in standby mode before CAD configuration + self.lora.setStandby(self.lora.STANDBY_RC) + + # Clear any existing interrupt flags + existing_irq = self.lora.getIrqStatus() + if existing_irq != 0: + self.lora.clearIrqStatus(existing_irq) + + # Configure CAD interrupts + cad_mask = self.lora.IRQ_CAD_DONE | self.lora.IRQ_CAD_DETECTED + self.lora.setDioIrqParams(cad_mask, cad_mask, self.lora.IRQ_NONE, self.lora.IRQ_NONE) + + self.lora.setCadParams( + self.lora.CAD_ON_2_SYMB, # 2 symbols + det_peak, + det_min, + self.lora.CAD_EXIT_STDBY, # exit to standby + 0, # no timeout + ) + + # Clear CAD event before starting + self._cad_event.clear() + + # Start CAD operation + self.lora.setCad() + + logger.debug(f"CAD started with peak={det_peak}, min={det_min}") + + # Wait for CAD completion + try: + await asyncio.wait_for(self._cad_event.wait(), timeout=timeout) + self._cad_event.clear() + + irq = self.lora.getIrqStatus() + logger.debug(f"CAD completed with IRQ status: 0x{irq:04X}") + self.lora.clearIrqStatus(irq) + detected = bool(irq & self.lora.IRQ_CAD_DETECTED) + cad_done = bool(irq & self.lora.IRQ_CAD_DONE) + + if not cad_done: + logger.warning("CAD interrupt received but CAD_DONE flag not set") + + if calibration: + return { + "sf": self.spreading_factor, + "bw": self.bandwidth, + "det_peak": det_peak, + "det_min": det_min, + "detected": detected, + "cad_done": cad_done, + "timestamp": time.time(), + "irq_status": irq, + } + else: + return detected + + except asyncio.TimeoutError: + logger.debug("CAD operation timed out") + # Check if there were any interrupt flags set anyway + irq = self.lora.getIrqStatus() + if irq != 0: + logger.debug(f"CAD timeout but IRQ status: 0x{irq:04X}") + self.lora.clearIrqStatus(irq) + + if calibration: + return { + "sf": self.spreading_factor, + "bw": self.bandwidth, + "det_peak": det_peak, + "det_min": det_min, + "detected": False, + "timestamp": time.time(), + "timeout": True, + } + else: + return False + + except Exception as e: + logger.error(f"CAD operation failed: {e}") + if calibration: + return { + "sf": self.spreading_factor, + "bw": self.bandwidth, + "det_peak": det_peak, + "det_min": det_min, + "detected": False, + "timestamp": time.time(), + "error": str(e), + } + else: + return False + finally: + # Restore RX mode after CAD + try: + rx_mask = self._get_rx_irq_mask() + self.lora.setDioIrqParams(rx_mask, rx_mask, self.lora.IRQ_NONE, self.lora.IRQ_NONE) + self.lora.setRx(self.lora.RX_CONTINUOUS) + except Exception as e: + logger.warning(f"Failed to restore RX mode after CAD: {e}") + def cleanup(self) -> None: """Clean up radio resources""" if hasattr(self, "lora") and self.lora: diff --git a/tests/test_basic.py b/tests/test_basic.py index 17e1406..7645b47 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -2,7 +2,7 @@ def test_version(): - assert __version__ == "1.0.2" + assert __version__ == "1.0.3" def test_import():