Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 17 additions & 19 deletions test/integration/smoke/test_certauthority_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,7 @@
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from OpenSSL.crypto import FILETYPE_PEM, verify, X509

PUBKEY_VERIFY=True
try:
from OpenSSL.crypto import load_publickey
except ImportError:
PUBKEY_VERIFY=False
from cryptography.hazmat.primitives.asymmetric import padding


class TestCARootProvider(cloudstackTestCase):
Expand All @@ -52,6 +46,20 @@ def tearDownClass(cls):
raise Exception("Warning: Exception during cleanup : %s" % e)


def verifySignature(self, caCert, cert):
print("Verifying Certificate")
caPublicKey = caCert.public_key()
try:
caPublicKey.verify(
cert.signature,
cert.tbs_certificate_bytes,
padding.PKCS1v15(),
cert.signature_hash_algorithm,
)
print("Certificate is valid!")
except Exception as e:
print(f"Certificate verification failed: {e}")

def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
Expand Down Expand Up @@ -136,13 +144,8 @@ def test_issue_certificate_without_csr(self):
self.assertTrue(address in [str(x) for x in altNames.value.get_values_for_type(x509.IPAddress)])

# Validate certificate against CA public key
global PUBKEY_VERIFY
if not PUBKEY_VERIFY:
return
caCert = x509.load_pem_x509_certificate(self.getCaCertificate().encode(), default_backend())
x = X509()
x.set_pubkey(load_publickey(FILETYPE_PEM, caCert.public_key().public_bytes(serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo)))
verify(x, cert.signature, cert.tbs_certificate_bytes, cert.signature_hash_algorithm.name)
self.verifySignature(caCert, cert)


@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
Expand All @@ -165,13 +168,8 @@ def test_issue_certificate_with_csr(self):
self.assertEqual(cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)[0].value, 'v-1-VM')

# Validate certificate against CA public key
global PUBKEY_VERIFY
if not PUBKEY_VERIFY:
return
caCert = x509.load_pem_x509_certificate(self.getCaCertificate().encode(), default_backend())
x = X509()
x.set_pubkey(load_publickey(FILETYPE_PEM, caCert.public_key().public_bytes(serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo)))
verify(x, cert.signature, cert.tbs_certificate_bytes, cert.signature_hash_algorithm.name)
self.verifySignature(caCert, cert)


@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
Expand Down
Loading