Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
67df31f
Added support of backup and restore for cipher_pass file
nmoray May 27, 2025
1c23a3d
Made default value as false for key_encrypt flag and removed uneven tabs
nmoray May 27, 2025
12ef9e5
Updated jinja template to copy pre/post migration scripts
nmoray May 27, 2025
1469610
Added sudo while executing chmod
nmoray May 27, 2025
9010405
Updated the existing design to support password rotate feature in Sec…
nmoray Jun 19, 2025
eb1709f
Added key_encrypt option under individual TACPLUS server configs
nmoray Jun 19, 2025
6e1b08b
Addressed comments
nmoray Jun 23, 2025
aaac0c6
Updated method description
nmoray Jun 23, 2025
9e8aab5
Updated rotate method to get info about CONFIG_DB entry in the callbacks
nmoray Jun 24, 2025
5b7da20
Fixed build errors
nmoray Jun 25, 2025
77ae0fb
Fixed build issues and added sceret key as an arg to rotate method so…
nmoray Jun 25, 2025
b717c06
Addressed comments and fixed build issues
nmoray Jun 26, 2025
df34c80
fixed build issues
nmoray Jun 26, 2025
5715b84
Updated rotate method and replaced callbacks with tableinfo
nmoray Jun 27, 2025
5628962
Addressed comments and fixed build issues
nmoray Jun 30, 2025
ec48f2d
Fixed build issues
nmoray Jun 30, 2025
6a3c61c
Fixed build issues
nmoray Jul 1, 2025
cbcd7b4
fixed build issues
nmoray Jul 1, 2025
986eec0
Fixed build issues
nmoray Jul 2, 2025
a6f641a
fixed build issues
nmoray Jul 2, 2025
dea228d
Fixed build issues
nmoray Jul 3, 2025
f746e09
Addressed comments
nmoray Jul 3, 2025
06b4b7a
Triggered another build
nmoray Jul 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions files/build_templates/sonic_debian_extension.j2
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,12 @@ j2 files/build_templates/config-setup.service.j2 | sudo tee $FILESYSTEM_ROOT_USR
sudo cp $IMAGE_CONFIGS/config-setup/config-setup $FILESYSTEM_ROOT/usr/bin/config-setup
sudo mkdir -p $FILESYSTEM_ROOT/etc/config-setup
sudo cp $IMAGE_CONFIGS/config-setup/config-setup.conf $FILESYSTEM_ROOT/etc/config-setup/config-setup.conf
sudo mkdir -p $FILESYSTEM_ROOT/etc/config-setup/config-migration-pre-hooks.d
sudo cp $IMAGE_CONFIGS/config-setup/01-pre-security-cipher $FILESYSTEM_ROOT/etc/config-setup/config-migration-pre-hooks.d/01-pre-security-cipher
sudo chmod +x $FILESYSTEM_ROOT/etc/config-setup/config-migration-pre-hooks.d/01-pre-security-cipher
sudo mkdir -p $FILESYSTEM_ROOT/etc/config-setup/config-migration-post-hooks.d
sudo cp $IMAGE_CONFIGS/config-setup/01-post-security-cipher $FILESYSTEM_ROOT/etc/config-setup/config-migration-post-hooks.d/01-post-security-cipher
sudo chmod +x $FILESYSTEM_ROOT/etc/config-setup/config-migration-post-hooks.d/01-post-security-cipher
echo "config-setup.service" | sudo tee -a $GENERATED_SERVICE_FILE
sudo LANG=C chroot $FILESYSTEM_ROOT systemctl enable config-setup.service

Expand Down
11 changes: 11 additions & 0 deletions files/image_config/config-setup/01-post-security-cipher
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

set -e

# Restore cipher_pass file from persistent storage
if [ -f /host/security_cipher/cipher_pass ]; then
cp /host/security_cipher/cipher_pass.json /etc/cipher_pass.json
chmod 640 /etc/cipher_pass.json
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

640

Is it better to use 600 if the only reader is root?

echo "Restored /host/security_cipher/cipher_pass.json to /etc/"
fi

13 changes: 13 additions & 0 deletions files/image_config/config-setup/01-pre-security-cipher
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

set -e

# Ensure old_config directory exists
mkdir -p /host/security_cipher

# Copy cipher_pass file to persistent storage
if [ -f /etc/cipher_pass ]; then
cp /etc/cipher_pass.json /host/security_cipher/cipher_pass.json
echo "Saved /etc/cipher_pass.json to /host/security_cipher/"
fi

216 changes: 120 additions & 96 deletions src/sonic-py-common/sonic_py_common/security_cipher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

A common module for handling the encryption and
decryption of the feature passkey. It also takes
care of storing the secure cipher at root
care of storing the secure cipher at root
protected file system

'''
Expand All @@ -12,89 +12,140 @@
import syslog
import os
import base64
import json
from swsscommon.swsscommon import ConfigDBConnector

CIPHER_PASS_FILE = "/etc/cipher_pass.json"

class master_key_mgr:
_instance = None
_lock = threading.Lock()
_initialized = False

def __new__(cls):
def __new__(cls, callback_lookup=None):
with cls._lock:
if cls._instance is None:
cls._instance = super(master_key_mgr, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance

def __init__(self):
def __init__(self, callback_lookup=None):
if not self._initialized:
self._file_path = "/etc/cipher_pass"
self._file_path = CIPHER_PASS_FILE
self._config_db = ConfigDBConnector()
self._config_db.connect()
# Note: Kept 1st index NA intentionally to map it with the cipher_pass file
# contents. The file has a comment at the 1st row / line
self._feature_list = ["NA", "TACPLUS", "RADIUS", "LDAP"]
if not os.path.exists(self._file_path):
with open(self._file_path, 'w') as file:
file.writelines("#Auto generated file for storing the encryption passwords\n")
for feature in self._feature_list[1:]: # Skip the first "NA" entry
file.write(f"{feature} : \n")
os.chmod(self._file_path, 0o640)
if callback_lookup is None:
callback_lookup = {}
self.callback_lookup = callback_lookup
self._initialized = True

# Write cipher_pass file
def __write_passwd_file(self, feature_type, passwd):
if feature_type == 'NA':
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Invalid feature type: {}".format(feature_type))
def _load_registry(self):
if not os.path.exists(CIPHER_PASS_FILE):
return {}
try:
with open(CIPHER_PASS_FILE, 'r') as f:
return json.load(f)
except Exception as e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception

Could you use more specific exception type?

syslog.syslog(syslog.LOG_ERR, "del_cipher_passwd: Exception occurred: {}".format(e))
return {}

def _save_registry(self, data):
with open(CIPHER_PASS_FILE, 'w') as f:
json.dump(data, f, indent=2)
os.chmod(self._file_path, 0o640)

def register(self, feature_type, callback):
"""
Register a callback for a feature type.
"""
data = self._load_registry()
if feature_type not in data:
data[feature_type] = {"callbacks": [], "password": None}
cb_name = callback.__name__
if cb_name not in data[feature_type]["callbacks"]:
data[feature_type]["callbacks"].append(cb_name)
self._save_registry(data)
syslog.syslog(syslog.LOG_INFO, "register: Callback {} attached to feature {}".format(cb_name, feature_type))

def deregister(self, feature_type, callback):
"""
Deregister (remove) a callback for a feature type.
If, after removal, there are no more callbacks for that feature,
that means there is no one who is going to use the password thus
remove the respective password too.
"""
data = self._load_registry()
if feature_type in data:
cb_name = callback.__name__
if cb_name in data[feature_type]["callbacks"]:
data[feature_type]["callbacks"].remove(cb_name)
if not data[feature_type]["callbacks"]:
# No more callbacks left; remove password as well
data[feature_type]["password"] = None
syslog.syslog(syslog.LOG_INFO, "deregister: No more callbacks for feature {}. Password also removed.".format(feature_type))
self._save_registry(data)
syslog.syslog(syslog.LOG_INFO, "deregister: Callback {} removed from feature {}".format(cb_name, feature_type))

else:
syslog.syslog(syslog.LOG_ERR, "deregister: Callback {} not found for feature {}".format(cb_name, feature_type))
else:
syslog.syslog(syslog.LOG_ERR, "deregister: No callbacks registered for {}".format(feature_type))

def set_feature_password(self, feature_type, password):
"""
Set a new password for a feature type.
It will not update if already exist.
"""
data = self._load_registry()
if feature_type not in data:
data[feature_type] = {"callbacks": [], "password": None}
if data[feature_type]["password"] is not None:
syslog.syslog(syslog.LOG_INFO, "set_feature_password: Password already set for feature {}, not updating the new password.".format(feature_type))
return
data[feature_type]["password"] = password
self._save_registry(data)
syslog.syslog(syslog.LOG_INFO, "set_feature_password: Password set for feature {}".format(feature_type))

if feature_type in self._feature_list:
try:
with open(self._file_path, 'r') as file:
lines = file.readlines()
# Update the password for given feature
lines[self._feature_list.index(feature_type)] = feature_type + ' : ' + passwd + '\n'

os.chmod(self._file_path, 0o640)
with open(self._file_path, 'w') as file:
file.writelines(lines)
os.chmod(self._file_path, 0o640)
except FileNotFoundError:
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: File {} no found".format(self._file_path))
except PermissionError:
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Read permission denied: {}".format(self._file_path))


# Read cipher pass file and return the feature specifc
# password
def __read_passwd_file(self, feature_type):
passwd = None
if feature_type == 'NA':
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Invalid feature type: {}".format(feature_type))
return passwd

if feature_type in self._feature_list:
try:
os.chmod(self._file_path, 0o644)
with open(self._file_path, "r") as file:
lines = file.readlines()
for line in lines:
if feature_type in line:
passwd = line.split(' : ')[1]
os.chmod(self._file_path, 0o640)
except FileNotFoundError:
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: File {} no found".format(self._file_path))
except PermissionError:
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Read permission denied: {}".format(self._file_path))

return passwd


def encrypt_passkey(self, feature_type, secret: str, passwd: str) -> str:
def rotate_feature_passwd(self, feature_type, new_password=None):
"""
On each call, read JSON data fresh from disk. Update password if provided,
and call all registered callbacks with the latest password.
"""
data = self._load_registry()
if feature_type not in data:
syslog.syslog(syslog.LOG_ERR, "rotate_feature_passwd: No callbacks registered for {}".format(feature_type))
return

if new_password is not None:
data[feature_type]["password"] = new_password
self._save_registry(data)
syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Password for {} updated during rotation.".format(feature_type))

password = data[feature_type].get("password")
cb_names = data[feature_type].get("callbacks", [])
callbacks = [self.callback_lookup[name] for name in cb_names if name in self.callback_lookup]

if not callbacks:
syslog.syslog(syslog.LOG_ERR, "rotate_feature_passwd: No callbacks registered for {}".format(feature_type))
return

syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Rotating password for feature {} and notifying callbacks...".format(feature_type))
for cb in callbacks:
cb(password)

def encrypt_passkey(self, feature_type, secret: str) -> str:
"""
Encrypts the plaintext using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64)
and returns the result as a hex string.
"""
# Retrieve password from cipher_pass registry
data = self._load_registry()
passwd = None
if feature_type in data:
passwd = data[feature_type].get("password")
if not passwd:
raise ValueError(f"No password set for feature {feature_type}")

cmd = [
"openssl", "enc", "-aes-128-cbc", "-salt", "-pbkdf2",
"-pass", f"pass:{passwd}"
Expand All @@ -109,23 +160,23 @@ def encrypt_passkey(self, feature_type, secret: str, passwd: str) -> str:
)
encrypted_bytes = result.stdout
b64_encoded = base64.b64encode(encrypted_bytes).decode()
self.__write_passwd_file(feature_type, passwd)
return b64_encoded
return b64_encoded
except subprocess.CalledProcessError as e:
syslog.syslog(syslog.LOG_ERR, "encrypt_passkey: {} Encryption failed with ERR: {}".format((e)))
return ""


def decrypt_passkey(self, feature_type, b64_encoded: str) -> str:
"""
Decrypts a hex-encoded encrypted string using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64).
Returns the decrypted plaintext.
"""

passwd = self.__read_passwd_file(feature_type).strip()
if passwd is None:
syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Enpty password for {} feature type".format(feature_type))
return ""
# Retrieve password from cipher_pass registry
data = self._load_registry()
passwd = None
if feature_type in data:
passwd = data[feature_type].get("password")
if not passwd:
raise ValueError(f"No password set for feature {feature_type}")

try:
encrypted_bytes = base64.b64decode(b64_encoded)
Expand All @@ -146,8 +197,7 @@ def decrypt_passkey(self, feature_type, b64_encoded: str) -> str:
syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Decryption failed with an ERR: {}".format(e.stderr.decode()))
return ""


# Check if the encryption is enabled
# Check if the encryption is enabled
def is_key_encrypt_enabled(self, table, entry):
key = 'key_encrypt'
data = self._config_db.get_entry(table, entry)
Expand All @@ -156,29 +206,3 @@ def is_key_encrypt_enabled(self, table, entry):
return data[key]
return False


def del_cipher_pass(self, feature_type):
"""
Removes only the password for the given feature_type while keeping the file structure intact.
"""
try:
os.chmod(self._file_path, 0o640)
with open(self._file_path, "r") as file:
lines = file.readlines()

updated_lines = []
for line in lines:
if line.strip().startswith(f"{feature_type} :"):
updated_lines.append(f"{feature_type} : \n") # Remove password but keep format
else:
updated_lines.append(line)

with open(self._file_path, 'w') as file:
file.writelines(updated_lines)
os.chmod(self._file_path, 0o640)

syslog.syslog(syslog.LOG_INFO, "del_cipher_pass: Password for {} has been removed".format((feature_type)))

except Exception as e:
syslog.syslog(syslog.LOG_ERR, "del_cipher_pass: {} Exception occurred: {}".format((e)))

Loading
Loading