diff --git a/src/sonic-py-common/sonic_py_common/security_cipher.py b/src/sonic-py-common/sonic_py_common/security_cipher.py new file mode 100644 index 000000000000..d48d29cf7fd6 --- /dev/null +++ b/src/sonic-py-common/sonic_py_common/security_cipher.py @@ -0,0 +1,184 @@ +''' + +A common module for handling the encryption and +decryption of the feature passkey. It also takes +care of storing the secure cipher at root +protected file system + +''' + +import subprocess +import threading +import syslog +import os +import base64 +from swsscommon.swsscommon import ConfigDBConnector + +class master_key_mgr: + _instance = None + _lock = threading.Lock() + _initialized = False + + def __new__(cls): + with cls._lock: + if cls._instance is None: + cls._instance = super(master_key_mgr, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if not self._initialized: + self._file_path = "/etc/cipher_pass" + self._config_db = ConfigDBConnector() + self._config_db.connect() + # Note: Kept 1st index NA intentionally to map it with the cipher_pass file + # contents. The file has a comment at the 1st row / line + self._feature_list = ["NA", "TACPLUS", "RADIUS", "LDAP"] + if not os.path.exists(self._file_path): + with open(self._file_path, 'w') as file: + file.writelines("#Auto generated file for storing the encryption passwords\n") + for feature in self._feature_list[1:]: # Skip the first "NA" entry + file.write(f"{feature} : \n") + os.chmod(self._file_path, 0o640) + self._initialized = True + + # Write cipher_pass file + def __write_passwd_file(self, feature_type, passwd): + if feature_type == 'NA': + syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Invalid feature type: {}".format(feature_type)) + return + + if feature_type in self._feature_list: + try: + with open(self._file_path, 'r') as file: + lines = file.readlines() + # Update the password for given feature + lines[self._feature_list.index(feature_type)] = feature_type + ' : ' + passwd + '\n' + + os.chmod(self._file_path, 0o640) + with open(self._file_path, 'w') as file: + file.writelines(lines) + os.chmod(self._file_path, 0o640) + except FileNotFoundError: + syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: File {} no found".format(self._file_path)) + except PermissionError: + syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Read permission denied: {}".format(self._file_path)) + + + # Read cipher pass file and return the feature specifc + # password + def __read_passwd_file(self, feature_type): + passwd = None + if feature_type == 'NA': + syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Invalid feature type: {}".format(feature_type)) + return passwd + + if feature_type in self._feature_list: + try: + os.chmod(self._file_path, 0o644) + with open(self._file_path, "r") as file: + lines = file.readlines() + for line in lines: + if feature_type in line: + passwd = line.split(' : ')[1] + os.chmod(self._file_path, 0o640) + except FileNotFoundError: + syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: File {} no found".format(self._file_path)) + except PermissionError: + syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Read permission denied: {}".format(self._file_path)) + + return passwd + + + def encrypt_passkey(self, feature_type, secret: str, passwd: str) -> str: + """ + Encrypts the plaintext using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64) + and returns the result as a hex string. + """ + cmd = [ + "openssl", "enc", "-aes-128-cbc", "-salt", "-pbkdf2", + "-pass", f"pass:{passwd}" + ] + try: + result = subprocess.run( + cmd, + input=secret.encode(), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True + ) + encrypted_bytes = result.stdout + b64_encoded = base64.b64encode(encrypted_bytes).decode() + self.__write_passwd_file(feature_type, passwd) + return b64_encoded + except subprocess.CalledProcessError as e: + syslog.syslog(syslog.LOG_ERR, "encrypt_passkey: {} Encryption failed with ERR: {}".format((e))) + return "" + + + def decrypt_passkey(self, feature_type, b64_encoded: str) -> str: + """ + Decrypts a hex-encoded encrypted string using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64). + Returns the decrypted plaintext. + """ + + passwd = self.__read_passwd_file(feature_type).strip() + if passwd is None: + syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Enpty password for {} feature type".format(feature_type)) + return "" + + try: + encrypted_bytes = base64.b64decode(b64_encoded) + + cmd = [ + "openssl", "enc", "-aes-128-cbc", "-d", "-salt", "-pbkdf2", + "-pass", f"pass:{passwd}" + ] + result = subprocess.run( + cmd, + input=encrypted_bytes, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True + ) + return result.stdout.decode().strip() + except subprocess.CalledProcessError as e: + syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Decryption failed with an ERR: {}".format(e.stderr.decode())) + return "" + + + # Check if the encryption is enabled + def is_key_encrypt_enabled(self, table, entry): + key = 'key_encrypt' + data = self._config_db.get_entry(table, entry) + if data: + if key in data: + return data[key] + return False + + + def del_cipher_pass(self, feature_type): + """ + Removes only the password for the given feature_type while keeping the file structure intact. + """ + try: + os.chmod(self._file_path, 0o640) + with open(self._file_path, "r") as file: + lines = file.readlines() + + updated_lines = [] + for line in lines: + if line.strip().startswith(f"{feature_type} :"): + updated_lines.append(f"{feature_type} : \n") # Remove password but keep format + else: + updated_lines.append(line) + + with open(self._file_path, 'w') as file: + file.writelines(updated_lines) + os.chmod(self._file_path, 0o640) + + syslog.syslog(syslog.LOG_INFO, "del_cipher_pass: Password for {} has been removed".format((feature_type))) + + except Exception as e: + syslog.syslog(syslog.LOG_ERR, "del_cipher_pass: {} Exception occurred: {}".format((e))) + diff --git a/src/sonic-py-common/tests/mock_swsscommon.py b/src/sonic-py-common/tests/mock_swsscommon.py index 8a619fe0e4c8..567a5615503b 100644 --- a/src/sonic-py-common/tests/mock_swsscommon.py +++ b/src/sonic-py-common/tests/mock_swsscommon.py @@ -14,3 +14,15 @@ def connect(self, db): def get(self, db, table, field): return self.data.get(field, "N/A") + + +class ConfigDBConnector: + def __init__(self): + self.CONFIG_DB = 'CONFIG_DB' + self.data = {"key_encrypt": "True"} + + def connect(self): + pass + + def get(self, db, table, field): + return self.data.get(field, "N/A") diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py new file mode 100644 index 000000000000..792667c7eda9 --- /dev/null +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -0,0 +1,72 @@ +import sys + +if sys.version_info.major == 3: + from unittest import mock +else: + import mock + +import pytest +from sonic_py_common.security_cipher import master_key_mgr +from .mock_swsscommon import ConfigDBConnector + +# TODO: Remove this if/else block once we no longer support Python 2 +if sys.version_info.major == 3: + BUILTINS = "builtins" +else: + BUILTINS = "__builtin__" + +DEFAULT_FILE = [ + "#Auto generated file for storing the encryption passwords", + "TACPLUS : ", + "RADIUS : ", + "LDAP :" + ] + +UPDATED_FILE = [ + "#Auto generated file for storing the encryption passwords", + "TACPLUS : ", + "RADIUS : TEST2", + "LDAP :" + ] + + +class TestSecurityCipher(object): + def test_passkey_encryption(self): + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod") as mock_chmod, \ + mock.patch("{}.open".format(BUILTINS),mock.mock_open()) as mock_file: + temp = master_key_mgr() + + # Use patch to replace the built-in 'open' function with a mock + with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \ + mock.patch("os.chmod") as mock_chmod: + mock_fd = mock.MagicMock() + mock_fd.readlines = mock.MagicMock(return_value=DEFAULT_FILE) + mock_file.return_value.__enter__.return_value = mock_fd + encrypt = temp.encrypt_passkey("TACPLUS", "passkey1", "TEST1") + assert encrypt != "passkey1" + + def test_passkey_decryption(self): + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod") as mock_chmod, \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file: + temp = master_key_mgr() + + # Use patch to replace the built-in 'open' function with a mock + with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \ + mock.patch("os.chmod") as mock_chmod: + mock_fd = mock.MagicMock() + mock_fd.readlines = mock.MagicMock(return_value=DEFAULT_FILE) + mock_file.return_value.__enter__.return_value = mock_fd + encrypt = temp.encrypt_passkey("RADIUS", "passkey2", "TEST2") + + # Use patch to replace the built-in 'open' function with a mock + with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \ + mock.patch("os.chmod") as mock_chmod: + mock_fd = mock.MagicMock() + mock_fd.readlines = mock.MagicMock(return_value=UPDATED_FILE) + mock_file.return_value.__enter__.return_value = mock_fd + decrypt = temp.decrypt_passkey("RADIUS", encrypt) + assert decrypt == "passkey2" + + diff --git a/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang b/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang index 6abbcf3523b0..e8a113954ee2 100644 --- a/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang +++ b/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang @@ -93,7 +93,7 @@ module sonic-system-tacacs { leaf passkey { type string { - length "1..65"; + length "1..256"; pattern "[^ #,]*" { error-message 'TACACS shared secret (Valid chars are ASCII printable except SPACE, "#", and ",")'; } @@ -131,9 +131,14 @@ module sonic-system-tacacs { default 5; } + leaf key_encrypt { + type boolean; + description "Indicates if the passkey is encrypted."; + } + leaf passkey { type string { - length "1..65"; + length "1..256"; pattern "[^ #,]*" { error-message 'TACACS shared secret (Valid chars are ASCII printable except SPACE, "#", and ",")'; }