|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +""" |
| 4 | +Tests for FTP_OP class in backend_mavftp.py. |
| 5 | +
|
| 6 | +This file is part of Ardupilot methodic configurator. https://github.com/ArduPilot/MethodicConfigurator |
| 7 | +
|
| 8 | +SPDX-FileCopyrightText: 2024-2025 Amilcar do Carmo Lucas <[email protected]> |
| 9 | +
|
| 10 | +SPDX-License-Identifier: GPL-3.0-or-later |
| 11 | +""" |
| 12 | + |
| 13 | +import unittest |
| 14 | + |
| 15 | +import pytest |
| 16 | + |
| 17 | +from ardupilot_methodic_configurator.backend_mavftp import ( |
| 18 | + FTP_OP, |
| 19 | + ERR_EndOfFile, |
| 20 | + ERR_Fail, |
| 21 | + ERR_FailErrno, |
| 22 | + ERR_FailToOpenLocalFile, |
| 23 | + ERR_FileExists, |
| 24 | + ERR_FileNotFound, |
| 25 | + ERR_FileProtected, |
| 26 | + ERR_InvalidArguments, |
| 27 | + ERR_InvalidDataSize, |
| 28 | + ERR_InvalidErrorCode, |
| 29 | + ERR_InvalidOpcode, |
| 30 | + ERR_InvalidSession, |
| 31 | + ERR_NoErrorCodeInNack, |
| 32 | + ERR_NoErrorCodeInPayload, |
| 33 | + ERR_NoFilesystemErrorInPayload, |
| 34 | + ERR_None, |
| 35 | + ERR_NoSessionsAvailable, |
| 36 | + ERR_PayloadTooLarge, |
| 37 | + ERR_PutAlreadyInProgress, |
| 38 | + ERR_RemoteReplyTimeout, |
| 39 | + ERR_UnknownCommand, |
| 40 | + MAVFTPReturn, |
| 41 | + MAVFTPSetting, |
| 42 | + MAVFTPSettings, |
| 43 | + ParamData, |
| 44 | + WriteQueue, |
| 45 | +) |
| 46 | + |
| 47 | + |
| 48 | +class TestFTPOP(unittest.TestCase): |
| 49 | + """Test cases for FTP_OP class.""" |
| 50 | + |
| 51 | + def test_init(self) -> None: |
| 52 | + """Test initialization of FTP_OP.""" |
| 53 | + op = FTP_OP(seq=1, session=2, opcode=3, size=4, req_opcode=5, burst_complete=True, offset=6, payload=b"test") |
| 54 | + assert op.seq == 1 |
| 55 | + assert op.session == 2 |
| 56 | + assert op.opcode == 3 |
| 57 | + assert op.size == 4 |
| 58 | + assert op.req_opcode == 5 |
| 59 | + assert op.burst_complete |
| 60 | + assert op.offset == 6 |
| 61 | + assert op.payload == b"test" |
| 62 | + |
| 63 | + def test_pack_with_payload(self) -> None: |
| 64 | + """Test pack method with payload.""" |
| 65 | + op = FTP_OP(seq=1, session=2, opcode=3, size=4, req_opcode=5, burst_complete=True, offset=6, payload=b"test") |
| 66 | + packed = op.pack() |
| 67 | + assert isinstance(packed, bytearray) |
| 68 | + # Header should be 12 bytes (struct.pack("<HBBBBBBI")) |
| 69 | + # Plus 4 bytes for the "test" payload |
| 70 | + assert len(packed) == 16 |
| 71 | + # Check first bytes match expected values |
| 72 | + assert packed[0] == 1 # seq (low byte) |
| 73 | + assert packed[1] == 0 # seq (high byte) |
| 74 | + assert packed[2] == 2 # session |
| 75 | + assert packed[3] == 3 # opcode |
| 76 | + assert packed[4] == 4 # size |
| 77 | + assert packed[5] == 5 # req_opcode |
| 78 | + assert packed[6] == 1 # burst_complete (True = 1) |
| 79 | + assert packed[12:16] == b"test" # payload |
| 80 | + |
| 81 | + def test_pack_without_payload(self) -> None: |
| 82 | + """Test pack method without payload.""" |
| 83 | + op = FTP_OP(seq=1, session=2, opcode=3, size=4, req_opcode=5, burst_complete=False, offset=6, payload=None) |
| 84 | + packed = op.pack() |
| 85 | + assert isinstance(packed, bytearray) |
| 86 | + # Header should be 12 bytes (struct.pack("<HBBBBBBI")) |
| 87 | + assert len(packed) == 12 |
| 88 | + # Check first bytes match expected values |
| 89 | + assert packed[0] == 1 # seq (low byte) |
| 90 | + assert packed[1] == 0 # seq (high byte) |
| 91 | + assert packed[2] == 2 # session |
| 92 | + assert packed[3] == 3 # opcode |
| 93 | + assert packed[4] == 4 # size |
| 94 | + assert packed[5] == 5 # req_opcode |
| 95 | + assert packed[6] == 0 # burst_complete (False = 0) |
| 96 | + |
| 97 | + def test_str_with_payload(self) -> None: |
| 98 | + """Test string representation with payload.""" |
| 99 | + op = FTP_OP(seq=1, session=2, opcode=3, size=4, req_opcode=5, burst_complete=True, offset=6, payload=b"test") |
| 100 | + str_rep = str(op) |
| 101 | + assert "seq:1" in str_rep |
| 102 | + assert "sess:2" in str_rep |
| 103 | + assert "opcode:3" in str_rep |
| 104 | + assert "req_opcode:5" in str_rep |
| 105 | + assert "size:4" in str_rep |
| 106 | + assert "bc:True" in str_rep |
| 107 | + assert "ofs:6" in str_rep |
| 108 | + assert "plen=4" in str_rep |
| 109 | + assert "[116]" in str_rep # ASCII value of 't' |
| 110 | + |
| 111 | + def test_str_without_payload(self) -> None: |
| 112 | + """Test string representation without payload.""" |
| 113 | + op = FTP_OP(seq=1, session=2, opcode=3, size=4, req_opcode=5, burst_complete=False, offset=6, payload=None) |
| 114 | + str_rep = str(op) |
| 115 | + assert "seq:1" in str_rep |
| 116 | + assert "sess:2" in str_rep |
| 117 | + assert "opcode:3" in str_rep |
| 118 | + assert "req_opcode:5" in str_rep |
| 119 | + assert "size:4" in str_rep |
| 120 | + assert "bc:False" in str_rep |
| 121 | + assert "ofs:6" in str_rep |
| 122 | + assert "plen=0" in str_rep |
| 123 | + assert "[" not in str_rep # No payload byte representation |
| 124 | + |
| 125 | + def test_items(self) -> None: |
| 126 | + """Test items generator method.""" |
| 127 | + op = FTP_OP(seq=1, session=2, opcode=3, size=4, req_opcode=5, burst_complete=True, offset=6, payload=b"test") |
| 128 | + items = dict(op.items()) |
| 129 | + assert items["seq"] == 1 |
| 130 | + assert items["session"] == 2 |
| 131 | + assert items["opcode"] == 3 |
| 132 | + assert items["size"] == 4 |
| 133 | + assert items["req_opcode"] == 5 |
| 134 | + assert items["burst_complete"] |
| 135 | + assert items["offset"] == 6 |
| 136 | + assert items["payload"] == b"test" |
| 137 | + |
| 138 | + |
| 139 | +class TestWriteQueue(unittest.TestCase): |
| 140 | + """Test cases for WriteQueue class.""" |
| 141 | + |
| 142 | + def test_init(self) -> None: |
| 143 | + """Test initialization of WriteQueue.""" |
| 144 | + queue = WriteQueue(ofs=100, size=1024) |
| 145 | + assert queue.ofs == 100 |
| 146 | + assert queue.size == 1024 |
| 147 | + assert queue.last_send == 0 |
| 148 | + |
| 149 | + def test_attributes_types(self) -> None: |
| 150 | + """Test attribute types of WriteQueue.""" |
| 151 | + queue = WriteQueue(ofs=100, size=1024) |
| 152 | + assert isinstance(queue.ofs, int) |
| 153 | + assert isinstance(queue.size, int) |
| 154 | + assert isinstance(queue.last_send, (int, float)) |
| 155 | + |
| 156 | + def test_attribute_modification(self) -> None: |
| 157 | + """Test modifying WriteQueue attributes.""" |
| 158 | + queue = WriteQueue(ofs=100, size=1024) |
| 159 | + queue.ofs = 200 |
| 160 | + queue.size = 2048 |
| 161 | + queue.last_send = 1.5 |
| 162 | + |
| 163 | + assert queue.ofs == 200 |
| 164 | + assert queue.size == 2048 |
| 165 | + assert queue.last_send == 1.5 |
| 166 | + |
| 167 | + |
| 168 | +class TestParamData(unittest.TestCase): |
| 169 | + """Test cases for ParamData class.""" |
| 170 | + |
| 171 | + def test_init(self) -> None: |
| 172 | + """Test initialization of ParamData.""" |
| 173 | + param_data = ParamData() |
| 174 | + assert not param_data.params |
| 175 | + assert param_data.defaults is None |
| 176 | + |
| 177 | + def test_add_param(self) -> None: |
| 178 | + """Test adding parameters.""" |
| 179 | + param_data = ParamData() |
| 180 | + param_data.add_param(b"TEST_PARAM", 1.5, float) |
| 181 | + assert len(param_data.params) == 1 |
| 182 | + assert param_data.params[0] == (b"TEST_PARAM", 1.5, float) |
| 183 | + |
| 184 | + def test_add_default(self) -> None: |
| 185 | + """Test adding default parameters.""" |
| 186 | + param_data = ParamData() |
| 187 | + param_data.add_default(b"TEST_PARAM", 2.0, float) |
| 188 | + assert len(param_data.defaults) == 1 |
| 189 | + assert param_data.defaults[0] == (b"TEST_PARAM", 2.0, float) |
| 190 | + |
| 191 | + def test_multiple_params(self) -> None: |
| 192 | + """Test adding multiple parameters.""" |
| 193 | + param_data = ParamData() |
| 194 | + param_data.add_param(b"PARAM1", 1.0, float) |
| 195 | + param_data.add_param(b"PARAM2", 2, int) |
| 196 | + param_data.add_default(b"PARAM1", 1.5, float) |
| 197 | + param_data.add_default(b"PARAM2", 3, int) |
| 198 | + |
| 199 | + assert len(param_data.params) == 2 |
| 200 | + assert len(param_data.defaults) == 2 |
| 201 | + assert param_data.params[0] == (b"PARAM1", 1.0, float) |
| 202 | + assert param_data.params[1] == (b"PARAM2", 2, int) |
| 203 | + assert param_data.defaults[0] == (b"PARAM1", 1.5, float) |
| 204 | + assert param_data.defaults[1] == (b"PARAM2", 3, int) |
| 205 | + |
| 206 | + |
| 207 | +class TestMAVFTPSetting(unittest.TestCase): |
| 208 | + """Test cases for MAVFTPSetting class.""" |
| 209 | + |
| 210 | + def test_init(self) -> None: |
| 211 | + """Test initialization of MAVFTPSetting.""" |
| 212 | + setting = MAVFTPSetting("test_setting", int, 42) |
| 213 | + assert setting.name == "test_setting" |
| 214 | + assert setting.type is int |
| 215 | + assert setting.default == 42 |
| 216 | + assert setting.value == 42 |
| 217 | + |
| 218 | + def test_value_modification(self) -> None: |
| 219 | + """Test modifying setting value.""" |
| 220 | + setting = MAVFTPSetting("test_setting", int, 42) |
| 221 | + setting.value = 100 |
| 222 | + assert setting.value == 100 |
| 223 | + assert setting.default == 42 # Default should remain unchanged |
| 224 | + |
| 225 | + |
| 226 | +class TestMAVFTPSettings(unittest.TestCase): |
| 227 | + """Test cases for MAVFTPSettings class.""" |
| 228 | + |
| 229 | + def test_init(self) -> None: |
| 230 | + """Test initialization of MAVFTPSettings.""" |
| 231 | + settings_vars = [("setting1", int, 42), ("setting2", float, 3.14), ("setting3", bool, True)] |
| 232 | + settings = MAVFTPSettings(settings_vars) |
| 233 | + assert settings.setting1 == 42 |
| 234 | + assert settings.setting2 == 3.14 |
| 235 | + assert settings.setting3 is True |
| 236 | + |
| 237 | + def test_append_setting(self) -> None: |
| 238 | + """Test appending new settings.""" |
| 239 | + settings = MAVFTPSettings([]) |
| 240 | + settings.append(("new_setting", int, 100)) |
| 241 | + assert settings.new_setting == 100 |
| 242 | + |
| 243 | + def test_append_setting_object(self) -> None: |
| 244 | + """Test appending MAVFTPSetting object.""" |
| 245 | + settings = MAVFTPSettings([]) |
| 246 | + setting = MAVFTPSetting("test_setting", int, 42) |
| 247 | + settings.append(setting) |
| 248 | + assert settings.test_setting == 42 |
| 249 | + |
| 250 | + def test_modify_setting(self) -> None: |
| 251 | + """Test modifying setting values.""" |
| 252 | + settings = MAVFTPSettings([("test_setting", int, 42)]) |
| 253 | + settings.test_setting = 100 |
| 254 | + assert settings.test_setting == 100 |
| 255 | + |
| 256 | + def test_invalid_setting_access(self) -> None: |
| 257 | + """Test accessing non-existent setting.""" |
| 258 | + settings = MAVFTPSettings([]) |
| 259 | + with pytest.raises(AttributeError): |
| 260 | + _ = settings.nonexistent_setting |
| 261 | + |
| 262 | + def test_invalid_setting_modification(self) -> None: |
| 263 | + """Test modifying non-existent setting.""" |
| 264 | + settings = MAVFTPSettings([]) |
| 265 | + with pytest.raises(AttributeError): |
| 266 | + settings.nonexistent_setting = 42 |
| 267 | + |
| 268 | + |
| 269 | +class TestMAVFTPReturn(unittest.TestCase): |
| 270 | + """Test cases for MAVFTPReturn class.""" |
| 271 | + |
| 272 | + def test_init(self) -> None: |
| 273 | + """Test initialization with different parameters.""" |
| 274 | + ret = MAVFTPReturn("TestOp", ERR_None) |
| 275 | + assert ret.operation_name == "TestOp" |
| 276 | + assert ret.error_code == ERR_None |
| 277 | + assert ret.system_error == 0 |
| 278 | + assert ret.invalid_error_code == 0 |
| 279 | + assert ret.invalid_opcode == 0 |
| 280 | + assert ret.invalid_payload_size == 0 |
| 281 | + |
| 282 | + # Test with all parameters |
| 283 | + ret = MAVFTPReturn("TestOp", ERR_Fail, 1, 2, 3, 4) |
| 284 | + assert ret.operation_name == "TestOp" |
| 285 | + assert ret.error_code == ERR_Fail |
| 286 | + assert ret.system_error == 1 |
| 287 | + assert ret.invalid_error_code == 2 |
| 288 | + assert ret.invalid_opcode == 3 |
| 289 | + assert ret.invalid_payload_size == 4 |
| 290 | + |
| 291 | + def test_return_code(self) -> None: |
| 292 | + """Test return_code property.""" |
| 293 | + ret = MAVFTPReturn("TestOp", ERR_None) |
| 294 | + assert ret.return_code == ERR_None |
| 295 | + |
| 296 | + ret = MAVFTPReturn("TestOp", ERR_Fail) |
| 297 | + assert ret.return_code == ERR_Fail |
| 298 | + |
| 299 | + def test_display_message_success(self) -> None: |
| 300 | + """Test display_message for successful operations.""" |
| 301 | + ret = MAVFTPReturn("TestOp", ERR_None) |
| 302 | + with self.assertLogs(level="INFO") as cm: |
| 303 | + ret.display_message() |
| 304 | + assert "TestOp succeeded" in cm.output[0] |
| 305 | + |
| 306 | + def test_display_message_errors(self) -> None: |
| 307 | + """Test display_message for various error conditions.""" |
| 308 | + error_test_cases = [ |
| 309 | + # ERROR level messages |
| 310 | + (ERR_Fail, "TestOp failed, generic error", "ERROR"), |
| 311 | + (ERR_FailErrno, "TestOp failed, system error 42", "ERROR"), |
| 312 | + (ERR_InvalidDataSize, "TestOp failed, invalid data size", "ERROR"), |
| 313 | + (ERR_InvalidSession, "TestOp failed, session is not currently open", "ERROR"), |
| 314 | + (ERR_NoSessionsAvailable, "TestOp failed, no sessions available", "ERROR"), |
| 315 | + (ERR_EndOfFile, "TestOp failed, offset past end of file", "ERROR"), |
| 316 | + (ERR_UnknownCommand, "TestOp failed, unknown command", "ERROR"), |
| 317 | + (ERR_NoErrorCodeInPayload, "TestOp failed, payload contains no error code", "ERROR"), |
| 318 | + (ERR_NoErrorCodeInNack, "TestOp failed, no error code", "ERROR"), |
| 319 | + (ERR_NoFilesystemErrorInPayload, "TestOp failed, file-system error missing in payload", "ERROR"), |
| 320 | + (ERR_InvalidErrorCode, "TestOp failed, invalid error code 42", "ERROR"), |
| 321 | + (ERR_PayloadTooLarge, "TestOp failed, payload is too long 42", "ERROR"), |
| 322 | + (ERR_InvalidOpcode, "TestOp failed, invalid opcode 42", "ERROR"), |
| 323 | + (ERR_InvalidArguments, "TestOp failed, invalid arguments", "ERROR"), |
| 324 | + (ERR_PutAlreadyInProgress, "TestOp failed, put already in progress", "ERROR"), |
| 325 | + (ERR_FailToOpenLocalFile, "TestOp failed, failed to open local file", "ERROR"), |
| 326 | + (ERR_RemoteReplyTimeout, "TestOp failed, remote reply timeout", "ERROR"), |
| 327 | + # WARNING level messages |
| 328 | + (ERR_FileExists, "TestOp failed, file/directory already exists", "WARNING"), |
| 329 | + (ERR_FileProtected, "TestOp failed, file/directory is protected", "WARNING"), |
| 330 | + (ERR_FileNotFound, "TestOp failed, file/directory not found", "WARNING"), |
| 331 | + ] |
| 332 | + |
| 333 | + for error_code, expected_message, level in error_test_cases: |
| 334 | + ret = MAVFTPReturn( |
| 335 | + "TestOp", error_code, system_error=42, invalid_error_code=42, invalid_opcode=42, invalid_payload_size=42 |
| 336 | + ) |
| 337 | + with self.assertLogs(level=level) as cm: |
| 338 | + ret.display_message() |
| 339 | + assert expected_message in cm.output[0] |
| 340 | + |
| 341 | + def test_display_message_unknown_error(self) -> None: |
| 342 | + """Test display_message for unknown error code.""" |
| 343 | + ret = MAVFTPReturn("TestOp", 999) # Unknown error code |
| 344 | + with self.assertLogs(level="ERROR") as cm: |
| 345 | + ret.display_message() |
| 346 | + assert "TestOp failed, unknown error 999 in display_message()" in cm.output[0] |
| 347 | + |
| 348 | + |
| 349 | +if __name__ == "__main__": |
| 350 | + unittest.main() |
0 commit comments