@@ -65,13 +65,28 @@ def write_credential(self, sectag, cred_type, cred_text):
65
65
for c in range (chunks ):
66
66
chunk = encoded [c * TLS_CRED_CHUNK_SIZE : (c + 1 ) * TLS_CRED_CHUNK_SIZE ]
67
67
self .write_raw (f"cred buf { chunk } " )
68
- self .serial_wait_for_response ("Stored" )
68
+ result , output = self .serial_wait_for_response ("Stored" , "RX ring buffer full" )
69
+ if not result :
70
+ logging .error ("Failed to store chunk in the device: unknown error" )
71
+ if output and b"RX ring buffer full" in output :
72
+ logging .error (f"Failed to store chunk in the device: { output } " )
73
+ return False
74
+ if not 0 <= cred_type < len (TLS_CRED_TYPES ):
75
+ logger .error (
76
+ f"Invalid credential type: { cred_type } . Range [0, { len (TLS_CRED_TYPES ) - 1 } ]."
77
+ )
78
+ return False
69
79
self .write_raw (f"cred add { sectag } { TLS_CRED_TYPES [cred_type ]} DEFAULT bint" )
70
- result , _ = self .serial_wait_for_response ("Added TLS credential" )
80
+ result , _ = self .serial_wait_for_response ("Added TLS credential" , "already exists" )
71
81
time .sleep (1 )
72
82
return result
73
83
74
84
def delete_credential (self , sectag , cred_type ):
85
+ if not 0 <= cred_type < len (TLS_CRED_TYPES ):
86
+ logger .error (
87
+ f"Invalid credential type: { cred_type } . Range [0, { len (TLS_CRED_TYPES ) - 1 } ]."
88
+ )
89
+ return False
75
90
self .write_raw (f'cred del { sectag } { TLS_CRED_TYPES [cred_type ]} ' )
76
91
result , _ = self .serial_wait_for_response (
77
92
"Deleted TLS credential" , "There is no TLS credential"
@@ -94,25 +109,35 @@ def check_credential_exists(self, sectag, cred_type, get_hash=True):
94
109
return True , None
95
110
96
111
data = output .decode ().split ("," )
97
- hash = data [2 ].strip ()
112
+ logger .debug (f"Cred list output: { data } " )
113
+ if len (data ) < 4 :
114
+ logger .error ("Invalid output format from device, skipping hash check." )
115
+ return False , None
116
+ cred_hash = data [2 ].strip ()
98
117
status_code = data [3 ].strip ()
99
118
100
119
if status_code != "0" :
101
120
logger .warning (f"Error retrieving credential hash: { output .decode ().strip ()} ." )
102
121
logger .warning ("Device might not support credential digests." )
103
122
return True , None
104
123
105
- return True , hash
124
+ return True , cred_hash
106
125
107
126
def calculate_expected_hash (self , cred_text ):
108
- hash = hashlib .sha256 (cred_text .encode ('utf-8' ) + b'\x00 ' )
109
- return base64 .b64encode (hash .digest ()).decode ()
127
+ cred_hash = hashlib .sha256 (cred_text .encode ('utf-8' ) + b'\x00 ' )
128
+ return base64 .b64encode (cred_hash .digest ()).decode ()
110
129
111
130
def check_cred_command (self ):
112
131
logger .info ("Checking for 'cred' command existence..." )
113
132
self .serial_write_line ("cred" )
114
- result , output = self .serial_wait_for_response (timeout = 5 )
115
- if not result or (output and b"command not found" in output ):
133
+ result , output = self .serial_wait_for_response (
134
+ "TLS Credentials Commands" , "command not found" , store = "cred"
135
+ )
136
+ logger .debug (f"Result: { result } , Output: { output } " )
137
+ if not result :
138
+ logger .error ("Device did not respond to 'cred' command." )
139
+ return False
140
+ if output and b"command not found" in output :
116
141
logger .error ("Device does not support 'cred' command." )
117
142
logger .error ("Hint: Add 'CONFIG_TLS_CREDENTIALS_SHELL=y' to your prj.conf file." )
118
143
return False
@@ -294,20 +319,25 @@ def main(in_args):
294
319
logger .info (f'Deleting sectag { args .sectag } ...' )
295
320
cred_if .delete_credential (args .sectag , args .cert_type )
296
321
297
- cred_if .write_credential (args .sectag , args .cert_type , dev_bytes )
322
+ result = cred_if .write_credential (args .sectag , args .cert_type , dev_bytes )
323
+ if not result :
324
+ logger .error (f'Failed to write credential for sectag { args .sectag } , it may already exist' )
325
+ sys .exit (5 )
298
326
logger .info (f'Writing sectag { args .sectag } ...' )
299
- result , hash = cred_if .check_credential_exists (args .sectag , args .cert_type , args .check_hash )
327
+ result , cred_hash = cred_if .check_credential_exists (
328
+ args .sectag , args .cert_type , args .check_hash
329
+ )
300
330
if args .check_hash :
301
331
logger .debug (f'Checking hash for sectag { args .sectag } ...' )
302
332
if not result :
303
333
logger .error (f'Failed to check credential existence for sectag { args .sectag } ' )
304
334
sys .exit (4 )
305
- if hash :
306
- logger .debug (f'Credential hash: { hash } ' )
335
+ if cred_hash :
336
+ logger .debug (f'Credential hash: { cred_hash } ' )
307
337
expected_hash = cred_if .calculate_expected_hash (dev_bytes )
308
- if hash != expected_hash :
338
+ if cred_hash != expected_hash :
309
339
logger .error (
310
- f'Hash mismatch for sectag { args .sectag } . Expected : { expected_hash } , got: { hash } '
340
+ f'Hash mismatch for sectag { args .sectag } . Exp : { expected_hash } , got: { cred_hash } '
311
341
)
312
342
sys .exit (6 )
313
343
logger .info (f'Credential for sectag { args .sectag } written successfully' )
0 commit comments