Skip to content

Commit 43c9776

Browse files
orosamSandor Oroszi
andauthored
Allow using an OpenSSL hashed directory for verification in X509Store (#943)
Add X509Store.load_locations() to set a CA bundle file and/or an OpenSSL- style hashed CA/CRL lookup directory, similar to the already existing SSL.Context.load_verify_locations(). Co-authored-by: Sandor Oroszi <[email protected]>
1 parent 0488214 commit 43c9776

File tree

3 files changed

+186
-1
lines changed

3 files changed

+186
-1
lines changed

CHANGELOG.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ Deprecations:
2424
Changes:
2525
^^^^^^^^
2626

27+
- Added ``OpenSSL.crypto.X509Store.load_locations`` to set trusted
28+
certificate file bundles and/or directories for verification.
29+
`#943 <https://github.com/pyca/pyopenssl/pull/943>`_
2730
- Added ``Context.set_keylog_callback`` to log key material.
2831
`#910 <https://github.com/pyca/pyopenssl/pull/910>`_
2932
- Added ``OpenSSL.SSL.Connection.get_verified_chain`` to retrieve the

src/OpenSSL/crypto.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
exception_from_error_queue as _exception_from_error_queue,
2020
byte_string as _byte_string,
2121
native as _native,
22+
path_string as _path_string,
2223
UNSPECIFIED as _UNSPECIFIED,
2324
text_to_bytes_and_warn as _text_to_bytes_and_warn,
2425
make_assert as _make_assert,
@@ -1674,6 +1675,53 @@ def set_time(self, vfy_time):
16741675
_lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime("%s")))
16751676
_openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0)
16761677

1678+
def load_locations(self, cafile, capath=None):
1679+
"""
1680+
Let X509Store know where we can find trusted certificates for the
1681+
certificate chain. Note that the certificates have to be in PEM
1682+
format.
1683+
1684+
If *capath* is passed, it must be a directory prepared using the
1685+
``c_rehash`` tool included with OpenSSL. Either, but not both, of
1686+
*cafile* or *capath* may be ``None``.
1687+
1688+
.. note::
1689+
1690+
Both *cafile* and *capath* may be set simultaneously.
1691+
1692+
Call this method multiple times to add more than one location.
1693+
For example, CA certificates, and certificate revocation list bundles
1694+
may be passed in *cafile* in subsequent calls to this method.
1695+
1696+
.. versionadded:: 20.0
1697+
1698+
:param cafile: In which file we can find the certificates (``bytes`` or
1699+
``unicode``).
1700+
:param capath: In which directory we can find the certificates
1701+
(``bytes`` or ``unicode``).
1702+
1703+
:return: ``None`` if the locations were set successfully.
1704+
1705+
:raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None``
1706+
or the locations could not be set for any reason.
1707+
1708+
"""
1709+
if cafile is None:
1710+
cafile = _ffi.NULL
1711+
else:
1712+
cafile = _path_string(cafile)
1713+
1714+
if capath is None:
1715+
capath = _ffi.NULL
1716+
else:
1717+
capath = _path_string(capath)
1718+
1719+
load_result = _lib.X509_STORE_load_locations(
1720+
self._store, cafile, capath
1721+
)
1722+
if not load_result:
1723+
_raise_current_error()
1724+
16771725

16781726
class X509StoreContextError(Exception):
16791727
"""

tests/test_crypto.py

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import base64
1111
from subprocess import PIPE, Popen
1212
from datetime import datetime, timedelta
13+
import sys
1314

1415
import pytest
1516

@@ -46,7 +47,14 @@
4647
get_elliptic_curves,
4748
)
4849

49-
from .util import EqualityTestsMixin, is_consistent_type, WARNING_TYPE_EXPECTED
50+
from OpenSSL._util import ffi as _ffi, lib as _lib
51+
52+
from .util import (
53+
EqualityTestsMixin,
54+
is_consistent_type,
55+
WARNING_TYPE_EXPECTED,
56+
NON_ASCII,
57+
)
5058

5159

5260
def normalize_privatekey_pem(pem):
@@ -2228,6 +2236,68 @@ def test_add_cert_accepts_duplicate(self):
22282236
store.add_cert(cert)
22292237
store.add_cert(cert)
22302238

2239+
@pytest.mark.parametrize(
2240+
"cafile, capath, call_cafile, call_capath",
2241+
[
2242+
(
2243+
"/cafile" + NON_ASCII,
2244+
None,
2245+
b"/cafile" + NON_ASCII.encode(sys.getfilesystemencoding()),
2246+
_ffi.NULL,
2247+
),
2248+
(
2249+
b"/cafile" + NON_ASCII.encode("utf-8"),
2250+
None,
2251+
b"/cafile" + NON_ASCII.encode("utf-8"),
2252+
_ffi.NULL,
2253+
),
2254+
(
2255+
None,
2256+
"/capath" + NON_ASCII,
2257+
_ffi.NULL,
2258+
b"/capath" + NON_ASCII.encode(sys.getfilesystemencoding()),
2259+
),
2260+
(
2261+
None,
2262+
b"/capath" + NON_ASCII.encode("utf-8"),
2263+
_ffi.NULL,
2264+
b"/capath" + NON_ASCII.encode("utf-8"),
2265+
),
2266+
],
2267+
)
2268+
def test_load_locations_parameters(
2269+
self, cafile, capath, call_cafile, call_capath, monkeypatch
2270+
):
2271+
class LibMock(object):
2272+
def load_locations(self, store, cafile, capath):
2273+
self.cafile = cafile
2274+
self.capath = capath
2275+
return 1
2276+
2277+
lib_mock = LibMock()
2278+
monkeypatch.setattr(
2279+
_lib, "X509_STORE_load_locations", lib_mock.load_locations
2280+
)
2281+
2282+
store = X509Store()
2283+
store.load_locations(cafile=cafile, capath=capath)
2284+
2285+
assert call_cafile == lib_mock.cafile
2286+
assert call_capath == lib_mock.capath
2287+
2288+
def test_load_locations_fails_when_all_args_are_none(self):
2289+
store = X509Store()
2290+
with pytest.raises(Error):
2291+
store.load_locations(None, None)
2292+
2293+
def test_load_locations_raises_error_on_failure(self, tmpdir):
2294+
invalid_ca_file = tmpdir.join("invalid.pem")
2295+
invalid_ca_file.write("This is not a certificate")
2296+
2297+
store = X509Store()
2298+
with pytest.raises(Error):
2299+
store.load_locations(cafile=str(invalid_ca_file))
2300+
22312301

22322302
class TestPKCS12(object):
22332303
"""
@@ -3884,6 +3954,70 @@ def test_get_verified_chain_invalid_chain_no_root(self):
38843954
assert exc.value.args[0][2] == "unable to get issuer certificate"
38853955
assert exc.value.certificate.get_subject().CN == "intermediate"
38863956

3957+
@pytest.fixture
3958+
def root_ca_file(self, tmpdir):
3959+
return self._create_ca_file(tmpdir, "root_ca_hash_dir", self.root_cert)
3960+
3961+
@pytest.fixture
3962+
def intermediate_ca_file(self, tmpdir):
3963+
return self._create_ca_file(
3964+
tmpdir, "intermediate_ca_hash_dir", self.intermediate_cert
3965+
)
3966+
3967+
@staticmethod
3968+
def _create_ca_file(base_path, hash_directory, cacert):
3969+
ca_hash = "{:08x}.0".format(cacert.subject_name_hash())
3970+
cafile = base_path.join(hash_directory, ca_hash)
3971+
cafile.write_binary(
3972+
dump_certificate(FILETYPE_PEM, cacert), ensure=True
3973+
)
3974+
return cafile
3975+
3976+
def test_verify_with_ca_file_location(self, root_ca_file):
3977+
store = X509Store()
3978+
store.load_locations(str(root_ca_file))
3979+
3980+
store_ctx = X509StoreContext(store, self.intermediate_cert)
3981+
store_ctx.verify_certificate()
3982+
3983+
def test_verify_with_ca_path_location(self, root_ca_file):
3984+
store = X509Store()
3985+
store.load_locations(None, str(root_ca_file.dirname))
3986+
3987+
store_ctx = X509StoreContext(store, self.intermediate_cert)
3988+
store_ctx.verify_certificate()
3989+
3990+
def test_verify_with_cafile_and_capath(
3991+
self, root_ca_file, intermediate_ca_file
3992+
):
3993+
store = X509Store()
3994+
store.load_locations(
3995+
cafile=str(root_ca_file), capath=str(intermediate_ca_file.dirname)
3996+
)
3997+
3998+
store_ctx = X509StoreContext(store, self.intermediate_server_cert)
3999+
store_ctx.verify_certificate()
4000+
4001+
def test_verify_with_multiple_ca_files(
4002+
self, root_ca_file, intermediate_ca_file
4003+
):
4004+
store = X509Store()
4005+
store.load_locations(str(root_ca_file))
4006+
store.load_locations(str(intermediate_ca_file))
4007+
4008+
store_ctx = X509StoreContext(store, self.intermediate_server_cert)
4009+
store_ctx.verify_certificate()
4010+
4011+
def test_verify_failure_with_empty_ca_directory(self, tmpdir):
4012+
store = X509Store()
4013+
store.load_locations(None, str(tmpdir))
4014+
4015+
store_ctx = X509StoreContext(store, self.intermediate_cert)
4016+
with pytest.raises(X509StoreContextError) as exc:
4017+
store_ctx.verify_certificate()
4018+
4019+
assert exc.value.args[0][2] == "unable to get local issuer certificate"
4020+
38874021

38884022
class TestSignVerify(object):
38894023
"""

0 commit comments

Comments
 (0)