66import ssl
77import subprocess
88import sys
9+ from typing import Any
910from typing import Optional
1011from typing import Tuple
1112
7071 'vSXnRLaxQhooWm+IuX9SuBQ=\n ' \
7172 '-----END PRIVATE KEY-----\n '
7273
73- OTA1_ADDRESS = '0x20000'
74- OTA2_ADDRESS = '0x1d0000'
74+ OTA_0_ADDRESS = '0x20000'
75+ OTA_1_ADDRESS = '0x1d0000'
7576
7677
7778def start_https_server (ota_image_dir : str , server_ip : str , server_port : int , server_file : Optional [str ] = None , key_file : Optional [str ] = None ) -> None :
@@ -133,6 +134,20 @@ def calc_all_sha256(dut: Dut) -> Tuple[str, str]:
133134 return str (sha256_bootloader ), str (sha256_app )
134135
135136
137+ def setting_connection (dut : Dut , env_name : Optional [str ] = None ) -> Any :
138+ if env_name is not None and dut .app .sdkconfig .get ('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN' ) is True :
139+ dut .expect ('Please input ssid password:' )
140+ ap_ssid = get_env_config_variable (env_name , 'ap_ssid' )
141+ ap_password = get_env_config_variable (env_name , 'ap_password' )
142+ dut .write (f'{ ap_ssid } { ap_password } ' )
143+ try :
144+ ip_address = dut .expect (r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]' , timeout = 30 )[1 ].decode ()
145+ print (f'Connected to AP/Ethernet with IP: { ip_address } ' )
146+ except pexpect .exceptions .TIMEOUT :
147+ raise ValueError ('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet' )
148+ return get_host_ip4_by_dest_ip (ip_address )
149+
150+
136151@pytest .mark .esp32
137152@pytest .mark .esp32c3
138153@pytest .mark .esp32s3
@@ -151,29 +166,18 @@ def test_examples_protocol_simple_ota_example(dut: Dut) -> None:
151166 thread1 .start ()
152167 try :
153168 # start test
154- dut .expect (f'Loaded app from partition at offset { OTA1_ADDRESS } ' , timeout = 30 )
169+ dut .expect (f'Loaded app from partition at offset { OTA_0_ADDRESS } ' , timeout = 30 )
155170 check_sha256 (sha256_bootloader , str (dut .expect (r'SHA-256 for bootloader:\s+([a-f0-9]){64}' )[0 ]))
156171 check_sha256 (sha256_app , str (dut .expect (r'SHA-256 for current firmware:\s+([a-f0-9]){64}' )[0 ]))
157172 # Parse IP address of STA
158- if dut .app .sdkconfig .get ('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN' ) is True :
159- env_name = 'wifi_high_traffic'
160- dut .expect ('Please input ssid password:' )
161- ap_ssid = get_env_config_variable (env_name , 'ap_ssid' )
162- ap_password = get_env_config_variable (env_name , 'ap_password' )
163- dut .write (f'{ ap_ssid } { ap_password } ' )
164- try :
165- ip_address = dut .expect (r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]' , timeout = 30 )[1 ].decode ()
166- print ('Connected to AP/Ethernet with IP: {}' .format (ip_address ))
167- except pexpect .exceptions .TIMEOUT :
168- raise ValueError ('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet' )
169- host_ip = get_host_ip4_by_dest_ip (ip_address )
170-
173+ env_name = 'wifi_high_traffic' if dut .app .sdkconfig .get ('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN' ) is True else None
174+ host_ip = setting_connection (dut , env_name )
171175 dut .expect ('Starting OTA example task' , timeout = 30 )
172- print ('writing to device: {}' . format ( ' https://' + host_ip + ' :8000/simple_ota.bin') )
173- dut .write ('https://' + host_ip + ' :8000/simple_ota.bin' )
176+ print (f 'writing to device: https://{ host_ip } :8000/simple_ota.bin' )
177+ dut .write (f 'https://{ host_ip } :8000/simple_ota.bin' )
174178 dut .expect ('OTA Succeed, Rebooting...' , timeout = 60 )
175179 # after reboot
176- dut .expect (f'Loaded app from partition at offset { OTA2_ADDRESS } ' , timeout = 30 )
180+ dut .expect (f'Loaded app from partition at offset { OTA_1_ADDRESS } ' , timeout = 30 )
177181 dut .expect ('OTA example app_main start' , timeout = 10 )
178182 finally :
179183 thread1 .terminate ()
@@ -195,20 +199,14 @@ def test_examples_protocol_simple_ota_example_ethernet_with_spiram_config(dut: D
195199 thread1 .start ()
196200 try :
197201 # start test
198- dut .expect (f'Loaded app from partition at offset { OTA1_ADDRESS } ' , timeout = 30 )
199- try :
200- ip_address = dut .expect (r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]' , timeout = 30 )[1 ].decode ()
201- print ('Connected to AP/Ethernet with IP: {}' .format (ip_address ))
202- except pexpect .exceptions .TIMEOUT :
203- raise ValueError ('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet' )
204- host_ip = get_host_ip4_by_dest_ip (ip_address )
205-
202+ dut .expect (f'Loaded app from partition at offset { OTA_0_ADDRESS } ' , timeout = 30 )
203+ host_ip = setting_connection (dut )
206204 dut .expect ('Starting OTA example task' , timeout = 30 )
207- print ('writing to device: {}' . format ( ' https://' + host_ip + ' :8000/simple_ota.bin') )
208- dut .write ('https://' + host_ip + ' :8000/simple_ota.bin' )
205+ print (f 'writing to device: https://{ host_ip } :8000/simple_ota.bin' )
206+ dut .write (f 'https://{ host_ip } :8000/simple_ota.bin' )
209207 dut .expect ('OTA Succeed, Rebooting...' , timeout = 60 )
210208 # after reboot
211- dut .expect (f'Loaded app from partition at offset { OTA2_ADDRESS } ' , timeout = 30 )
209+ dut .expect (f'Loaded app from partition at offset { OTA_1_ADDRESS } ' , timeout = 30 )
212210 dut .expect ('OTA example app_main start' , timeout = 10 )
213211 finally :
214212 thread1 .terminate ()
@@ -236,28 +234,18 @@ def test_examples_protocol_simple_ota_example_with_flash_encryption_wifi(dut: Du
236234 thread1 .daemon = True
237235 thread1 .start ()
238236 try :
239- dut .expect (f'Loaded app from partition at offset { OTA1_ADDRESS } ' , timeout = 30 )
237+ dut .expect (f'Loaded app from partition at offset { OTA_0_ADDRESS } ' , timeout = 30 )
240238 dut .expect ('Flash encryption mode is DEVELOPMENT' , timeout = 10 )
241239 # Parse IP address of STA
242- if dut .app .sdkconfig .get ('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN' ) is True :
243- env_name = 'flash_encryption_wifi_high_traffic'
244- dut .expect ('Please input ssid password:' )
245- ap_ssid = get_env_config_variable (env_name , 'ap_ssid' )
246- ap_password = get_env_config_variable (env_name , 'ap_password' )
247- dut .write (f'{ ap_ssid } { ap_password } ' )
248- try :
249- ip_address = dut .expect (r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]' , timeout = 30 )[1 ].decode ()
250- print ('Connected to AP/Ethernet with IP: {}' .format (ip_address ))
251- except pexpect .exceptions .TIMEOUT :
252- raise ValueError ('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet' )
253- host_ip = get_host_ip4_by_dest_ip (ip_address )
240+ env_name = 'flash_encryption_wifi_high_traffic' if dut .app .sdkconfig .get ('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN' ) is True else None
241+ host_ip = setting_connection (dut , env_name )
254242
255243 dut .expect ('Starting OTA example task' , timeout = 30 )
256- print ('writing to device: {}' . format ( ' https://' + host_ip + ' :8000/simple_ota.bin') )
257- dut .write ('https://' + host_ip + ' :8000/simple_ota.bin' )
244+ print (f 'writing to device: https://{ host_ip } :8000/simple_ota.bin' )
245+ dut .write (f 'https://{ host_ip } :8000/simple_ota.bin' )
258246 dut .expect ('OTA Succeed, Rebooting...' , timeout = 60 )
259247 # after reboot
260- dut .expect (f'Loaded app from partition at offset { OTA2_ADDRESS } ' , timeout = 30 )
248+ dut .expect (f'Loaded app from partition at offset { OTA_1_ADDRESS } ' , timeout = 30 )
261249 dut .expect ('Flash encryption mode is DEVELOPMENT' , timeout = 10 )
262250 dut .expect ('OTA example app_main start' , timeout = 10 )
263251 finally :
@@ -281,24 +269,20 @@ def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_updat
281269 thread1 .start ()
282270 try :
283271 # start test
284- dut .expect (f'Loaded app from partition at offset { OTA1_ADDRESS } ' , timeout = 30 )
272+ dut .expect (f'Loaded app from partition at offset { OTA_0_ADDRESS } ' , timeout = 30 )
285273 check_sha256 (sha256_bootloader , str (dut .expect (r'SHA-256 for bootloader:\s+([a-f0-9]){64}' )[0 ]))
286274 check_sha256 (sha256_app , str (dut .expect (r'SHA-256 for current firmware:\s+([a-f0-9]){64}' )[0 ]))
287- try :
288- ip_address = dut .expect (r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]' , timeout = 30 )[1 ].decode ()
289- print ('Connected to AP/Ethernet with IP: {}' .format (ip_address ))
290- except pexpect .exceptions .TIMEOUT :
291- raise ValueError ('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet' )
292- host_ip = get_host_ip4_by_dest_ip (ip_address )
275+
276+ host_ip = setting_connection (dut )
293277
294278 dut .expect ('Starting OTA example task' , timeout = 30 )
295- print ('writing to device: {}' . format ( ' https://' + host_ip + ' :8000/simple_ota.bin') )
296- dut .write ('https://' + host_ip + ' :8000/simple_ota.bin' )
297- dut .expect (f'Writing to partition subtype 17 at offset { OTA2_ADDRESS } ' , timeout = 20 )
279+ print (f 'writing to device: https://{ host_ip } :8000/simple_ota.bin' )
280+ dut .write (f 'https://{ host_ip } :8000/simple_ota.bin' )
281+ dut .expect (f'Writing to <ota_1> partition at offset { OTA_1_ADDRESS } ' , timeout = 20 )
298282 dut .expect ('Verifying image signature...' , timeout = 60 )
299283 dut .expect ('OTA Succeed, Rebooting...' , timeout = 60 )
300284 # after reboot
301- dut .expect (f'Loaded app from partition at offset { OTA2_ADDRESS } ' , timeout = 20 )
285+ dut .expect (f'Loaded app from partition at offset { OTA_1_ADDRESS } ' , timeout = 20 )
302286 dut .expect ('OTA example app_main start' , timeout = 10 )
303287 finally :
304288 thread1 .terminate ()
@@ -321,27 +305,23 @@ def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_updat
321305 thread1 .start ()
322306 try :
323307 # start test
324- dut .expect (f'Loaded app from partition at offset { OTA1_ADDRESS } ' , timeout = 30 )
308+ dut .expect (f'Loaded app from partition at offset { OTA_0_ADDRESS } ' , timeout = 30 )
325309 check_sha256 (sha256_bootloader , str (dut .expect (r'SHA-256 for bootloader:\s+([a-f0-9]){64}' )[0 ]))
326310 check_sha256 (sha256_app , str (dut .expect (r'SHA-256 for current firmware:\s+([a-f0-9]){64}' )[0 ]))
327- try :
328- ip_address = dut .expect (r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]' , timeout = 30 )[1 ].decode ()
329- print ('Connected to AP/Ethernet with IP: {}' .format (ip_address ))
330- except pexpect .exceptions .TIMEOUT :
331- raise ValueError ('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet' )
332- host_ip = get_host_ip4_by_dest_ip (ip_address )
311+
312+ host_ip = setting_connection (dut )
333313
334314 dut .expect ('Starting OTA example task' , timeout = 30 )
335- print ('writing to device: {}' . format ( ' https://' + host_ip + ' :8000/simple_ota.bin') )
336- dut .write ('https://' + host_ip + ' :8000/simple_ota.bin' )
337- dut .expect (f'Writing to partition subtype 17 at offset { OTA2_ADDRESS } ' , timeout = 20 )
315+ print (f 'writing to device: https://{ host_ip } :8000/simple_ota.bin' )
316+ dut .write (f 'https://{ host_ip } :8000/simple_ota.bin' )
317+ dut .expect (f'Writing to <ota_1> partition at offset { OTA_1_ADDRESS } ' , timeout = 20 )
338318 dut .expect ('Verifying image signature...' , timeout = 60 )
339319 dut .expect ('#0 app key digest == #0 trusted key digest' , timeout = 10 )
340320 dut .expect ('Verifying with RSA-PSS...' , timeout = 10 )
341321 dut .expect ('Signature verified successfully!' , timeout = 10 )
342322 dut .expect ('OTA Succeed, Rebooting...' , timeout = 60 )
343323 # after reboot
344- dut .expect (f'Loaded app from partition at offset { OTA2_ADDRESS } ' , timeout = 20 )
324+ dut .expect (f'Loaded app from partition at offset { OTA_1_ADDRESS } ' , timeout = 20 )
345325 dut .expect ('OTA example app_main start' , timeout = 10 )
346326 finally :
347327 thread1 .terminate ()
@@ -362,42 +342,32 @@ def test_examples_protocol_simple_ota_example_tls1_3(dut: Dut) -> None:
362342 tls1_3_server = start_tls1_3_server (dut .app .binary_path , 8000 )
363343 try :
364344 # start test
365- dut .expect (f'Loaded app from partition at offset { OTA1_ADDRESS } ' , timeout = 30 )
345+ dut .expect (f'Loaded app from partition at offset { OTA_0_ADDRESS } ' , timeout = 30 )
366346 check_sha256 (sha256_bootloader , str (dut .expect (r'SHA-256 for bootloader:\s+([a-f0-9]){64}' )[0 ]))
367347 check_sha256 (sha256_app , str (dut .expect (r'SHA-256 for current firmware:\s+([a-f0-9]){64}' )[0 ]))
368348 # Parse IP address of STA
369- if dut .app .sdkconfig .get ('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN' ) is True :
370- env_name = 'wifi_high_traffic'
371- dut .expect ('Please input ssid password:' )
372- ap_ssid = get_env_config_variable (env_name , 'ap_ssid' )
373- ap_password = get_env_config_variable (env_name , 'ap_password' )
374- dut .write (f'{ ap_ssid } { ap_password } ' )
375- try :
376- ip_address = dut .expect (r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]' , timeout = 30 )[1 ].decode ()
377- print ('Connected to AP/Ethernet with IP: {}' .format (ip_address ))
378- except pexpect .exceptions .TIMEOUT :
379- raise ValueError ('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet' )
380- host_ip = get_host_ip4_by_dest_ip (ip_address )
349+ env_name = 'wifi_high_traffic' if dut .app .sdkconfig .get ('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN' ) is True else None
350+ host_ip = setting_connection (dut , env_name )
381351
382352 dut .expect ('Starting OTA example task' , timeout = 30 )
383- print ('writing to device: {}' . format ( ' https://' + host_ip + ' :8000/simple_ota.bin') )
384- dut .write ('https://' + host_ip + ' :8000/simple_ota.bin' )
353+ print (f 'writing to device: https://{ host_ip } :8000/simple_ota.bin' )
354+ dut .write (f 'https://{ host_ip } :8000/simple_ota.bin' )
385355 dut .expect ('OTA Succeed, Rebooting...' , timeout = 120 )
386356 # after reboot
387- dut .expect (f'Loaded app from partition at offset { OTA2_ADDRESS } ' , timeout = 30 )
357+ dut .expect (f'Loaded app from partition at offset { OTA_1_ADDRESS } ' , timeout = 30 )
388358 dut .expect ('OTA example app_main start' , timeout = 10 )
389359 finally :
390360 tls1_3_server .kill ()
391361
392362
393363if __name__ == '__main__' :
394364 if sys .argv [2 :]: # if two or more arguments provided:
395- # Usage: pytest_simple_ota.py <image_dir> <server_port> [cert_di> ]
365+ # Usage: pytest_simple_ota.py <image_dir> <server_port> [cert_dir ]
396366 this_dir = os .path .dirname (os .path .realpath (__file__ ))
397367 bin_dir = os .path .join (this_dir , sys .argv [1 ])
398368 port = int (sys .argv [2 ])
399369 cert_dir = bin_dir if not sys .argv [3 :] else os .path .join (this_dir , sys .argv [3 ]) # optional argument
400- print ('Starting HTTPS server at "https://:{ }"' . format ( port ) )
370+ print (f 'Starting HTTPS server at "https://0.0.0.0: { port } "' )
401371 start_https_server (bin_dir , '' , port ,
402372 server_file = os .path .join (cert_dir , 'ca_cert.pem' ),
403373 key_file = os .path .join (cert_dir , 'ca_key.pem' ))
0 commit comments