Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions middleware/admin/x509_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,31 @@ def get_intel_pcs_x509_crl(url):
raise RuntimeError(f"Error fetching CRL from {url}")

try:
# Parse CRL
ctype = ra_res.headers["Content-Type"]
if ctype in ["application/x-pem-file"]:
crl = x509.load_pem_x509_crl(ra_res.content)
elif ctype in ["application/pkix-crl", "application/x-x509-ca-cert"]:
crl = x509.load_der_x509_crl(ra_res.content)
# Parse CRL. PCS endpoints may return generic MIME types (e.g.
# application/octet-stream) even when the payload is DER.
raw_ctype = ra_res.headers.get("Content-Type", "")
ctype = raw_ctype.split(";", 1)[0].strip().lower()
pem_types = {
"application/x-pem-file"
}
der_types = {
"application/pkix-crl",
"application/x-x509-ca-cert",
}
content = ra_res.content
content_stripped = content.lstrip()
pem_hint = \
content_stripped.startswith(b"-----BEGIN") or \
url.lower().endswith(".pem")
der_hint = \
url.lower().endswith(".der")

if ctype in pem_types or pem_hint:
crl = x509.load_pem_x509_crl(content)
elif ctype in der_types or der_hint:
crl = x509.load_der_x509_crl(content)
else:
raise RuntimeError(f"Unknown CRL encoding: {ctype}")
raise RuntimeError(f"Unable to parse CRL with content-type <{raw_ctype}>")

# Parse certification chain (if any)
issuer_chain = ra_res.headers.get("SGX-PCK-CRL-Issuer-Chain")
Expand Down
11 changes: 8 additions & 3 deletions middleware/admin/x509_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,22 @@ def get_crl_info(self, subject):
if len(crldps) == 0:
raise RuntimeError("No CRL distribution points found in certificate")

errors = []
for crldp in crldps:
url = crldp.full_name[0].value
try:
crl_info = crl_getter(url)
break
except RuntimeError:
pass
except RuntimeError as e:
errors.append(str(e))

if crl_info is None:
error_str = ""
if len(errors) > 0:
error_str = " The following errors were recorded: " + \
"; ".join(errors)
raise RuntimeError("None of the distribution points "
"provided a valid CRL")
f"provided a valid CRL.{error_str}")

return crl_info
except Exception as e:
Expand Down
58 changes: 36 additions & 22 deletions middleware/tests/admin/test_x509_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,38 +72,46 @@ def test_certs_pref_suf(self):
@patch("admin.x509_utils.x509.load_pem_x509_crl")
@patch("admin.x509_utils.requests")
class TestGetIntelPcsX509CRL(TestCase):
def test_ok_pem(self, requests, load_pem, load_der, split, unquote):
@parameterized.expand([
("ctype", "a-url", "application/x-pem-file", b"the-content"),
("content", "a-url", "the-ctype", b"-----BEGIN CERTIFICATE etc etc"),
("url", "a-url.pem", "the-ctype", b"the-content"),
])
def test_ok_pem(self, requests, load_pem, load_der, split, unquote,
_, url, ctype, ctnt):
res = Mock()
requests.get.return_value = res

res.status_code = 200
res.content = "the-crl-content"
res.content = ctnt
res.headers = {
"Content-Type": "application/x-pem-file"
"Content-Type": ctype
}
load_pem.return_value = "the-parsed-certificate"

self.assertEqual({
"crl": "the-parsed-certificate",
"issuer_chain": None,
"warning": None,
}, get_intel_pcs_x509_crl("the-crl-url"))
}, get_intel_pcs_x509_crl(url))

load_pem.assert_called_with("the-crl-content")
load_pem.assert_called_with(ctnt)
load_der.assert_not_called()
split.assert_not_called()
unquote.assert_not_called()

@parameterized.expand([
("header 1", "application/pkix-crl"),
("header 2", "application/x-x509-ca-cert"),
("ctype 1", "a-url", "application/pkix-crl"),
("ctype 2", "a-url", "application/x-x509-ca-cert"),
("url", "a-url.der", "the-ctype"),
])
def test_ok_der(self, requests, load_pem, load_der, split, unquote, _, ctype):
def test_ok_der(self, requests, load_pem, load_der, split, unquote,
_, url, ctype):
res = Mock()
requests.get.return_value = res

res.status_code = 200
res.content = "the-crl-content"
res.content = b"the-crl-content"
res.headers = {
"Content-Type": ctype,
}
Expand All @@ -113,9 +121,9 @@ def test_ok_der(self, requests, load_pem, load_der, split, unquote, _, ctype):
"crl": "the-parsed-certificate",
"issuer_chain": None,
"warning": None,
}, get_intel_pcs_x509_crl("the-crl-url"))
}, get_intel_pcs_x509_crl(url))

load_der.assert_called_with("the-crl-content")
load_der.assert_called_with(b"the-crl-content")
load_pem.assert_not_called()
split.assert_not_called()
unquote.assert_not_called()
Expand All @@ -125,7 +133,7 @@ def test_ok_warning(self, requests, load_pem, load_der, split, unquote):
requests.get.return_value = res

res.status_code = 200
res.content = "the-crl-content"
res.content = b"the-crl-content"
res.headers = {
"Content-Type": "application/x-pem-file",
"warning": "this-is-a-warning",
Expand All @@ -138,7 +146,7 @@ def test_ok_warning(self, requests, load_pem, load_der, split, unquote):
"warning": "Getting the-crl-url: this-is-a-warning",
}, get_intel_pcs_x509_crl("the-crl-url"))

load_pem.assert_called_with("the-crl-content")
load_pem.assert_called_with(b"the-crl-content")
load_der.assert_not_called()
split.assert_not_called()
unquote.assert_not_called()
Expand All @@ -149,7 +157,7 @@ def test_ok_issuer_chain(self, loadcer, requests, load_pem, load_der, split, unq
requests.get.return_value = res

res.status_code = 200
res.content = "the-crl-content"
res.content = b"the-crl-content"
res.headers = {
"Content-Type": "application/x-x509-ca-cert",
"SGX-PCK-CRL-Issuer-Chain": "chain0-chain1-chain2",
Expand All @@ -169,7 +177,7 @@ def test_ok_issuer_chain(self, loadcer, requests, load_pem, load_der, split, unq
"warning": None,
}, get_intel_pcs_x509_crl("the-crl-url"))

load_der.assert_called_with("the-crl-content")
load_der.assert_called_with(b"the-crl-content")
load_pem.assert_not_called()
unquote.assert_called_with("chain0-chain1-chain2")
split.assert_called_with("chain0,chain1,chain2")
Expand Down Expand Up @@ -198,29 +206,35 @@ def test_error_unknown_ctype(self, requests, load_pem, load_der, split, unquote)
res.headers = {
"Content-Type": "not-known"
}
res.content = b"this is not a certificate"

with self.assertRaises(RuntimeError) as e:
get_intel_pcs_x509_crl("the-crl-url")
self.assertIn("While", str(e.exception))
self.assertIn("Unknown", str(e.exception))
self.assertIn("the-crl-url", str(e.exception))
self.assertIn("Unable", str(e.exception))
self.assertIn("not-known", str(e.exception))

load_pem.assert_not_called()
load_der.assert_not_called()
split.assert_not_called()
unquote.assert_not_called()

@parameterized.expand([
("header 1", "application/x-pem-file", "pem"),
("header 2", "application/pkix-crl", "der"),
("header 3", "application/x-x509-ca-cert", "der"),
("header 1", "a-url", "application/x-pem-file", b"the-content", "pem"),
("header 2", "a-url", "application/pkix-crl", b"the-content", "der"),
("header 3", "a-url", "application/x-x509-ca-cert", b"the-content", "der"),
("url 1", "a-url.der", "the-ctype", b"the-content", "der"),
("url 2", "a-url.pem", "the-ctype", b"the-content", "pem"),
("content", "a-url", "the-ctype", b"-----BEGIN CERT etc etc", "pem"),
])
def test_error_parsing(self, requests, load_pem, load_der,
split, unquote, _, ctype, errct):
split, unquote, _, url, ctype, ctnt, errct):
res = Mock()
requests.get.return_value = res

res.status_code = 200
res.content = "some-content"
res.content = ctnt
res.headers = {
"Content-Type": ctype,
}
Expand All @@ -229,7 +243,7 @@ def test_error_parsing(self, requests, load_pem, load_der,
load_der.side_effect = ValueError("der parsing issue")

with self.assertRaises(RuntimeError) as e:
get_intel_pcs_x509_crl("the-crl-url")
get_intel_pcs_x509_crl(url)
self.assertIn("While", str(e.exception))
self.assertIn(f"{errct} parsing", str(e.exception))

Expand Down