|
| 1 | +""" |
| 2 | +watchdog_test.py: |
| 3 | +
|
| 4 | +Integration tests for the Watchdog component. |
| 5 | +""" |
| 6 | + |
| 7 | +import time |
| 8 | + |
| 9 | +import pytest |
| 10 | +from fprime_gds.common.data_types.ch_data import ChData |
| 11 | +from fprime_gds.common.testing_fw.api import IntegrationTestAPI |
| 12 | + |
| 13 | + |
| 14 | +@pytest.fixture(autouse=True) |
| 15 | +def ensure_watchdog_running(fprime_test_api: IntegrationTestAPI): |
| 16 | + """Fixture to ensure watchdog is started before and after each test""" |
| 17 | + start_watchdog(fprime_test_api) |
| 18 | + yield |
| 19 | + start_watchdog(fprime_test_api) |
| 20 | + |
| 21 | + |
| 22 | +def start_watchdog(fprime_test_api: IntegrationTestAPI): |
| 23 | + """Helper function to start the watchdog""" |
| 24 | + fprime_test_api.send_and_assert_command( |
| 25 | + "ReferenceDeployment.watchdog.START_WATCHDOG", max_delay=2 |
| 26 | + ) |
| 27 | + fprime_test_api.assert_event( |
| 28 | + "ReferenceDeployment.watchdog.WatchdogStart", timeout=2 |
| 29 | + ) |
| 30 | + |
| 31 | + |
| 32 | +def get_watchdog_transitions(fprime_test_api: IntegrationTestAPI) -> int: |
| 33 | + """Helper function to request packet and get fresh WatchdogTransitions telemetry""" |
| 34 | + fprime_test_api.clear_histories() |
| 35 | + fprime_test_api.send_and_assert_command( |
| 36 | + "CdhCore.tlmSend.SEND_PKT", ["5"], max_delay=2 |
| 37 | + ) |
| 38 | + result: ChData = fprime_test_api.assert_telemetry( |
| 39 | + "ReferenceDeployment.watchdog.WatchdogTransitions", start="NOW", timeout=3 |
| 40 | + ) |
| 41 | + return result.get_val() |
| 42 | + |
| 43 | + |
| 44 | +def test_01_watchdog_telemetry_basic(fprime_test_api: IntegrationTestAPI): |
| 45 | + """Test that we can read WatchdogTransitions telemetry""" |
| 46 | + value = get_watchdog_transitions(fprime_test_api) |
| 47 | + assert value >= 0, f"WatchdogTransitions should be >= 0, got {value}" |
| 48 | + |
| 49 | + |
| 50 | +def test_02_watchdog_increments(fprime_test_api: IntegrationTestAPI): |
| 51 | + """Test that WatchdogTransitions increments over time""" |
| 52 | + |
| 53 | + initial_value = get_watchdog_transitions(fprime_test_api) |
| 54 | + time.sleep(2.0) # Wait for watchdog to run more cycles |
| 55 | + updated_value = get_watchdog_transitions(fprime_test_api) |
| 56 | + |
| 57 | + assert updated_value > initial_value, ( |
| 58 | + f"WatchdogTransitions should increase. Initial: {initial_value}, Updated: {updated_value}" |
| 59 | + ) |
| 60 | + |
| 61 | + |
| 62 | +def test_03_stop_watchdog_command(fprime_test_api: IntegrationTestAPI): |
| 63 | + """ |
| 64 | + Test STOP_WATCHDOG command sends and emits WatchdogStop |
| 65 | + event and WatchdogTransitions stops incrementing |
| 66 | + """ |
| 67 | + fprime_test_api.clear_histories() |
| 68 | + |
| 69 | + # Send stop command |
| 70 | + fprime_test_api.send_and_assert_command( |
| 71 | + "ReferenceDeployment.watchdog.STOP_WATCHDOG", max_delay=2 |
| 72 | + ) |
| 73 | + |
| 74 | + # Check for watchdog stop event |
| 75 | + fprime_test_api.assert_event("ReferenceDeployment.watchdog.WatchdogStop", timeout=2) |
| 76 | + |
| 77 | + # Get watchdog transition count |
| 78 | + initial_value = get_watchdog_transitions(fprime_test_api) |
| 79 | + |
| 80 | + # Wait and check that it's no longer incrementing |
| 81 | + time.sleep(2.0) |
| 82 | + final_value = get_watchdog_transitions(fprime_test_api) |
| 83 | + |
| 84 | + assert final_value == initial_value, ( |
| 85 | + f"Watchdog should remain stopped. Initial: {initial_value}, Final: {final_value}" |
| 86 | + ) |
0 commit comments