diff --git a/circuitpython-workspaces/flight-software/examples/README.md b/circuitpython-workspaces/flight-software/examples/README.md new file mode 100644 index 00000000..fc8dece6 --- /dev/null +++ b/circuitpython-workspaces/flight-software/examples/README.md @@ -0,0 +1,33 @@ +# Flight Software Examples + +This directory contains example code files demonstrating various PySquared features. + +## watchdog_demo.py + +A simple demonstration of the `set_next_code_file` command functionality. This file can be used to test the supervisor module's ability to switch execution to a different code file after a reset. + +### What it does: +- Prints status messages to the console every 1 second +- Demonstrates that the new code file is running after reset +- Runs in an infinite loop (simulating continuous operation) + +### How to use: +1. Upload this file to your satellite's filesystem +2. Send a `set_next_code_file` command with this file as the argument: + ```json + { + "password": "your_password", + "name": "your_satellite_name", + "command": "set_next_code_file", + "args": ["watchdog_demo.py"] + } + ``` +3. The satellite will reset and begin executing this file instead of the default `code.py` +4. You should see messages like `[1] Running at 1.23s` printed every second + +### Reverting back to normal operation: +To return to the default code file, you would need to: +- Either send another `set_next_code_file` command with `code.py` as the argument +- Or manually reset the satellite and ensure `code.py` is present + +**Note:** This is a minimal demonstration. In production, you should include mechanisms to safely revert to the original code if the new code fails to execute properly. diff --git a/circuitpython-workspaces/flight-software/examples/watchdog_demo.py b/circuitpython-workspaces/flight-software/examples/watchdog_demo.py new file mode 100644 index 00000000..4b015978 --- /dev/null +++ b/circuitpython-workspaces/flight-software/examples/watchdog_demo.py @@ -0,0 +1,40 @@ +"""Demo code file for testing supervisor.set_next_code_file functionality. + +This file demonstrates the set_next_code_file command with a simple loop that +prints messages at a 1-second interval. This is a minimal example to verify that +the supervisor module can successfully switch to a new code file after a reset. + +To use this demo: +1. Upload this file to the satellite's filesystem (e.g., as 'watchdog_demo.py') +2. Send a command to set this as the next code file: + {"password": "", "name": "", "command": "set_next_code_file", "args": ["watchdog_demo.py"]} +3. The satellite will reset and begin executing this file instead of the default code.py + +Note: This is a minimal demonstration. In a real scenario, you would want to +include more robust error handling and a way to revert to the original code. +""" + +import time + +print("=== Watchdog Demo Started ===") +print("This demonstrates the set_next_code_file command functionality.") +print("Printing a message every 1 second to show the code is running.") +print("Press Ctrl+C to stop.") + +iteration = 0 +try: + while True: + iteration += 1 + current_time = time.monotonic() + print(f"[{iteration}] Running at {current_time:.2f}s") + + # Sleep for 1 second + time.sleep(1.0) + +except KeyboardInterrupt: + print("\n=== Demo stopped by user ===") +except Exception as e: + print(f"Error in demo: {e}") + import microcontroller + + microcontroller.reset() diff --git a/circuitpython-workspaces/flight-software/src/pysquared/cdh.py b/circuitpython-workspaces/flight-software/src/pysquared/cdh.py index 099fc2b2..6256586a 100644 --- a/circuitpython-workspaces/flight-software/src/pysquared/cdh.py +++ b/circuitpython-workspaces/flight-software/src/pysquared/cdh.py @@ -21,6 +21,7 @@ import traceback import microcontroller +import supervisor from .config.config import Config from .hardware.radio.packetizer.packet_manager import PacketManager @@ -33,6 +34,7 @@ class CommandDataHandler: command_reset: str = "reset" command_change_radio_modulation: str = "change_radio_modulation" command_send_joke: str = "send_joke" + command_set_next_code_file: str = "set_next_code_file" oscar_password: str = "Hello World!" # Default password for OSCAR commands @@ -137,6 +139,8 @@ def listen_for_commands(self, timeout: int) -> None: self.change_radio_modulation(args) elif cmd == self.command_send_joke: self.send_joke() + elif cmd == self.command_set_next_code_file: + self.set_next_code_file(args) else: self._log.warning("Unknown command received", cmd=cmd) self._packet_manager.send( @@ -196,6 +200,40 @@ def reset(self) -> None: microcontroller.on_next_reset(microcontroller.RunMode.NORMAL) microcontroller.reset() + def set_next_code_file(self, args: list[str]) -> None: + """Sets the next code file to execute after reset. + + Args: + args: A list of arguments, the first item must be the filename. All other items in the args list are ignored. + """ + if len(args) < 1: + self._log.warning("No filename specified") + self._packet_manager.send( + "No filename specified. Please provide a code file name.".encode( + "utf-8" + ) + ) + return + + filename = args[0] + + try: + supervisor.set_next_code_file(filename) + self._log.info("Next code file set", filename=filename) + self._packet_manager.send( + f"Next code file set to: {filename}. Resetting satellite...".encode( + "utf-8" + ) + ) + # Reset the satellite to execute the new code file + microcontroller.on_next_reset(microcontroller.RunMode.NORMAL) + microcontroller.reset() + except Exception as e: + self._log.error("Failed to set next code file", err=e) + self._packet_manager.send( + f"Failed to set next code file: {e}".encode("utf-8") + ) + def oscar_command(self, command: str, args: list[str]) -> None: """Handles OSCAR commands. diff --git a/cpython-workspaces/flight-software-mocks/src/mocks/circuitpython/supervisor.py b/cpython-workspaces/flight-software-mocks/src/mocks/circuitpython/supervisor.py new file mode 100644 index 00000000..0376ac18 --- /dev/null +++ b/cpython-workspaces/flight-software-mocks/src/mocks/circuitpython/supervisor.py @@ -0,0 +1,15 @@ +"""Mock for the CircuitPython supervisor module. + +This module provides a mock implementation of the CircuitPython supervisor module +for testing purposes. It allows for simulating the behavior of the supervisor +module without the need for actual CircuitPython hardware. +""" + + +def set_next_code_file(filename: str) -> None: + """Mock implementation of supervisor.set_next_code_file. + + Args: + filename: The name of the code file to execute on next reset. + """ + pass diff --git a/cpython-workspaces/flight-software-unit-tests/src/unit-tests/test_cdh.py b/cpython-workspaces/flight-software-unit-tests/src/unit-tests/test_cdh.py index f8f1dcb6..e15a4bbd 100644 --- a/cpython-workspaces/flight-software-unit-tests/src/unit-tests/test_cdh.py +++ b/cpython-workspaces/flight-software-unit-tests/src/unit-tests/test_cdh.py @@ -6,9 +6,15 @@ """ import json +import sys from unittest.mock import MagicMock, patch import pytest + +from mocks.circuitpython import supervisor + +sys.modules["supervisor"] = supervisor + from pysquared.cdh import CommandDataHandler from pysquared.config.config import Config from pysquared.hardware.radio.packetizer.packet_manager import PacketManager @@ -608,3 +614,140 @@ def test_listen_for_commands_oscar_ping_integration( # Verify ping response was sent mock_packet_manager.send.assert_called_once_with("Pong! -82".encode("utf-8")) + + +def test_set_next_code_file_success(cdh, mock_logger, mock_packet_manager): + """Tests set_next_code_file with valid filename. + + Args: + cdh: CommandDataHandler instance. + mock_logger: Mocked Logger instance. + mock_packet_manager: Mocked PacketManager instance. + """ + with patch("pysquared.cdh.supervisor") as mock_supervisor: + with patch("pysquared.cdh.microcontroller") as mock_microcontroller: + mock_supervisor.set_next_code_file = MagicMock() + mock_microcontroller.reset = MagicMock() + mock_microcontroller.on_next_reset = MagicMock() + mock_microcontroller.RunMode = MagicMock() + mock_microcontroller.RunMode.NORMAL = MagicMock() + + filename = ["test_code.py"] + + cdh.set_next_code_file(filename) + + # Verify supervisor.set_next_code_file was called with correct filename + mock_supervisor.set_next_code_file.assert_called_once_with(filename[0]) + + # Verify info log + mock_logger.info.assert_called_once_with( + "Next code file set", filename=filename[0] + ) + + # Verify success message was sent + expected_message = f"Next code file set to: {filename[0]}. Resetting satellite..." + mock_packet_manager.send.assert_called_once_with( + expected_message.encode("utf-8") + ) + + # Verify reset was called + mock_microcontroller.on_next_reset.assert_called_once_with( + mock_microcontroller.RunMode.NORMAL + ) + mock_microcontroller.reset.assert_called_once() + + +def test_set_next_code_file_no_filename(cdh, mock_logger, mock_packet_manager): + """Tests set_next_code_file when no filename is specified. + + Args: + cdh: CommandDataHandler instance. + mock_logger: Mocked Logger instance. + mock_packet_manager: Mocked PacketManager instance. + """ + # Call the method with an empty list + cdh.set_next_code_file([]) + + # Verify warning was logged + mock_logger.warning.assert_called_once_with("No filename specified") + + # Verify error message was sent + expected_message = "No filename specified. Please provide a code file name." + mock_packet_manager.send.assert_called_once_with(expected_message.encode("utf-8")) + + +def test_set_next_code_file_failure(cdh, mock_logger, mock_packet_manager): + """Tests set_next_code_file with an error case. + + Args: + cdh: CommandDataHandler instance. + mock_logger: Mocked Logger instance. + mock_packet_manager: Mocked PacketManager instance. + """ + with patch("pysquared.cdh.supervisor") as mock_supervisor: + mock_supervisor.set_next_code_file = MagicMock( + side_effect=ValueError("Invalid filename") + ) + + filename = ["invalid_file.py"] + + cdh.set_next_code_file(filename) + + # Verify error was logged + mock_logger.error.assert_called_once() + + # Verify error message was sent + expected_message = "Failed to set next code file: Invalid filename" + mock_packet_manager.send.assert_called_once_with( + expected_message.encode("utf-8") + ) + + +@patch("time.sleep") +def test_listen_for_commands_set_next_code_file( + mock_sleep, cdh, mock_packet_manager, mock_logger +): + """Tests listen_for_commands with set_next_code_file command. + + Args: + mock_sleep: Mocked time.sleep function. + cdh: CommandDataHandler instance. + mock_packet_manager: Mocked PacketManager instance. + mock_logger: Mocked Logger instance. + """ + with patch("pysquared.cdh.supervisor") as mock_supervisor: + with patch("pysquared.cdh.microcontroller") as mock_microcontroller: + mock_supervisor.set_next_code_file = MagicMock() + mock_microcontroller.reset = MagicMock() + mock_microcontroller.on_next_reset = MagicMock() + + message = { + "password": "test_password", + "name": "test_satellite", + "command": "set_next_code_file", + "args": ["new_code.py"], + } + mock_packet_manager.listen.return_value = json.dumps(message).encode( + "utf-8" + ) + + cdh.listen_for_commands(30) + + # Verify supervisor.set_next_code_file was called + mock_supervisor.set_next_code_file.assert_called_once_with("new_code.py") + + # Verify acknowledgement was sent + mock_packet_manager.send_acknowledgement.assert_called_once() + + # Verify success message was sent + expected_message = ( + "Next code file set to: new_code.py. Resetting satellite..." + ) + mock_packet_manager.send.assert_called_once_with( + expected_message.encode("utf-8") + ) + + # Verify reset was called + mock_microcontroller.on_next_reset.assert_called_once() + mock_microcontroller.reset.assert_called_once() +