|
24 | 24 | import textwrap
|
25 | 25 | import traceback
|
26 | 26 | import uuid
|
27 |
| -from typing import Any, Dict |
| 27 | +from typing import Any, Dict, Mapping |
28 | 28 |
|
29 | 29 | from pymongo.collection import Collection
|
30 | 30 |
|
@@ -2202,6 +2202,86 @@ def test_05_roundtrip_encrypted_unindexed(self):
|
2202 | 2202 | self.assertEqual(decrypted, val)
|
2203 | 2203 |
|
2204 | 2204 |
|
| 2205 | +# https://github.com/mongodb/specifications/blob/072601/source/client-side-encryption/tests/README.rst#rewrap |
| 2206 | +class TestRewrapWithSeparateClientEncryption(EncryptionIntegrationTest): |
| 2207 | + |
| 2208 | + MASTER_KEYS: Mapping[str, Mapping[str, Any]] = { |
| 2209 | + "aws": { |
| 2210 | + "region": "us-east-1", |
| 2211 | + "key": "arn:aws:kms:us-east-1:579766882180:key/89fcc2c4-08b0-4bd9-9f25-e30687b580d0", |
| 2212 | + }, |
| 2213 | + "azure": { |
| 2214 | + "keyVaultEndpoint": "key-vault-csfle.vault.azure.net", |
| 2215 | + "keyName": "key-name-csfle", |
| 2216 | + }, |
| 2217 | + "gcp": { |
| 2218 | + "projectId": "devprod-drivers", |
| 2219 | + "location": "global", |
| 2220 | + "keyRing": "key-ring-csfle", |
| 2221 | + "keyName": "key-name-csfle", |
| 2222 | + }, |
| 2223 | + "kmip": {}, |
| 2224 | + "local": {}, |
| 2225 | + } |
| 2226 | + |
| 2227 | + def test_rewrap(self): |
| 2228 | + for src_provider in self.MASTER_KEYS: |
| 2229 | + for dst_provider in self.MASTER_KEYS: |
| 2230 | + with self.subTest(src_provider=src_provider, dst_provider=dst_provider): |
| 2231 | + self.run_test(src_provider, dst_provider) |
| 2232 | + |
| 2233 | + def run_test(self, src_provider, dst_provider): |
| 2234 | + # Step 1. Drop the collection ``keyvault.datakeys``. |
| 2235 | + self.client.keyvault.drop_collection("datakeys") |
| 2236 | + |
| 2237 | + # Step 2. Create a ``ClientEncryption`` object named ``client_encryption1`` |
| 2238 | + client_encryption1 = ClientEncryption( |
| 2239 | + key_vault_client=self.client, |
| 2240 | + key_vault_namespace="keyvault.datakeys", |
| 2241 | + kms_providers=ALL_KMS_PROVIDERS, |
| 2242 | + kms_tls_options=KMS_TLS_OPTS, |
| 2243 | + codec_options=OPTS, |
| 2244 | + ) |
| 2245 | + self.addCleanup(client_encryption1.close) |
| 2246 | + |
| 2247 | + # Step 3. Call ``client_encryption1.create_data_key`` with ``src_provider``. |
| 2248 | + key_id = client_encryption1.create_data_key( |
| 2249 | + master_key=self.MASTER_KEYS[src_provider], kms_provider=src_provider |
| 2250 | + ) |
| 2251 | + |
| 2252 | + # Step 4. Call ``client_encryption1.encrypt`` with the value "test" |
| 2253 | + cipher_text = client_encryption1.encrypt( |
| 2254 | + "test", key_id=key_id, algorithm=Algorithm.AEAD_AES_256_CBC_HMAC_SHA_512_Deterministic |
| 2255 | + ) |
| 2256 | + |
| 2257 | + # Step 5. Create a ``ClientEncryption`` object named ``client_encryption2`` |
| 2258 | + client2 = MongoClient() |
| 2259 | + self.addCleanup(client2.close) |
| 2260 | + client_encryption2 = ClientEncryption( |
| 2261 | + key_vault_client=client2, |
| 2262 | + key_vault_namespace="keyvault.datakeys", |
| 2263 | + kms_providers=ALL_KMS_PROVIDERS, |
| 2264 | + kms_tls_options=KMS_TLS_OPTS, |
| 2265 | + codec_options=OPTS, |
| 2266 | + ) |
| 2267 | + self.addCleanup(client_encryption1.close) |
| 2268 | + |
| 2269 | + # Step 6. Call ``client_encryption2.rewrap_many_data_key`` with an empty ``filter``. |
| 2270 | + rewrap_many_data_key_result = client_encryption2.rewrap_many_data_key( |
| 2271 | + {}, provider=dst_provider, master_key=self.MASTER_KEYS[dst_provider] |
| 2272 | + ) |
| 2273 | + |
| 2274 | + self.assertEqual(rewrap_many_data_key_result.bulk_write_result.modified_count, 1) |
| 2275 | + |
| 2276 | + # 7. Call ``client_encryption1.decrypt`` with the ``cipher_text``. Assert the return value is "test". |
| 2277 | + decrypt_result1 = client_encryption1.decrypt(cipher_text) |
| 2278 | + self.assertEqual(decrypt_result1, "test") |
| 2279 | + |
| 2280 | + # 8. Call ``client_encryption2.decrypt`` with the ``cipher_text``. Assert the return value is "test". |
| 2281 | + decrypt_result2 = client_encryption2.decrypt(cipher_text) |
| 2282 | + self.assertEqual(decrypt_result2, "test") |
| 2283 | + |
| 2284 | + |
2205 | 2285 | class TestQueryableEncryptionDocsExample(EncryptionIntegrationTest):
|
2206 | 2286 | # Queryable Encryption is not supported on Standalone topology.
|
2207 | 2287 | @client_context.require_no_standalone
|
|
0 commit comments