Skip to content

Commit 47da73c

Browse files
committed
mogration 3
1 parent fd21367 commit 47da73c

File tree

3 files changed

+415
-88
lines changed

3 files changed

+415
-88
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from aws_dbesdk_dynamodb.internaldafny.generated.AwsCryptographyDbEncryptionSdkDynamoDbItemEncryptorTypes import (
4+
DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig,
5+
Error_DynamoDbItemEncryptorException,
6+
)
7+
from aws_dbesdk_dynamodb.internaldafny.generated.AwsCryptographyDbEncryptionSdkStructuredEncryptionTypes import (
8+
CryptoAction_ENCRYPT__AND__SIGN,
9+
CryptoAction_SIGN__ONLY,
10+
CryptoAction_DO__NOTHING,
11+
)
12+
import smithy_dafny_standard_library.internaldafny.generated.Wrappers as Wrappers
13+
import _dafny
14+
15+
import aws_dbesdk_dynamodb.internaldafny.generated.InternalLegacyOverride
16+
17+
try:
18+
from dynamodb_encryption_sdk.encrypted.client import EncryptedClient
19+
from dynamodb_encryption_sdk.structures import EncryptionContext, AttributeActions
20+
from dynamodb_encryption_sdk.identifiers import CryptoAction
21+
22+
_HAS_LEGACY_DDBEC = True
23+
except ImportError:
24+
_HAS_LEGACY_DDBEC = False
25+
26+
27+
class InternalLegacyOverride(aws_dbesdk_dynamodb.internaldafny.generated.InternalLegacyOverride.InternalLegacyOverride):
28+
@staticmethod
29+
def Build(config: DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig):
30+
if config.legacyOverride.is_None:
31+
return InternalLegacyOverride.CreateBuildSuccess(InternalLegacyOverride.CreateInternalLegacyOverrideNone())
32+
33+
legacy_override = config.legacyOverride.value
34+
35+
# Precondition: The encryptor MUST be a DynamoDBEncryptor
36+
if not _HAS_LEGACY_DDBEC:
37+
return InternalLegacyOverride.CreateBuildFailure(
38+
InternalLegacyOverride.CreateError("Could not find aws-dynamodb-encryption-python installation")
39+
)
40+
if not isinstance(legacy_override.encryptor, EncryptedClient):
41+
return InternalLegacyOverride.CreateBuildFailure(
42+
InternalLegacyOverride.CreateError("Legacy encryptor is not supported")
43+
)
44+
maybe_encryptor = aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb.dafny_to_smithy.aws_cryptography_dbencryptionsdk_dynamodb_LegacyDynamoDbEncryptorReference(
45+
legacy_override.encryptor
46+
)
47+
48+
# Preconditions: MUST be able to create valid encryption context
49+
maybe_encryption_context = InternalLegacyOverride.legacyEncryptionContext(config)
50+
if maybe_encryption_context.is_Failure:
51+
return InternalLegacyOverride.CreateBuildFailure(maybe_encryption_context.error())
52+
53+
# Precondition: All actions MUST be supported types
54+
maybe_actions = InternalLegacyOverride.legacyActions(legacy_override.attributeActionsOnEncrypt)
55+
if maybe_actions.is_Failure:
56+
return InternalLegacyOverride.CreateBuildFailure(maybe_actions.error())
57+
58+
# Create and return the legacy override instance
59+
legacy_instance = InternalLegacyOverride()
60+
legacy_instance.encryptor = maybe_encryptor
61+
legacy_instance.policy = legacy_override.policy
62+
# Access the value property, not calling it as a function
63+
legacy_instance.encryption_context = maybe_encryption_context.value
64+
# Access the value property, not calling it as a function
65+
legacy_instance.attribute_actions = maybe_actions.value
66+
67+
# Set the material description field name and signature field name
68+
# These values might be customized by the customer
69+
if isinstance(legacy_override.encryptor, EncryptedClient) and hasattr(legacy_override.encryptor, "_crypto_config"):
70+
# Get field names from the encryptor's crypto config
71+
crypto_config = legacy_override.encryptor._crypto_config
72+
if hasattr(crypto_config, "material_description_field_name"):
73+
legacy_instance.materialDescriptionFieldName = _dafny.seq_of_chars(
74+
crypto_config.material_description_field_name
75+
)
76+
else:
77+
# Use default value if not explicitly set
78+
legacy_instance.materialDescriptionFieldName = _dafny.seq_of_chars("*amzn-ddb-map-desc*")
79+
80+
if hasattr(crypto_config, "signature_field_name"):
81+
legacy_instance.signatureFieldName = _dafny.seq_of_chars(
82+
crypto_config.signature_field_name
83+
)
84+
else:
85+
# Use default value if not explicitly set
86+
legacy_instance.signatureFieldName = _dafny.seq_of_chars("*amzn-ddb-map-sig*")
87+
88+
return InternalLegacyOverride.CreateBuildSuccess(
89+
InternalLegacyOverride.CreateInternalLegacyOverrideSome(legacy_instance)
90+
)
91+
92+
# TODO: Should resolve using the Encryption Context.
93+
@staticmethod
94+
def legacyEncryptionContext(config: DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig):
95+
try:
96+
encryption_context_kwargs = {
97+
"table_name": _dafny.string_of(config.logicalTableName),
98+
"partition_key_name": _dafny.string_of(config.partitionKeyName),
99+
}
100+
if config.sortKeyName.is_Some:
101+
encryption_context_kwargs["sort_key_name"] = _dafny.string_of(config.sortKeyName.value)
102+
encryption_context = EncryptionContext(**encryption_context_kwargs)
103+
return InternalLegacyOverride.CreateBuildSuccess(encryption_context)
104+
except Exception as e:
105+
return InternalLegacyOverride.CreateBuildFailure(InternalLegacyOverride.CreateError(str(e)))
106+
107+
@staticmethod
108+
def legacyActions(attribute_actions_on_encrypt):
109+
try:
110+
# Create a new AttributeActions with default DO_NOTHING
111+
legacy_actions = AttributeActions(default_action=CryptoAction.DO_NOTHING)
112+
113+
# Map the action from the config to legacy actions
114+
attribute_actions = {}
115+
for key, action in attribute_actions_on_encrypt.items:
116+
# Convert the string key to Python string
117+
key_str = _dafny.string_of(key)
118+
119+
# Map the action type to the appropriate CryptoAction
120+
if action == CryptoAction_ENCRYPT__AND__SIGN():
121+
attribute_actions[key_str] = CryptoAction.ENCRYPT_AND_SIGN
122+
elif action == CryptoAction_SIGN__ONLY():
123+
attribute_actions[key_str] = CryptoAction.SIGN_ONLY
124+
elif action == CryptoAction_DO__NOTHING():
125+
attribute_actions[key_str] = CryptoAction.DO_NOTHING
126+
else:
127+
return InternalLegacyOverride.CreateBuildFailure(
128+
InternalLegacyOverride.CreateError(f"Unknown action type: {action}")
129+
)
130+
131+
# Update the attribute_actions dictionary
132+
legacy_actions.attribute_actions = attribute_actions
133+
134+
return InternalLegacyOverride.CreateBuildSuccess(legacy_actions)
135+
except Exception as e:
136+
return InternalLegacyOverride.CreateBuildFailure(InternalLegacyOverride.CreateError(str(e)))
137+
138+
@staticmethod
139+
def EncryptItem(input):
140+
print(f'Encrypting {input}')
141+
try:
142+
# Extract components from the input
143+
item = input.plaintextItem
144+
145+
if not isinstance(legacy_override.encryptor, EncryptedClient):
146+
return Wrappers.Result_Failure("Legacy encryptor is not an instance of EncryptedClient")
147+
148+
# Convert item to Python dictionary for legacy client
149+
python_item = {}
150+
for key, value in item.items():
151+
python_item[_dafny.string_of(key)] = value
152+
153+
# Set up the encryption context attributes for the item
154+
if hasattr(legacy_override.encryption_context, "attributes"):
155+
legacy_override.encryption_context.attributes = python_item
156+
157+
# Use the legacy EncryptedClient to encrypt the item
158+
# The client's _encrypt_item method is used internally by put_item
159+
encrypted_item = legacy_override.encryptor._encrypt_item(
160+
item=python_item,
161+
crypto_config=encryptor._item_crypto_config(
162+
TableName=encryption_context.table_name,
163+
attribute_actions=attribute_actions,
164+
),
165+
)
166+
167+
# Convert the encrypted item back to the format expected by Dafny
168+
result_item = {}
169+
for key, value in encrypted_item.items():
170+
result_item[key] = value
171+
172+
return Wrappers.Result_Success(result_item)
173+
except Exception as e:
174+
return Wrappers.Result_Failure(str(e))
175+
176+
# @staticmethod
177+
def DecryptItem(input):
178+
try:
179+
# Extract components from the input
180+
legacy_override = input.legacyOverride
181+
item = input.encryptedItem
182+
183+
if not isinstance(legacy_override.encryptor, EncryptedClient):
184+
return Wrappers.Result_Failure("Legacy encryptor is not an instance of EncryptedClient")
185+
186+
# Convert item to Python dictionary for legacy client
187+
python_item = {}
188+
for key, value in item.items():
189+
python_item[_dafny.string_of(key)] = value
190+
191+
# Use the legacy EncryptedClient to decrypt the item
192+
# The client's _decrypt_item method is used internally by get_item
193+
decrypted_item = legacy_override.encryptor._decrypt_item(
194+
item=python_item,
195+
crypto_config=legacy_override.encryptor._item_crypto_config(
196+
TableName=legacy_override.encryption_context.table_name,
197+
attribute_actions=legacy_override.attribute_actions,
198+
),
199+
)
200+
201+
# Convert the decrypted item back to the format expected by Dafny
202+
result_item = {}
203+
for key, value in decrypted_item.items():
204+
result_item[key] = value
205+
206+
return Wrappers.Result_Success(result_item)
207+
except Exception as e:
208+
return Wrappers.Result_Failure(str(e))
209+
210+
def __init__(self):
211+
super().__init__()
212+
self.encryptor = None
213+
self.attribute_actions = None
214+
self.encryption_context = None
215+
self.policy = None
216+
self.materialDescriptionFieldName = None
217+
self.signatureFieldName = None
218+
219+
@staticmethod
220+
def IsLegacyInput(input):
221+
"""Determine if the input is from a legacy client.
222+
223+
This is used to decide whether to use legacy or new decryption methods.
224+
As specified in specification/dynamodb-encryption-client/decrypt-item.md:
225+
An item MUST be determined to be encrypted under the legacy format if it contains
226+
attributes for the material description and the signature.
227+
"""
228+
try:
229+
if not _HAS_LEGACY_DDBEC:
230+
return False
231+
232+
# Check if we're dealing with DecryptItemInput
233+
if not hasattr(input, 'encryptedItem'):
234+
return False
235+
236+
# We need the instance with materialDescriptionFieldName and signatureFieldName
237+
if not hasattr(input, 'legacyOverride') or not input.legacyOverride:
238+
return False
239+
240+
legacy_override = input.legacyOverride
241+
if not hasattr(legacy_override, 'materialDescriptionFieldName') or not legacy_override.materialDescriptionFieldName:
242+
return False
243+
if not hasattr(legacy_override, 'signatureFieldName') or not legacy_override.signatureFieldName:
244+
return False
245+
246+
# Check if the item contains both required markers
247+
return (input.encryptedItem.contains(legacy_override.materialDescriptionFieldName) and
248+
input.encryptedItem.contains(legacy_override.signatureFieldName))
249+
250+
except:
251+
# If we encounter any error during detection, default to not using legacy
252+
return False
253+
254+
@staticmethod
255+
def CreateError(message):
256+
"""Create an Error with the given message."""
257+
return Error_DynamoDbItemEncryptorException(message)
258+
259+
260+
261+
aws_dbesdk_dynamodb.internaldafny.generated.InternalLegacyOverride.InternalLegacyOverride = InternalLegacyOverride

0 commit comments

Comments
 (0)