Skip to content

Commit 47a7573

Browse files
nmoraylotus-nexthop
authored andcommitted
Adding support of common security cipher module for encryption and decryption of a passkey (sonic-net#17201)
Why I did it This module is created to handle the passkey encryption, decryption and the cipher storage. It's a common module which will be used for feature like TACACS, RADIUS, LDAP etc. This implementation is aligned with HLD How I did it This module will expose following APIs. Passkey encryption for a given feature Passkey decryption for a given feature Check whether encryption feature is enabled for a given feature How to verify it Cipher / password used for the encryption: root@sonic:/tmp# cat /etc/cipher_pass #Auto generated file for storing the encryption passwords TACPLUS : TEST1 RADIUS : TEST2 LDAP : TEST3 Is encryption enabled for TACACS: False Encrypted passkey for Feature: TACPLUS - U2FsdGVkX1/frdwl4GGD7bTKyzLi+lr2K76v0IECzkQ= Passkey post decryption:TACPLUS - passkey1 Encrypted passkey for Feature: RADIUS - U2FsdGVkX1/fdiBo3RWWxIIPFJYCy1CF/ZQeLt8N96Q= Passkey post decryption: RADIUS - passkey2 Encrypted passkey for Feature: LDAP - U2FsdGVkX1/o0UkHtWgOjr46UzLQRhXKAHngctey9TE= Passkey post decryption: LDAP - passkey3
1 parent dea235b commit 47a7573

File tree

4 files changed

+275
-2
lines changed

4 files changed

+275
-2
lines changed
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
'''
2+
3+
A common module for handling the encryption and
4+
decryption of the feature passkey. It also takes
5+
care of storing the secure cipher at root
6+
protected file system
7+
8+
'''
9+
10+
import subprocess
11+
import threading
12+
import syslog
13+
import os
14+
import base64
15+
from swsscommon.swsscommon import ConfigDBConnector
16+
17+
class master_key_mgr:
18+
_instance = None
19+
_lock = threading.Lock()
20+
_initialized = False
21+
22+
def __new__(cls):
23+
with cls._lock:
24+
if cls._instance is None:
25+
cls._instance = super(master_key_mgr, cls).__new__(cls)
26+
cls._instance._initialized = False
27+
return cls._instance
28+
29+
def __init__(self):
30+
if not self._initialized:
31+
self._file_path = "/etc/cipher_pass"
32+
self._config_db = ConfigDBConnector()
33+
self._config_db.connect()
34+
# Note: Kept 1st index NA intentionally to map it with the cipher_pass file
35+
# contents. The file has a comment at the 1st row / line
36+
self._feature_list = ["NA", "TACPLUS", "RADIUS", "LDAP"]
37+
if not os.path.exists(self._file_path):
38+
with open(self._file_path, 'w') as file:
39+
file.writelines("#Auto generated file for storing the encryption passwords\n")
40+
for feature in self._feature_list[1:]: # Skip the first "NA" entry
41+
file.write(f"{feature} : \n")
42+
os.chmod(self._file_path, 0o640)
43+
self._initialized = True
44+
45+
# Write cipher_pass file
46+
def __write_passwd_file(self, feature_type, passwd):
47+
if feature_type == 'NA':
48+
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Invalid feature type: {}".format(feature_type))
49+
return
50+
51+
if feature_type in self._feature_list:
52+
try:
53+
with open(self._file_path, 'r') as file:
54+
lines = file.readlines()
55+
# Update the password for given feature
56+
lines[self._feature_list.index(feature_type)] = feature_type + ' : ' + passwd + '\n'
57+
58+
os.chmod(self._file_path, 0o640)
59+
with open(self._file_path, 'w') as file:
60+
file.writelines(lines)
61+
os.chmod(self._file_path, 0o640)
62+
except FileNotFoundError:
63+
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: File {} no found".format(self._file_path))
64+
except PermissionError:
65+
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Read permission denied: {}".format(self._file_path))
66+
67+
68+
# Read cipher pass file and return the feature specifc
69+
# password
70+
def __read_passwd_file(self, feature_type):
71+
passwd = None
72+
if feature_type == 'NA':
73+
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Invalid feature type: {}".format(feature_type))
74+
return passwd
75+
76+
if feature_type in self._feature_list:
77+
try:
78+
os.chmod(self._file_path, 0o644)
79+
with open(self._file_path, "r") as file:
80+
lines = file.readlines()
81+
for line in lines:
82+
if feature_type in line:
83+
passwd = line.split(' : ')[1]
84+
os.chmod(self._file_path, 0o640)
85+
except FileNotFoundError:
86+
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: File {} no found".format(self._file_path))
87+
except PermissionError:
88+
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Read permission denied: {}".format(self._file_path))
89+
90+
return passwd
91+
92+
93+
def encrypt_passkey(self, feature_type, secret: str, passwd: str) -> str:
94+
"""
95+
Encrypts the plaintext using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64)
96+
and returns the result as a hex string.
97+
"""
98+
cmd = [
99+
"openssl", "enc", "-aes-128-cbc", "-salt", "-pbkdf2",
100+
"-pass", f"pass:{passwd}"
101+
]
102+
try:
103+
result = subprocess.run(
104+
cmd,
105+
input=secret.encode(),
106+
stdout=subprocess.PIPE,
107+
stderr=subprocess.PIPE,
108+
check=True
109+
)
110+
encrypted_bytes = result.stdout
111+
b64_encoded = base64.b64encode(encrypted_bytes).decode()
112+
self.__write_passwd_file(feature_type, passwd)
113+
return b64_encoded
114+
except subprocess.CalledProcessError as e:
115+
syslog.syslog(syslog.LOG_ERR, "encrypt_passkey: {} Encryption failed with ERR: {}".format((e)))
116+
return ""
117+
118+
119+
def decrypt_passkey(self, feature_type, b64_encoded: str) -> str:
120+
"""
121+
Decrypts a hex-encoded encrypted string using OpenSSL (AES-128-CBC, with salt and pbkdf2, no base64).
122+
Returns the decrypted plaintext.
123+
"""
124+
125+
passwd = self.__read_passwd_file(feature_type).strip()
126+
if passwd is None:
127+
syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Enpty password for {} feature type".format(feature_type))
128+
return ""
129+
130+
try:
131+
encrypted_bytes = base64.b64decode(b64_encoded)
132+
133+
cmd = [
134+
"openssl", "enc", "-aes-128-cbc", "-d", "-salt", "-pbkdf2",
135+
"-pass", f"pass:{passwd}"
136+
]
137+
result = subprocess.run(
138+
cmd,
139+
input=encrypted_bytes,
140+
stdout=subprocess.PIPE,
141+
stderr=subprocess.PIPE,
142+
check=True
143+
)
144+
return result.stdout.decode().strip()
145+
except subprocess.CalledProcessError as e:
146+
syslog.syslog(syslog.LOG_ERR, "decrypt_passkey: Decryption failed with an ERR: {}".format(e.stderr.decode()))
147+
return ""
148+
149+
150+
# Check if the encryption is enabled
151+
def is_key_encrypt_enabled(self, table, entry):
152+
key = 'key_encrypt'
153+
data = self._config_db.get_entry(table, entry)
154+
if data:
155+
if key in data:
156+
return data[key]
157+
return False
158+
159+
160+
def del_cipher_pass(self, feature_type):
161+
"""
162+
Removes only the password for the given feature_type while keeping the file structure intact.
163+
"""
164+
try:
165+
os.chmod(self._file_path, 0o640)
166+
with open(self._file_path, "r") as file:
167+
lines = file.readlines()
168+
169+
updated_lines = []
170+
for line in lines:
171+
if line.strip().startswith(f"{feature_type} :"):
172+
updated_lines.append(f"{feature_type} : \n") # Remove password but keep format
173+
else:
174+
updated_lines.append(line)
175+
176+
with open(self._file_path, 'w') as file:
177+
file.writelines(updated_lines)
178+
os.chmod(self._file_path, 0o640)
179+
180+
syslog.syslog(syslog.LOG_INFO, "del_cipher_pass: Password for {} has been removed".format((feature_type)))
181+
182+
except Exception as e:
183+
syslog.syslog(syslog.LOG_ERR, "del_cipher_pass: {} Exception occurred: {}".format((e)))
184+

src/sonic-py-common/tests/mock_swsscommon.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,15 @@ def connect(self, db):
1414

1515
def get(self, db, table, field):
1616
return self.data.get(field, "N/A")
17+
18+
19+
class ConfigDBConnector:
20+
def __init__(self):
21+
self.CONFIG_DB = 'CONFIG_DB'
22+
self.data = {"key_encrypt": "True"}
23+
24+
def connect(self):
25+
pass
26+
27+
def get(self, db, table, field):
28+
return self.data.get(field, "N/A")
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import sys
2+
3+
if sys.version_info.major == 3:
4+
from unittest import mock
5+
else:
6+
import mock
7+
8+
import pytest
9+
from sonic_py_common.security_cipher import master_key_mgr
10+
from .mock_swsscommon import ConfigDBConnector
11+
12+
# TODO: Remove this if/else block once we no longer support Python 2
13+
if sys.version_info.major == 3:
14+
BUILTINS = "builtins"
15+
else:
16+
BUILTINS = "__builtin__"
17+
18+
DEFAULT_FILE = [
19+
"#Auto generated file for storing the encryption passwords",
20+
"TACPLUS : ",
21+
"RADIUS : ",
22+
"LDAP :"
23+
]
24+
25+
UPDATED_FILE = [
26+
"#Auto generated file for storing the encryption passwords",
27+
"TACPLUS : ",
28+
"RADIUS : TEST2",
29+
"LDAP :"
30+
]
31+
32+
33+
class TestSecurityCipher(object):
34+
def test_passkey_encryption(self):
35+
with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \
36+
mock.patch("os.chmod") as mock_chmod, \
37+
mock.patch("{}.open".format(BUILTINS),mock.mock_open()) as mock_file:
38+
temp = master_key_mgr()
39+
40+
# Use patch to replace the built-in 'open' function with a mock
41+
with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \
42+
mock.patch("os.chmod") as mock_chmod:
43+
mock_fd = mock.MagicMock()
44+
mock_fd.readlines = mock.MagicMock(return_value=DEFAULT_FILE)
45+
mock_file.return_value.__enter__.return_value = mock_fd
46+
encrypt = temp.encrypt_passkey("TACPLUS", "passkey1", "TEST1")
47+
assert encrypt != "passkey1"
48+
49+
def test_passkey_decryption(self):
50+
with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \
51+
mock.patch("os.chmod") as mock_chmod, \
52+
mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file:
53+
temp = master_key_mgr()
54+
55+
# Use patch to replace the built-in 'open' function with a mock
56+
with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \
57+
mock.patch("os.chmod") as mock_chmod:
58+
mock_fd = mock.MagicMock()
59+
mock_fd.readlines = mock.MagicMock(return_value=DEFAULT_FILE)
60+
mock_file.return_value.__enter__.return_value = mock_fd
61+
encrypt = temp.encrypt_passkey("RADIUS", "passkey2", "TEST2")
62+
63+
# Use patch to replace the built-in 'open' function with a mock
64+
with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \
65+
mock.patch("os.chmod") as mock_chmod:
66+
mock_fd = mock.MagicMock()
67+
mock_fd.readlines = mock.MagicMock(return_value=UPDATED_FILE)
68+
mock_file.return_value.__enter__.return_value = mock_fd
69+
decrypt = temp.decrypt_passkey("RADIUS", encrypt)
70+
assert decrypt == "passkey2"
71+
72+

src/sonic-yang-models/yang-models/sonic-system-tacacs.yang

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ module sonic-system-tacacs {
9393

9494
leaf passkey {
9595
type string {
96-
length "1..65";
96+
length "1..256";
9797
pattern "[^ #,]*" {
9898
error-message 'TACACS shared secret (Valid chars are ASCII printable except SPACE, "#", and ",")';
9999
}
@@ -131,9 +131,14 @@ module sonic-system-tacacs {
131131
default 5;
132132
}
133133

134+
leaf key_encrypt {
135+
type boolean;
136+
description "Indicates if the passkey is encrypted.";
137+
}
138+
134139
leaf passkey {
135140
type string {
136-
length "1..65";
141+
length "1..256";
137142
pattern "[^ #,]*" {
138143
error-message 'TACACS shared secret (Valid chars are ASCII printable except SPACE, "#", and ",")';
139144
}

0 commit comments

Comments
 (0)