|
| 1 | +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. |
| 2 | +# SPDX-License-Identifier: Apache-2.0 |
| 3 | +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. |
| 4 | +# SPDX-License-Identifier: Apache-2.0 |
| 5 | +import uuid |
| 6 | +from copy import deepcopy |
| 7 | + |
| 8 | +import boto3 |
| 9 | +import pytest |
| 10 | +from dynamodb_encryption_sdk.exceptions import DecryptionError |
| 11 | + |
| 12 | +from aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_transforms.errors import ( |
| 13 | + DynamoDbItemEncryptor, |
| 14 | +) |
| 15 | +from aws_dbesdk_dynamodb.structures.dynamodb import LegacyPolicy |
| 16 | + |
| 17 | +from ...constants import ( |
| 18 | + INTEG_TEST_DEFAULT_DYNAMODB_TABLE_NAME, |
| 19 | +) |
| 20 | +from ...items import ( |
| 21 | + complex_item_ddb, |
| 22 | + complex_item_dict, |
| 23 | + complex_key_ddb, |
| 24 | + complex_key_dict, |
| 25 | + simple_item_ddb, |
| 26 | + simple_item_dict, |
| 27 | + simple_key_ddb, |
| 28 | + simple_key_dict, |
| 29 | +) |
| 30 | +from ...requests import ( |
| 31 | + basic_delete_item_request_ddb, |
| 32 | + basic_delete_item_request_dict, |
| 33 | + basic_get_item_request_ddb, |
| 34 | + basic_get_item_request_dict, |
| 35 | + basic_put_item_request_ddb, |
| 36 | + basic_put_item_request_dict, |
| 37 | +) |
| 38 | +from . import sort_dynamodb_json_lists |
| 39 | +from .utils import ( |
| 40 | + create_legacy_encrypted_client, |
| 41 | + create_legacy_encrypted_resource, |
| 42 | + create_legacy_encrypted_table, |
| 43 | + encrypted_client_with_legacy_override, |
| 44 | + legacy_actions, |
| 45 | +) |
| 46 | + |
| 47 | + |
| 48 | +# TODO: standard_dicts tests were failing with TypeError: __str__ returned non-string (type bytes) |
| 49 | +# Initial assumption is that some thing is wrong with my test setup. |
| 50 | +# Creates a matrix of tests for each value in param, |
| 51 | +# with a user-friendly string for test output: |
| 52 | +# expect_standard_dictionaries = True -> "standard_dicts" |
| 53 | +# expect_standard_dictionaries = False -> "ddb_json" |
| 54 | +# @pytest.fixture(params=[True, False], ids=["standard_dicts", "ddb_json"]) |
| 55 | +@pytest.fixture(params=[False], ids=["ddb_json"]) |
| 56 | +def expect_standard_dictionaries(request): |
| 57 | + return request.param |
| 58 | + |
| 59 | + |
| 60 | +# Creates a matrix of tests for each value in param, |
| 61 | +# with a user-friendly string for test output: |
| 62 | +# use_complex_item = True -> "complex_item" |
| 63 | +# use_complex_item = False -> "simple_item" |
| 64 | +@pytest.fixture(params=[True, False], ids=["complex_item", "simple_item"]) |
| 65 | +def use_complex_item(request): |
| 66 | + return request.param |
| 67 | + |
| 68 | + |
| 69 | +# Append a suffix to the partition key to avoid collisions between test runs. |
| 70 | +@pytest.fixture(scope="module") |
| 71 | +def test_run_suffix(): |
| 72 | + return "-" + str(uuid.uuid4()) |
| 73 | + |
| 74 | + |
| 75 | +@pytest.fixture |
| 76 | +def test_item(expect_standard_dictionaries, use_complex_item, test_run_suffix): |
| 77 | + """Get a single test item in the appropriate format for the client.""" |
| 78 | + if expect_standard_dictionaries: |
| 79 | + if use_complex_item: |
| 80 | + item = deepcopy(complex_item_dict) |
| 81 | + else: |
| 82 | + item = deepcopy(simple_item_dict) |
| 83 | + else: |
| 84 | + if use_complex_item: |
| 85 | + item = deepcopy(complex_item_ddb) |
| 86 | + else: |
| 87 | + item = deepcopy(simple_item_ddb) |
| 88 | + # Add a suffix to the partition key to avoid collisions between test runs. |
| 89 | + if isinstance(item["partition_key"], dict): |
| 90 | + item["partition_key"]["S"] += test_run_suffix |
| 91 | + else: |
| 92 | + item["partition_key"] += test_run_suffix |
| 93 | + return item |
| 94 | + |
| 95 | + |
| 96 | +@pytest.fixture |
| 97 | +def test_key(expect_standard_dictionaries, use_complex_item, test_run_suffix): |
| 98 | + """Get a single test item in the appropriate format for the client.""" |
| 99 | + if expect_standard_dictionaries: |
| 100 | + if use_complex_item: |
| 101 | + key = deepcopy(complex_key_dict) |
| 102 | + else: |
| 103 | + key = deepcopy(simple_key_dict) |
| 104 | + else: |
| 105 | + if use_complex_item: |
| 106 | + key = deepcopy(complex_key_ddb) |
| 107 | + else: |
| 108 | + key = deepcopy(simple_key_ddb) |
| 109 | + # Add a suffix to the partition key to avoid collisions between test runs. |
| 110 | + if isinstance(key["partition_key"], dict): |
| 111 | + key["partition_key"]["S"] += test_run_suffix |
| 112 | + else: |
| 113 | + key["partition_key"] += test_run_suffix |
| 114 | + return key |
| 115 | + |
| 116 | + |
| 117 | +@pytest.fixture |
| 118 | +def put_item_request(expect_standard_dictionaries, test_item): |
| 119 | + if expect_standard_dictionaries: |
| 120 | + # Client requests with `expect_standard_dictionaries=True` use dict-formatted requests |
| 121 | + # with an added "TableName" key. |
| 122 | + return {**basic_put_item_request_dict(test_item), "TableName": INTEG_TEST_DEFAULT_DYNAMODB_TABLE_NAME} |
| 123 | + return basic_put_item_request_ddb(test_item) |
| 124 | + |
| 125 | + |
| 126 | +@pytest.fixture |
| 127 | +def get_item_request(expect_standard_dictionaries, test_item): |
| 128 | + if expect_standard_dictionaries: |
| 129 | + # Client requests with `expect_standard_dictionaries=True` use dict-formatted requests |
| 130 | + # with an added "TableName" key. |
| 131 | + return {**basic_get_item_request_dict(test_item), "TableName": INTEG_TEST_DEFAULT_DYNAMODB_TABLE_NAME} |
| 132 | + return basic_get_item_request_ddb(test_item) |
| 133 | + |
| 134 | + |
| 135 | +@pytest.fixture |
| 136 | +def delete_item_request(expect_standard_dictionaries, test_item): |
| 137 | + if expect_standard_dictionaries: |
| 138 | + return {**basic_delete_item_request_dict(test_item), "TableName": INTEG_TEST_DEFAULT_DYNAMODB_TABLE_NAME} |
| 139 | + return basic_delete_item_request_ddb(test_item) |
| 140 | + |
| 141 | + |
| 142 | +# Fixtures for legacy encryptors and clients |
| 143 | + |
| 144 | + |
| 145 | +@pytest.fixture(params=["client", "table", "resource"], ids=["legacy_client", "legacy_table", "legacy_resource"]) |
| 146 | +def legacy_encryptor(request): |
| 147 | + """Create a legacy encryptor of the specified type.""" |
| 148 | + if request.param == "client": |
| 149 | + return create_legacy_encrypted_client() |
| 150 | + elif request.param == "table": |
| 151 | + return create_legacy_encrypted_table() |
| 152 | + elif request.param == "resource": |
| 153 | + return create_legacy_encrypted_resource() |
| 154 | + |
| 155 | + |
| 156 | +@pytest.fixture( |
| 157 | + params=[ |
| 158 | + LegacyPolicy.FORCE_LEGACY_ENCRYPT_ALLOW_LEGACY_DECRYPT, |
| 159 | + LegacyPolicy.FORBID_LEGACY_ENCRYPT_ALLOW_LEGACY_DECRYPT, |
| 160 | + LegacyPolicy.FORBID_LEGACY_ENCRYPT_FORBID_LEGACY_DECRYPT, |
| 161 | + ] |
| 162 | +) |
| 163 | +def legacy_policy(request): |
| 164 | + return request.param |
| 165 | + |
| 166 | + |
| 167 | +@pytest.fixture |
| 168 | +def encrypted_client(legacy_encryptor, legacy_policy, expect_standard_dictionaries): |
| 169 | + return encrypted_client_with_legacy_override( |
| 170 | + legacy_encryptor=legacy_encryptor, |
| 171 | + legacy_policy=legacy_policy, |
| 172 | + expect_standard_dictionaries=expect_standard_dictionaries, |
| 173 | + ) |
| 174 | + |
| 175 | + |
| 176 | +def test_GIVEN_awsdbe_encrypted_item_WHEN_get_with_legacy_client( |
| 177 | + encrypted_client, |
| 178 | + put_item_request, |
| 179 | + get_item_request, |
| 180 | + delete_item_request, |
| 181 | + expect_standard_dictionaries, |
| 182 | + legacy_policy, |
| 183 | +): |
| 184 | + # Given: Valid put_item request |
| 185 | + # When: put_item |
| 186 | + put_response = encrypted_client.put_item(**put_item_request) |
| 187 | + # Then: put_item succeeds |
| 188 | + assert put_response["ResponseMetadata"]["HTTPStatusCode"] == 200 |
| 189 | + |
| 190 | + # Given: Fresh legacy encryptor of the same type as used in the fixture |
| 191 | + legacy_encrypted_client = create_legacy_encrypted_client( |
| 192 | + attribute_actions=legacy_actions(), |
| 193 | + expect_standard_dictionaries=expect_standard_dictionaries, |
| 194 | + ) |
| 195 | + |
| 196 | + if legacy_policy == LegacyPolicy.FORCE_LEGACY_ENCRYPT_ALLOW_LEGACY_DECRYPT: |
| 197 | + # Given: Valid get_item request for the same item using legacy encryptor with FORCE_LEGACY_ENCRYPT policy |
| 198 | + # When: get_item with legacy encryptor |
| 199 | + get_response = legacy_encrypted_client.get_item(**get_item_request) |
| 200 | + # Then: Response is equal to the original item (legacy encryptor can decrypt item written by AWS DB-ESDK) |
| 201 | + assert get_response["ResponseMetadata"]["HTTPStatusCode"] == 200 |
| 202 | + # DynamoDB JSON uses lists to represent sets, so strict equality can fail. |
| 203 | + # Sort lists to ensure consistent ordering when comparing expected and actual items. |
| 204 | + expected_item = sort_dynamodb_json_lists(put_item_request["Item"]) |
| 205 | + legacy_actual_item = sort_dynamodb_json_lists(get_response["Item"]) |
| 206 | + assert expected_item == legacy_actual_item |
| 207 | + else: |
| 208 | + # Given: Valid get_item request for the same item using legacy encryptor with FORBID_LEGACY_ENCRYPT policy |
| 209 | + # When: get_item with legacy encryptor |
| 210 | + # Then: throws DecryptionError Exception (i.e. legacy encryptor cannot read values in new format) |
| 211 | + with pytest.raises(DecryptionError): # The exact exception may vary in Python implementation |
| 212 | + # Try to read the item with the legacy encryptor |
| 213 | + legacy_encrypted_client.get_item(**get_item_request) |
| 214 | + |
| 215 | + |
| 216 | +def test_GIVEN_legacy_encrypted_item_WHEN_get_with_awsdbe( |
| 217 | + encrypted_client, |
| 218 | + put_item_request, |
| 219 | + get_item_request, |
| 220 | + delete_item_request, |
| 221 | + expect_standard_dictionaries, |
| 222 | + legacy_policy, |
| 223 | +): |
| 224 | + # Given: Fresh legacy encryptor and valid put_item request |
| 225 | + legacy_encrypted_client = create_legacy_encrypted_client( |
| 226 | + attribute_actions=legacy_actions(), |
| 227 | + expect_standard_dictionaries=expect_standard_dictionaries, |
| 228 | + ) |
| 229 | + # When: put_item using legacy encryptor |
| 230 | + put_response = legacy_encrypted_client.put_item(**put_item_request) |
| 231 | + # Then: put_item succeeds (item is written using legacy format) |
| 232 | + assert put_response["ResponseMetadata"]["HTTPStatusCode"] == 200 |
| 233 | + |
| 234 | + if not legacy_policy == LegacyPolicy.FORBID_LEGACY_ENCRYPT_FORBID_LEGACY_DECRYPT: |
| 235 | + # Given: Valid get_item request for the same item with ALLOW_LEGACY_DECRYPT policy |
| 236 | + # When: get_item using AWS DB-ESDK client |
| 237 | + get_response = encrypted_client.get_item(**get_item_request) |
| 238 | + # Then: Response is equal to the original item (AWS DB ESDK can decrypt legacy items) |
| 239 | + assert get_response["ResponseMetadata"]["HTTPStatusCode"] == 200 |
| 240 | + # DynamoDB JSON uses lists to represent sets, so strict equality can fail. |
| 241 | + # Sort lists to ensure consistent ordering when comparing expected and actual items. |
| 242 | + expected_item = sort_dynamodb_json_lists(put_item_request["Item"]) |
| 243 | + actual_item = sort_dynamodb_json_lists(get_response["Item"]) |
| 244 | + assert expected_item == actual_item |
| 245 | + else: |
| 246 | + # Given: Valid get_item request for the same item with FORBID_LEGACY_DECRYPT policy |
| 247 | + # When: get_item using AWS DBE SDK client |
| 248 | + # Then: Throws DynamoDbItemEncryptor exception (AWS DB-ESDK with FORBID policy cannot decrypt legacy items) |
| 249 | + with pytest.raises(DynamoDbItemEncryptor): |
| 250 | + encrypted_client.get_item(**get_item_request) |
| 251 | + |
| 252 | + |
| 253 | +# Delete the items in the table after the module runs |
| 254 | +@pytest.fixture(scope="module", autouse=True) |
| 255 | +def cleanup_after_module(test_run_suffix): |
| 256 | + yield |
| 257 | + table = boto3.client("dynamodb") |
| 258 | + items = [deepcopy(simple_item_ddb), deepcopy(complex_item_ddb)] |
| 259 | + for item in items: |
| 260 | + item["partition_key"]["S"] += test_run_suffix |
| 261 | + table.delete_item(**basic_delete_item_request_ddb(item)) |
0 commit comments