|
| 1 | +# # Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. |
| 2 | +# # SPDX-License-Identifier: Apache-2.0 |
| 3 | +# |
| 4 | +# from aws_dbesdk_dynamodb.internaldafny.generated.AwsCryptographyDbEncryptionSdkDynamoDbItemEncryptorTypes import ( |
| 5 | +# DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig, |
| 6 | +# ) |
| 7 | +# import smithy_dafny_standard_library.internaldafny.generated.Wrappers as Wrappers |
| 8 | +# import _dafny |
| 9 | +# |
| 10 | +# import aws_dbesdk_dynamodb.internaldafny.generated.InternalLegacyOverride |
| 11 | +# |
| 12 | +# try: |
| 13 | +# from dynamodb_encryption_sdk.encrypted.client import EncryptedClient |
| 14 | +# from dynamodb_encryption_sdk.structures import EncryptionContext, AttributeActions |
| 15 | +# from dynamodb_encryption_sdk.identifiers import CryptoAction |
| 16 | +# _HAS_LEGACY_DDBEC = True |
| 17 | +# except ImportError: |
| 18 | +# _HAS_LEGACY_DDBEC = False |
| 19 | +# |
| 20 | +# |
| 21 | +# class InternalLegacyOverride(aws_dbesdk_dynamodb.internaldafny.generated.InternalLegacyOverride.InternalLegacyOverride): |
| 22 | +# def __init__(self): |
| 23 | +# # Instance variables to match the Java implementation |
| 24 | +# self.encryptor = None |
| 25 | +# self.attribute_actions = None |
| 26 | +# self.encryption_context = None |
| 27 | +# self._policy = None |
| 28 | +# self.material_description_field_name = None |
| 29 | +# self.signature_field_name = None |
| 30 | +# |
| 31 | +# def policy(self): |
| 32 | +# """Return the policy configured for this legacy override.""" |
| 33 | +# return self._policy |
| 34 | +# |
| 35 | +# @staticmethod |
| 36 | +# def Build(config: DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig): |
| 37 | +# # Check for early return: If there is no legacyOverride there is nothing to do. |
| 38 | +# if config.legacyOverride.is_None: |
| 39 | +# return InternalLegacyOverride.CreateBuildSuccess(InternalLegacyOverride.CreateInternalLegacyOverrideNone()) |
| 40 | +# |
| 41 | +# legacy_override = config.legacyOverride.value |
| 42 | +# |
| 43 | +# maybe_encryptor = aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb.dafny_to_smithy.aws_cryptography_dbencryptionsdk_dynamodb_LegacyDynamoDbEncryptorReference( |
| 44 | +# legacy_override.encryptor |
| 45 | +# ) |
| 46 | +# |
| 47 | +# # Precondition: The encryptor MUST be a DynamoDBEncryptor |
| 48 | +# if not _HAS_LEGACY_DDBEC: |
| 49 | +# return InternalLegacyOverride.CreateBuildFailure( |
| 50 | +# InternalLegacyOverride.CreateError("Could not find aws-dynamodb-encryption-python installation") |
| 51 | +# ) |
| 52 | +# if not isinstance(maybe_encryptor, EncryptedClient): |
| 53 | +# return InternalLegacyOverride.CreateBuildFailure( |
| 54 | +# InternalLegacyOverride.CreateError("Legacy encryptor is not supported") |
| 55 | +# ) |
| 56 | +# |
| 57 | +# # Preconditions: MUST be able to create valid encryption context |
| 58 | +# maybe_encryption_context = InternalLegacyOverride.legacyEncryptionContext(config) |
| 59 | +# if maybe_encryption_context.is_Failure: |
| 60 | +# return InternalLegacyOverride.CreateBuildFailure(maybe_encryption_context.error()) |
| 61 | +# |
| 62 | +# # Precondition: All actions MUST be supported types |
| 63 | +# maybe_actions = InternalLegacyOverride.legacyActions(legacy_override.attributeActionsOnEncrypt) |
| 64 | +# if maybe_actions.is_Failure: |
| 65 | +# return InternalLegacyOverride.CreateBuildFailure(maybe_actions.error()) |
| 66 | +# |
| 67 | +# # Create and return the legacy override instance |
| 68 | +# legacy_instance = InternalLegacyOverride() |
| 69 | +# legacy_instance.encryptor = maybe_encryptor |
| 70 | +# legacy_instance.encryption_context = maybe_encryption_context.value() |
| 71 | +# legacy_instance.attribute_actions = maybe_actions.value() |
| 72 | +# legacy_instance._policy = legacy_override.policy |
| 73 | +# |
| 74 | +# # It is possible that these values have been customized by the customer. |
| 75 | +# legacy_instance.material_description_field_name = _dafny.DafnySequence.of(list( |
| 76 | +# maybe_encryptor.material_description_field_name |
| 77 | +# )) |
| 78 | +# legacy_instance.signature_field_name = _dafny.DafnySequence.of(list( |
| 79 | +# maybe_encryptor.signature_field_name |
| 80 | +# )) |
| 81 | +# |
| 82 | +# return InternalLegacyOverride.CreateBuildSuccess( |
| 83 | +# InternalLegacyOverride.CreateInternalLegacyOverrideSome(legacy_instance) |
| 84 | +# ) |
| 85 | +# |
| 86 | +# @staticmethod |
| 87 | +# def legacyEncryptionContext(config: DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig): |
| 88 | +# try: |
| 89 | +# encryption_context_kwargs = { |
| 90 | +# "table_name": _dafny.string_of(config.logicalTableName), |
| 91 | +# "partition_key_name": _dafny.string_of(config.partitionKeyName), |
| 92 | +# } |
| 93 | +# if config.sortKeyName.is_Some: |
| 94 | +# encryption_context_kwargs["sort_key_name"] = _dafny.string_of(config.sortKeyName.value) |
| 95 | +# encryption_context = EncryptionContext(**encryption_context_kwargs) |
| 96 | +# return InternalLegacyOverride.CreateBuildSuccess(encryption_context) |
| 97 | +# except Exception as e: |
| 98 | +# return InternalLegacyOverride.CreateBuildFailure(InternalLegacyOverride.CreateError(str(e))) |
| 99 | +# |
| 100 | +# @staticmethod |
| 101 | +# def legacyActions(attribute_actions_on_encrypt): |
| 102 | +# try: |
| 103 | +# # Create a new AttributeActions with default DO_NOTHING |
| 104 | +# legacy_actions = AttributeActions(default_action=CryptoAction.DO_NOTHING) |
| 105 | +# |
| 106 | +# # Map the action from the config to legacy actions |
| 107 | +# attribute_actions = {} |
| 108 | +# for key, action in attribute_actions_on_encrypt.items(): |
| 109 | +# # Convert the string key to Python string |
| 110 | +# key_str = _dafny.string_of(key) |
| 111 | +# |
| 112 | +# # Map the action type to the appropriate CryptoAction |
| 113 | +# if action == "ENCRYPT_AND_SIGN": |
| 114 | +# attribute_actions[key_str] = CryptoAction.ENCRYPT_AND_SIGN |
| 115 | +# elif action == "SIGN_ONLY": |
| 116 | +# attribute_actions[key_str] = CryptoAction.SIGN_ONLY |
| 117 | +# elif action == "DO_NOTHING": |
| 118 | +# attribute_actions[key_str] = CryptoAction.DO_NOTHING |
| 119 | +# else: |
| 120 | +# return InternalLegacyOverride.CreateBuildFailure( |
| 121 | +# InternalLegacyOverride.CreateError(f"Unknown action type: {action}") |
| 122 | +# ) |
| 123 | +# |
| 124 | +# # Update the attribute_actions dictionary |
| 125 | +# legacy_actions.attribute_actions = attribute_actions |
| 126 | +# |
| 127 | +# return InternalLegacyOverride.CreateBuildSuccess(legacy_actions) |
| 128 | +# except Exception as e: |
| 129 | +# return InternalLegacyOverride.CreateBuildFailure(InternalLegacyOverride.CreateError(str(e))) |
| 130 | +# |
| 131 | +# @staticmethod |
| 132 | +# def createError(message): |
| 133 | +# """Create an Error with the given message.""" |
| 134 | +# return InternalLegacyOverride.CreateError(message) |
| 135 | +# |
| 136 | +# def EncryptItem(self, input): |
| 137 | +# try: |
| 138 | +# # Precondition: Policy MUST allow the caller to encrypt. |
| 139 | +# if not self._policy.is_FORCE__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT(): |
| 140 | +# return Wrappers.Result_Failure("Legacy Policy does not support encrypt.") |
| 141 | +# |
| 142 | +# # Extract components from the input |
| 143 | +# item = input.plaintextItem |
| 144 | +# |
| 145 | +# if not isinstance(self.encryptor, EncryptedClient): |
| 146 | +# return Wrappers.Result_Failure("Legacy encryptor is not an instance of EncryptedClient") |
| 147 | +# |
| 148 | +# # Convert item to Python dictionary for legacy client |
| 149 | +# python_item = {} |
| 150 | +# for key, value in item.items(): |
| 151 | +# python_item[_dafny.string_of(key)] = value |
| 152 | +# |
| 153 | +# # Set up the encryption context attributes for the item |
| 154 | +# if hasattr(self.encryption_context, 'attributes'): |
| 155 | +# self.encryption_context.attributes = python_item |
| 156 | +# |
| 157 | +# # Use the legacy EncryptedClient to encrypt the item |
| 158 | +# # The client's _encrypt_item method is used internally by put_item |
| 159 | +# encrypted_item = self.encryptor._encrypt_item( |
| 160 | +# item=python_item, |
| 161 | +# crypto_config=self.encryptor._item_crypto_config( |
| 162 | +# TableName=self.encryption_context.table_name, |
| 163 | +# attribute_actions=self.attribute_actions |
| 164 | +# ) |
| 165 | +# ) |
| 166 | +# |
| 167 | +# # Convert the encrypted item back to the format expected by Dafny |
| 168 | +# result_item = {} |
| 169 | +# for key, value in encrypted_item.items(): |
| 170 | +# result_item[key] = value |
| 171 | +# |
| 172 | +# return Wrappers.Result_Success(result_item) |
| 173 | +# except Exception as e: |
| 174 | +# return Wrappers.Result_Failure(str(e)) |
| 175 | +# |
| 176 | +# def DecryptItem(self, input): |
| 177 | +# try: |
| 178 | +# # Precondition: Policy MUST allow the caller to decrypt. |
| 179 | +# if not (self._policy.is_FORCE__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT() or |
| 180 | +# self._policy.is_FORBID__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT()): |
| 181 | +# return Wrappers.Result_Failure("Legacy Policy does not support decrypt.") |
| 182 | +# |
| 183 | +# # Extract components from the input |
| 184 | +# item = input.encryptedItem |
| 185 | +# |
| 186 | +# if not isinstance(self.encryptor, EncryptedClient): |
| 187 | +# return Wrappers.Result_Failure("Legacy encryptor is not an instance of EncryptedClient") |
| 188 | +# |
| 189 | +# # Convert item to Python dictionary for legacy client |
| 190 | +# python_item = {} |
| 191 | +# for key, value in item.items(): |
| 192 | +# python_item[_dafny.string_of(key)] = value |
| 193 | +# |
| 194 | +# # Use the legacy EncryptedClient to decrypt the item |
| 195 | +# # The client's _decrypt_item method is used internally by get_item |
| 196 | +# decrypted_item = self.encryptor._decrypt_item( |
| 197 | +# item=python_item, |
| 198 | +# crypto_config=self.encryptor._item_crypto_config( |
| 199 | +# TableName=self.encryption_context.table_name, |
| 200 | +# attribute_actions=self.attribute_actions |
| 201 | +# ) |
| 202 | +# ) |
| 203 | +# |
| 204 | +# # Convert the decrypted item back to the format expected by Dafny |
| 205 | +# result_item = {} |
| 206 | +# for key, value in decrypted_item.items(): |
| 207 | +# result_item[key] = value |
| 208 | +# |
| 209 | +# return Wrappers.Result_Success(result_item) |
| 210 | +# except Exception as e: |
| 211 | +# return Wrappers.Result_Failure(str(e)) |
| 212 | +# |
| 213 | +# def IsLegacyInput(self, input): |
| 214 | +# """Determine if the input is from a legacy client. |
| 215 | +# |
| 216 | +# This is used to decide whether to use legacy or new decryption methods. |
| 217 | +# We detect legacy encrypted items by looking for specific attributes or |
| 218 | +# structure patterns that are unique to the legacy encryption client. |
| 219 | +# """ |
| 220 | +# try: |
| 221 | +# if not _HAS_LEGACY_DDBEC: |
| 222 | +# return False |
| 223 | +# |
| 224 | +# item = input.encryptedItem |
| 225 | +# |
| 226 | +# # Check if the item contains material description and signature fields |
| 227 | +# # from the legacy encryptor |
| 228 | +# return ( |
| 229 | +# item.contains(self.material_description_field_name) and |
| 230 | +# item.contains(self.signature_field_name) |
| 231 | +# ) |
| 232 | +# except: |
| 233 | +# # If we encounter any error during detection, default to not using legacy |
| 234 | +# return False |
| 235 | +# |
| 236 | +# |
| 237 | +# aws_dbesdk_dynamodb.internaldafny.generated.InternalLegacyOverride.InternalLegacyOverride = InternalLegacyOverride |
0 commit comments