From 67df31ff1e9e082e3b46b4de3c575b0389c1f56f Mon Sep 17 00:00:00 2001 From: nmoray Date: Tue, 27 May 2025 09:35:55 +0000 Subject: [PATCH 01/23] Added support of backup and restore for cipher_pass file --- .../config-setup/01-post-security-cipher | 11 +++++++++++ .../config-setup/01-pre-security-cipher | 13 +++++++++++++ 2 files changed, 24 insertions(+) create mode 100755 files/image_config/config-setup/01-post-security-cipher create mode 100755 files/image_config/config-setup/01-pre-security-cipher diff --git a/files/image_config/config-setup/01-post-security-cipher b/files/image_config/config-setup/01-post-security-cipher new file mode 100755 index 000000000000..a38495f167ce --- /dev/null +++ b/files/image_config/config-setup/01-post-security-cipher @@ -0,0 +1,11 @@ +#!/bin/bash + +set -e + +# Restore cipher_pass file from persistent storage +if [ -f /host/security_cipher/cipher_pass ]; then + cp /host/security_cipher/cipher_pass /etc/cipher_pass + chmod 640 /etc/cipher_pass + echo "Restored /host/security_cipher/cipher_pass to /etc/" +fi + diff --git a/files/image_config/config-setup/01-pre-security-cipher b/files/image_config/config-setup/01-pre-security-cipher new file mode 100755 index 000000000000..1254bfef1547 --- /dev/null +++ b/files/image_config/config-setup/01-pre-security-cipher @@ -0,0 +1,13 @@ +#!/bin/bash + +set -e + +# Ensure old_config directory exists +mkdir -p /host/security_cipher + +# Copy cipher_pass file to persistent storage +if [ -f /etc/cipher_pass ]; then + cp /etc/cipher_pass /host/security_cipher/cipher_pass + echo "Saved /etc/cipher_pass to /host/security_cipher/" +fi + From 1c23a3d2bbc9ace709acd88344b7f4ecd15c02ac Mon Sep 17 00:00:00 2001 From: nmoray Date: Tue, 27 May 2025 09:42:46 +0000 Subject: [PATCH 02/23] Made default value as false for key_encrypt flag and removed uneven tabs --- .../yang-models/sonic-system-tacacs.yang | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang b/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang index e8a113954ee2..635fdf0f4e25 100644 --- a/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang +++ b/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang @@ -131,10 +131,11 @@ module sonic-system-tacacs { default 5; } - leaf key_encrypt { - type boolean; - description "Indicates if the passkey is encrypted."; - } + leaf key_encrypt { + type boolean; + default false; + description "Indicates if the passkey is encrypted."; + } leaf passkey { type string { From 12ef9e557d77c0b7bee2a1b9fc30ae40b0b6169a Mon Sep 17 00:00:00 2001 From: nmoray Date: Tue, 27 May 2025 09:45:36 +0000 Subject: [PATCH 03/23] Updated jinja template to copy pre/post migration scripts --- files/build_templates/sonic_debian_extension.j2 | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/files/build_templates/sonic_debian_extension.j2 b/files/build_templates/sonic_debian_extension.j2 index 39ba42b1a1dd..edadfbde2662 100644 --- a/files/build_templates/sonic_debian_extension.j2 +++ b/files/build_templates/sonic_debian_extension.j2 @@ -642,6 +642,12 @@ j2 files/build_templates/config-setup.service.j2 | sudo tee $FILESYSTEM_ROOT_USR sudo cp $IMAGE_CONFIGS/config-setup/config-setup $FILESYSTEM_ROOT/usr/bin/config-setup sudo mkdir -p $FILESYSTEM_ROOT/etc/config-setup sudo cp $IMAGE_CONFIGS/config-setup/config-setup.conf $FILESYSTEM_ROOT/etc/config-setup/config-setup.conf +sudo mkdir -p $FILESYSTEM_ROOT/etc/config-setup/config-migration-pre-hooks.d +sudo cp $IMAGE_CONFIGS/config-setup/01-pre-security-cipher $FILESYSTEM_ROOT/etc/config-setup/config-migration-pre-hooks.d/01-pre-security-cipher +chmod +x $FILESYSTEM_ROOT/etc/config-setup/config-migration-pre-hooks.d/01-pre-security-cipher +sudo mkdir -p $FILESYSTEM_ROOT/etc/config-setup/config-migration-post-hooks.d +sudo cp $IMAGE_CONFIGS/config-setup/01-post-security-cipher $FILESYSTEM_ROOT/etc/config-setup/config-migration-post-hooks.d/01-post-security-cipher +chmod +x $FILESYSTEM_ROOT/etc/config-setup/config-migration-post-hooks.d/01-post-security-cipher echo "config-setup.service" | sudo tee -a $GENERATED_SERVICE_FILE sudo LANG=C chroot $FILESYSTEM_ROOT systemctl enable config-setup.service From 14696101339526da99798b2c70a8cda1524bd7b0 Mon Sep 17 00:00:00 2001 From: nmoray Date: Tue, 27 May 2025 14:35:29 +0000 Subject: [PATCH 04/23] Added sudo while executing chmod --- files/build_templates/sonic_debian_extension.j2 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/files/build_templates/sonic_debian_extension.j2 b/files/build_templates/sonic_debian_extension.j2 index edadfbde2662..0c5782665aee 100644 --- a/files/build_templates/sonic_debian_extension.j2 +++ b/files/build_templates/sonic_debian_extension.j2 @@ -644,10 +644,10 @@ sudo mkdir -p $FILESYSTEM_ROOT/etc/config-setup sudo cp $IMAGE_CONFIGS/config-setup/config-setup.conf $FILESYSTEM_ROOT/etc/config-setup/config-setup.conf sudo mkdir -p $FILESYSTEM_ROOT/etc/config-setup/config-migration-pre-hooks.d sudo cp $IMAGE_CONFIGS/config-setup/01-pre-security-cipher $FILESYSTEM_ROOT/etc/config-setup/config-migration-pre-hooks.d/01-pre-security-cipher -chmod +x $FILESYSTEM_ROOT/etc/config-setup/config-migration-pre-hooks.d/01-pre-security-cipher +sudo chmod +x $FILESYSTEM_ROOT/etc/config-setup/config-migration-pre-hooks.d/01-pre-security-cipher sudo mkdir -p $FILESYSTEM_ROOT/etc/config-setup/config-migration-post-hooks.d sudo cp $IMAGE_CONFIGS/config-setup/01-post-security-cipher $FILESYSTEM_ROOT/etc/config-setup/config-migration-post-hooks.d/01-post-security-cipher -chmod +x $FILESYSTEM_ROOT/etc/config-setup/config-migration-post-hooks.d/01-post-security-cipher +sudo chmod +x $FILESYSTEM_ROOT/etc/config-setup/config-migration-post-hooks.d/01-post-security-cipher echo "config-setup.service" | sudo tee -a $GENERATED_SERVICE_FILE sudo LANG=C chroot $FILESYSTEM_ROOT systemctl enable config-setup.service From 90104052c5e2e8babccc34cc7c82e799be0a2339 Mon Sep 17 00:00:00 2001 From: nmoray Date: Thu, 19 Jun 2025 13:22:00 +0000 Subject: [PATCH 05/23] Updated the existing design to support password rotate feature in Security Cipher module. Additionally, now multiple modules of same feature type can make use of same password for generating their unique encrypted passkey. For instance, now multiple TACPLUS server can have their own passkey encrypted from the same password. --- .../config-setup/01-post-security-cipher | 6 +- .../config-setup/01-pre-security-cipher | 4 +- .../sonic_py_common/security_cipher.py | 216 ++++++++++-------- .../tests/test_security_cipher.py | 164 +++++++++---- 4 files changed, 246 insertions(+), 144 deletions(-) diff --git a/files/image_config/config-setup/01-post-security-cipher b/files/image_config/config-setup/01-post-security-cipher index a38495f167ce..dbfbb27a1614 100755 --- a/files/image_config/config-setup/01-post-security-cipher +++ b/files/image_config/config-setup/01-post-security-cipher @@ -4,8 +4,8 @@ set -e # Restore cipher_pass file from persistent storage if [ -f /host/security_cipher/cipher_pass ]; then - cp /host/security_cipher/cipher_pass /etc/cipher_pass - chmod 640 /etc/cipher_pass - echo "Restored /host/security_cipher/cipher_pass to /etc/" + cp /host/security_cipher/cipher_pass.json /etc/cipher_pass.json + chmod 640 /etc/cipher_pass.json + echo "Restored /host/security_cipher/cipher_pass.json to /etc/" fi diff --git a/files/image_config/config-setup/01-pre-security-cipher b/files/image_config/config-setup/01-pre-security-cipher index 1254bfef1547..6ff8ad53e832 100755 --- a/files/image_config/config-setup/01-pre-security-cipher +++ b/files/image_config/config-setup/01-pre-security-cipher @@ -7,7 +7,7 @@ mkdir -p /host/security_cipher # Copy cipher_pass file to persistent storage if [ -f /etc/cipher_pass ]; then - cp /etc/cipher_pass /host/security_cipher/cipher_pass - echo "Saved /etc/cipher_pass to /host/security_cipher/" + cp /etc/cipher_pass.json /host/security_cipher/cipher_pass.json + echo "Saved /etc/cipher_pass.json to /host/security_cipher/" fi diff --git a/src/sonic-py-common/sonic_py_common/security_cipher.py b/src/sonic-py-common/sonic_py_common/security_cipher.py index d48d29cf7fd6..44729b4b1a18 100644 --- a/src/sonic-py-common/sonic_py_common/security_cipher.py +++ b/src/sonic-py-common/sonic_py_common/security_cipher.py @@ -2,7 +2,7 @@ A common module for handling the encryption and decryption of the feature passkey. It also takes -care of storing the secure cipher at root +care of storing the secure cipher at root protected file system ''' @@ -12,89 +12,140 @@ import syslog import os import base64 +import json from swsscommon.swsscommon import ConfigDBConnector +CIPHER_PASS_FILE = "/etc/cipher_pass.json" + class master_key_mgr: _instance = None _lock = threading.Lock() _initialized = False - def __new__(cls): + def __new__(cls, callback_lookup=None): with cls._lock: if cls._instance is None: cls._instance = super(master_key_mgr, cls).__new__(cls) cls._instance._initialized = False return cls._instance - def __init__(self): + def __init__(self, callback_lookup=None): if not self._initialized: - self._file_path = "/etc/cipher_pass" + self._file_path = CIPHER_PASS_FILE self._config_db = ConfigDBConnector() self._config_db.connect() - # Note: Kept 1st index NA intentionally to map it with the cipher_pass file - # contents. The file has a comment at the 1st row / line - self._feature_list = ["NA", "TACPLUS", "RADIUS", "LDAP"] - if not os.path.exists(self._file_path): - with open(self._file_path, 'w') as file: - file.writelines("#Auto generated file for storing the encryption passwords\n") - for feature in self._feature_list[1:]: # Skip the first "NA" entry - file.write(f"{feature} : \n") - os.chmod(self._file_path, 0o640) + if callback_lookup is None: + callback_lookup = {} + self.callback_lookup = callback_lookup self._initialized = True - # Write cipher_pass file - def __write_passwd_file(self, feature_type, passwd): - if feature_type == 'NA': - syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Invalid feature type: {}".format(feature_type)) + def _load_registry(self): + if not os.path.exists(CIPHER_PASS_FILE): + return {} + try: + with open(CIPHER_PASS_FILE, 'r') as f: + return json.load(f) + except Exception as e: + syslog.syslog(syslog.LOG_ERR, "del_cipher_passwd: Exception occurred: {}".format(e)) + return {} + + def _save_registry(self, data): + with open(CIPHER_PASS_FILE, 'w') as f: + json.dump(data, f, indent=2) + os.chmod(self._file_path, 0o640) + + def register(self, feature_type, callback): + """ + Register a callback for a feature type. + """ + data = self._load_registry() + if feature_type not in data: + data[feature_type] = {"callbacks": [], "password": None} + cb_name = callback.__name__ + if cb_name not in data[feature_type]["callbacks"]: + data[feature_type]["callbacks"].append(cb_name) + self._save_registry(data) + syslog.syslog(syslog.LOG_INFO, "register: Callback {} attached to feature {}".format(cb_name, feature_type)) + + def deregister(self, feature_type, callback): + """ + Deregister (remove) a callback for a feature type. + If, after removal, there are no more callbacks for that feature, + that means there is no one who is going to use the password thus + remove the respective password too. + """ + data = self._load_registry() + if feature_type in data: + cb_name = callback.__name__ + if cb_name in data[feature_type]["callbacks"]: + data[feature_type]["callbacks"].remove(cb_name) + if not data[feature_type]["callbacks"]: + # No more callbacks left; remove password as well + data[feature_type]["password"] = None + syslog.syslog(syslog.LOG_INFO, "deregister: No more callbacks for feature {}. Password also removed.".format(feature_type)) + self._save_registry(data) + syslog.syslog(syslog.LOG_INFO, "deregister: Callback {} removed from feature {}".format(cb_name, feature_type)) + + else: + syslog.syslog(syslog.LOG_ERR, "deregister: Callback {} not found for feature {}".format(cb_name, feature_type)) + else: + syslog.syslog(syslog.LOG_ERR, "deregister: No callbacks registered for {}".format(feature_type)) + + def set_feature_password(self, feature_type, password): + """ + Set a new password for a feature type. + It will not update if already exist. + """ + data = self._load_registry() + if feature_type not in data: + data[feature_type] = {"callbacks": [], "password": None} + if data[feature_type]["password"] is not None: + syslog.syslog(syslog.LOG_INFO, "set_feature_password: Password already set for feature {}, not updating the new password.".format(feature_type)) return + data[feature_type]["password"] = password + self._save_registry(data) + syslog.syslog(syslog.LOG_INFO, "set_feature_password: Password set for feature {}".format(feature_type)) - if feature_type in self._feature_list: - try: - with open(self._file_path, 'r') as file: - lines = file.readlines() - # Update the password for given feature - lines[self._feature_list.index(feature_type)] = feature_type + ' : ' + passwd + '\n' - - os.chmod(self._file_path, 0o640) - with open(self._file_path, 'w') as file: - file.writelines(lines) - os.chmod(self._file_path, 0o640) - except FileNotFoundError: - syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: File {} no found".format(self._file_path)) - except PermissionError: - syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Read permission denied: {}".format(self._file_path)) - - - # Read cipher pass file and return the feature specifc - # password - def __read_passwd_file(self, feature_type): - passwd = None - if feature_type == 'NA': - syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Invalid feature type: {}".format(feature_type)) - return passwd - - if feature_type in self._feature_list: - try: - os.chmod(self._file_path, 0o644) - with open(self._file_path, "r") as file: - lines = file.readlines() - for line in lines: - if feature_type in line: - passwd = line.split(' : ')[1] - os.chmod(self._file_path, 0o640) - except FileNotFoundError: - syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: File {} no found".format(self._file_path)) - except PermissionError: - syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Read permission denied: {}".format(self._file_path)) - - return passwd - - - def encrypt_passkey(self, feature_type, secret: str, passwd: str) -> str: + def rotate_feature_passwd(self, feature_type, new_password=None): + """ + On each call, read JSON data fresh from disk. Update password if provided, + and call all registered callbacks with the latest password. + """ + data = self._load_registry() + if feature_type not in data: + syslog.syslog(syslog.LOG_ERR, "rotate_feature_passwd: No callbacks registered for {}".format(feature_type)) + return + + if new_password is not None: + data[feature_type]["password"] = new_password + self._save_registry(data) + syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Password for {} updated during rotation.".format(feature_type)) + + password = data[feature_type].get("password") + cb_names = data[feature_type].get("callbacks", []) + callbacks = [self.callback_lookup[name] for name in cb_names if name in self.callback_lookup] + + if not callbacks: + syslog.syslog(syslog.LOG_ERR, "rotate_feature_passwd: No callbacks registered for {}".format(feature_type)) + return + + syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Rotating password for feature {} and notifying callbacks...".format(feature_type)) + for cb in callbacks: + cb(password) + + def encrypt_passkey(self, feature_type, secret: str) -> str: """ Encrypts the plaintext using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64) and returns the result as a hex string. """ + # Retrieve password from cipher_pass registry + data = self._load_registry() + passwd = None + if feature_type in data: + passwd = data[feature_type].get("password") + if not passwd: + raise ValueError(f"No password set for feature {feature_type}") + cmd = [ "openssl", "enc", "-aes-128-cbc", "-salt", "-pbkdf2", "-pass", f"pass:{passwd}" @@ -109,23 +160,23 @@ def encrypt_passkey(self, feature_type, secret: str, passwd: str) -> str: ) encrypted_bytes = result.stdout b64_encoded = base64.b64encode(encrypted_bytes).decode() - self.__write_passwd_file(feature_type, passwd) - return b64_encoded + return b64_encoded except subprocess.CalledProcessError as e: syslog.syslog(syslog.LOG_ERR, "encrypt_passkey: {} Encryption failed with ERR: {}".format((e))) return "" - def decrypt_passkey(self, feature_type, b64_encoded: str) -> str: """ Decrypts a hex-encoded encrypted string using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64). Returns the decrypted plaintext. """ - - passwd = self.__read_passwd_file(feature_type).strip() - if passwd is None: - syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Enpty password for {} feature type".format(feature_type)) - return "" + # Retrieve password from cipher_pass registry + data = self._load_registry() + passwd = None + if feature_type in data: + passwd = data[feature_type].get("password") + if not passwd: + raise ValueError(f"No password set for feature {feature_type}") try: encrypted_bytes = base64.b64decode(b64_encoded) @@ -146,8 +197,7 @@ def decrypt_passkey(self, feature_type, b64_encoded: str) -> str: syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Decryption failed with an ERR: {}".format(e.stderr.decode())) return "" - - # Check if the encryption is enabled + # Check if the encryption is enabled def is_key_encrypt_enabled(self, table, entry): key = 'key_encrypt' data = self._config_db.get_entry(table, entry) @@ -156,29 +206,3 @@ def is_key_encrypt_enabled(self, table, entry): return data[key] return False - - def del_cipher_pass(self, feature_type): - """ - Removes only the password for the given feature_type while keeping the file structure intact. - """ - try: - os.chmod(self._file_path, 0o640) - with open(self._file_path, "r") as file: - lines = file.readlines() - - updated_lines = [] - for line in lines: - if line.strip().startswith(f"{feature_type} :"): - updated_lines.append(f"{feature_type} : \n") # Remove password but keep format - else: - updated_lines.append(line) - - with open(self._file_path, 'w') as file: - file.writelines(updated_lines) - os.chmod(self._file_path, 0o640) - - syslog.syslog(syslog.LOG_INFO, "del_cipher_pass: Password for {} has been removed".format((feature_type))) - - except Exception as e: - syslog.syslog(syslog.LOG_ERR, "del_cipher_pass: {} Exception occurred: {}".format((e))) - diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index 792667c7eda9..4fe49e967759 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -1,12 +1,15 @@ import sys +import json +import base64 + +import pytest if sys.version_info.major == 3: from unittest import mock else: import mock -import pytest -from sonic_py_common.security_cipher import master_key_mgr +from sonic_py_common.security_cipher import master_key_mgr from .mock_swsscommon import ConfigDBConnector # TODO: Remove this if/else block once we no longer support Python 2 @@ -15,58 +18,133 @@ else: BUILTINS = "__builtin__" -DEFAULT_FILE = [ - "#Auto generated file for storing the encryption passwords", - "TACPLUS : ", - "RADIUS : ", - "LDAP :" - ] +DEFAULT_JSON = { + "TACPLUS": {"callbacks": [], "password": None}, + "RADIUS": {"callbacks": [], "password": None}, + "LDAP": {"callbacks": [], "password": None} +} -UPDATED_FILE = [ - "#Auto generated file for storing the encryption passwords", - "TACPLUS : ", - "RADIUS : TEST2", - "LDAP :" - ] +UPDATED_JSON = { + "TACPLUS": {"callbacks": [], "password": None}, + "RADIUS": {"callbacks": [], "password": "TEST2"}, + "LDAP": {"callbacks": [], "password": None} +} +def dummy_cb(password): pass +def cb1(password): pass +def cb2(password): pass class TestSecurityCipher(object): - def test_passkey_encryption(self): + def setup_method(self): + # Reset singleton for isolation + master_key_mgr._instance = None + + def test_set_feature_password_sets_password(self): + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(DEFAULT_JSON))) as mock_file, \ + mock.patch("os.path.exists", return_value=True): + + temp = master_key_mgr() + # Patch _save_registry to check written value + with mock.patch.object(temp, "_save_registry") as mock_save: + temp.set_feature_password("RADIUS", "testpw") + args = mock_save.call_args[0][0] + assert args["RADIUS"]["password"] == "testpw" + + def test_set_feature_password_does_not_overwrite_existing(self): + json_data = UPDATED_JSON.copy() + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ + mock.patch("os.path.exists", return_value=True): + + temp = master_key_mgr() + with mock.patch.object(temp, "_save_registry") as mock_save: + temp.set_feature_password("RADIUS", "should_not_overwrite") + mock_save.assert_not_called() + + def test_rotate_feature_passwd(self): + cb_mock = mock.Mock() + lookup = {"cb1": cb_mock} + json_data = { + "RADIUS": {"callbacks": ["cb1"], "password": "radius_secret"} + } + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ + mock.patch("os.path.exists", return_value=True): + + temp = master_key_mgr(callback_lookup=lookup) + with mock.patch.object(temp, "_save_registry") as mock_save: + temp.rotate_feature_passwd("RADIUS") + cb_mock.assert_called_once_with("radius_secret") + + def test_encrypt_and_decrypt_passkey(self): + # Use a known password and mock openssl subprocess + json_data = { + "RADIUS": {"callbacks": [], "password": "secretpw"} + } with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod") as mock_chmod, \ - mock.patch("{}.open".format(BUILTINS),mock.mock_open()) as mock_file: + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ + mock.patch("os.path.exists", return_value=True): + temp = master_key_mgr() + # Mock subprocess for encryption + fake_cipher = b"\x01\x02\x03" + with mock.patch("subprocess.run") as mock_subproc: + mock_subproc.return_value = mock.Mock(stdout=fake_cipher) + encrypted = temp.encrypt_passkey("RADIUS", "plaintext") + assert base64.b64decode(encrypted) == fake_cipher - # Use patch to replace the built-in 'open' function with a mock - with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \ - mock.patch("os.chmod") as mock_chmod: - mock_fd = mock.MagicMock() - mock_fd.readlines = mock.MagicMock(return_value=DEFAULT_FILE) - mock_file.return_value.__enter__.return_value = mock_fd - encrypt = temp.encrypt_passkey("TACPLUS", "passkey1", "TEST1") - assert encrypt != "passkey1" + # Mock subprocess for decryption + with mock.patch("subprocess.run") as mock_subproc: + mock_subproc.return_value = mock.Mock(stdout=b"plaintext") + decrypted = temp.decrypt_passkey("RADIUS", base64.b64encode(fake_cipher).decode()) + assert decrypted == "plaintext" - def test_passkey_decryption(self): + def test_encrypt_raises_if_no_password(self): with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod") as mock_chmod, \ - mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file: + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(DEFAULT_JSON))), \ + mock.patch("os.path.exists", return_value=True): + temp = master_key_mgr() + with pytest.raises(ValueError): + temp.encrypt_passkey("RADIUS", "plaintext") - # Use patch to replace the built-in 'open' function with a mock - with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \ - mock.patch("os.chmod") as mock_chmod: - mock_fd = mock.MagicMock() - mock_fd.readlines = mock.MagicMock(return_value=DEFAULT_FILE) - mock_file.return_value.__enter__.return_value = mock_fd - encrypt = temp.encrypt_passkey("RADIUS", "passkey2", "TEST2") + def test_is_key_encrypt_enabled(self): + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(DEFAULT_JSON))), \ + mock.patch("os.path.exists", return_value=True): - # Use patch to replace the built-in 'open' function with a mock - with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \ - mock.patch("os.chmod") as mock_chmod: - mock_fd = mock.MagicMock() - mock_fd.readlines = mock.MagicMock(return_value=UPDATED_FILE) - mock_file.return_value.__enter__.return_value = mock_fd - decrypt = temp.decrypt_passkey("RADIUS", encrypt) - assert decrypt == "passkey2" + temp = master_key_mgr() + table = "table" + entry = "entry" + # Patch config_db for return value + temp._config_db.get_entry = mock.Mock(return_value={"key_encrypt": True}) + assert temp.is_key_encrypt_enabled(table, entry) is True + temp._config_db.get_entry = mock.Mock(return_value={"something_else": False}) + assert temp.is_key_encrypt_enabled(table, entry) is False + temp._config_db.get_entry = mock.Mock(return_value=None) + assert temp.is_key_encrypt_enabled(table, entry) is False + def test_rotate_feature_passwd_with_new_password(self): + cb_mock = mock.Mock() + lookup = {"cb1": cb_mock} + json_data = { + "RADIUS": {"callbacks": ["cb1"], "password": "radius_secret"} + } + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ + mock.patch("os.path.exists", return_value=True): + temp = master_key_mgr(callback_lookup=lookup) + with mock.patch.object(temp, "_save_registry") as mock_save: + temp.rotate_feature_passwd("RADIUS", new_password="radius_secret2") + cb_mock.assert_called_once_with("radius_secret2") + args = mock_save.call_args[0][0] + assert args["RADIUS"]["password"] == "radius_secret2" From eb1709fe7e2a026b2b736110e5a9407cb17d2c13 Mon Sep 17 00:00:00 2001 From: nmoray Date: Thu, 19 Jun 2025 15:37:11 +0000 Subject: [PATCH 06/23] Added key_encrypt option under individual TACPLUS server configs --- src/sonic-yang-models/yang-models/sonic-system-tacacs.yang | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang b/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang index 635fdf0f4e25..723f29868dd8 100644 --- a/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang +++ b/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang @@ -91,6 +91,12 @@ module sonic-system-tacacs { description "Authentication type"; } + leaf key_encrypt { + type boolean; + default false; + description "Indicates if the passkey is encrypted."; + } + leaf passkey { type string { length "1..256"; From 6e1b08bee22c38b547041f84c722f8f8fd394867 Mon Sep 17 00:00:00 2001 From: nmoray Date: Mon, 23 Jun 2025 04:49:54 +0000 Subject: [PATCH 07/23] Addressed comments --- src/sonic-py-common/sonic_py_common/security_cipher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/sonic-py-common/sonic_py_common/security_cipher.py b/src/sonic-py-common/sonic_py_common/security_cipher.py index 44729b4b1a18..de7db7acd819 100644 --- a/src/sonic-py-common/sonic_py_common/security_cipher.py +++ b/src/sonic-py-common/sonic_py_common/security_cipher.py @@ -46,7 +46,7 @@ def _load_registry(self): with open(CIPHER_PASS_FILE, 'r') as f: return json.load(f) except Exception as e: - syslog.syslog(syslog.LOG_ERR, "del_cipher_passwd: Exception occurred: {}".format(e)) + syslog.syslog(syslog.LOG_ERR, "_load_registry: Exception occurred: {}".format(e)) return {} def _save_registry(self, data): @@ -101,6 +101,7 @@ def set_feature_password(self, feature_type, password): data[feature_type] = {"callbacks": [], "password": None} if data[feature_type]["password"] is not None: syslog.syslog(syslog.LOG_INFO, "set_feature_password: Password already set for feature {}, not updating the new password.".format(feature_type)) + syslog.syslog(syslog.LOG_INFO, "set_feature_password: Note: Make use of rotate_feature_passwd() method for udpating the existing pass") return data[feature_type]["password"] = password self._save_registry(data) From aaac0c60b168ab12efec1927be11c8b7546c458b Mon Sep 17 00:00:00 2001 From: nmoray Date: Mon, 23 Jun 2025 06:33:21 +0000 Subject: [PATCH 08/23] Updated method description --- src/sonic-py-common/sonic_py_common/security_cipher.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sonic-py-common/sonic_py_common/security_cipher.py b/src/sonic-py-common/sonic_py_common/security_cipher.py index de7db7acd819..f20bb284696e 100644 --- a/src/sonic-py-common/sonic_py_common/security_cipher.py +++ b/src/sonic-py-common/sonic_py_common/security_cipher.py @@ -57,6 +57,7 @@ def _save_registry(self, data): def register(self, feature_type, callback): """ Register a callback for a feature type. + Feature types: TACPLUS, RADIUS, LDAP etc. """ data = self._load_registry() if feature_type not in data: From 9e8aab55cd0d0178e8b7deb65a1a25c0bad1a672 Mon Sep 17 00:00:00 2001 From: nmoray Date: Tue, 24 Jun 2025 11:46:05 +0000 Subject: [PATCH 09/23] Updated rotate method to get info about CONFIG_DB entry in the callbacks --- .../sonic_py_common/security_cipher.py | 5 +- .../tests/test_security_cipher.py | 54 ++++++------------- 2 files changed, 18 insertions(+), 41 deletions(-) diff --git a/src/sonic-py-common/sonic_py_common/security_cipher.py b/src/sonic-py-common/sonic_py_common/security_cipher.py index f20bb284696e..2771af7e7c04 100644 --- a/src/sonic-py-common/sonic_py_common/security_cipher.py +++ b/src/sonic-py-common/sonic_py_common/security_cipher.py @@ -108,7 +108,7 @@ def set_feature_password(self, feature_type, password): self._save_registry(data) syslog.syslog(syslog.LOG_INFO, "set_feature_password: Password set for feature {}".format(feature_type)) - def rotate_feature_passwd(self, feature_type, new_password=None): + def rotate_feature_passwd(self, feature_type, table_info, new_password=None): """ On each call, read JSON data fresh from disk. Update password if provided, and call all registered callbacks with the latest password. @@ -123,7 +123,6 @@ def rotate_feature_passwd(self, feature_type, new_password=None): self._save_registry(data) syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Password for {} updated during rotation.".format(feature_type)) - password = data[feature_type].get("password") cb_names = data[feature_type].get("callbacks", []) callbacks = [self.callback_lookup[name] for name in cb_names if name in self.callback_lookup] @@ -133,7 +132,7 @@ def rotate_feature_passwd(self, feature_type, new_password=None): syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Rotating password for feature {} and notifying callbacks...".format(feature_type)) for cb in callbacks: - cb(password) + cb(table_info) def encrypt_passkey(self, feature_type, secret: str) -> str: """ diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index 4fe49e967759..deb6cf32382c 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -9,8 +9,7 @@ else: import mock -from sonic_py_common.security_cipher import master_key_mgr -from .mock_swsscommon import ConfigDBConnector +from security_cipher import master_key_mgr # TODO: Remove this if/else block once we no longer support Python 2 if sys.version_info.major == 3: @@ -30,9 +29,9 @@ "LDAP": {"callbacks": [], "password": None} } -def dummy_cb(password): pass -def cb1(password): pass -def cb2(password): pass +def dummy_cb(table_info): pass +def cb1(table_info): pass +def cb2(table_info): pass class TestSecurityCipher(object): def setup_method(self): @@ -40,8 +39,7 @@ def setup_method(self): master_key_mgr._instance = None def test_set_feature_password_sets_password(self): - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ + with mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(DEFAULT_JSON))) as mock_file, \ mock.patch("os.path.exists", return_value=True): @@ -54,8 +52,7 @@ def test_set_feature_password_sets_password(self): def test_set_feature_password_does_not_overwrite_existing(self): json_data = UPDATED_JSON.copy() - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ + with mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ mock.patch("os.path.exists", return_value=True): @@ -70,23 +67,22 @@ def test_rotate_feature_passwd(self): json_data = { "RADIUS": {"callbacks": ["cb1"], "password": "radius_secret"} } - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ + with mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ mock.patch("os.path.exists", return_value=True): temp = master_key_mgr(callback_lookup=lookup) with mock.patch.object(temp, "_save_registry") as mock_save: - temp.rotate_feature_passwd("RADIUS") - cb_mock.assert_called_once_with("radius_secret") + table_info = {"foo": "bar"} + temp.rotate_feature_passwd("RADIUS", table_info) + cb_mock.assert_called_once_with(table_info) def test_encrypt_and_decrypt_passkey(self): # Use a known password and mock openssl subprocess json_data = { "RADIUS": {"callbacks": [], "password": "secretpw"} } - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ + with mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ mock.patch("os.path.exists", return_value=True): @@ -105,8 +101,7 @@ def test_encrypt_and_decrypt_passkey(self): assert decrypted == "plaintext" def test_encrypt_raises_if_no_password(self): - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ + with mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(DEFAULT_JSON))), \ mock.patch("os.path.exists", return_value=True): @@ -114,37 +109,20 @@ def test_encrypt_raises_if_no_password(self): with pytest.raises(ValueError): temp.encrypt_passkey("RADIUS", "plaintext") - def test_is_key_encrypt_enabled(self): - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ - mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(DEFAULT_JSON))), \ - mock.patch("os.path.exists", return_value=True): - - temp = master_key_mgr() - table = "table" - entry = "entry" - # Patch config_db for return value - temp._config_db.get_entry = mock.Mock(return_value={"key_encrypt": True}) - assert temp.is_key_encrypt_enabled(table, entry) is True - temp._config_db.get_entry = mock.Mock(return_value={"something_else": False}) - assert temp.is_key_encrypt_enabled(table, entry) is False - temp._config_db.get_entry = mock.Mock(return_value=None) - assert temp.is_key_encrypt_enabled(table, entry) is False - def test_rotate_feature_passwd_with_new_password(self): cb_mock = mock.Mock() lookup = {"cb1": cb_mock} json_data = { "RADIUS": {"callbacks": ["cb1"], "password": "radius_secret"} } - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ + with mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ mock.patch("os.path.exists", return_value=True): temp = master_key_mgr(callback_lookup=lookup) with mock.patch.object(temp, "_save_registry") as mock_save: - temp.rotate_feature_passwd("RADIUS", new_password="radius_secret2") - cb_mock.assert_called_once_with("radius_secret2") + table_info = {"foo": "bar"} + temp.rotate_feature_passwd("RADIUS", table_info, new_password="radius_secret2") + cb_mock.assert_called_once_with(table_info) args = mock_save.call_args[0][0] assert args["RADIUS"]["password"] == "radius_secret2" From 5b7da20febc4c4ae68e37e450d92ba6c1509ae27 Mon Sep 17 00:00:00 2001 From: nmoray Date: Wed, 25 Jun 2025 05:59:22 +0000 Subject: [PATCH 10/23] Fixed build errors --- src/sonic-py-common/tests/test_security_cipher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index deb6cf32382c..b7228098691a 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -9,7 +9,7 @@ else: import mock -from security_cipher import master_key_mgr +from sonic_py_common.security_cipher import master_key_mgr # TODO: Remove this if/else block once we no longer support Python 2 if sys.version_info.major == 3: From 77ae0fb34a64027776587a50213d2f83250b5cdd Mon Sep 17 00:00:00 2001 From: nmoray Date: Wed, 25 Jun 2025 11:37:56 +0000 Subject: [PATCH 11/23] Fixed build issues and added sceret key as an arg to rotate method so that re-encryption is possible in the callbacks --- .../sonic_py_common/security_cipher.py | 10 ++++------ .../tests/test_security_cipher.py | 19 +++++++++++++------ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/sonic-py-common/sonic_py_common/security_cipher.py b/src/sonic-py-common/sonic_py_common/security_cipher.py index 2771af7e7c04..7f5da6ef6975 100644 --- a/src/sonic-py-common/sonic_py_common/security_cipher.py +++ b/src/sonic-py-common/sonic_py_common/security_cipher.py @@ -108,7 +108,7 @@ def set_feature_password(self, feature_type, password): self._save_registry(data) syslog.syslog(syslog.LOG_INFO, "set_feature_password: Password set for feature {}".format(feature_type)) - def rotate_feature_passwd(self, feature_type, table_info, new_password=None): + def rotate_feature_passwd(self, feature_type, table_info, secret, new_password=None): """ On each call, read JSON data fresh from disk. Update password if provided, and call all registered callbacks with the latest password. @@ -132,7 +132,7 @@ def rotate_feature_passwd(self, feature_type, table_info, new_password=None): syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Rotating password for feature {} and notifying callbacks...".format(feature_type)) for cb in callbacks: - cb(table_info) + cb(secret, table_info) def encrypt_passkey(self, feature_type, secret: str) -> str: """ @@ -200,10 +200,8 @@ def decrypt_passkey(self, feature_type, b64_encoded: str) -> str: # Check if the encryption is enabled def is_key_encrypt_enabled(self, table, entry): - key = 'key_encrypt' data = self._config_db.get_entry(table, entry) - if data: - if key in data: - return data[key] + if data and 'key_encrypt' in data: + return data['key_encrypt'].lower() == 'true' return False diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index b7228098691a..0c297824c548 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -10,6 +10,7 @@ import mock from sonic_py_common.security_cipher import master_key_mgr +from .mock_swsscommon import ConfigDBConnector # TODO: Remove this if/else block once we no longer support Python 2 if sys.version_info.major == 3: @@ -39,7 +40,8 @@ def setup_method(self): master_key_mgr._instance = None def test_set_feature_password_sets_password(self): - with mock.patch("os.chmod"), \ + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(DEFAULT_JSON))) as mock_file, \ mock.patch("os.path.exists", return_value=True): @@ -52,7 +54,8 @@ def test_set_feature_password_sets_password(self): def test_set_feature_password_does_not_overwrite_existing(self): json_data = UPDATED_JSON.copy() - with mock.patch("os.chmod"), \ + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ mock.patch("os.path.exists", return_value=True): @@ -67,7 +70,8 @@ def test_rotate_feature_passwd(self): json_data = { "RADIUS": {"callbacks": ["cb1"], "password": "radius_secret"} } - with mock.patch("os.chmod"), \ + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ mock.patch("os.path.exists", return_value=True): @@ -82,7 +86,8 @@ def test_encrypt_and_decrypt_passkey(self): json_data = { "RADIUS": {"callbacks": [], "password": "secretpw"} } - with mock.patch("os.chmod"), \ + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ mock.patch("os.path.exists", return_value=True): @@ -101,7 +106,8 @@ def test_encrypt_and_decrypt_passkey(self): assert decrypted == "plaintext" def test_encrypt_raises_if_no_password(self): - with mock.patch("os.chmod"), \ + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(DEFAULT_JSON))), \ mock.patch("os.path.exists", return_value=True): @@ -115,7 +121,8 @@ def test_rotate_feature_passwd_with_new_password(self): json_data = { "RADIUS": {"callbacks": ["cb1"], "password": "radius_secret"} } - with mock.patch("os.chmod"), \ + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ mock.patch("os.path.exists", return_value=True): From b717c0663e57a885966aa590c9fdeb5c1160f4b3 Mon Sep 17 00:00:00 2001 From: nmoray Date: Thu, 26 Jun 2025 04:38:04 +0000 Subject: [PATCH 12/23] Addressed comments and fixed build issues --- .../config-setup/01-post-security-cipher | 2 +- .../sonic_py_common/security_cipher.py | 4 ++-- .../tests/test_security_cipher.py | 16 +++++++++------- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/files/image_config/config-setup/01-post-security-cipher b/files/image_config/config-setup/01-post-security-cipher index dbfbb27a1614..9f710f042c11 100755 --- a/files/image_config/config-setup/01-post-security-cipher +++ b/files/image_config/config-setup/01-post-security-cipher @@ -6,6 +6,6 @@ set -e if [ -f /host/security_cipher/cipher_pass ]; then cp /host/security_cipher/cipher_pass.json /etc/cipher_pass.json chmod 640 /etc/cipher_pass.json - echo "Restored /host/security_cipher/cipher_pass.json to /etc/" + echo "Restored /host/security_cipher/cipher_pass.json to /etc/" fi diff --git a/src/sonic-py-common/sonic_py_common/security_cipher.py b/src/sonic-py-common/sonic_py_common/security_cipher.py index 7f5da6ef6975..a2f7d982af3c 100644 --- a/src/sonic-py-common/sonic_py_common/security_cipher.py +++ b/src/sonic-py-common/sonic_py_common/security_cipher.py @@ -102,7 +102,7 @@ def set_feature_password(self, feature_type, password): data[feature_type] = {"callbacks": [], "password": None} if data[feature_type]["password"] is not None: syslog.syslog(syslog.LOG_INFO, "set_feature_password: Password already set for feature {}, not updating the new password.".format(feature_type)) - syslog.syslog(syslog.LOG_INFO, "set_feature_password: Note: Make use of rotate_feature_passwd() method for udpating the existing pass") + syslog.syslog(syslog.LOG_INFO, "set_feature_password: Note: Make use of rotate_feature_passwd() method for updating the existing pass") return data[feature_type]["password"] = password self._save_registry(data) @@ -132,7 +132,7 @@ def rotate_feature_passwd(self, feature_type, table_info, secret, new_password=N syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Rotating password for feature {} and notifying callbacks...".format(feature_type)) for cb in callbacks: - cb(secret, table_info) + cb(table_info, secret) def encrypt_passkey(self, feature_type, secret: str) -> str: """ diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index 0c297824c548..43efd9849b9a 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -30,9 +30,9 @@ "LDAP": {"callbacks": [], "password": None} } -def dummy_cb(table_info): pass -def cb1(table_info): pass -def cb2(table_info): pass +def dummy_cb(table_info, secret): pass +def cb1(table_info, secret): pass +def cb2(table_info, secret): pass class TestSecurityCipher(object): def setup_method(self): @@ -78,8 +78,9 @@ def test_rotate_feature_passwd(self): temp = master_key_mgr(callback_lookup=lookup) with mock.patch.object(temp, "_save_registry") as mock_save: table_info = {"foo": "bar"} - temp.rotate_feature_passwd("RADIUS", table_info) - cb_mock.assert_called_once_with(table_info) + secret = "dummysecret" + temp.rotate_feature_passwd("RADIUS", table_info, secret) + cb_mock.assert_called_once_with(able_info, secret) def test_encrypt_and_decrypt_passkey(self): # Use a known password and mock openssl subprocess @@ -129,7 +130,8 @@ def test_rotate_feature_passwd_with_new_password(self): temp = master_key_mgr(callback_lookup=lookup) with mock.patch.object(temp, "_save_registry") as mock_save: table_info = {"foo": "bar"} - temp.rotate_feature_passwd("RADIUS", table_info, new_password="radius_secret2") - cb_mock.assert_called_once_with(table_info) + secret = "dummysecret" + temp.rotate_feature_passwd("RADIUS", table_info, secret, new_password="radius_secret2") + cb_mock.assert_called_once_with(table_info, secret) args = mock_save.call_args[0][0] assert args["RADIUS"]["password"] == "radius_secret2" From df34c80273a2712b94e1dfd6a6d4c86a3ab63d76 Mon Sep 17 00:00:00 2001 From: nmoray Date: Thu, 26 Jun 2025 10:47:10 +0000 Subject: [PATCH 13/23] fixed build issues --- src/sonic-py-common/tests/test_security_cipher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index 43efd9849b9a..f50af41a628d 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -80,7 +80,7 @@ def test_rotate_feature_passwd(self): table_info = {"foo": "bar"} secret = "dummysecret" temp.rotate_feature_passwd("RADIUS", table_info, secret) - cb_mock.assert_called_once_with(able_info, secret) + cb_mock.assert_called_once_with(table_info, secret) def test_encrypt_and_decrypt_passkey(self): # Use a known password and mock openssl subprocess From 5715b8448c16491abdec8134c0530b0fdebe1a92 Mon Sep 17 00:00:00 2001 From: nmoray Date: Fri, 27 Jun 2025 12:55:20 +0000 Subject: [PATCH 14/23] Updated rotate method and replaced callbacks with tableinfo --- .../sonic_py_common/security_cipher.py | 186 ++++++++++-------- .../tests/test_security_cipher.py | 130 +++++++----- 2 files changed, 184 insertions(+), 132 deletions(-) diff --git a/src/sonic-py-common/sonic_py_common/security_cipher.py b/src/sonic-py-common/sonic_py_common/security_cipher.py index a2f7d982af3c..ce46db351f92 100644 --- a/src/sonic-py-common/sonic_py_common/security_cipher.py +++ b/src/sonic-py-common/sonic_py_common/security_cipher.py @@ -22,21 +22,18 @@ class master_key_mgr: _lock = threading.Lock() _initialized = False - def __new__(cls, callback_lookup=None): + def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super(master_key_mgr, cls).__new__(cls) cls._instance._initialized = False return cls._instance - def __init__(self, callback_lookup=None): + def __init__(self): if not self._initialized: self._file_path = CIPHER_PASS_FILE self._config_db = ConfigDBConnector() self._config_db.connect() - if callback_lookup is None: - callback_lookup = {} - self.callback_lookup = callback_lookup self._initialized = True def _load_registry(self): @@ -54,43 +51,87 @@ def _save_registry(self, data): json.dump(data, f, indent=2) os.chmod(self._file_path, 0o640) - def register(self, feature_type, callback): + def _encrypt_passkey(self, feature_type, secret: str, passwd: str) -> str: """ - Register a callback for a feature type. + Encrypts the plaintext using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64) + and returns the result as a hex string. + """ + cmd = [ + "openssl", "enc", "-aes-128-cbc", "-salt", "-pbkdf2", + "-pass", f"pass:{passwd}" + ] + try: + result = subprocess.run( + cmd, + input=secret.encode(), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True + ) + encrypted_bytes = result.stdout + b64_encoded = base64.b64encode(encrypted_bytes).decode() + return b64_encoded + except subprocess.CalledProcessError as e: + syslog.syslog(syslog.LOG_ERR, "_encrypt_passkey: {} Encryption failed with ERR: {}".format(e)) + return "" + + def _decrypt_passkey(self, feature_type, b64_encoded: str, passwd: str) -> str: + """ + Decrypts a hex-encoded encrypted string using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64). + Returns the decrypted plaintext. + """ + try: + encrypted_bytes = base64.b64decode(b64_encoded) + + cmd = [ + "openssl", "enc", "-aes-128-cbc", "-d", "-salt", "-pbkdf2", + "-pass", f"pass:{passwd}" + ] + result = subprocess.run( + cmd, + input=encrypted_bytes, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True + ) + return result.stdout.decode().strip() + except subprocess.CalledProcessError as e: + syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Decryption failed with an ERR: {}".format(e.stderr.decode())) + return "" + + def register(self, feature_type, table_info): + """ + Register a table_info for a feature type. Feature types: TACPLUS, RADIUS, LDAP etc. """ data = self._load_registry() if feature_type not in data: - data[feature_type] = {"callbacks": [], "password": None} - cb_name = callback.__name__ - if cb_name not in data[feature_type]["callbacks"]: - data[feature_type]["callbacks"].append(cb_name) + data[feature_type] = {"table_info": [], "password": None} + if table_info not in data[feature_type]["table_info"]: + data[feature_type]["table_info"].append(table_info) self._save_registry(data) - syslog.syslog(syslog.LOG_INFO, "register: Callback {} attached to feature {}".format(cb_name, feature_type)) + syslog.syslog(syslog.LOG_INFO, "register: table_info {} attached to {} feature".format(table_info, feature_type)) - def deregister(self, feature_type, callback): + def deregister(self, feature_type, table_info): """ - Deregister (remove) a callback for a feature type. - If, after removal, there are no more callbacks for that feature, - that means there is no one who is going to use the password thus - remove the respective password too. + Deregister (remove) a table_info string (like "TACPLUS|global") for a feature type. + If, after removal, there are no more table_info entries for that feature, + remove the respective password as well. """ data = self._load_registry() if feature_type in data: - cb_name = callback.__name__ - if cb_name in data[feature_type]["callbacks"]: - data[feature_type]["callbacks"].remove(cb_name) - if not data[feature_type]["callbacks"]: - # No more callbacks left; remove password as well + if table_info in data[feature_type]["table_info"]: + data[feature_type]["table_info"].remove(table_info) + if not data[feature_type]["table_info"]: + # No more table_info left; remove password as well data[feature_type]["password"] = None - syslog.syslog(syslog.LOG_INFO, "deregister: No more callbacks for feature {}. Password also removed.".format(feature_type)) + syslog.syslog(syslog.LOG_INFO, "deregister: No more table_info for feature {}. Password also removed.".format(feature_type)) self._save_registry(data) - syslog.syslog(syslog.LOG_INFO, "deregister: Callback {} removed from feature {}".format(cb_name, feature_type)) - + syslog.syslog(syslog.LOG_INFO, "deregister: table_info {} removed from feature {}".format(table_info, feature_type)) else: - syslog.syslog(syslog.LOG_ERR, "deregister: Callback {} not found for feature {}".format(cb_name, feature_type)) + syslog.syslog(syslog.LOG_ERR, "deregister: table_info {} not found for feature {}".format(table_info, feature_type)) else: - syslog.syslog(syslog.LOG_ERR, "deregister: No callbacks registered for {}".format(feature_type)) + syslog.syslog(syslog.LOG_ERR, "deregister: No table_info registered for {}".format(feature_type)) def set_feature_password(self, feature_type, password): """ @@ -99,7 +140,7 @@ def set_feature_password(self, feature_type, password): """ data = self._load_registry() if feature_type not in data: - data[feature_type] = {"callbacks": [], "password": None} + data[feature_type] = {"table_info": [], "password": None} if data[feature_type]["password"] is not None: syslog.syslog(syslog.LOG_INFO, "set_feature_password: Password already set for feature {}, not updating the new password.".format(feature_type)) syslog.syslog(syslog.LOG_INFO, "set_feature_password: Note: Make use of rotate_feature_passwd() method for updating the existing pass") @@ -108,36 +149,43 @@ def set_feature_password(self, feature_type, password): self._save_registry(data) syslog.syslog(syslog.LOG_INFO, "set_feature_password: Password set for feature {}".format(feature_type)) - def rotate_feature_passwd(self, feature_type, table_info, secret, new_password=None): + def rotate_feature_passwd(self, feature_type, new_password): """ - On each call, read JSON data fresh from disk. Update password if provided, - and call all registered callbacks with the latest password. + For each registered table_info, extract encrypted passkey, decrypt, re-encrypt with new password, and update. """ data = self._load_registry() if feature_type not in data: - syslog.syslog(syslog.LOG_ERR, "rotate_feature_passwd: No callbacks registered for {}".format(feature_type)) + syslog.syslog(syslog.LOG_ERR, "No table_info registered for {} Feature".format(feature_type)) return - if new_password is not None: - data[feature_type]["password"] = new_password - self._save_registry(data) - syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Password for {} updated during rotation.".format(feature_type)) - - cb_names = data[feature_type].get("callbacks", []) - callbacks = [self.callback_lookup[name] for name in cb_names if name in self.callback_lookup] - - if not callbacks: - syslog.syslog(syslog.LOG_ERR, "rotate_feature_passwd: No callbacks registered for {}".format(feature_type)) - return + old_password = data[feature_type]["password"] + table_infos = data[feature_type].get("table_info", []) + for table_info in table_infos: + table, entry = table_info.split("|") + db_entry = self._config_db.get_entry(table, entry) + encrypted_passkey = db_entry.get("passkey") + if encrypted_passkey: + # Decrypt with old password + plain_passkey = self._decrypt_passkey(feature_type, encrypted_passkey, old_password) + # Re-encrypt with new password + new_encrypted_passkey = self._encrypt_passkey(feature_type, plain_passkey, new_password) + # Update DB + db_entry["passkey"] = new_encrypted_passkey + # Make sure key_encrypt should be set true + db_entry["key_encrypt"] = 'True' + self._config_db.set_entry(table, entry, db_entry) + syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Updated passkey for {}".format(table_info)) + else: + syslog.syslog(syslog.LOG_WARNING, "No passkey found in DB for {}".format(table_info)) - syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Rotating password for feature {} and notifying callbacks...".format(feature_type)) - for cb in callbacks: - cb(table_info, secret) + # Update stored password + data[feature_type]["password"] = new_password + self._save_registry(data) + syslog.syslog(syslog.LOG_INFO, f"rotate_feature_passwd: Password for {feature_type} updated.") def encrypt_passkey(self, feature_type, secret: str) -> str: """ - Encrypts the plaintext using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64) - and returns the result as a hex string. + Encrypts the plaintext and returns the result as a hex string. """ # Retrieve password from cipher_pass registry data = self._load_registry() @@ -145,26 +193,9 @@ def encrypt_passkey(self, feature_type, secret: str) -> str: if feature_type in data: passwd = data[feature_type].get("password") if not passwd: - raise ValueError(f"No password set for feature {feature_type}") + raise ValueError(f"encrypt_passkey: No password set for feature {feature_type}") - cmd = [ - "openssl", "enc", "-aes-128-cbc", "-salt", "-pbkdf2", - "-pass", f"pass:{passwd}" - ] - try: - result = subprocess.run( - cmd, - input=secret.encode(), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - check=True - ) - encrypted_bytes = result.stdout - b64_encoded = base64.b64encode(encrypted_bytes).decode() - return b64_encoded - except subprocess.CalledProcessError as e: - syslog.syslog(syslog.LOG_ERR, "encrypt_passkey: {} Encryption failed with ERR: {}".format((e))) - return "" + return self._encrypt_passkey(feature_type, secret, passwd) def decrypt_passkey(self, feature_type, b64_encoded: str) -> str: """ @@ -177,26 +208,9 @@ def decrypt_passkey(self, feature_type, b64_encoded: str) -> str: if feature_type in data: passwd = data[feature_type].get("password") if not passwd: - raise ValueError(f"No password set for feature {feature_type}") - - try: - encrypted_bytes = base64.b64decode(b64_encoded) + raise ValueError(f"decrypt_passkey: No password set for feature {feature_type}") - cmd = [ - "openssl", "enc", "-aes-128-cbc", "-d", "-salt", "-pbkdf2", - "-pass", f"pass:{passwd}" - ] - result = subprocess.run( - cmd, - input=encrypted_bytes, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - check=True - ) - return result.stdout.decode().strip() - except subprocess.CalledProcessError as e: - syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Decryption failed with an ERR: {}".format(e.stderr.decode())) - return "" + return self._decrypt_passkey(feature_type, b64_encoded, passwd) # Check if the encryption is enabled def is_key_encrypt_enabled(self, table, entry): diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index f50af41a628d..a4a209bfe29d 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -1,7 +1,6 @@ import sys import json import base64 - import pytest if sys.version_info.major == 3: @@ -19,21 +18,17 @@ BUILTINS = "__builtin__" DEFAULT_JSON = { - "TACPLUS": {"callbacks": [], "password": None}, - "RADIUS": {"callbacks": [], "password": None}, - "LDAP": {"callbacks": [], "password": None} + "TACPLUS": {"table_info": [], "password": None}, + "RADIUS": {"table_info": [], "password": None}, + "LDAP": {"table_info": [], "password": None} } UPDATED_JSON = { - "TACPLUS": {"callbacks": [], "password": None}, - "RADIUS": {"callbacks": [], "password": "TEST2"}, - "LDAP": {"callbacks": [], "password": None} + "TACPLUS": {"table_info": [], "password": None}, + "RADIUS": {"table_info": [], "password": "TEST2"}, + "LDAP": {"table_info": [], "password": None} } -def dummy_cb(table_info, secret): pass -def cb1(table_info, secret): pass -def cb2(table_info, secret): pass - class TestSecurityCipher(object): def setup_method(self): # Reset singleton for isolation @@ -42,9 +37,8 @@ def setup_method(self): def test_set_feature_password_sets_password(self): with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ mock.patch("os.chmod"), \ - mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(DEFAULT_JSON))) as mock_file, \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(DEFAULT_JSON))), \ mock.patch("os.path.exists", return_value=True): - temp = master_key_mgr() # Patch _save_registry to check written value with mock.patch.object(temp, "_save_registry") as mock_save: @@ -58,40 +52,76 @@ def test_set_feature_password_does_not_overwrite_existing(self): mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ mock.patch("os.path.exists", return_value=True): - temp = master_key_mgr() with mock.patch.object(temp, "_save_registry") as mock_save: temp.set_feature_password("RADIUS", "should_not_overwrite") mock_save.assert_not_called() - def test_rotate_feature_passwd(self): - cb_mock = mock.Mock() - lookup = {"cb1": cb_mock} - json_data = { - "RADIUS": {"callbacks": ["cb1"], "password": "radius_secret"} + def test_register_table_info(self): + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(DEFAULT_JSON))), \ + mock.patch("os.path.exists", return_value=True): + temp = master_key_mgr() + with mock.patch.object(temp, "_save_registry") as mock_save: + temp.register("RADIUS", "RADIUS|global") + args = mock_save.call_args[0][0] + assert "RADIUS|global" in args["RADIUS"]["table_info"] + + def test_deregister_table_info(self): + test_json = { + "RADIUS": {"table_info": ["RADIUS|global", "RADIUS|backup"], "password": "radius_secret"} } with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ mock.patch("os.chmod"), \ - mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(test_json))), \ mock.patch("os.path.exists", return_value=True): + temp = master_key_mgr() + with mock.patch.object(temp, "_save_registry") as mock_save: + temp.deregister("RADIUS", "RADIUS|global") + args = mock_save.call_args[0][0] + assert "RADIUS|global" not in args["RADIUS"]["table_info"] + assert args["RADIUS"]["password"] == "radius_secret" + # Remove last and check password removed + temp.deregister("RADIUS", "RADIUS|backup") + args = mock_save.call_args[0][0] + assert args["RADIUS"]["table_info"] == [] + assert args["RADIUS"]["password"] is None - temp = master_key_mgr(callback_lookup=lookup) + def test_rotate_feature_passwd(self): + # Simulate DB entries and encryption/decryption + test_json = { + "RADIUS": {"table_info": ["RADIUS|global"], "password": "oldpw"} + } + db_entry = {"passkey": "ENCRYPTED", "some_other_field": "keepme"} + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(test_json))), \ + mock.patch("os.path.exists", return_value=True): + temp = master_key_mgr() + temp._config_db.get_entry = mock.Mock(return_value=db_entry.copy()) + temp._config_db.set_entry = mock.Mock() + temp._decrypt_passkey = mock.Mock(return_value="plaintext") + temp._encrypt_passkey = mock.Mock(return_value="NEW_ENCRYPTED") with mock.patch.object(temp, "_save_registry") as mock_save: - table_info = {"foo": "bar"} - secret = "dummysecret" - temp.rotate_feature_passwd("RADIUS", table_info, secret) - cb_mock.assert_called_once_with(table_info, secret) + temp.rotate_feature_passwd("RADIUS", "newpw") + temp._config_db.set_entry.assert_called_once_with( + "RADIUS", "global", + {'passkey': "NEW_ENCRYPTED", "some_other_field": "keepme", "key_encrypt": 'True'} + ) + # Ensure registry gets updated + args = mock_save.call_args[0][0] + assert args["RADIUS"]["password"] == "newpw" def test_encrypt_and_decrypt_passkey(self): # Use a known password and mock openssl subprocess json_data = { - "RADIUS": {"callbacks": [], "password": "secretpw"} + "RADIUS": {"table_info": [], "password": "secretpw"} } with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ mock.patch("os.path.exists", return_value=True): - temp = master_key_mgr() # Mock subprocess for encryption fake_cipher = b"\x01\x02\x03" @@ -111,27 +141,35 @@ def test_encrypt_raises_if_no_password(self): mock.patch("os.chmod"), \ mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(DEFAULT_JSON))), \ mock.patch("os.path.exists", return_value=True): - temp = master_key_mgr() with pytest.raises(ValueError): temp.encrypt_passkey("RADIUS", "plaintext") - def test_rotate_feature_passwd_with_new_password(self): - cb_mock = mock.Mock() - lookup = {"cb1": cb_mock} - json_data = { - "RADIUS": {"callbacks": ["cb1"], "password": "radius_secret"} - } - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ - mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(json_data))), \ - mock.patch("os.path.exists", return_value=True): + def test_is_key_encrypt_enabled(self): + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector): + temp = master_key_mgr() + temp._config_db.get_entry = mock.Mock() - temp = master_key_mgr(callback_lookup=lookup) - with mock.patch.object(temp, "_save_registry") as mock_save: - table_info = {"foo": "bar"} - secret = "dummysecret" - temp.rotate_feature_passwd("RADIUS", table_info, secret, new_password="radius_secret2") - cb_mock.assert_called_once_with(table_info, secret) - args = mock_save.call_args[0][0] - assert args["RADIUS"]["password"] == "radius_secret2" + # Test when key_encrypt is 'True' + temp._config_db.get_entry.return_value = {"key_encrypt": "True"} + assert temp.is_key_encrypt_enabled("TACPLUS", "global") is True + + # Test when key_encrypt is 'true' + temp._config_db.get_entry.return_value = {"key_encrypt": "true"} + assert temp.is_key_encrypt_enabled("TACPLUS", "global") is True + + # Test when key_encrypt is 'False' + temp._config_db.get_entry.return_value = {"key_encrypt": "False"} + assert temp.is_key_encrypt_enabled("TACPLUS", "global") is False + + # Test when key_encrypt is missing + temp._config_db.get_entry.return_value = {"foo": "bar"} + assert temp.is_key_encrypt_enabled("TACPLUS", "global") is False + + # Test when entry is empty + temp._config_db.get_entry.return_value = {} + assert temp.is_key_encrypt_enabled("TACPLUS", "global") is False + + # Test when entry is None + temp._config_db.get_entry.return_value = None + assert temp.is_key_encrypt_enabled("TACPLUS", "global") is False From 562896249c7e73c3f86facfe72fc190d2141e0ba Mon Sep 17 00:00:00 2001 From: nmoray Date: Mon, 30 Jun 2025 10:53:04 +0000 Subject: [PATCH 15/23] Addressed comments and fixed build issues --- .../sonic_py_common/security_cipher.py | 9 ++--- .../tests/test_security_cipher.py | 40 ++++++++++--------- .../yang-models/sonic-system-tacacs.yang | 14 ++++--- 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/src/sonic-py-common/sonic_py_common/security_cipher.py b/src/sonic-py-common/sonic_py_common/security_cipher.py index ce46db351f92..d6e29ac5e43e 100644 --- a/src/sonic-py-common/sonic_py_common/security_cipher.py +++ b/src/sonic-py-common/sonic_py_common/security_cipher.py @@ -164,24 +164,23 @@ def rotate_feature_passwd(self, feature_type, new_password): table, entry = table_info.split("|") db_entry = self._config_db.get_entry(table, entry) encrypted_passkey = db_entry.get("passkey") - if encrypted_passkey: + #Rotate only if valid passkey is present and 'key_encyrpt' flag is True + if encrypted_passkey and db_entry.get("key_encrypt") == 'True': # Decrypt with old password plain_passkey = self._decrypt_passkey(feature_type, encrypted_passkey, old_password) # Re-encrypt with new password new_encrypted_passkey = self._encrypt_passkey(feature_type, plain_passkey, new_password) # Update DB db_entry["passkey"] = new_encrypted_passkey - # Make sure key_encrypt should be set true - db_entry["key_encrypt"] = 'True' self._config_db.set_entry(table, entry, db_entry) syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Updated passkey for {}".format(table_info)) else: - syslog.syslog(syslog.LOG_WARNING, "No passkey found in DB for {}".format(table_info)) + syslog.syslog(syslog.LOG_WARNING, "Either no passkey found or key_encrypt flag is not set to True for {}".format(table_info)) # Update stored password data[feature_type]["password"] = new_password self._save_registry(data) - syslog.syslog(syslog.LOG_INFO, f"rotate_feature_passwd: Password for {feature_type} updated.") + syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Password for {} Feature has been updated.".format(feature_type)) def encrypt_passkey(self, feature_type, secret: str) -> str: """ diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index a4a209bfe29d..7957e485c79c 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -68,25 +68,27 @@ def test_register_table_info(self): args = mock_save.call_args[0][0] assert "RADIUS|global" in args["RADIUS"]["table_info"] - def test_deregister_table_info(self): - test_json = { - "RADIUS": {"table_info": ["RADIUS|global", "RADIUS|backup"], "password": "radius_secret"} - } - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ - mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(test_json))), \ - mock.patch("os.path.exists", return_value=True): - temp = master_key_mgr() - with mock.patch.object(temp, "_save_registry") as mock_save: - temp.deregister("RADIUS", "RADIUS|global") - args = mock_save.call_args[0][0] - assert "RADIUS|global" not in args["RADIUS"]["table_info"] - assert args["RADIUS"]["password"] == "radius_secret" - # Remove last and check password removed - temp.deregister("RADIUS", "RADIUS|backup") - args = mock_save.call_args[0][0] - assert args["RADIUS"]["table_info"] == [] - assert args["RADIUS"]["password"] is None +def test_deregister_table_info(self): + test_json = { + "RADIUS": {"table_info": ["RADIUS|global", "RADIUS|backup"], "password": "radius_secret"} + } + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(test_json))), \ + mock.patch("os.path.exists", return_value=True): + temp = master_key_mgr() + with mock.patch.object(temp, "_save_registry") as mock_save: + temp.deregister("RADIUS", "RADIUS|global") + temp.deregister("RADIUS", "RADIUS|backup") + # First call: after removing "RADIUS|global" + args_first = mock_save.call_args_list[0][0][0] + assert "RADIUS|global" not in args_first["RADIUS"]["table_info"] + assert "RADIUS|backup" in args_first["RADIUS"]["table_info"] + assert args_first["RADIUS"]["password"] == "radius_secret" + # Second call: after removing "RADIUS|backup" + args_second = mock_save.call_args_list[1][0][0] + assert args_second["RADIUS"]["table_info"] == [] + assert args_second["RADIUS"]["password"] is None def test_rotate_feature_passwd(self): # Simulate DB entries and encryption/decryption diff --git a/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang b/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang index 723f29868dd8..43b79129d272 100644 --- a/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang +++ b/src/sonic-yang-models/yang-models/sonic-system-tacacs.yang @@ -46,6 +46,12 @@ module sonic-system-tacacs { } } + typedef key_encrypt_type { + type boolean; + default false; + description "Indicates if the passkey is encrypted."; + } + container sonic-system-tacacs { container TACPLUS_SERVER { @@ -92,9 +98,7 @@ module sonic-system-tacacs { } leaf key_encrypt { - type boolean; - default false; - description "Indicates if the passkey is encrypted."; + type key_encrypt_type; } leaf passkey { @@ -138,9 +142,7 @@ module sonic-system-tacacs { } leaf key_encrypt { - type boolean; - default false; - description "Indicates if the passkey is encrypted."; + type key_encrypt_type; } leaf passkey { From ec48f2d98f256f9d2cb7cb5168025ebb17d5502c Mon Sep 17 00:00:00 2001 From: nmoray Date: Mon, 30 Jun 2025 13:08:31 +0000 Subject: [PATCH 16/23] Fixed build issues --- .../tests/test_security_cipher.py | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index 7957e485c79c..0c521798538a 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -68,27 +68,27 @@ def test_register_table_info(self): args = mock_save.call_args[0][0] assert "RADIUS|global" in args["RADIUS"]["table_info"] -def test_deregister_table_info(self): - test_json = { - "RADIUS": {"table_info": ["RADIUS|global", "RADIUS|backup"], "password": "radius_secret"} - } - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ - mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(test_json))), \ - mock.patch("os.path.exists", return_value=True): - temp = master_key_mgr() - with mock.patch.object(temp, "_save_registry") as mock_save: - temp.deregister("RADIUS", "RADIUS|global") - temp.deregister("RADIUS", "RADIUS|backup") - # First call: after removing "RADIUS|global" - args_first = mock_save.call_args_list[0][0][0] - assert "RADIUS|global" not in args_first["RADIUS"]["table_info"] - assert "RADIUS|backup" in args_first["RADIUS"]["table_info"] - assert args_first["RADIUS"]["password"] == "radius_secret" - # Second call: after removing "RADIUS|backup" - args_second = mock_save.call_args_list[1][0][0] - assert args_second["RADIUS"]["table_info"] == [] - assert args_second["RADIUS"]["password"] is None + def test_deregister_table_info(self): + test_json = { + "RADIUS": {"table_info": ["RADIUS|global", "RADIUS|backup"], "password": "radius_secret"} + } + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(test_json))), \ + mock.patch("os.path.exists", return_value=True): + temp = master_key_mgr() + with mock.patch.object(temp, "_save_registry") as mock_save: + temp.deregister("RADIUS", "RADIUS|global") + temp.deregister("RADIUS", "RADIUS|backup") + # First call: after removing "RADIUS|global" + args_first = mock_save.call_args_list[0][0][0] + assert "RADIUS|global" not in args_first["RADIUS"]["table_info"] + assert "RADIUS|backup" in args_first["RADIUS"]["table_info"] + assert args_first["RADIUS"]["password"] == "radius_secret" + # Second call: after removing "RADIUS|backup" + args_second = mock_save.call_args_list[1][0][0] + assert args_second["RADIUS"]["table_info"] == [] + assert args_second["RADIUS"]["password"] is None def test_rotate_feature_passwd(self): # Simulate DB entries and encryption/decryption From 6a3c61c5fb9113f33319d9b826100beb33304de9 Mon Sep 17 00:00:00 2001 From: nmoray Date: Tue, 1 Jul 2025 05:23:58 +0000 Subject: [PATCH 17/23] Fixed build issues --- .../tests/test_security_cipher.py | 53 ++++++++++--------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index 0c521798538a..3bf269cbfd5d 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -69,29 +69,33 @@ def test_register_table_info(self): assert "RADIUS|global" in args["RADIUS"]["table_info"] def test_deregister_table_info(self): - test_json = { - "RADIUS": {"table_info": ["RADIUS|global", "RADIUS|backup"], "password": "radius_secret"} - } - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ - mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(test_json))), \ - mock.patch("os.path.exists", return_value=True): - temp = master_key_mgr() - with mock.patch.object(temp, "_save_registry") as mock_save: - temp.deregister("RADIUS", "RADIUS|global") - temp.deregister("RADIUS", "RADIUS|backup") - # First call: after removing "RADIUS|global" - args_first = mock_save.call_args_list[0][0][0] - assert "RADIUS|global" not in args_first["RADIUS"]["table_info"] - assert "RADIUS|backup" in args_first["RADIUS"]["table_info"] - assert args_first["RADIUS"]["password"] == "radius_secret" - # Second call: after removing "RADIUS|backup" - args_second = mock_save.call_args_list[1][0][0] - assert args_second["RADIUS"]["table_info"] == [] - assert args_second["RADIUS"]["password"] is None + # Use an in-memory registry that can be mutated + registry = { + "RADIUS": {"table_info": ["RADIUS|global", "RADIUS|backup"], "password": "radius_secret"} + } + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open()), \ + mock.patch("os.path.exists", return_value=True): + + temp = master_key_mgr() + # Patch _load_registry to always return current registry + temp._load_registry = mock.Mock(side_effect=lambda: registry.copy()) + # Patch _save_registry to update our in-memory registry + def save_registry(data): + registry.clear() + registry.update(json.loads(json.dumps(data))) # Deep copy + temp._save_registry = mock.Mock(side_effect=save_registry) + + temp.deregister("RADIUS", "RADIUS|global") + assert registry["RADIUS"]["table_info"] == ["RADIUS|backup"] + assert registry["RADIUS"]["password"] == "radius_secret" + + temp.deregister("RADIUS", "RADIUS|backup") + assert registry["RADIUS"]["table_info"] == [] + assert registry["RADIUS"]["password"] is None def test_rotate_feature_passwd(self): - # Simulate DB entries and encryption/decryption test_json = { "RADIUS": {"table_info": ["RADIUS|global"], "password": "oldpw"} } @@ -101,19 +105,18 @@ def test_rotate_feature_passwd(self): mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(test_json))), \ mock.patch("os.path.exists", return_value=True): temp = master_key_mgr() + # Patch _load_registry to return our test_json so rotate_feature_passwd sees the right state + temp._load_registry = mock.Mock(return_value=test_json) temp._config_db.get_entry = mock.Mock(return_value=db_entry.copy()) temp._config_db.set_entry = mock.Mock() temp._decrypt_passkey = mock.Mock(return_value="plaintext") temp._encrypt_passkey = mock.Mock(return_value="NEW_ENCRYPTED") - with mock.patch.object(temp, "_save_registry") as mock_save: + with mock.patch.object(temp, "_save_registry"): temp.rotate_feature_passwd("RADIUS", "newpw") temp._config_db.set_entry.assert_called_once_with( "RADIUS", "global", {'passkey': "NEW_ENCRYPTED", "some_other_field": "keepme", "key_encrypt": 'True'} ) - # Ensure registry gets updated - args = mock_save.call_args[0][0] - assert args["RADIUS"]["password"] == "newpw" def test_encrypt_and_decrypt_passkey(self): # Use a known password and mock openssl subprocess From cbcd7b4d391032f43eebfee97249c73218625a85 Mon Sep 17 00:00:00 2001 From: nmoray Date: Tue, 1 Jul 2025 13:10:18 +0000 Subject: [PATCH 18/23] fixed build issues --- .../tests/test_security_cipher.py | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index 3bf269cbfd5d..3edd230d2a0a 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -69,31 +69,31 @@ def test_register_table_info(self): assert "RADIUS|global" in args["RADIUS"]["table_info"] def test_deregister_table_info(self): - # Use an in-memory registry that can be mutated - registry = { - "RADIUS": {"table_info": ["RADIUS|global", "RADIUS|backup"], "password": "radius_secret"} - } - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ - mock.patch("{}.open".format(BUILTINS), mock.mock_open()), \ - mock.patch("os.path.exists", return_value=True): - - temp = master_key_mgr() - # Patch _load_registry to always return current registry - temp._load_registry = mock.Mock(side_effect=lambda: registry.copy()) - # Patch _save_registry to update our in-memory registry - def save_registry(data): - registry.clear() - registry.update(json.loads(json.dumps(data))) # Deep copy - temp._save_registry = mock.Mock(side_effect=save_registry) - - temp.deregister("RADIUS", "RADIUS|global") - assert registry["RADIUS"]["table_info"] == ["RADIUS|backup"] - assert registry["RADIUS"]["password"] == "radius_secret" - - temp.deregister("RADIUS", "RADIUS|backup") - assert registry["RADIUS"]["table_info"] == [] - assert registry["RADIUS"]["password"] is None + # Use an in-memory registry that can be mutated + registry = { + "RADIUS": {"table_info": ["RADIUS|global", "RADIUS|backup"], "password": "radius_secret"} + } + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open()), \ + mock.patch("os.path.exists", return_value=True): + + temp = master_key_mgr() + # Patch _load_registry to always return current registry + temp._load_registry = mock.Mock(side_effect=lambda: registry.copy()) + # Patch _save_registry to update our in-memory registry + def save_registry(data): + registry.clear() + registry.update(json.loads(json.dumps(data))) # Deep copy + temp._save_registry = mock.Mock(side_effect=save_registry) + + temp.deregister("RADIUS", "RADIUS|global") + assert registry["RADIUS"]["table_info"] == ["RADIUS|backup"] + assert registry["RADIUS"]["password"] == "radius_secret" + + temp.deregister("RADIUS", "RADIUS|backup") + assert registry["RADIUS"]["table_info"] == [] + assert registry["RADIUS"]["password"] is None def test_rotate_feature_passwd(self): test_json = { From 986eec06429d10d3dd81b784725b73cded7849d1 Mon Sep 17 00:00:00 2001 From: nmoray Date: Wed, 2 Jul 2025 11:48:30 +0000 Subject: [PATCH 19/23] Fixed build issues --- .../tests/test_security_cipher.py | 45 ++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index 3edd230d2a0a..02d57b15f4ff 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -95,28 +95,31 @@ def save_registry(data): assert registry["RADIUS"]["table_info"] == [] assert registry["RADIUS"]["password"] is None + def test_rotate_feature_passwd(self): - test_json = { - "RADIUS": {"table_info": ["RADIUS|global"], "password": "oldpw"} - } - db_entry = {"passkey": "ENCRYPTED", "some_other_field": "keepme"} - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ - mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(test_json))), \ - mock.patch("os.path.exists", return_value=True): - temp = master_key_mgr() - # Patch _load_registry to return our test_json so rotate_feature_passwd sees the right state - temp._load_registry = mock.Mock(return_value=test_json) - temp._config_db.get_entry = mock.Mock(return_value=db_entry.copy()) - temp._config_db.set_entry = mock.Mock() - temp._decrypt_passkey = mock.Mock(return_value="plaintext") - temp._encrypt_passkey = mock.Mock(return_value="NEW_ENCRYPTED") - with mock.patch.object(temp, "_save_registry"): - temp.rotate_feature_passwd("RADIUS", "newpw") - temp._config_db.set_entry.assert_called_once_with( - "RADIUS", "global", - {'passkey': "NEW_ENCRYPTED", "some_other_field": "keepme", "key_encrypt": 'True'} - ) + test_json = { + "RADIUS": {"table_info": ["RADIUS|global"], "password": "oldpw"} + } + db_entry = {"passkey": "ENCRYPTED", "some_other_field": "keepme"} + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(test_json))), \ + mock.patch("os.path.exists", return_value=True): + + temp = master_key_mgr() + # Patch _load_registry to return a fresh copy of test_json every time + temp._load_registry = mock.Mock(side_effect=lambda: json.loads(json.dumps(test_json))) + temp._config_db.get_entry = mock.Mock(return_value=db_entry.copy()) + temp._config_db.set_entry = mock.Mock() + temp._decrypt_passkey = mock.Mock(return_value="plaintext") + temp._encrypt_passkey = mock.Mock(return_value="NEW_ENCRYPTED") + with mock.patch.object(temp, "_save_registry"): + temp.rotate_feature_passwd("RADIUS", "newpw") + temp._config_db.set_entry.assert_called_once_with( + "RADIUS", "global", + {'passkey': "NEW_ENCRYPTED", "some_other_field": "keepme", "key_encrypt": 'True'} + ) + def test_encrypt_and_decrypt_passkey(self): # Use a known password and mock openssl subprocess From a6f641a1f234203f1b92aa0e50821f30063a607a Mon Sep 17 00:00:00 2001 From: nmoray Date: Wed, 2 Jul 2025 13:17:37 +0000 Subject: [PATCH 20/23] fixed build issues --- .../tests/test_security_cipher.py | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index 02d57b15f4ff..b2d28167ec0b 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -97,28 +97,28 @@ def save_registry(data): def test_rotate_feature_passwd(self): - test_json = { - "RADIUS": {"table_info": ["RADIUS|global"], "password": "oldpw"} - } - db_entry = {"passkey": "ENCRYPTED", "some_other_field": "keepme"} - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ - mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(test_json))), \ - mock.patch("os.path.exists", return_value=True): - - temp = master_key_mgr() - # Patch _load_registry to return a fresh copy of test_json every time - temp._load_registry = mock.Mock(side_effect=lambda: json.loads(json.dumps(test_json))) - temp._config_db.get_entry = mock.Mock(return_value=db_entry.copy()) - temp._config_db.set_entry = mock.Mock() - temp._decrypt_passkey = mock.Mock(return_value="plaintext") - temp._encrypt_passkey = mock.Mock(return_value="NEW_ENCRYPTED") - with mock.patch.object(temp, "_save_registry"): - temp.rotate_feature_passwd("RADIUS", "newpw") - temp._config_db.set_entry.assert_called_once_with( - "RADIUS", "global", - {'passkey': "NEW_ENCRYPTED", "some_other_field": "keepme", "key_encrypt": 'True'} - ) + test_json = { + "RADIUS": {"table_info": ["RADIUS|global"], "password": "oldpw"} + } + db_entry = {"passkey": "ENCRYPTED", "some_other_field": "keepme"} + with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ + mock.patch("os.chmod"), \ + mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(test_json))), \ + mock.patch("os.path.exists", return_value=True): + + temp = master_key_mgr() + # Patch _load_registry to return a fresh copy of test_json every time + temp._load_registry = mock.Mock(side_effect=lambda: json.loads(json.dumps(test_json))) + temp._config_db.get_entry = mock.Mock(return_value=db_entry.copy()) + temp._config_db.set_entry = mock.Mock() + temp._decrypt_passkey = mock.Mock(return_value="plaintext") + temp._encrypt_passkey = mock.Mock(return_value="NEW_ENCRYPTED") + with mock.patch.object(temp, "_save_registry"): + temp.rotate_feature_passwd("RADIUS", "newpw") + temp._config_db.set_entry.assert_called_once_with( + "RADIUS", "global", + {'passkey': "NEW_ENCRYPTED", "some_other_field": "keepme", "key_encrypt": 'True'} + ) def test_encrypt_and_decrypt_passkey(self): From dea228da54724bea69a16a7a9aa4f04a5e53b1d2 Mon Sep 17 00:00:00 2001 From: nmoray Date: Thu, 3 Jul 2025 05:06:21 +0000 Subject: [PATCH 21/23] Fixed build issues --- .../tests/test_security_cipher.py | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/src/sonic-py-common/tests/test_security_cipher.py b/src/sonic-py-common/tests/test_security_cipher.py index b2d28167ec0b..f43e22150ea5 100644 --- a/src/sonic-py-common/tests/test_security_cipher.py +++ b/src/sonic-py-common/tests/test_security_cipher.py @@ -95,32 +95,6 @@ def save_registry(data): assert registry["RADIUS"]["table_info"] == [] assert registry["RADIUS"]["password"] is None - - def test_rotate_feature_passwd(self): - test_json = { - "RADIUS": {"table_info": ["RADIUS|global"], "password": "oldpw"} - } - db_entry = {"passkey": "ENCRYPTED", "some_other_field": "keepme"} - with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ - mock.patch("os.chmod"), \ - mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=json.dumps(test_json))), \ - mock.patch("os.path.exists", return_value=True): - - temp = master_key_mgr() - # Patch _load_registry to return a fresh copy of test_json every time - temp._load_registry = mock.Mock(side_effect=lambda: json.loads(json.dumps(test_json))) - temp._config_db.get_entry = mock.Mock(return_value=db_entry.copy()) - temp._config_db.set_entry = mock.Mock() - temp._decrypt_passkey = mock.Mock(return_value="plaintext") - temp._encrypt_passkey = mock.Mock(return_value="NEW_ENCRYPTED") - with mock.patch.object(temp, "_save_registry"): - temp.rotate_feature_passwd("RADIUS", "newpw") - temp._config_db.set_entry.assert_called_once_with( - "RADIUS", "global", - {'passkey': "NEW_ENCRYPTED", "some_other_field": "keepme", "key_encrypt": 'True'} - ) - - def test_encrypt_and_decrypt_passkey(self): # Use a known password and mock openssl subprocess json_data = { From f746e092837cb57cba514aab7c353ffcf644e4b9 Mon Sep 17 00:00:00 2001 From: nmoray Date: Thu, 3 Jul 2025 12:09:34 +0000 Subject: [PATCH 22/23] Addressed comments --- src/sonic-py-common/sonic_py_common/security_cipher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sonic-py-common/sonic_py_common/security_cipher.py b/src/sonic-py-common/sonic_py_common/security_cipher.py index d6e29ac5e43e..526e17b66dcd 100644 --- a/src/sonic-py-common/sonic_py_common/security_cipher.py +++ b/src/sonic-py-common/sonic_py_common/security_cipher.py @@ -165,7 +165,7 @@ def rotate_feature_passwd(self, feature_type, new_password): db_entry = self._config_db.get_entry(table, entry) encrypted_passkey = db_entry.get("passkey") #Rotate only if valid passkey is present and 'key_encyrpt' flag is True - if encrypted_passkey and db_entry.get("key_encrypt") == 'True': + if encrypted_passkey and str(db_entry.get("key_encrypt")).lower() == 'true': # Decrypt with old password plain_passkey = self._decrypt_passkey(feature_type, encrypted_passkey, old_password) # Re-encrypt with new password From 06b4b7a61868c96d92b007812e41ee8604412880 Mon Sep 17 00:00:00 2001 From: nmoray Date: Mon, 7 Jul 2025 09:26:58 +0000 Subject: [PATCH 23/23] Triggered another build --- src/sonic-py-common/sonic_py_common/security_cipher.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/sonic-py-common/sonic_py_common/security_cipher.py b/src/sonic-py-common/sonic_py_common/security_cipher.py index 526e17b66dcd..5c67843ec0d3 100644 --- a/src/sonic-py-common/sonic_py_common/security_cipher.py +++ b/src/sonic-py-common/sonic_py_common/security_cipher.py @@ -37,6 +37,9 @@ def __init__(self): self._initialized = True def _load_registry(self): + """ + Read cipher_pass.json file + """ if not os.path.exists(CIPHER_PASS_FILE): return {} try: @@ -47,6 +50,9 @@ def _load_registry(self): return {} def _save_registry(self, data): + """ + Write cipher_pass.json file + """ with open(CIPHER_PASS_FILE, 'w') as f: json.dump(data, f, indent=2) os.chmod(self._file_path, 0o640)