Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a7708da
Added a common encrypt/decrypt module w.r.t. HLD:TACACSPLUS_PASSKEY_E…
nmoray Nov 16, 2023
e6bb593
Added test cases to test security cipher module
nmoray Nov 16, 2023
f77490b
Corrected typo
nmoray Nov 16, 2023
485aed8
Addressed comments and fixed AUT build errors
nmoray Nov 17, 2023
519dc8d
Fixed build issues
nmoray Nov 20, 2023
cb51987
Fixed AUT issues
nmoray Nov 22, 2023
f4745b4
Fixed indentation issue
nmoray Nov 22, 2023
2b08431
Fixed build issues
nmoray Nov 22, 2023
8129f86
Fixed build issues
nmoray Nov 22, 2023
8dfacd3
Added support of mock class in existing AUT
nmoray Nov 22, 2023
c6fd0c9
fixed build issues
nmoray Nov 23, 2023
547b31f
Fixed build issues
nmoray Nov 23, 2023
7a8c120
fixed build issues
nmoray Nov 23, 2023
78d66c7
Fixed build issues
nmoray Nov 23, 2023
6ee2c7e
fixed build issues
nmoray Nov 23, 2023
0fd2ba0
fixed build issues
nmoray Nov 23, 2023
aff2df1
Updated passkey leaf length to work with encrypted string too
nmoray Nov 24, 2023
55c7cc7
Added a delete function for removing the footprint of cipher_pass fil…
nmoray Feb 1, 2024
8268021
Change the access permission of cipher_pass file from 644 to 640 (-rw…
nmoray Feb 1, 2024
2fbec21
updated scripts to mock chmod
nmoray Feb 1, 2024
95a4c10
Removed unwated debugs
nmoray Feb 2, 2024
719053b
renamed class name to master_key_mgr
nmoray Apr 11, 2024
1ab7067
Addressed new comments
nmoray Mar 10, 2025
5a94a62
Fixed semgrep findings
nmoray May 19, 2025
33e88cb
updated tests as per the new changes
nmoray May 19, 2025
f388f71
Removed commented code
nmoray May 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions src/sonic-py-common/sonic_py_common/security_cipher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
'''
A common module for handling the encryption and
decryption of the feature passkey. It also takes
care of storing the secure cipher at root
protected file system
'''

import subprocess
import threading
import syslog
import os
import base64
from swsscommon.swsscommon import ConfigDBConnector

class master_key_mgr:
_instance = None
_lock = threading.Lock()
_initialized = False

def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super(master_key_mgr, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance

def __init__(self):
if not self._initialized:
self._file_path = "/etc/cipher_pass"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/etc/cipher_pass

When you boot into a new version, the /etc/config_db.json will be carried over, but this file will be lost. Then you have only encrypted passkey and lost cipher_pass. How does the new version survive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qiluo-msft As per https://github.com/sonic-net/SONiC/blob/master/doc/ztp/SONiC-config-setup.md#222-config-migration
the backup and restore of cipher_pass file will automatically be taken care, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the link! The file "/etc/cipher_pass" is outside the folder of config migration design, could you check again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yes. Okay then, I will create another PR to add pre-hook and post-hook scripts to backup and restore the cipher_pass file during the up-gradation. Is that okay? Or just change the location to "/etc/sonic"?

self._config_db = ConfigDBConnector()
self._config_db.connect()
# Note: Kept 1st index NA intentionally to map it with the cipher_pass file
# contents. The file has a comment at the 1st row / line
self._feature_list = ["NA", "TACPLUS", "RADIUS", "LDAP"]
if not os.path.exists(self._file_path):
with open(self._file_path, 'w') as file:
file.writelines("#Auto generated file for storing the encryption passwords\n")
for feature in self._feature_list[1:]: # Skip the first "NA" entry
file.write(f"{feature} : \n")
os.chmod(self._file_path, 0o640)
self._initialized = True

# Write cipher_pass file
def __write_passwd_file(self, feature_type, passwd):
if feature_type == 'NA':
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Invalid feature type: {}".format(feature_type))
return

if feature_type in self._feature_list:
try:
with open(self._file_path, 'r') as file:
lines = file.readlines()
# Update the password for given feature
lines[self._feature_list.index(feature_type)] = feature_type + ' : ' + passwd + '\n'

os.chmod(self._file_path, 0o640)
with open(self._file_path, 'w') as file:
file.writelines(lines)
os.chmod(self._file_path, 0o640)
except FileNotFoundError:
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: File {} no found".format(self._file_path))
except PermissionError:
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Read permission denied: {}".format(self._file_path))


# Read cipher pass file and return the feature specifc
# password
def __read_passwd_file(self, feature_type):
passwd = None
if feature_type == 'NA':
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Invalid feature type: {}".format(feature_type))
return passwd

if feature_type in self._feature_list:
try:
os.chmod(self._file_path, 0o644)
with open(self._file_path, "r") as file:
lines = file.readlines()
for line in lines:
if feature_type in line:
passwd = line.split(' : ')[1]
os.chmod(self._file_path, 0o640)
except FileNotFoundError:
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: File {} no found".format(self._file_path))
except PermissionError:
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Read permission denied: {}".format(self._file_path))

return passwd


def encrypt_passkey(self, feature_type, secret: str, passwd: str) -> str:
"""
Encrypts the plaintext using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64)
and returns the result as a hex string.
"""
cmd = [
"openssl", "enc", "-aes-128-cbc", "-salt", "-pbkdf2",
"-pass", f"pass:{passwd}"
]
try:
result = subprocess.run(
cmd,
input=secret.encode(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True
)
encrypted_bytes = result.stdout
b64_encoded = base64.b64encode(encrypted_bytes).decode()
self.__write_passwd_file(feature_type, passwd)
return b64_encoded
except subprocess.CalledProcessError as e:
syslog.syslog(syslog.LOG_ERR, "encrypt_passkey: {} Encryption failed with ERR: {}".format((e)))
return ""


def decrypt_passkey(self, feature_type, b64_encoded: str) -> str:
"""
Decrypts a hex-encoded encrypted string using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64).
Returns the decrypted plaintext.
"""

passwd = self.__read_passwd_file(feature_type).strip()
if passwd is None:
syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Enpty password for {} feature type".format(feature_type))
return ""

try:
encrypted_bytes = base64.b64decode(b64_encoded)

cmd = [
"openssl", "enc", "-aes-128-cbc", "-d", "-salt", "-pbkdf2",
"-pass", f"pass:{passwd}"
]
result = subprocess.run(
cmd,
input=encrypted_bytes,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True
)
return result.stdout.decode().strip()
except subprocess.CalledProcessError as e:
syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Decryption failed with an ERR: {}".format(e.stderr.decode()))
return ""


# Check if the encryption is enabled
def is_key_encrypt_enabled(self, table, entry):
key = 'key_encrypt'
data = self._config_db.get_entry(table, entry)
if data:
if key in data:
return data[key]
return False


def del_cipher_pass(self, feature_type):
"""
Removes only the password for the given feature_type while keeping the file structure intact.
"""
try:
os.chmod(self._file_path, 0o640)
with open(self._file_path, "r") as file:
lines = file.readlines()

updated_lines = []
for line in lines:
if line.strip().startswith(f"{feature_type} :"):
updated_lines.append(f"{feature_type} : \n") # Remove password but keep format
else:
updated_lines.append(line)

with open(self._file_path, 'w') as file:
file.writelines(updated_lines)
os.chmod(self._file_path, 0o640)

syslog.syslog(syslog.LOG_INFO, "del_cipher_pass: Password for {} has been removed".format((feature_type)))

except Exception as e:
syslog.syslog(syslog.LOG_ERR, "del_cipher_pass: {} Exception occurred: {}".format((e)))

12 changes: 12 additions & 0 deletions src/sonic-py-common/tests/mock_swsscommon.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,15 @@ def connect(self, db):

def get(self, db, table, field):
return self.data.get(field, "N/A")


class ConfigDBConnector:
def __init__(self):
self.CONFIG_DB = 'CONFIG_DB'
self.data = {"key_encrypt": "True"}

def connect(self):
pass

def get(self, db, table, field):
return self.data.get(field, "N/A")
73 changes: 73 additions & 0 deletions src/sonic-py-common/tests/test_security_cipher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import sys

if sys.version_info.major == 3:
from unittest import mock
else:
import mock

import pytest
from sonic_py_common.security_cipher import master_key_mgr
from .mock_swsscommon import ConfigDBConnector

# TODO: Remove this if/else block once we no longer support Python 2
if sys.version_info.major == 3:
BUILTINS = "builtins"
else:
BUILTINS = "__builtin__"

DEFAULT_FILE = [
"#Auto generated file for storing the encryption passwords",
"TACPLUS : ",
"RADIUS : ",
"LDAP :"
]

UPDATED_FILE = [
"#Auto generated file for storing the encryption passwords",
"TACPLUS : ",
"RADIUS : TEST2",
"LDAP :"
]


class TestSecurityCipher(object):
def test_passkey_encryption(self):
with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \
mock.patch("os.chmod") as mock_chmod, \
mock.patch("{}.open".format(BUILTINS),mock.mock_open()) as mock_file:
temp = master_key_mgr()

# Use patch to replace the built-in 'open' function with a mock
with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \
mock.patch("os.chmod") as mock_chmod:
mock_fd = mock.MagicMock()
mock_fd.readlines = mock.MagicMock(return_value=DEFAULT_FILE)
mock_file.return_value.__enter__.return_value = mock_fd
encrypt = temp.encrypt_passkey("TACPLUS", "passkey1", "TEST1")
assert encrypt != "passkey1"

def test_passkey_decryption(self):
with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \
mock.patch("os.chmod") as mock_chmod, \
mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file:
temp = master_key_mgr()

# Use patch to replace the built-in 'open' function with a mock
with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \
mock.patch("os.chmod") as mock_chmod:
mock_fd = mock.MagicMock()
mock_fd.readlines = mock.MagicMock(return_value=DEFAULT_FILE)
mock_file.return_value.__enter__.return_value = mock_fd
encrypt = temp.encrypt_passkey("RADIUS", "passkey2", "TEST2")

# Use patch to replace the built-in 'open' function with a mock
#with mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=EXPECTED_PASSWD)) as mock_file:
with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \
mock.patch("os.chmod") as mock_chmod:
mock_fd = mock.MagicMock()
mock_fd.readlines = mock.MagicMock(return_value=UPDATED_FILE)
mock_file.return_value.__enter__.return_value = mock_fd
decrypt = temp.decrypt_passkey("RADIUS", encrypt)
assert decrypt == "passkey2"


9 changes: 7 additions & 2 deletions src/sonic-yang-models/yang-models/sonic-system-tacacs.yang
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ module sonic-system-tacacs {

leaf passkey {
type string {
length "1..65";
length "1..256";
pattern "[^ #,]*" {
error-message 'TACACS shared secret (Valid chars are ASCII printable except SPACE, "#", and ",")';
}
Expand Down Expand Up @@ -131,9 +131,14 @@ module sonic-system-tacacs {
default 5;
}

leaf key_encrypt {
Copy link
Collaborator

@qiluo-msft qiluo-msft May 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mix tabs with spaces, please reformat this "leaf key_encrypt" section.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean, replace the tabs with spaces for the entire leaf section?

Copy link
Contributor Author

@nmoray nmoray May 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qiluo-msft The link that you shared is not accessible.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the link. Yes, I mean to replace the tabs with spaces for the entire leaf section

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do as a part of new PR.

type boolean;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boolean

Please add a default value (false). This is needed for backward compatible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I will raise a new PR to accomodate this change.

description "Indicates if the passkey is encrypted.";
}

leaf passkey {
type string {
length "1..65";
length "1..256";
pattern "[^ #,]*" {
error-message 'TACACS shared secret (Valid chars are ASCII printable except SPACE, "#", and ",")';
}
Expand Down
Loading