|
10 | 10 | import subprocess |
11 | 11 | import time |
12 | 12 | from typing import Callable |
| 13 | +from typing import Optional |
13 | 14 |
|
14 | 15 | import pexpect |
15 | 16 | import pytest |
@@ -79,17 +80,19 @@ def start_https_server(ota_image_dir: str, server_ip: str, server_port: int) -> |
79 | 80 |
|
80 | 81 | def start_chunked_server(ota_image_dir: str, server_port: int) -> subprocess.Popen: |
81 | 82 | os.chdir(ota_image_dir) |
82 | | - chunked_server = subprocess.Popen([ |
83 | | - 'openssl', |
84 | | - 's_server', |
85 | | - '-WWW', |
86 | | - '-key', |
87 | | - key_file, |
88 | | - '-cert', |
89 | | - server_file, |
90 | | - '-port', |
91 | | - str(server_port), |
92 | | - ]) |
| 83 | + chunked_server = subprocess.Popen( |
| 84 | + [ |
| 85 | + 'openssl', |
| 86 | + 's_server', |
| 87 | + '-WWW', |
| 88 | + '-key', |
| 89 | + key_file, |
| 90 | + '-cert', |
| 91 | + server_file, |
| 92 | + '-port', |
| 93 | + str(server_port), |
| 94 | + ] |
| 95 | + ) |
93 | 96 | return chunked_server |
94 | 97 |
|
95 | 98 |
|
@@ -129,6 +132,48 @@ def start_redirect_server(ota_image_dir: str, server_ip: str, server_port: int, |
129 | 132 | httpd.serve_forever() |
130 | 133 |
|
131 | 134 |
|
| 135 | +# Function to modify chip revisions in the app header |
| 136 | +def modify_chip_revision( |
| 137 | + app_path: str, min_rev: Optional[int] = None, max_rev: Optional[int] = None, increment_min: bool = False |
| 138 | +) -> None: |
| 139 | + """ |
| 140 | + Modify min_chip_rev_full and max_chip_rev_full in the app header. |
| 141 | +
|
| 142 | + :param app_path: Path to the app binary. |
| 143 | + :param min_rev: Value to set min_chip_rev_full (if provided). |
| 144 | + :param max_rev: Value to set max_chip_rev_full (if provided). |
| 145 | + :param increment_min: If True, increments min_chip_rev_full. |
| 146 | + """ |
| 147 | + |
| 148 | + HEADER_SIZE = 512 |
| 149 | + TARGET_OFFSET_MIN_REV = 0x0F |
| 150 | + TARGET_OFFSET_MAX_REV = 0x11 |
| 151 | + |
| 152 | + if not os.path.exists(app_path): |
| 153 | + raise FileNotFoundError(f"App binary file '{app_path}' not found") |
| 154 | + |
| 155 | + try: |
| 156 | + with open(app_path, 'rb') as f: |
| 157 | + header = bytearray(f.read(HEADER_SIZE)) |
| 158 | + |
| 159 | + # Increment or set min revision value |
| 160 | + if increment_min: |
| 161 | + header[TARGET_OFFSET_MIN_REV] = (header[TARGET_OFFSET_MIN_REV] + 1) & 0xFF |
| 162 | + elif min_rev is not None: |
| 163 | + header[TARGET_OFFSET_MIN_REV] = min_rev & 0xFF |
| 164 | + |
| 165 | + # Set max revision value |
| 166 | + if max_rev is not None: |
| 167 | + header[TARGET_OFFSET_MAX_REV] = max_rev & 0xFF |
| 168 | + |
| 169 | + # Write back the modified header to the binary file |
| 170 | + with open(app_path, 'r+b') as f: |
| 171 | + f.write(header) |
| 172 | + |
| 173 | + except IOError as e: |
| 174 | + raise RuntimeError(f'Failed to modify app header: {e}') |
| 175 | + |
| 176 | + |
132 | 177 | @pytest.mark.ethernet_ota |
133 | 178 | @idf_parametrize('target', ['esp32'], indirect=['target']) |
134 | 179 | def test_examples_protocol_advanced_https_ota_example(dut: Dut) -> None: |
@@ -253,7 +298,8 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(dut: Dut) -> |
253 | 298 | bin_name = 'advanced_https_ota.bin' |
254 | 299 | # Truncated binary file to be generated from original binary file |
255 | 300 | truncated_bin_name = 'truncated.bin' |
256 | | - # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file |
| 301 | + # Size of truncated file to be grnerated. |
| 302 | + # This value can range from 288 bytes (Image header size) to size of original binary file |
257 | 303 | # truncated_bin_size is set to 64000 to reduce consumed by the test case |
258 | 304 | truncated_bin_size = 64000 |
259 | 305 | binary_file = os.path.join(dut.app.binary_path, bin_name) |
@@ -757,7 +803,8 @@ def test_examples_protocol_advanced_https_ota_example_ota_resumption_partial_dow |
757 | 803 | @idf_parametrize('target', ['esp32', 'esp32c3', 'esp32s3'], indirect=['target']) |
758 | 804 | def test_examples_protocol_advanced_https_ota_example_nimble_gatts(dut: Dut) -> None: |
759 | 805 | """ |
760 | | - Run an OTA image update while a BLE GATT Server is running in background. This GATT server will be using NimBLE Host stack. |
| 806 | + Run an OTA image update while a BLE GATT Server is running in background. |
| 807 | + This GATT server will be using NimBLE Host stack. |
761 | 808 | steps: | |
762 | 809 | 1. join AP/Ethernet |
763 | 810 | 2. Run BLE advertise and then GATT server. |
@@ -812,7 +859,8 @@ def test_examples_protocol_advanced_https_ota_example_nimble_gatts(dut: Dut) -> |
812 | 859 | @idf_parametrize('target', ['esp32', 'esp32c3', 'esp32s3'], indirect=['target']) |
813 | 860 | def test_examples_protocol_advanced_https_ota_example_bluedroid_gatts(dut: Dut) -> None: |
814 | 861 | """ |
815 | | - Run an OTA image update while a BLE GATT Server is running in background. This GATT server will be using Bluedroid Host stack. |
| 862 | + Run an OTA image update while a BLE GATT Server is running in background. |
| 863 | + This GATT server will be using Bluedroid Host stack. |
816 | 864 | steps: | |
817 | 865 | 1. join AP/Ethernet |
818 | 866 | 2. Run BLE advertise and then GATT server. |
@@ -907,3 +955,115 @@ def test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin(dut: D |
907 | 955 | pass |
908 | 956 | finally: |
909 | 957 | chunked_server.kill() |
| 958 | + |
| 959 | + |
| 960 | +@pytest.mark.qemu |
| 961 | +@pytest.mark.nightly_run |
| 962 | +@pytest.mark.host_test |
| 963 | +@pytest.mark.parametrize( |
| 964 | + 'qemu_extra_args', |
| 965 | + [ |
| 966 | + f'-drive file={os.path.join(os.path.dirname(__file__), "efuse_esp32c3.bin")},if=none,format=raw,id=efuse ' |
| 967 | + '-global driver=nvram.esp32c3.efuse,property=drive,value=efuse ' |
| 968 | + '-global driver=timer.esp32c3.timg,property=wdt_disable,value=true', |
| 969 | + ], |
| 970 | + indirect=True, |
| 971 | +) |
| 972 | +@idf_parametrize('target', ['esp32c3'], indirect=['target']) |
| 973 | +@pytest.mark.parametrize('config', ['verify_revision'], indirect=True) |
| 974 | +def test_examples_protocol_advanced_https_ota_example_verify_min_chip_revision(dut: Dut) -> None: |
| 975 | + """ |
| 976 | + This is a QEMU test case that verifies the chip revision value in the application header. |
| 977 | + steps: | |
| 978 | + 1. join AP/Ethernet |
| 979 | + 2. Fetch OTA image over HTTPS |
| 980 | + 3. Reboot with the new OTA image |
| 981 | + """ |
| 982 | + |
| 983 | + # Update the min full revision field in the app header |
| 984 | + app_path = os.path.join(dut.app.binary_path, 'advanced_https_ota.bin') |
| 985 | + # Increment min_chip_rev_full |
| 986 | + modify_chip_revision(app_path, increment_min=True) |
| 987 | + |
| 988 | + server_port = 8001 |
| 989 | + bin_name = 'advanced_https_ota.bin' |
| 990 | + # Start server |
| 991 | + thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) |
| 992 | + thread1.daemon = True |
| 993 | + thread1.start() |
| 994 | + try: |
| 995 | + # start test |
| 996 | + dut.expect('Loaded app from partition at offset', timeout=30) |
| 997 | + |
| 998 | + try: |
| 999 | + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() |
| 1000 | + print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) |
| 1001 | + except pexpect.exceptions.TIMEOUT: |
| 1002 | + raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') |
| 1003 | + |
| 1004 | + dut.expect('Starting Advanced OTA example', timeout=30) |
| 1005 | + host_ip = get_host_ip4_by_dest_ip(ip_address) |
| 1006 | + |
| 1007 | + print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)) |
| 1008 | + dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name) |
| 1009 | + dut.expect('Starting OTA...', timeout=60) |
| 1010 | + dut.expect('chip revision check failed.', timeout=150) |
| 1011 | + |
| 1012 | + finally: |
| 1013 | + thread1.terminate() |
| 1014 | + |
| 1015 | + |
| 1016 | +@pytest.mark.qemu |
| 1017 | +@pytest.mark.nightly_run |
| 1018 | +@pytest.mark.host_test |
| 1019 | +@pytest.mark.parametrize( |
| 1020 | + 'qemu_extra_args', |
| 1021 | + [ |
| 1022 | + f'-drive file={os.path.join(os.path.dirname(__file__), "efuse_esp32c3.bin")},if=none,format=raw,id=efuse ' |
| 1023 | + '-global driver=nvram.esp32c3.efuse,property=drive,value=efuse ' |
| 1024 | + '-global driver=timer.esp32c3.timg,property=wdt_disable,value=true', |
| 1025 | + ], |
| 1026 | + indirect=True, |
| 1027 | +) |
| 1028 | +@idf_parametrize('target', ['esp32c3'], indirect=['target']) |
| 1029 | +@pytest.mark.parametrize('config', ['verify_revision'], indirect=True) |
| 1030 | +def test_examples_protocol_advanced_https_ota_example_verify_max_chip_revision(dut: Dut) -> None: |
| 1031 | + """ |
| 1032 | + This is a QEMU test case that verifies the chip revision value in the application header. |
| 1033 | + steps: | |
| 1034 | + 1. join AP/Ethernet |
| 1035 | + 2. Fetch OTA image over HTTPS |
| 1036 | + 3. Reboot with the new OTA image |
| 1037 | + """ |
| 1038 | + |
| 1039 | + # Update the min full revision field in the app header |
| 1040 | + app_path = os.path.join(dut.app.binary_path, 'advanced_https_ota.bin') |
| 1041 | + # Set min_chip_rev_full to 0.0 and max_chip_rev_full to 0.2 |
| 1042 | + modify_chip_revision(app_path, min_rev=0x00, max_rev=0x02) |
| 1043 | + |
| 1044 | + server_port = 8001 |
| 1045 | + bin_name = 'advanced_https_ota.bin' |
| 1046 | + # Start server |
| 1047 | + thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) |
| 1048 | + thread1.daemon = True |
| 1049 | + thread1.start() |
| 1050 | + try: |
| 1051 | + # start test |
| 1052 | + dut.expect('Loaded app from partition at offset', timeout=30) |
| 1053 | + |
| 1054 | + try: |
| 1055 | + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() |
| 1056 | + print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) |
| 1057 | + except pexpect.exceptions.TIMEOUT: |
| 1058 | + raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') |
| 1059 | + |
| 1060 | + dut.expect('Starting Advanced OTA example', timeout=30) |
| 1061 | + host_ip = get_host_ip4_by_dest_ip(ip_address) |
| 1062 | + |
| 1063 | + print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)) |
| 1064 | + dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name) |
| 1065 | + dut.expect('Starting OTA...', timeout=60) |
| 1066 | + dut.expect('chip revision check failed.', timeout=150) |
| 1067 | + |
| 1068 | + finally: |
| 1069 | + thread1.terminate() |
0 commit comments