|
1 | | -# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD |
| 1 | +# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD |
2 | 2 | # SPDX-License-Identifier: Unlicense OR CC0-1.0 |
3 | 3 | import http.server |
4 | 4 | import multiprocessing |
|
18 | 18 | from pytest_embedded import Dut |
19 | 19 | from RangeHTTPServer import RangeRequestHandler |
20 | 20 |
|
| 21 | +NVS_PARTITION = 'nvs' |
| 22 | + |
21 | 23 | server_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_certs/server_cert.pem') |
22 | 24 | key_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_certs/server_key.pem') |
23 | 25 |
|
24 | 26 |
|
| 27 | +def restart_device_with_random_delay(dut: Dut, min_delay: int = 10, max_delay: int = 30) -> None: |
| 28 | + """ |
| 29 | + Restarts the device after a random delay. |
| 30 | +
|
| 31 | + Parameters: |
| 32 | + - dut: The device under test (DUT) instance. |
| 33 | + - min_delay: Minimum delay in seconds before restarting. |
| 34 | + - max_delay: Maximum delay in seconds before restarting. |
| 35 | + """ |
| 36 | + delay = random.randint(min_delay, max_delay) |
| 37 | + print(f'Waiting for {delay} seconds before restarting the device...') |
| 38 | + time.sleep(delay) |
| 39 | + dut.serial.hard_reset() # Restart the ESP32 device |
| 40 | + print('Device restarted after random delay.') |
| 41 | + |
| 42 | + |
25 | 43 | def https_request_handler() -> Callable[...,http.server.BaseHTTPRequestHandler]: |
26 | 44 | """ |
27 | 45 | Returns a request handler class that handles broken pipe exception |
@@ -134,6 +152,89 @@ def test_examples_protocol_advanced_https_ota_example(dut: Dut) -> None: |
134 | 152 | thread1.terminate() |
135 | 153 |
|
136 | 154 |
|
| 155 | +@pytest.mark.esp32 |
| 156 | +@pytest.mark.wifi_router |
| 157 | +@pytest.mark.parametrize('config', ['ota_resumption'], indirect=True) |
| 158 | +def test_examples_protocol_advanced_https_ota_example_ota_resumption(dut: Dut) -> None: |
| 159 | + """ |
| 160 | + This is a positive test case, which stops the download midway and resumes downloading again. |
| 161 | + steps: | |
| 162 | + 1. join AP/Ethernet |
| 163 | + 2. Fetch OTA image over HTTPS |
| 164 | + 3. Reboot with the new OTA image |
| 165 | + """ |
| 166 | + # Number of iterations to validate OTA |
| 167 | + server_port = 8001 |
| 168 | + bin_name = 'advanced_https_ota.bin' |
| 169 | + |
| 170 | + # Erase NVS partition |
| 171 | + dut.serial.erase_partition(NVS_PARTITION) |
| 172 | + |
| 173 | + # Start server |
| 174 | + thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) |
| 175 | + thread1.daemon = True |
| 176 | + thread1.start() |
| 177 | + try: |
| 178 | + # start test |
| 179 | + dut.expect('Loaded app from partition at offset', timeout=30) |
| 180 | + |
| 181 | + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: |
| 182 | + dut.expect('Please input ssid password:') |
| 183 | + env_name = 'wifi_router' |
| 184 | + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') |
| 185 | + ap_password = get_env_config_variable(env_name, 'ap_password') |
| 186 | + dut.write(f'{ap_ssid} {ap_password}') |
| 187 | + |
| 188 | + try: |
| 189 | + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() |
| 190 | + print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) |
| 191 | + except pexpect.exceptions.TIMEOUT: |
| 192 | + raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') |
| 193 | + |
| 194 | + dut.expect('Starting Advanced OTA example', timeout=30) |
| 195 | + host_ip = get_host_ip4_by_dest_ip(ip_address) |
| 196 | + |
| 197 | + print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)) |
| 198 | + dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name) |
| 199 | + dut.expect('Starting OTA...', timeout=60) |
| 200 | + |
| 201 | + restart_device_with_random_delay(dut, 10, 30) |
| 202 | + thread1.terminate() |
| 203 | + |
| 204 | + # Start server |
| 205 | + thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) |
| 206 | + thread1.daemon = True |
| 207 | + thread1.start() |
| 208 | + |
| 209 | + # Validate that the device restarts correctly |
| 210 | + dut.expect('Loaded app from partition at offset', timeout=180) |
| 211 | + |
| 212 | + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: |
| 213 | + dut.expect('Please input ssid password:') |
| 214 | + env_name = 'wifi_router' |
| 215 | + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') |
| 216 | + ap_password = get_env_config_variable(env_name, 'ap_password') |
| 217 | + dut.write(f'{ap_ssid} {ap_password}') |
| 218 | + |
| 219 | + try: |
| 220 | + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() |
| 221 | + print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) |
| 222 | + except pexpect.exceptions.TIMEOUT: |
| 223 | + raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') |
| 224 | + |
| 225 | + dut.expect('Starting Advanced OTA example', timeout=30) |
| 226 | + host_ip = get_host_ip4_by_dest_ip(ip_address) |
| 227 | + |
| 228 | + print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)) |
| 229 | + dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name) |
| 230 | + dut.expect('Starting OTA...', timeout=60) |
| 231 | + |
| 232 | + dut.expect('upgrade successful. Rebooting ...', timeout=150) |
| 233 | + |
| 234 | + finally: |
| 235 | + thread1.terminate() |
| 236 | + |
| 237 | + |
137 | 238 | @pytest.mark.esp32 |
138 | 239 | @pytest.mark.ethernet_ota |
139 | 240 | def test_examples_protocol_advanced_https_ota_example_truncated_bin(dut: Dut) -> None: |
@@ -544,6 +645,93 @@ def test_examples_protocol_advanced_https_ota_example_partial_request(dut: Dut) |
544 | 645 | thread1.terminate() |
545 | 646 |
|
546 | 647 |
|
| 648 | +@pytest.mark.esp32 |
| 649 | +@pytest.mark.wifi_router |
| 650 | +@pytest.mark.parametrize('config', ['ota_resumption_partial_download',], indirect=True) |
| 651 | +def test_examples_protocol_advanced_https_ota_example_ota_resumption_partial_download_request(dut: Dut) -> None: |
| 652 | + """ |
| 653 | + This is a positive test case, to test OTA workflow with Range HTTP header. |
| 654 | + steps: | |
| 655 | + 1. join AP/Ethernet |
| 656 | + 2. Fetch OTA image over HTTPS |
| 657 | + 3. Reboot with the new OTA image |
| 658 | + """ |
| 659 | + server_port = 8001 |
| 660 | + # Size of partial HTTP request |
| 661 | + request_size = int(dut.app.sdkconfig.get('EXAMPLE_HTTP_REQUEST_SIZE')) |
| 662 | + # File to be downloaded. This file is generated after compilation |
| 663 | + bin_name = 'advanced_https_ota.bin' |
| 664 | + binary_file = os.path.join(dut.app.binary_path, bin_name) |
| 665 | + bin_size = os.path.getsize(binary_file) |
| 666 | + http_requests = int((bin_size / request_size) - 1) |
| 667 | + assert http_requests > 1 |
| 668 | + |
| 669 | + # Erase NVS partition |
| 670 | + dut.serial.erase_partition(NVS_PARTITION) |
| 671 | + |
| 672 | + # Start server |
| 673 | + thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) |
| 674 | + thread1.daemon = True |
| 675 | + thread1.start() |
| 676 | + try: |
| 677 | + # start test |
| 678 | + dut.expect('Loaded app from partition at offset', timeout=30) |
| 679 | + |
| 680 | + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: |
| 681 | + dut.expect('Please input ssid password:') |
| 682 | + env_name = 'wifi_router' |
| 683 | + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') |
| 684 | + ap_password = get_env_config_variable(env_name, 'ap_password') |
| 685 | + dut.write(f'{ap_ssid} {ap_password}') |
| 686 | + |
| 687 | + try: |
| 688 | + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() |
| 689 | + print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) |
| 690 | + except pexpect.exceptions.TIMEOUT: |
| 691 | + raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP') |
| 692 | + host_ip = get_host_ip4_by_dest_ip(ip_address) |
| 693 | + |
| 694 | + dut.expect('Starting Advanced OTA example', timeout=30) |
| 695 | + print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)) |
| 696 | + dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name) |
| 697 | + |
| 698 | + restart_device_with_random_delay(dut, 10, 30) |
| 699 | + thread1.terminate() |
| 700 | + |
| 701 | + # Start server |
| 702 | + thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) |
| 703 | + thread1.daemon = True |
| 704 | + thread1.start() |
| 705 | + |
| 706 | + # Validate that the device restarts correctly |
| 707 | + dut.expect('Loaded app from partition at offset', timeout=180) |
| 708 | + |
| 709 | + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: |
| 710 | + dut.expect('Please input ssid password:') |
| 711 | + env_name = 'wifi_router' |
| 712 | + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') |
| 713 | + ap_password = get_env_config_variable(env_name, 'ap_password') |
| 714 | + dut.write(f'{ap_ssid} {ap_password}') |
| 715 | + |
| 716 | + try: |
| 717 | + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() |
| 718 | + print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) |
| 719 | + except pexpect.exceptions.TIMEOUT: |
| 720 | + raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') |
| 721 | + |
| 722 | + dut.expect('Starting Advanced OTA example', timeout=30) |
| 723 | + host_ip = get_host_ip4_by_dest_ip(ip_address) |
| 724 | + |
| 725 | + print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)) |
| 726 | + dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name) |
| 727 | + dut.expect('Starting OTA...', timeout=60) |
| 728 | + |
| 729 | + dut.expect('upgrade successful. Rebooting ...', timeout=150) |
| 730 | + |
| 731 | + finally: |
| 732 | + thread1.terminate() |
| 733 | + |
| 734 | + |
547 | 735 | @pytest.mark.esp32 |
548 | 736 | @pytest.mark.esp32c3 |
549 | 737 | @pytest.mark.esp32s3 |
|
0 commit comments