-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Adding support of common security cipher module for encryption and decryption of a passkey #17201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a7708da
e6bb593
f77490b
485aed8
519dc8d
cb51987
f4745b4
2b08431
8129f86
8dfacd3
c6fd0c9
547b31f
7a8c120
78d66c7
6ee2c7e
0fd2ba0
aff2df1
55c7cc7
8268021
2fbec21
95a4c10
719053b
1ab7067
5a94a62
33e88cb
f388f71
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,184 @@ | ||
| ''' | ||
| A common module for handling the encryption and | ||
| decryption of the feature passkey. It also takes | ||
| care of storing the secure cipher at root | ||
| protected file system | ||
| ''' | ||
|
|
||
| import subprocess | ||
| import threading | ||
| import syslog | ||
| import os | ||
| import base64 | ||
| from swsscommon.swsscommon import ConfigDBConnector | ||
|
|
||
| class master_key_mgr: | ||
| _instance = None | ||
| _lock = threading.Lock() | ||
| _initialized = False | ||
|
|
||
| def __new__(cls): | ||
| with cls._lock: | ||
| if cls._instance is None: | ||
| cls._instance = super(master_key_mgr, cls).__new__(cls) | ||
| cls._instance._initialized = False | ||
| return cls._instance | ||
|
|
||
| def __init__(self): | ||
| if not self._initialized: | ||
| self._file_path = "/etc/cipher_pass" | ||
| self._config_db = ConfigDBConnector() | ||
| self._config_db.connect() | ||
| # Note: Kept 1st index NA intentionally to map it with the cipher_pass file | ||
| # contents. The file has a comment at the 1st row / line | ||
| self._feature_list = ["NA", "TACPLUS", "RADIUS", "LDAP"] | ||
nmoray marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if not os.path.exists(self._file_path): | ||
nmoray marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| with open(self._file_path, 'w') as file: | ||
| file.writelines("#Auto generated file for storing the encryption passwords\n") | ||
| for feature in self._feature_list[1:]: # Skip the first "NA" entry | ||
| file.write(f"{feature} : \n") | ||
| os.chmod(self._file_path, 0o640) | ||
| self._initialized = True | ||
|
|
||
| # Write cipher_pass file | ||
| def __write_passwd_file(self, feature_type, passwd): | ||
| if feature_type == 'NA': | ||
| syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Invalid feature type: {}".format(feature_type)) | ||
| return | ||
|
|
||
| if feature_type in self._feature_list: | ||
| try: | ||
| with open(self._file_path, 'r') as file: | ||
| lines = file.readlines() | ||
| # Update the password for given feature | ||
| lines[self._feature_list.index(feature_type)] = feature_type + ' : ' + passwd + '\n' | ||
|
|
||
| os.chmod(self._file_path, 0o640) | ||
| with open(self._file_path, 'w') as file: | ||
| file.writelines(lines) | ||
nmoray marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| os.chmod(self._file_path, 0o640) | ||
| except FileNotFoundError: | ||
| syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: File {} no found".format(self._file_path)) | ||
| except PermissionError: | ||
| syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Read permission denied: {}".format(self._file_path)) | ||
|
|
||
|
|
||
| # Read cipher pass file and return the feature specifc | ||
nmoray marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # password | ||
| def __read_passwd_file(self, feature_type): | ||
| passwd = None | ||
| if feature_type == 'NA': | ||
nmoray marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Invalid feature type: {}".format(feature_type)) | ||
| return passwd | ||
|
|
||
| if feature_type in self._feature_list: | ||
| try: | ||
| os.chmod(self._file_path, 0o644) | ||
nmoray marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| with open(self._file_path, "r") as file: | ||
| lines = file.readlines() | ||
| for line in lines: | ||
| if feature_type in line: | ||
| passwd = line.split(' : ')[1] | ||
| os.chmod(self._file_path, 0o640) | ||
| except FileNotFoundError: | ||
| syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: File {} no found".format(self._file_path)) | ||
| except PermissionError: | ||
| syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Read permission denied: {}".format(self._file_path)) | ||
|
|
||
| return passwd | ||
|
|
||
|
|
||
| def encrypt_passkey(self, feature_type, secret: str, passwd: str) -> str: | ||
| """ | ||
| Encrypts the plaintext using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64) | ||
| and returns the result as a hex string. | ||
| """ | ||
| cmd = [ | ||
| "openssl", "enc", "-aes-128-cbc", "-salt", "-pbkdf2", | ||
| "-pass", f"pass:{passwd}" | ||
| ] | ||
| try: | ||
| result = subprocess.run( | ||
| cmd, | ||
| input=secret.encode(), | ||
| stdout=subprocess.PIPE, | ||
| stderr=subprocess.PIPE, | ||
| check=True | ||
| ) | ||
| encrypted_bytes = result.stdout | ||
| b64_encoded = base64.b64encode(encrypted_bytes).decode() | ||
| self.__write_passwd_file(feature_type, passwd) | ||
| return b64_encoded | ||
| except subprocess.CalledProcessError as e: | ||
| syslog.syslog(syslog.LOG_ERR, "encrypt_passkey: {} Encryption failed with ERR: {}".format((e))) | ||
| return "" | ||
|
|
||
|
|
||
| def decrypt_passkey(self, feature_type, b64_encoded: str) -> str: | ||
| """ | ||
| Decrypts a hex-encoded encrypted string using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64). | ||
| Returns the decrypted plaintext. | ||
| """ | ||
|
|
||
| passwd = self.__read_passwd_file(feature_type).strip() | ||
| if passwd is None: | ||
| syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Enpty password for {} feature type".format(feature_type)) | ||
| return "" | ||
|
|
||
| try: | ||
| encrypted_bytes = base64.b64decode(b64_encoded) | ||
|
|
||
| cmd = [ | ||
| "openssl", "enc", "-aes-128-cbc", "-d", "-salt", "-pbkdf2", | ||
| "-pass", f"pass:{passwd}" | ||
| ] | ||
| result = subprocess.run( | ||
| cmd, | ||
| input=encrypted_bytes, | ||
| stdout=subprocess.PIPE, | ||
| stderr=subprocess.PIPE, | ||
| check=True | ||
| ) | ||
| return result.stdout.decode().strip() | ||
| except subprocess.CalledProcessError as e: | ||
| syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Decryption failed with an ERR: {}".format(e.stderr.decode())) | ||
| return "" | ||
|
|
||
|
|
||
| # Check if the encryption is enabled | ||
| def is_key_encrypt_enabled(self, table, entry): | ||
| key = 'key_encrypt' | ||
| data = self._config_db.get_entry(table, entry) | ||
| if data: | ||
| if key in data: | ||
| return data[key] | ||
nmoray marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return False | ||
|
|
||
|
|
||
| def del_cipher_pass(self, feature_type): | ||
| """ | ||
| Removes only the password for the given feature_type while keeping the file structure intact. | ||
| """ | ||
| try: | ||
| os.chmod(self._file_path, 0o640) | ||
| with open(self._file_path, "r") as file: | ||
| lines = file.readlines() | ||
|
|
||
| updated_lines = [] | ||
| for line in lines: | ||
| if line.strip().startswith(f"{feature_type} :"): | ||
| updated_lines.append(f"{feature_type} : \n") # Remove password but keep format | ||
| else: | ||
| updated_lines.append(line) | ||
|
|
||
| with open(self._file_path, 'w') as file: | ||
| file.writelines(updated_lines) | ||
| os.chmod(self._file_path, 0o640) | ||
|
|
||
| syslog.syslog(syslog.LOG_INFO, "del_cipher_pass: Password for {} has been removed".format((feature_type))) | ||
|
|
||
| except Exception as e: | ||
| syslog.syslog(syslog.LOG_ERR, "del_cipher_pass: {} Exception occurred: {}".format((e))) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| import sys | ||
|
|
||
| if sys.version_info.major == 3: | ||
| from unittest import mock | ||
| else: | ||
| import mock | ||
|
|
||
| import pytest | ||
| from sonic_py_common.security_cipher import master_key_mgr | ||
| from .mock_swsscommon import ConfigDBConnector | ||
|
|
||
| # TODO: Remove this if/else block once we no longer support Python 2 | ||
| if sys.version_info.major == 3: | ||
| BUILTINS = "builtins" | ||
| else: | ||
| BUILTINS = "__builtin__" | ||
|
|
||
| DEFAULT_FILE = [ | ||
| "#Auto generated file for storing the encryption passwords", | ||
| "TACPLUS : ", | ||
| "RADIUS : ", | ||
| "LDAP :" | ||
| ] | ||
|
|
||
| UPDATED_FILE = [ | ||
| "#Auto generated file for storing the encryption passwords", | ||
| "TACPLUS : ", | ||
| "RADIUS : TEST2", | ||
| "LDAP :" | ||
| ] | ||
|
|
||
|
|
||
| class TestSecurityCipher(object): | ||
nmoray marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def test_passkey_encryption(self): | ||
| with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ | ||
| mock.patch("os.chmod") as mock_chmod, \ | ||
| mock.patch("{}.open".format(BUILTINS),mock.mock_open()) as mock_file: | ||
| temp = master_key_mgr() | ||
|
|
||
| # Use patch to replace the built-in 'open' function with a mock | ||
| with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \ | ||
| mock.patch("os.chmod") as mock_chmod: | ||
| mock_fd = mock.MagicMock() | ||
| mock_fd.readlines = mock.MagicMock(return_value=DEFAULT_FILE) | ||
| mock_file.return_value.__enter__.return_value = mock_fd | ||
| encrypt = temp.encrypt_passkey("TACPLUS", "passkey1", "TEST1") | ||
| assert encrypt != "passkey1" | ||
|
|
||
| def test_passkey_decryption(self): | ||
| with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \ | ||
| mock.patch("os.chmod") as mock_chmod, \ | ||
| mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file: | ||
| temp = master_key_mgr() | ||
|
|
||
| # Use patch to replace the built-in 'open' function with a mock | ||
| with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \ | ||
| mock.patch("os.chmod") as mock_chmod: | ||
| mock_fd = mock.MagicMock() | ||
| mock_fd.readlines = mock.MagicMock(return_value=DEFAULT_FILE) | ||
| mock_file.return_value.__enter__.return_value = mock_fd | ||
| encrypt = temp.encrypt_passkey("RADIUS", "passkey2", "TEST2") | ||
|
|
||
| # Use patch to replace the built-in 'open' function with a mock | ||
| with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \ | ||
| mock.patch("os.chmod") as mock_chmod: | ||
| mock_fd = mock.MagicMock() | ||
| mock_fd.readlines = mock.MagicMock(return_value=UPDATED_FILE) | ||
| mock_file.return_value.__enter__.return_value = mock_fd | ||
| decrypt = temp.decrypt_passkey("RADIUS", encrypt) | ||
| assert decrypt == "passkey2" | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,7 +93,7 @@ module sonic-system-tacacs { | |
|
|
||
| leaf passkey { | ||
nmoray marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| type string { | ||
| length "1..65"; | ||
| length "1..256"; | ||
| pattern "[^ #,]*" { | ||
| error-message 'TACACS shared secret (Valid chars are ASCII printable except SPACE, "#", and ",")'; | ||
| } | ||
|
|
@@ -131,9 +131,14 @@ module sonic-system-tacacs { | |
| default 5; | ||
| } | ||
|
|
||
| leaf key_encrypt { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mix tabs with spaces, please reformat this "leaf key_encrypt" section.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean, replace the tabs with spaces for the entire leaf section?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @qiluo-msft The link that you shared is not accessible.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed the link. Yes, I mean to replace the tabs with spaces for the entire leaf section
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will do as a part of new PR. |
||
| type boolean; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. I will raise a new PR to accomodate this change. |
||
| description "Indicates if the passkey is encrypted."; | ||
| } | ||
|
|
||
| leaf passkey { | ||
| type string { | ||
| length "1..65"; | ||
| length "1..256"; | ||
| pattern "[^ #,]*" { | ||
| error-message 'TACACS shared secret (Valid chars are ASCII printable except SPACE, "#", and ",")'; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you boot into a new version, the /etc/config_db.json will be carried over, but this file will be lost. Then you have only encrypted passkey and lost cipher_pass. How does the new version survive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@qiluo-msft As per https://github.com/sonic-net/SONiC/blob/master/doc/ztp/SONiC-config-setup.md#222-config-migration
the backup and restore of cipher_pass file will automatically be taken care, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the link! The file "/etc/cipher_pass" is outside the folder of config migration design, could you check again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yes. Okay then, I will create another PR to add pre-hook and post-hook scripts to backup and restore the cipher_pass file during the up-gradation. Is that okay? Or just change the location to "/etc/sonic"?