Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
67df31f
Added support of backup and restore for cipher_pass file
nmoray May 27, 2025
1c23a3d
Made default value as false for key_encrypt flag and removed uneven tabs
nmoray May 27, 2025
12ef9e5
Updated jinja template to copy pre/post migration scripts
nmoray May 27, 2025
1469610
Added sudo while executing chmod
nmoray May 27, 2025
9010405
Updated the existing design to support password rotate feature in Sec…
nmoray Jun 19, 2025
eb1709f
Added key_encrypt option under individual TACPLUS server configs
nmoray Jun 19, 2025
6e1b08b
Addressed comments
nmoray Jun 23, 2025
aaac0c6
Updated method description
nmoray Jun 23, 2025
9e8aab5
Updated rotate method to get info about CONFIG_DB entry in the callbacks
nmoray Jun 24, 2025
5b7da20
Fixed build errors
nmoray Jun 25, 2025
77ae0fb
Fixed build issues and added sceret key as an arg to rotate method so…
nmoray Jun 25, 2025
b717c06
Addressed comments and fixed build issues
nmoray Jun 26, 2025
df34c80
fixed build issues
nmoray Jun 26, 2025
5715b84
Updated rotate method and replaced callbacks with tableinfo
nmoray Jun 27, 2025
5628962
Addressed comments and fixed build issues
nmoray Jun 30, 2025
ec48f2d
Fixed build issues
nmoray Jun 30, 2025
6a3c61c
Fixed build issues
nmoray Jul 1, 2025
cbcd7b4
fixed build issues
nmoray Jul 1, 2025
986eec0
Fixed build issues
nmoray Jul 2, 2025
a6f641a
fixed build issues
nmoray Jul 2, 2025
dea228d
Fixed build issues
nmoray Jul 3, 2025
f746e09
Addressed comments
nmoray Jul 3, 2025
06b4b7a
Triggered another build
nmoray Jul 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions files/build_templates/sonic_debian_extension.j2
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,12 @@ j2 files/build_templates/config-setup.service.j2 | sudo tee $FILESYSTEM_ROOT_USR
sudo cp $IMAGE_CONFIGS/config-setup/config-setup $FILESYSTEM_ROOT/usr/bin/config-setup
sudo mkdir -p $FILESYSTEM_ROOT/etc/config-setup
sudo cp $IMAGE_CONFIGS/config-setup/config-setup.conf $FILESYSTEM_ROOT/etc/config-setup/config-setup.conf
sudo mkdir -p $FILESYSTEM_ROOT/etc/config-setup/config-migration-pre-hooks.d
sudo cp $IMAGE_CONFIGS/config-setup/01-pre-security-cipher $FILESYSTEM_ROOT/etc/config-setup/config-migration-pre-hooks.d/01-pre-security-cipher
sudo chmod +x $FILESYSTEM_ROOT/etc/config-setup/config-migration-pre-hooks.d/01-pre-security-cipher
sudo mkdir -p $FILESYSTEM_ROOT/etc/config-setup/config-migration-post-hooks.d
sudo cp $IMAGE_CONFIGS/config-setup/01-post-security-cipher $FILESYSTEM_ROOT/etc/config-setup/config-migration-post-hooks.d/01-post-security-cipher
sudo chmod +x $FILESYSTEM_ROOT/etc/config-setup/config-migration-post-hooks.d/01-post-security-cipher
echo "config-setup.service" | sudo tee -a $GENERATED_SERVICE_FILE
sudo LANG=C chroot $FILESYSTEM_ROOT systemctl enable config-setup.service

Expand Down
11 changes: 11 additions & 0 deletions files/image_config/config-setup/01-post-security-cipher
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

set -e

# Restore cipher_pass file from persistent storage
if [ -f /host/security_cipher/cipher_pass ]; then
cp /host/security_cipher/cipher_pass.json /etc/cipher_pass.json
chmod 640 /etc/cipher_pass.json
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

640

Is it better to use 600 if the only reader is root?

echo "Restored /host/security_cipher/cipher_pass.json to /etc/"
fi

13 changes: 13 additions & 0 deletions files/image_config/config-setup/01-pre-security-cipher
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

set -e

# Ensure old_config directory exists
mkdir -p /host/security_cipher

# Copy cipher_pass file to persistent storage
if [ -f /etc/cipher_pass ]; then
cp /etc/cipher_pass.json /host/security_cipher/cipher_pass.json
echo "Saved /etc/cipher_pass.json to /host/security_cipher/"
fi

236 changes: 139 additions & 97 deletions src/sonic-py-common/sonic_py_common/security_cipher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

A common module for handling the encryption and
decryption of the feature passkey. It also takes
care of storing the secure cipher at root
care of storing the secure cipher at root
protected file system

'''
Expand All @@ -12,8 +12,11 @@
import syslog
import os
import base64
import json
from swsscommon.swsscommon import ConfigDBConnector

CIPHER_PASS_FILE = "/etc/cipher_pass.json"

class master_key_mgr:
_instance = None
_lock = threading.Lock()
Expand All @@ -28,69 +31,33 @@ def __new__(cls):

def __init__(self):
if not self._initialized:
self._file_path = "/etc/cipher_pass"
self._file_path = CIPHER_PASS_FILE
self._config_db = ConfigDBConnector()
self._config_db.connect()
# Note: Kept 1st index NA intentionally to map it with the cipher_pass file
# contents. The file has a comment at the 1st row / line
self._feature_list = ["NA", "TACPLUS", "RADIUS", "LDAP"]
if not os.path.exists(self._file_path):
with open(self._file_path, 'w') as file:
file.writelines("#Auto generated file for storing the encryption passwords\n")
for feature in self._feature_list[1:]: # Skip the first "NA" entry
file.write(f"{feature} : \n")
os.chmod(self._file_path, 0o640)
self._initialized = True

# Write cipher_pass file
def __write_passwd_file(self, feature_type, passwd):
if feature_type == 'NA':
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Invalid feature type: {}".format(feature_type))
return
def _load_registry(self):
"""
Read cipher_pass.json file
"""
if not os.path.exists(CIPHER_PASS_FILE):
return {}
try:
with open(CIPHER_PASS_FILE, 'r') as f:
return json.load(f)
except Exception as e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception

Could you use more specific exception type?

syslog.syslog(syslog.LOG_ERR, "_load_registry: Exception occurred: {}".format(e))
return {}

if feature_type in self._feature_list:
try:
with open(self._file_path, 'r') as file:
lines = file.readlines()
# Update the password for given feature
lines[self._feature_list.index(feature_type)] = feature_type + ' : ' + passwd + '\n'

os.chmod(self._file_path, 0o640)
with open(self._file_path, 'w') as file:
file.writelines(lines)
os.chmod(self._file_path, 0o640)
except FileNotFoundError:
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: File {} no found".format(self._file_path))
except PermissionError:
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Read permission denied: {}".format(self._file_path))


# Read cipher pass file and return the feature specifc
# password
def __read_passwd_file(self, feature_type):
passwd = None
if feature_type == 'NA':
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Invalid feature type: {}".format(feature_type))
return passwd

if feature_type in self._feature_list:
try:
os.chmod(self._file_path, 0o644)
with open(self._file_path, "r") as file:
lines = file.readlines()
for line in lines:
if feature_type in line:
passwd = line.split(' : ')[1]
os.chmod(self._file_path, 0o640)
except FileNotFoundError:
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: File {} no found".format(self._file_path))
except PermissionError:
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Read permission denied: {}".format(self._file_path))

return passwd


def encrypt_passkey(self, feature_type, secret: str, passwd: str) -> str:
def _save_registry(self, data):
"""
Write cipher_pass.json file
"""
with open(CIPHER_PASS_FILE, 'w') as f:
json.dump(data, f, indent=2)
os.chmod(self._file_path, 0o640)

def _encrypt_passkey(self, feature_type, secret: str, passwd: str) -> str:
"""
Encrypts the plaintext using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64)
and returns the result as a hex string.
Expand All @@ -109,24 +76,16 @@ def encrypt_passkey(self, feature_type, secret: str, passwd: str) -> str:
)
encrypted_bytes = result.stdout
b64_encoded = base64.b64encode(encrypted_bytes).decode()
self.__write_passwd_file(feature_type, passwd)
return b64_encoded
return b64_encoded
except subprocess.CalledProcessError as e:
syslog.syslog(syslog.LOG_ERR, "encrypt_passkey: {} Encryption failed with ERR: {}".format((e)))
syslog.syslog(syslog.LOG_ERR, "_encrypt_passkey: {} Encryption failed with ERR: {}".format(e))
return ""


def decrypt_passkey(self, feature_type, b64_encoded: str) -> str:
def _decrypt_passkey(self, feature_type, b64_encoded: str, passwd: str) -> str:
"""
Decrypts a hex-encoded encrypted string using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64).
Returns the decrypted plaintext.
"""

passwd = self.__read_passwd_file(feature_type).strip()
if passwd is None:
syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Enpty password for {} feature type".format(feature_type))
return ""

try:
encrypted_bytes = base64.b64decode(b64_encoded)

Expand All @@ -146,39 +105,122 @@ def decrypt_passkey(self, feature_type, b64_encoded: str) -> str:
syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Decryption failed with an ERR: {}".format(e.stderr.decode()))
return ""

def register(self, feature_type, table_info):
"""
Register a table_info for a feature type.
Feature types: TACPLUS, RADIUS, LDAP etc.
"""
data = self._load_registry()
if feature_type not in data:
data[feature_type] = {"table_info": [], "password": None}
if table_info not in data[feature_type]["table_info"]:
data[feature_type]["table_info"].append(table_info)
self._save_registry(data)
syslog.syslog(syslog.LOG_INFO, "register: table_info {} attached to {} feature".format(table_info, feature_type))

def deregister(self, feature_type, table_info):
"""
Deregister (remove) a table_info string (like "TACPLUS|global") for a feature type.
If, after removal, there are no more table_info entries for that feature,
remove the respective password as well.
"""
data = self._load_registry()
if feature_type in data:
if table_info in data[feature_type]["table_info"]:
data[feature_type]["table_info"].remove(table_info)
if not data[feature_type]["table_info"]:
# No more table_info left; remove password as well
data[feature_type]["password"] = None
syslog.syslog(syslog.LOG_INFO, "deregister: No more table_info for feature {}. Password also removed.".format(feature_type))
self._save_registry(data)
syslog.syslog(syslog.LOG_INFO, "deregister: table_info {} removed from feature {}".format(table_info, feature_type))
else:
syslog.syslog(syslog.LOG_ERR, "deregister: table_info {} not found for feature {}".format(table_info, feature_type))
else:
syslog.syslog(syslog.LOG_ERR, "deregister: No table_info registered for {}".format(feature_type))

def set_feature_password(self, feature_type, password):
"""
Set a new password for a feature type.
It will not update if already exist.
"""
data = self._load_registry()
if feature_type not in data:
data[feature_type] = {"table_info": [], "password": None}
if data[feature_type]["password"] is not None:
syslog.syslog(syslog.LOG_INFO, "set_feature_password: Password already set for feature {}, not updating the new password.".format(feature_type))
syslog.syslog(syslog.LOG_INFO, "set_feature_password: Note: Make use of rotate_feature_passwd() method for updating the existing pass")
return
data[feature_type]["password"] = password
self._save_registry(data)
syslog.syslog(syslog.LOG_INFO, "set_feature_password: Password set for feature {}".format(feature_type))

# Check if the encryption is enabled
def is_key_encrypt_enabled(self, table, entry):
key = 'key_encrypt'
data = self._config_db.get_entry(table, entry)
if data:
if key in data:
return data[key]
return False

def rotate_feature_passwd(self, feature_type, new_password):
"""
For each registered table_info, extract encrypted passkey, decrypt, re-encrypt with new password, and update.
"""
data = self._load_registry()
if feature_type not in data:
syslog.syslog(syslog.LOG_ERR, "No table_info registered for {} Feature".format(feature_type))
return

def del_cipher_pass(self, feature_type):
old_password = data[feature_type]["password"]
table_infos = data[feature_type].get("table_info", [])
for table_info in table_infos:
table, entry = table_info.split("|")
db_entry = self._config_db.get_entry(table, entry)
encrypted_passkey = db_entry.get("passkey")
#Rotate only if valid passkey is present and 'key_encyrpt' flag is True
if encrypted_passkey and str(db_entry.get("key_encrypt")).lower() == 'true':
# Decrypt with old password
plain_passkey = self._decrypt_passkey(feature_type, encrypted_passkey, old_password)
# Re-encrypt with new password
new_encrypted_passkey = self._encrypt_passkey(feature_type, plain_passkey, new_password)
# Update DB
db_entry["passkey"] = new_encrypted_passkey
self._config_db.set_entry(table, entry, db_entry)
syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Updated passkey for {}".format(table_info))
else:
syslog.syslog(syslog.LOG_WARNING, "Either no passkey found or key_encrypt flag is not set to True for {}".format(table_info))

# Update stored password
data[feature_type]["password"] = new_password
self._save_registry(data)
syslog.syslog(syslog.LOG_INFO, "rotate_feature_passwd: Password for {} Feature has been updated.".format(feature_type))

def encrypt_passkey(self, feature_type, secret: str) -> str:
"""
Removes only the password for the given feature_type while keeping the file structure intact.
Encrypts the plaintext and returns the result as a hex string.
"""
try:
os.chmod(self._file_path, 0o640)
with open(self._file_path, "r") as file:
lines = file.readlines()
# Retrieve password from cipher_pass registry
data = self._load_registry()
passwd = None
if feature_type in data:
passwd = data[feature_type].get("password")
if not passwd:
raise ValueError(f"encrypt_passkey: No password set for feature {feature_type}")

updated_lines = []
for line in lines:
if line.strip().startswith(f"{feature_type} :"):
updated_lines.append(f"{feature_type} : \n") # Remove password but keep format
else:
updated_lines.append(line)
return self._encrypt_passkey(feature_type, secret, passwd)

with open(self._file_path, 'w') as file:
file.writelines(updated_lines)
os.chmod(self._file_path, 0o640)
def decrypt_passkey(self, feature_type, b64_encoded: str) -> str:
"""
Decrypts a hex-encoded encrypted string using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64).
Returns the decrypted plaintext.
"""
# Retrieve password from cipher_pass registry
data = self._load_registry()
passwd = None
if feature_type in data:
passwd = data[feature_type].get("password")
if not passwd:
raise ValueError(f"decrypt_passkey: No password set for feature {feature_type}")

syslog.syslog(syslog.LOG_INFO, "del_cipher_pass: Password for {} has been removed".format((feature_type)))
return self._decrypt_passkey(feature_type, b64_encoded, passwd)

except Exception as e:
syslog.syslog(syslog.LOG_ERR, "del_cipher_pass: {} Exception occurred: {}".format((e)))
# Check if the encryption is enabled
def is_key_encrypt_enabled(self, table, entry):
data = self._config_db.get_entry(table, entry)
if data and 'key_encrypt' in data:
return data['key_encrypt'].lower() == 'true'
return False

Loading
Loading