Skip to content

Commit 134349f

Browse files
krish2718carlescufi
authored andcommitted
[nrf fromlist] scripts: utils: Add a script to install TLS credentials
This helps install certificates to the TLS credentials store using TLS credentials shell. Upstream PR #: 88553 Signed-off-by: Chaitanya Tata <[email protected]> (cherry picked from commit 87459b4)
1 parent 288fcb5 commit 134349f

File tree

1 file changed

+332
-0
lines changed

1 file changed

+332
-0
lines changed
Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Copyright (c) 2025 Nordic Semiconductor ASA
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
"""
8+
This script is used to install TLS credentials on a device via a serial connection.
9+
It supports both deleting and writing credentials, as well as checking for their existence.
10+
It also verifies the hash of the installed credentials against the expected hash.
11+
12+
This script is based on https://github.com/nRFCloud/utils/, specifically
13+
"command_interface.py" and "device_credentials_installer.py".
14+
"""
15+
16+
import argparse
17+
import base64
18+
import hashlib
19+
import logging
20+
import math
21+
import os
22+
import sys
23+
import time
24+
25+
import serial
26+
27+
# Configure logging
28+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
29+
logger = logging.getLogger(__name__)
30+
31+
CMD_TERM_DICT = {'NULL': '\0', 'CR': '\r', 'LF': '\n', 'CRLF': '\r\n'}
32+
# 'CR' is the default termination value for the at_host library in the nRF Connect SDK
33+
cmd_term_key = 'CR'
34+
35+
TLS_CRED_TYPES = ["CA", "SERV", "PK"]
36+
TLS_CRED_CHUNK_SIZE = 48
37+
serial_timeout = 1
38+
ser = None
39+
40+
41+
class TLSCredShellInterface:
42+
def __init__(self, serial_write_line, serial_wait_for_response, verbose):
43+
self.serial_write_line = serial_write_line
44+
self.serial_wait_for_response = serial_wait_for_response
45+
self.verbose = verbose
46+
47+
def write_raw(self, command):
48+
if self.verbose:
49+
logger.debug(f'-> {command}')
50+
self.serial_write_line(command)
51+
52+
def write_credential(self, sectag, cred_type, cred_text):
53+
# Because the Zephyr shell does not support multi-line commands,
54+
# we must base-64 encode our PEM strings and install them as if they were binary.
55+
# Yes, this does mean we are base-64 encoding a string which is already mostly base-64.
56+
# We could alternatively strip the ===== BEGIN/END XXXX ===== header/footer, and then pass
57+
# everything else directly as a binary payload (using BIN mode instead of BINT, since
58+
# MBedTLS uses the NULL terminator to determine if the credential is raw DER, or is a
59+
# PEM string). But this will fail for multi-CA installs, such as CoAP.
60+
61+
# text -> bytes -> base64 bytes -> base64 text
62+
encoded = base64.b64encode(cred_text.encode()).decode()
63+
self.write_raw("cred buf clear")
64+
chunks = math.ceil(len(encoded) / TLS_CRED_CHUNK_SIZE)
65+
for c in range(chunks):
66+
chunk = encoded[c * TLS_CRED_CHUNK_SIZE : (c + 1) * TLS_CRED_CHUNK_SIZE]
67+
self.write_raw(f"cred buf {chunk}")
68+
self.serial_wait_for_response("Stored")
69+
self.write_raw(f"cred add {sectag} {TLS_CRED_TYPES[cred_type]} DEFAULT bint")
70+
result, _ = self.serial_wait_for_response("Added TLS credential")
71+
time.sleep(1)
72+
return result
73+
74+
def delete_credential(self, sectag, cred_type):
75+
self.write_raw(f'cred del {sectag} {TLS_CRED_TYPES[cred_type]}')
76+
result, _ = self.serial_wait_for_response(
77+
"Deleted TLS credential", "There is no TLS credential"
78+
)
79+
time.sleep(2)
80+
return result
81+
82+
def check_credential_exists(self, sectag, cred_type, get_hash=True):
83+
self.write_raw(f'cred list {sectag} {TLS_CRED_TYPES[cred_type]}')
84+
_, output = self.serial_wait_for_response(
85+
"1 credentials found.",
86+
"0 credentials found.",
87+
store=f"{sectag},{TLS_CRED_TYPES[cred_type]}",
88+
)
89+
90+
if not output:
91+
return False, None
92+
93+
if not get_hash:
94+
return True, None
95+
96+
data = output.decode().split(",")
97+
hash = data[2].strip()
98+
status_code = data[3].strip()
99+
100+
if status_code != "0":
101+
logger.warning(f"Error retrieving credential hash: {output.decode().strip()}.")
102+
logger.warning("Device might not support credential digests.")
103+
return True, None
104+
105+
return True, hash
106+
107+
def calculate_expected_hash(self, cred_text):
108+
hash = hashlib.sha256(cred_text.encode('utf-8') + b'\x00')
109+
return base64.b64encode(hash.digest()).decode()
110+
111+
def check_cred_command(self):
112+
logger.info("Checking for 'cred' command existence...")
113+
self.serial_write_line("cred")
114+
result, output = self.serial_wait_for_response(timeout=5, store="cred")
115+
logger.debug(f"Output: {output}")
116+
logger.debug(f"Result: {result}")
117+
# no prompt or promt timedout
118+
if not result:
119+
logger.error("Device did not respond to 'cred' command.")
120+
return False
121+
if output and b"command not found" in output:
122+
logger.error("Device does not support 'cred' command.")
123+
logger.error("Hint: Add 'CONFIG_TLS_CREDENTIALS_SHELL=y' to your prj.conf file.")
124+
return False
125+
logger.info("'cred' command found.")
126+
return True
127+
128+
129+
def write_line(line, hidden=False):
130+
if not hidden:
131+
logger.debug(f'-> {line}')
132+
ser.write(bytes((line + CMD_TERM_DICT[cmd_term_key]).encode('utf-8')))
133+
134+
135+
def wait_for_prompt(val1='uart:~$ ', val2=None, timeout=15, store=None):
136+
found = False
137+
retval = False
138+
output = None
139+
140+
if not ser:
141+
logger.error('Serial interface not initialized')
142+
return False, None
143+
144+
if isinstance(val1, str):
145+
val1 = val1.encode()
146+
147+
if isinstance(val2, str):
148+
val2 = val2.encode()
149+
150+
if isinstance(store, str):
151+
store = store.encode()
152+
153+
ser.flush()
154+
155+
while not found and timeout != 0:
156+
try:
157+
line = ser.readline()
158+
except serial.SerialException as e:
159+
logger.error(f"Error reading from serial interface: {e}")
160+
return False, None
161+
except Exception as e:
162+
logger.error(f"Unexpected error: {e}")
163+
return False, None
164+
165+
if line == b'\r\n':
166+
continue
167+
168+
if line is None or len(line) == 0:
169+
if timeout > 0:
170+
timeout -= serial_timeout
171+
continue
172+
173+
logger.debug(f'<- {line.decode("utf-8", errors="replace")}')
174+
175+
if val1 in line:
176+
found = True
177+
retval = True
178+
elif val2 is not None and val2 in line:
179+
found = True
180+
retval = False
181+
elif store is not None and (store in line or str(store) in str(line)):
182+
output = line
183+
184+
if b'\n' not in line:
185+
logger.debug('')
186+
187+
ser.flush()
188+
if store is not None and output is None:
189+
logger.error(f'String {store} not detected in line {line}')
190+
191+
if timeout == 0:
192+
logger.error('Serial timeout waiting for prompt')
193+
194+
return retval, output
195+
196+
197+
def parse_args(in_args):
198+
parser = argparse.ArgumentParser(
199+
description="Device Credentials Installer",
200+
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
201+
allow_abbrev=False,
202+
)
203+
parser.add_argument(
204+
"-p", "--port", type=str, help="Specify which serial port to open", default="/dev/ttyACM1"
205+
)
206+
parser.add_argument(
207+
"-x",
208+
"--xonxoff",
209+
help="Enable software flow control for serial connection",
210+
action='store_true',
211+
default=False,
212+
)
213+
parser.add_argument(
214+
"-r",
215+
"--rtscts-off",
216+
help="Disable hardware (RTS/CTS) flow control for serial connection",
217+
action='store_true',
218+
default=False,
219+
)
220+
parser.add_argument(
221+
"-f",
222+
"--dsrdtr",
223+
help="Enable hardware (DSR/DTR) flow control for serial connection",
224+
action='store_true',
225+
default=False,
226+
)
227+
parser.add_argument(
228+
"-d", "--delete", help="Delete sectag from device first", action='store_true', default=False
229+
)
230+
parser.add_argument(
231+
"-l",
232+
"--local-cert-file",
233+
type=str,
234+
help="Filepath to a local certificate (PEM) to use for the device",
235+
required=True,
236+
)
237+
parser.add_argument(
238+
"-t", "--cert-type", type=int, help="Certificate type to use for the device", default=1
239+
)
240+
parser.add_argument(
241+
"-S", "--sectag", type=int, help="integer: Security tag to use", default=16842753
242+
)
243+
parser.add_argument(
244+
"-H",
245+
"--check-hash",
246+
help="Check hash of the credential after writing",
247+
action='store_true',
248+
default=False,
249+
)
250+
251+
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output")
252+
args = parser.parse_args(in_args)
253+
return args
254+
255+
256+
def main(in_args):
257+
global ser
258+
259+
args = parse_args(in_args)
260+
261+
if args.verbose:
262+
logger.setLevel(logging.DEBUG)
263+
264+
if not os.path.isfile(args.local_cert_file):
265+
logger.error(f'Local certificate file {args.local_cert_file} does not exist')
266+
sys.exit(3)
267+
268+
logger.info(f'Opening port {args.port}')
269+
try:
270+
try:
271+
ser = serial.Serial(
272+
args.port,
273+
115200,
274+
xonxoff=args.xonxoff,
275+
rtscts=(not args.rtscts_off),
276+
dsrdtr=args.dsrdtr,
277+
timeout=serial_timeout,
278+
)
279+
ser.reset_input_buffer()
280+
ser.reset_output_buffer()
281+
except FileNotFoundError:
282+
logger.error(f'Specified port {args.port} does not exist or cannot be accessed')
283+
sys.exit(2)
284+
except serial.SerialException as e:
285+
logger.error(f'Failed to open serial port {args.port}: {e}')
286+
sys.exit(2)
287+
except serial.serialutil.SerialException:
288+
logger.error('Port could not be opened; not a device, or open already')
289+
sys.exit(2)
290+
291+
cred_if = TLSCredShellInterface(write_line, wait_for_prompt, args.verbose)
292+
cmd_exits = cred_if.check_cred_command()
293+
if not cmd_exits:
294+
sys.exit(1)
295+
296+
with open(args.local_cert_file) as f:
297+
dev_bytes = f.read()
298+
299+
if args.delete:
300+
logger.info(f'Deleting sectag {args.sectag}...')
301+
cred_if.delete_credential(args.sectag, args.cert_type)
302+
303+
cred_if.write_credential(args.sectag, args.cert_type, dev_bytes)
304+
logger.info(f'Writing sectag {args.sectag}...')
305+
result, hash = cred_if.check_credential_exists(args.sectag, args.cert_type, args.check_hash)
306+
if args.check_hash:
307+
logger.debug(f'Checking hash for sectag {args.sectag}...')
308+
if not result:
309+
logger.error(f'Failed to check credential existence for sectag {args.sectag}')
310+
sys.exit(4)
311+
if hash:
312+
logger.debug(f'Credential hash: {hash}')
313+
expected_hash = cred_if.calculate_expected_hash(dev_bytes)
314+
if hash != expected_hash:
315+
logger.error(
316+
f'Hash mismatch for sectag {args.sectag}. Expected: {expected_hash}, got: {hash}'
317+
)
318+
sys.exit(6)
319+
logger.info(f'Credential for sectag {args.sectag} written successfully')
320+
sys.exit(0)
321+
322+
323+
def run():
324+
try:
325+
main(sys.argv[1:])
326+
except KeyboardInterrupt:
327+
logger.info("Execution interrupted by user (Ctrl-C). Exiting...")
328+
sys.exit(1)
329+
330+
331+
if __name__ == '__main__':
332+
run()

0 commit comments

Comments
 (0)