|
41 | 41 | from pymongo.asynchronous.collection import AsyncCollection |
42 | 42 | from pymongo.asynchronous.helpers import anext |
43 | 43 | from pymongo.daemon import _spawn_daemon |
| 44 | +from pymongo.uri_parser import _parse_kms_tls_options |
44 | 45 |
|
45 | 46 | try: |
46 | 47 | from pymongo.pyopenssl_context import IS_PYOPENSSL |
@@ -141,7 +142,7 @@ def test_init(self): |
141 | 142 | self.assertEqual(opts._mongocryptd_bypass_spawn, False) |
142 | 143 | self.assertEqual(opts._mongocryptd_spawn_path, "mongocryptd") |
143 | 144 | self.assertEqual(opts._mongocryptd_spawn_args, ["--idleShutdownTimeoutSecs=60"]) |
144 | | - self.assertEqual(opts._kms_ssl_contexts, {}) |
| 145 | + self.assertEqual(opts._kms_tls_options, {}) |
145 | 146 |
|
146 | 147 | @unittest.skipUnless(_HAVE_PYMONGOCRYPT, "pymongocrypt is not installed") |
147 | 148 | def test_init_spawn_args(self): |
@@ -189,22 +190,22 @@ def test_init_kms_tls_options(self): |
189 | 190 | tls_opts: Any |
190 | 191 | for tls_opts in [None, {}]: |
191 | 192 | opts = AutoEncryptionOpts({}, "k.d", kms_tls_options=tls_opts) |
192 | | - self.assertEqual(opts._kms_ssl_contexts, {}) |
| 193 | + self.assertEqual(opts._kms_tls_options, {}) |
193 | 194 | opts = AutoEncryptionOpts({}, "k.d", kms_tls_options={"kmip": {"tls": True}, "aws": {}}) |
194 | | - opts._parse_kms_tls_options(_IS_SYNC) |
195 | | - ctx = opts._kms_ssl_contexts["kmip"] |
| 195 | + _kms_ssl_contexts = _parse_kms_tls_options(opts._kms_tls_options, _IS_SYNC) |
| 196 | + ctx = _kms_ssl_contexts["kmip"] |
196 | 197 | self.assertEqual(ctx.check_hostname, True) |
197 | 198 | self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) |
198 | | - ctx = opts._kms_ssl_contexts["aws"] |
| 199 | + ctx = _kms_ssl_contexts["aws"] |
199 | 200 | self.assertEqual(ctx.check_hostname, True) |
200 | 201 | self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) |
201 | 202 | opts = AutoEncryptionOpts( |
202 | 203 | {}, |
203 | 204 | "k.d", |
204 | 205 | kms_tls_options={"kmip": {"tlsCAFile": CA_PEM, "tlsCertificateKeyFile": CLIENT_PEM}}, |
205 | 206 | ) |
206 | | - opts._parse_kms_tls_options(_IS_SYNC) |
207 | | - ctx = opts._kms_ssl_contexts["kmip"] |
| 207 | + _kms_ssl_contexts = _parse_kms_tls_options(opts._kms_tls_options, _IS_SYNC) |
| 208 | + ctx = _kms_ssl_contexts["kmip"] |
208 | 209 | self.assertEqual(ctx.check_hostname, True) |
209 | 210 | self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) |
210 | 211 |
|
@@ -2233,7 +2234,7 @@ async def test_05_tlsDisableOCSPEndpointCheck_is_permitted(self): |
2233 | 2234 | encryption = self.create_client_encryption( |
2234 | 2235 | providers, "keyvault.datakeys", self.client, OPTS, kms_tls_options=options |
2235 | 2236 | ) |
2236 | | - ctx = encryption._io_callbacks.opts._kms_ssl_contexts["aws"] |
| 2237 | + ctx = encryption._io_callbacks._kms_ssl_contexts["aws"] |
2237 | 2238 | if not hasattr(ctx, "check_ocsp_endpoint"): |
2238 | 2239 | raise self.skipTest("OCSP not enabled") |
2239 | 2240 | self.assertFalse(ctx.check_ocsp_endpoint) |
|
0 commit comments