Skip to content

Commit 3ac1937

Browse files
authored
Improve SSL mangement: (#127)
- Support passing private key and certificate files as strings - Change edge/ssl module to use crypto services - Update edge/ssl UT
1 parent d0e7f05 commit 3ac1937

File tree

5 files changed

+123
-61
lines changed

5 files changed

+123
-61
lines changed

cterasdk/core/ssl.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from zipfile import ZipFile
33

44
from .base_command import BaseCommand
5-
from ..lib import FileSystem, X509Certificate, TempfileServices, create_certificate_chain
5+
from ..lib import FileSystem, X509Certificate, PrivateKey, TempfileServices, create_certificate_chain
66

77

88
class SSL(BaseCommand):
@@ -50,23 +50,25 @@ def create_zip_archive(self, private_key, *certificates):
5050
"""
5151
Create a ZIP archive that can be imported to CTERA Portal
5252
53-
:param str private_key: A path to the PEM-encoded private key file
54-
:param list[str] certificates: A list of paths of the PEM-encoded certificate files
53+
:param str private_key: The PEM-encoded private key, or a path to the PEM-encoded private key file
54+
:param list[str] certificates: The PEM-encoded certificates, or a list of paths of the PEM-encoded certificate files
5555
"""
5656
tempdir = TempfileServices.mkdir()
5757

5858
key_basename = 'private.key'
59-
private_keyfile = self._filesystem.copyfile(private_key, FileSystem.join(tempdir, key_basename))
59+
key_object = PrivateKey.load_private_key(private_key)
60+
key_filepath = FileSystem.join(tempdir, key_basename)
61+
self._filesystem.write(key_filepath, key_object.pem_data)
6062

6163
cert_basename = 'certificate'
62-
certificates = [X509Certificate.from_file(certificate) for certificate in certificates]
64+
certificates = [X509Certificate.load_certificate(certificate) for certificate in certificates]
6365
certificate_chain = create_certificate_chain(*certificates)
6466

6567
certificate_chain_zip_archive = None
6668
if certificate_chain:
6769
certificate_chain_zip_archive = FileSystem.join(tempdir, '{}.zip'.format(cert_basename))
6870
with ZipFile(certificate_chain_zip_archive, 'w') as zip_archive:
69-
zip_archive.write(private_keyfile, key_basename)
71+
zip_archive.write(key_filepath, key_basename)
7072
for idx, certificate in enumerate(certificate_chain):
7173
filename = '{}{}.crt'.format(cert_basename, idx if idx > 0 else '')
7274
filepath = FileSystem.join(tempdir, filename)
@@ -87,8 +89,8 @@ def import_from_chain(self, private_key, *certificates):
8789
"""
8890
Import an SSL Certificate to CTERA Portal from a chain
8991
90-
:param str private_key: A path to the PEM-encoded private key file
91-
:param list[str] certificates: A list of paths to the PEM-encoded certificates
92+
:param str private_key: The PEM-encoded private key, or a path to the PEM-encoded private key file
93+
:param list[str] certificates: The PEM-encoded certificates, or a list of paths of the PEM-encoded certificate files
9294
"""
9395
zipflie = self.create_zip_archive(private_key, *certificates)
9496
return self.import_from_zip(zipflie)

cterasdk/edge/ssl.py

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
import logging
22

33
from .base_command import BaseCommand
4-
from ..lib import FileSystem
4+
from ..lib import X509Certificate, PrivateKey, create_certificate_chain
55
from ..common import Object
66

77

88
class SSL(BaseCommand):
9-
""" Gateway SSL APIs """
10-
11-
BEGIN_PEM = '-----BEGIN'
9+
""" Edge Filer SSL APIs """
1210

1311
def enable_http(self):
1412
"""
@@ -52,48 +50,34 @@ def get_storage_ca(self):
5250
"""
5351
return self._gateway.get('/status/extStorageTrustedCA')
5452

55-
def set_storage_ca(self, certificate):
53+
def import_storage_ca(self, certificate):
5654
"""
57-
Set object storage trusted CA certificate
55+
Import the object storage trusted CA certificate
5856
5957
:param str certificate: The PEM-encoded certificate or a path to the PEM-encoded server certificate file
6058
"""
6159
logging.getLogger().info('Setting trusted object storage CA certificate')
6260
param = Object()
6361
param._classname = 'ExtTrustedCA' # pylint: disable=protected-access
64-
param.certificate = SSL._obtain_secret(certificate)
65-
return self._gateway.put('/config/extStorageTrustedCA', param)
62+
param.certificate = X509Certificate.load_certificate(certificate).pem_data.decode('utf-8')
63+
logging.getLogger().info("Uploading object storage certificate.")
64+
response = self._gateway.put('/config/extStorageTrustedCA', param)
65+
logging.getLogger().info("Uploaded object storage certificate.")
66+
return response
6667

67-
def set_certificate(self, private_key, *certificates):
68+
def import_certificate(self, private_key, *certificates):
6869
"""
69-
Set the Edge Filer's web server's certificate.
70+
Import the Edge Filer's web server's SSL certificate
7071
71-
:param str private_key: The PEM-encoded private key or a path to the PEM-encoded private key file
72-
:param list[str] certificates: The PEM-encoded certificates or a path to the PEM-encoded certificates
72+
:param str private_key: The PEM-encoded private key, or a path to the PEM-encoded private key file
73+
:param list[str] certificates: The PEM-encoded certificates, or a list of paths to the PEM-encoded certificates
7374
"""
74-
logging.getLogger().debug('Loading private key')
75-
certificate_chain = [SSL._obtain_secret(private_key)]
76-
logging.getLogger().debug('Loading certificates')
77-
certificate_chain = certificate_chain + [SSL._obtain_secret(certificate) for certificate in certificates]
7875

79-
logging.getLogger().info("Uploading certificate chain")
80-
server_certificate = '\n' + '\n'.join(certificate_chain).replace('\n\n', '\n')
81-
response = self._gateway.put('/config/certificate', server_certificate)
82-
logging.getLogger().info("Uploaded certificate chain")
76+
key_object = PrivateKey.load_private_key(private_key)
77+
certificates = [X509Certificate.load_certificate(certificate) for certificate in certificates]
78+
certificate_chain = [certificate.pem_data.decode('utf-8') for certificate in create_certificate_chain(*certificates)]
79+
server_certificate = ''.join([key_object.pem_data.decode('utf-8')] + certificate_chain)
80+
logging.getLogger().info("Uploading SSL certificate.")
81+
response = self._gateway.put('/config/certificate', "\n{}".format(server_certificate))
82+
logging.getLogger().info("Uploaded SSL certificate.")
8383
return response
84-
85-
@staticmethod
86-
def _obtain_secret(secret):
87-
if not secret.startswith(SSL.BEGIN_PEM):
88-
file_info, secret = SSL._file_contents(secret)
89-
logging.getLogger().debug(
90-
"Reading file. %s", {'name': file_info['name'], 'size': file_info['size'], 'type': file_info['mimetype']}
91-
)
92-
return secret
93-
94-
@staticmethod
95-
def _file_contents(filepath):
96-
file_info = FileSystem.instance().get_local_file_info(filepath)
97-
with open(filepath, 'r') as f:
98-
file_content = f.read()
99-
return (file_info, file_content)

cterasdk/lib/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66
from .file_access_base import FileAccessBase # noqa: E402, F401
77
from .filesystem import FileSystem # noqa: E402, F401
88
from .tracker import track, ErrorStatus # noqa: E402, F401
9-
from .crypto import CryptoServices, X509Certificate, create_certificate_chain # noqa: E402, F401
9+
from .crypto import CryptoServices, X509Certificate, PrivateKey, create_certificate_chain # noqa: E402, F401

cterasdk/lib/crypto.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
from cryptography import x509
77
from cryptography.hazmat.primitives import hashes
88
from cryptography.hazmat.primitives.asymmetric import rsa
9-
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, PublicFormat, NoEncryption
9+
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, PublicFormat, NoEncryption, load_pem_private_key
1010

1111
from .. import config
12+
from ..exception import LocalFileNotFound
1213
from .filesystem import FileSystem
1314

1415

@@ -68,6 +69,41 @@ def compare_certificates(a, b):
6869
return 0
6970

7071

72+
class PrivateKey:
73+
74+
def __init__(self, private_key):
75+
self.private_key = private_key
76+
77+
@property
78+
def pem_data(self):
79+
return self.private_key.private_bytes(Encoding.PEM, PrivateFormat.TraditionalOpenSSL, NoEncryption())
80+
81+
@staticmethod
82+
def from_file(path, password=None):
83+
with open(path, 'r') as f:
84+
data = f.read()
85+
return PrivateKey.from_string(data, password)
86+
87+
@staticmethod
88+
def from_string(data, password=None):
89+
return PrivateKey.from_bytes(data.encode('utf-8'), password)
90+
91+
@staticmethod
92+
def from_bytes(data, password=None):
93+
return PrivateKey(load_pem_private_key(data, password))
94+
95+
@staticmethod
96+
def load_private_key(key, password=None):
97+
if isinstance(key, bytes):
98+
return PrivateKey.from_bytes(key, password)
99+
100+
try:
101+
FileSystem.instance().get_local_file_info(key)
102+
return PrivateKey.from_file(key, password)
103+
except LocalFileNotFound:
104+
return PrivateKey.from_string(key, password)
105+
106+
71107
class X509Certificate:
72108

73109
def __init__(self, certificate):
@@ -112,6 +148,17 @@ def from_string(data):
112148
def from_bytes(data):
113149
return X509Certificate(x509.load_pem_x509_certificate(data))
114150

151+
@staticmethod
152+
def load_certificate(cert):
153+
if isinstance(cert, bytes):
154+
return X509Certificate.from_bytes(cert)
155+
156+
try:
157+
FileSystem.instance().get_local_file_info(cert)
158+
return X509Certificate.from_file(cert)
159+
except LocalFileNotFound:
160+
return X509Certificate.from_string(cert)
161+
115162
def __str__(self):
116163
return str(
117164
dict(

tests/ut/test_edge_ssl.py

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@ class TestEdgeSSL(base_edge.BaseEdgeTest):
99

1010
def setUp(self):
1111
super().setUp()
12-
self._private_key = './certs/private.key'
12+
self._key_filepath = './certs/private.key'
1313
self._domain_cert = './certs/certificate.crt'
14-
self._intermediate = './certs/intermediate.crt'
15-
self._ca = './certs/ca.crt'
16-
self._certificate = ssl.SSL.BEGIN_PEM
14+
self._intermediate_cert = './certs/intermediate.crt'
15+
self._root_cert = './certs/ca.crt'
16+
17+
self._private_key_contents = 'private_key'
18+
self._certificate_contents = 'certificate'
1719

1820
def test_is_http_disabled_true(self):
1921
get_response = True
@@ -53,14 +55,30 @@ def test_disable_http(self):
5355
ssl.SSL(self._filer).disable_http()
5456
self._filer.put.assert_called_once_with('/config/fileservices/webdav/forceHttps', True)
5557

56-
def test_set_certificate(self):
57-
data = 'data\n'
58-
self._init_filer()
59-
self.patch_call("cterasdk.edge.ssl.FileSystem.get_local_file_info")
60-
mock_open = mock.mock_open(read_data=data)
61-
with mock.patch("builtins.open", mock_open):
62-
ssl.SSL(self._filer).set_certificate(self._private_key, self._domain_cert, self._intermediate, self._ca)
63-
self._filer.put.assert_called_once_with('/config/certificate', '\n' + data * 4)
58+
def test_import_certificate(self):
59+
put_response = 'Success'
60+
self._init_filer(put_response=put_response)
61+
mock_load_private_key = self.patch_call("cterasdk.lib.crypto.PrivateKey.load_private_key")
62+
mock_load_private_key.return_value = TestEdgeSSL._get_secret(self._private_key_contents)
63+
mock_load_certificate = self.patch_call("cterasdk.lib.crypto.X509Certificate.load_certificate")
64+
mock_load_certificate.return_value = TestEdgeSSL._get_secret(self._certificate_contents)
65+
ret = ssl.SSL(self._filer).import_certificate(self._key_filepath, self._domain_cert, self._intermediate_cert, self._root_cert)
66+
mock_load_private_key.assert_called_once_with(self._key_filepath)
67+
mock_load_certificate.assert_has_calls(
68+
[
69+
mock.call(self._domain_cert),
70+
mock.call(self._intermediate_cert),
71+
mock.call(self._root_cert),
72+
]
73+
)
74+
expected_param = ''.join([
75+
self._private_key_contents,
76+
self._certificate_contents,
77+
self._certificate_contents,
78+
self._certificate_contents
79+
])
80+
self._filer.put.assert_called_once_with('/config/certificate', '\n{}'.format(expected_param))
81+
self.assertEqual(ret, put_response)
6482

6583
def test_get_storage_ca(self):
6684
get_response = 'Success'
@@ -69,13 +87,16 @@ def test_get_storage_ca(self):
6987
self._filer.get.assert_called_once_with('/status/extStorageTrustedCA')
7088
self.assertEqual(ret, get_response)
7189

72-
def test_set_storage_ca(self):
90+
def test_import_storage_ca(self):
7391
put_response = 'Success'
7492
self._init_filer(put_response=put_response)
75-
ret = ssl.SSL(self._filer).set_storage_ca(self._certificate)
93+
mock_load_certificate = self.patch_call("cterasdk.lib.crypto.X509Certificate.load_certificate")
94+
mock_load_certificate.return_value = TestEdgeSSL._get_secret(self._certificate_contents)
95+
ret = ssl.SSL(self._filer).import_storage_ca(self._domain_cert)
7696
expected_param = Object()
7797
expected_param._classname = 'ExtTrustedCA' # pylint: disable=protected-access
78-
expected_param.certificate = self._certificate
98+
expected_param.certificate = self._certificate_contents
99+
mock_load_certificate.assert_called_once_with(self._domain_cert)
79100
self._filer.put.assert_called_once_with('/config/extStorageTrustedCA', mock.ANY)
80101
actual_param = self._filer.put.call_args[0][1]
81102
self._assert_equal_objects(actual_param, expected_param)
@@ -86,3 +107,11 @@ def test_remove_storage_ca(self):
86107
self._init_filer(put_response=put_response)
87108
ssl.SSL(self._filer).remove_storage_ca()
88109
self._filer.put.assert_called_once_with('/config/extStorageTrustedCA', None)
110+
111+
@staticmethod
112+
def _get_secret(secret):
113+
param = Object()
114+
param.pem_data = secret.encode('utf-8')
115+
param.subject = 'subject'
116+
param.issuer = 'issuer'
117+
return param

0 commit comments

Comments
 (0)