2
2
# SPDX-License-Identifier: Apache-2.0
3
3
from aws_dbesdk_dynamodb .internaldafny .generated .AwsCryptographyDbEncryptionSdkDynamoDbItemEncryptorTypes import (
4
4
DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig ,
5
+ Error_DynamoDbItemEncryptorException ,
6
+ EncryptItemOutput_EncryptItemOutput ,
7
+ DecryptItemOutput_DecryptItemOutput ,
8
+ DecryptItemInput_DecryptItemInput ,
9
+ EncryptItemInput_EncryptItemInput ,
10
+ )
11
+ from aws_dbesdk_dynamodb .internaldafny .generated .AwsCryptographyDbEncryptionSdkStructuredEncryptionTypes import (
12
+ CryptoAction_ENCRYPT__AND__SIGN ,
13
+ CryptoAction_SIGN__ONLY ,
14
+ CryptoAction_DO__NOTHING ,
15
+ )
16
+ from aws_dbesdk_dynamodb .smithygenerated .aws_cryptography_dbencryptionsdk_dynamodb .references import (
17
+ ILegacyDynamoDbEncryptor ,
5
18
)
6
19
import smithy_dafny_standard_library .internaldafny .generated .Wrappers as Wrappers
7
20
import _dafny
8
21
9
22
import aws_dbesdk_dynamodb .internaldafny .generated .InternalLegacyOverride
23
+ from aws_dbesdk_dynamodb .smithygenerated .aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor .models import (
24
+ EncryptItemInput ,
25
+ EncryptItemOutput ,
26
+ DecryptItemOutput ,
27
+ DecryptItemInput ,
28
+ )
29
+
10
30
11
31
try :
12
32
from dynamodb_encryption_sdk .encrypted .client import EncryptedClient
13
- from dynamodb_encryption_sdk .structures import EncryptionContext
33
+ from dynamodb_encryption_sdk .structures import EncryptionContext , AttributeActions
34
+ from dynamodb_encryption_sdk .identifiers import CryptoAction
35
+ from dynamodb_encryption_sdk .encrypted import CryptoConfig
36
+ from dynamodb_encryption_sdk .internal .identifiers import ReservedAttributes
14
37
15
38
_HAS_LEGACY_DDBEC = True
16
39
except ImportError :
@@ -25,65 +48,220 @@ def Build(config: DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig):
25
48
26
49
legacy_override = config .legacyOverride .value
27
50
28
- maybe_encryptor = aws_dbesdk_dynamodb .smithygenerated .aws_cryptography_dbencryptionsdk_dynamodb .dafny_to_smithy .aws_cryptography_dbencryptionsdk_dynamodb_LegacyDynamoDbEncryptorReference (
29
- legacy_override .encryptor
30
- )
31
-
32
51
# Precondition: The encryptor MUST be a DynamoDBEncryptor
33
52
if not _HAS_LEGACY_DDBEC :
34
53
return InternalLegacyOverride .CreateBuildFailure (
35
54
InternalLegacyOverride .CreateError ("Could not find aws-dynamodb-encryption-python installation" )
36
55
)
37
- if not isinstance (maybe_encryptor , EncryptedClient ):
56
+ if not isinstance (legacy_override . encryptor , EncryptedClient ):
38
57
return InternalLegacyOverride .CreateBuildFailure (
39
58
InternalLegacyOverride .CreateError ("Legacy encryptor is not supported" )
40
59
)
41
60
42
61
# Preconditions: MUST be able to create valid encryption context
43
62
maybe_encryption_context = InternalLegacyOverride .legacyEncryptionContext (config )
44
63
if maybe_encryption_context .is_Failure :
45
- return InternalLegacyOverride . CreateBuildFailure ( maybe_encryption_context . error ())
64
+ return maybe_encryption_context
46
65
47
66
# Precondition: All actions MUST be supported types
48
67
maybe_actions = InternalLegacyOverride .legacyActions (legacy_override .attributeActionsOnEncrypt )
49
68
if maybe_actions .is_Failure :
50
- return InternalLegacyOverride . CreateBuildFailure ( maybe_actions . error ())
69
+ return maybe_actions
51
70
52
- # TODO: Implement this
71
+ # Create and return the legacy override instance
72
+ legacy_instance = InternalLegacyOverride ()
73
+ legacy_instance .encryptor = legacy_override .encryptor
74
+ legacy_instance .policy = legacy_override .policy
75
+ # # Access the value property, not calling it as a function
76
+ # legacy_instance.encryption_context = maybe_encryption_context.value
77
+ # # Access the value property, not calling it as a function
78
+ # legacy_instance.attribute_actions = maybe_actions.value
79
+ legacy_instance .crypto_config = CryptoConfig (
80
+ materials_provider = legacy_override .encryptor ._materials_provider ,
81
+ encryption_context = maybe_encryption_context .value ,
82
+ attribute_actions = maybe_actions .value ,
83
+ )
84
+ return InternalLegacyOverride .CreateBuildSuccess (
85
+ InternalLegacyOverride .CreateInternalLegacyOverrideSome (legacy_instance )
86
+ )
87
+
88
+ def __init__ (self ):
89
+ super ().__init__ ()
90
+ self .encryptor = None
91
+ self .crypto_config = None
92
+ self .policy = None
53
93
54
94
@staticmethod
55
95
def legacyEncryptionContext (config : DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig ):
96
+ """Create the legacy encryption context from the config."""
56
97
try :
57
- encryption_context_kwargs = {
58
- "table_name" : _dafny .string_of (config .logicalTableName ),
59
- "hash_key_name" : _dafny .string_of (config .partitionKeyName ),
60
- }
61
- if config .sortKeyName .is_Some :
62
- encryption_context_kwargs ["sort_key_name" ] = _dafny .string_of (config .sortKeyName .value )
63
- encryption_context = EncryptionContext (** encryption_context_kwargs )
98
+ # Convert Dafny types to Python strings for the encryption context
99
+ table_name = _dafny .string_of (config .logicalTableName )
100
+ partition_key_name = _dafny .string_of (config .partitionKeyName )
101
+ sort_key_name = _dafny .string_of (config .sortKeyName .value ) if config .sortKeyName .is_Some else None
102
+
103
+ # Create the legacy encryption context with the extracted values
104
+ encryption_context = EncryptionContext (
105
+ table_name = table_name ,
106
+ partition_key_name = partition_key_name ,
107
+ sort_key_name = sort_key_name ,
108
+ )
109
+
64
110
return InternalLegacyOverride .CreateBuildSuccess (encryption_context )
65
111
except Exception as e :
112
+ # Return a failure with the error message if any exception occurs
66
113
return InternalLegacyOverride .CreateBuildFailure (InternalLegacyOverride .CreateError (str (e )))
67
114
68
115
@staticmethod
69
116
def legacyActions (attribute_actions_on_encrypt ):
70
- # TODO: Implement this
71
- pass
117
+ """Create the legacy attribute actions from the config."""
118
+ try :
119
+ # Create a new AttributeActions with default ENCRYPT_AND_SIGN
120
+ legacy_actions = AttributeActions (default_action = CryptoAction .ENCRYPT_AND_SIGN )
72
121
73
- @staticmethod
74
- def EncryptItem (input ):
75
- # TODO: Implement this
76
- return Wrappers .Result_Failure ("TODO-legacy-Encryptitem" )
122
+ # Map the action from the config to legacy actions
123
+ attribute_actions = {}
124
+ for key , action in attribute_actions_on_encrypt .items :
125
+ # Convert the string key to Python string
126
+ key_str = _dafny .string_of (key )
77
127
78
- @staticmethod
79
- def DecryptItem (input ):
80
- # TODO: Implement this
81
- return Wrappers .Result_Failure ("TODO-legacy-Decryptitem" )
128
+ # Map the action type to the appropriate CryptoAction
129
+ if action == CryptoAction_ENCRYPT__AND__SIGN ():
130
+ attribute_actions [key_str ] = CryptoAction .ENCRYPT_AND_SIGN
131
+ elif action == CryptoAction_SIGN__ONLY ():
132
+ attribute_actions [key_str ] = CryptoAction .SIGN_ONLY
133
+ elif action == CryptoAction_DO__NOTHING ():
134
+ attribute_actions [key_str ] = CryptoAction .DO_NOTHING
135
+ else :
136
+ return InternalLegacyOverride .CreateBuildFailure (
137
+ InternalLegacyOverride .CreateError (f"Unknown action type: { action } " )
138
+ )
139
+
140
+ # Update the attribute_actions dictionary
141
+ legacy_actions .attribute_actions = attribute_actions
142
+ return InternalLegacyOverride .CreateBuildSuccess (legacy_actions )
143
+ except Exception as e :
144
+ return InternalLegacyOverride .CreateBuildFailure (InternalLegacyOverride .CreateError (str (e )))
145
+
146
+ def EncryptItem (self , input : EncryptItemInput_EncryptItemInput ):
147
+ """Encrypt an item using the legacy DynamoDB encryptor.
148
+
149
+ Args:
150
+ input: EncryptItemInput containing the plaintext item to encrypt
151
+
152
+ Returns:
153
+ Result containing the encrypted item or an error
154
+ """
155
+ try :
156
+ # Check policy
157
+ if not self .policy .is_FORCE__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT :
158
+ return Wrappers .Result_Failure (
159
+ InternalLegacyOverride .CreateError ("Legacy policy does not support encrypt" )
160
+ )
161
+
162
+ # Get the Native Plaintext Item
163
+ native_input = aws_dbesdk_dynamodb .smithygenerated .aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor .dafny_to_smithy .aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_EncryptItemInput (
164
+ input
165
+ )
166
+
167
+ # Use the encryptor to encrypt the item using the instance attributes
168
+ encrypted_item = self .encryptor ._encrypt_item (
169
+ item = native_input .plaintext_item ,
170
+ crypto_config = self .crypto_config .with_item (native_input .plaintext_item ),
171
+ )
172
+
173
+ # Return the encrypted item
174
+ # The legacy encryption client returns items in the format that Dafny expects,
175
+ # so no additional conversion is needed here
176
+ native_output = EncryptItemOutput (encrypted_item = encrypted_item , parsed_header = None )
177
+ dafny_output = aws_dbesdk_dynamodb .smithygenerated .aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor .smithy_to_dafny .aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_EncryptItemOutput (
178
+ native_output
179
+ )
180
+ return Wrappers .Result_Success (dafny_output )
181
+
182
+ except Exception as e :
183
+ # Return an appropriate error result with the exception details
184
+ return Wrappers .Result_Failure (InternalLegacyOverride .CreateError (f"Error during encryption: { str (e )} " ))
185
+
186
+ def DecryptItem (self , input : DecryptItemInput_DecryptItemInput ):
187
+ """Decrypt an item using the legacy DynamoDB encryptor.
188
+
189
+ Args:
190
+ input: DecryptItemInput containing the encrypted item to decrypt
191
+
192
+ Returns:
193
+ Result containing the decrypted item or an error
194
+ """
195
+ try :
196
+ # Check policy
197
+ if not (
198
+ self .policy .is_FORCE__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT
199
+ or self .policy .is_FORBID__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT
200
+ ):
201
+ return Wrappers .Result_Failure (
202
+ InternalLegacyOverride .CreateError ("Legacy policy does not support decrypt" )
203
+ )
204
+
205
+ # Get the Native DecryptItemInput
206
+ native_input : DecryptItemInput = (
207
+ aws_dbesdk_dynamodb .smithygenerated .aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor .dafny_to_smithy .aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_DecryptItemInput (
208
+ input
209
+ )
210
+ )
211
+ # Use the encryptor to decrypt the item using the instance attributes
212
+ decrypted_item = self .encryptor ._decrypt_item (
213
+ item = native_input .encrypted_item ,
214
+ crypto_config = self .crypto_config .with_item (native_input .encrypted_item ),
215
+ )
216
+
217
+ native_output = DecryptItemOutput (plaintext_item = decrypted_item , parsed_header = None )
218
+ dafny_output = aws_dbesdk_dynamodb .smithygenerated .aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor .smithy_to_dafny .aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_DecryptItemOutput (
219
+ native_output
220
+ )
221
+ return Wrappers .Result_Success (dafny_output )
222
+ except Exception as e :
223
+ # Return an appropriate error result with the exception details
224
+ return Wrappers .Result_Failure (InternalLegacyOverride .CreateError (f"Error during decryption: { str (e )} " ))
225
+
226
+ def IsLegacyInput (self , input : DecryptItemInput_DecryptItemInput ):
227
+ """
228
+ Determine if the input is from a legacy client.
229
+
230
+ Args:
231
+ input: The decrypt item input to check
232
+
233
+ Returns:
234
+ Boolean indicating if the input is from a legacy client
235
+ """
236
+ try :
237
+ if not _HAS_LEGACY_DDBEC :
238
+ return False
239
+
240
+ if not input .is_DecryptItemInput :
241
+ return False
242
+
243
+ # Get the Native DecryptItemInput
244
+ native_input : DecryptItemInput = (
245
+ aws_dbesdk_dynamodb .smithygenerated .aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor .dafny_to_smithy .aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_DecryptItemInput (
246
+ input
247
+ )
248
+ )
249
+ # = specification/dynamodb-encryption-client/decrypt-item.md#determining-legacy-items
250
+ ## An item MUST be determined to be encrypted under the legacy format if it contains
251
+ ## attributes for the material description and the signature.
252
+ return (
253
+ "*amzn-ddb-map-desc*" in native_input .encrypted_item
254
+ and "*amzn-ddb-map-sig*" in native_input .encrypted_item
255
+ )
256
+
257
+ except Exception as e :
258
+ # If we encounter any error during detection, default to not using legacy
259
+ return Wrappers .Result_Failure (InternalLegacyOverride .CreateError (f"Error in IsLegacyInput: { e } " ))
82
260
83
261
@staticmethod
84
- def IsLegacyinput ( input ):
85
- # TODO: Implement this
86
- return False
262
+ def CreateError ( message ):
263
+ """Create an Error with the given message."""
264
+ return Error_DynamoDbItemEncryptorException ( message )
87
265
88
266
89
267
aws_dbesdk_dynamodb .internaldafny .generated .InternalLegacyOverride .InternalLegacyOverride = InternalLegacyOverride
0 commit comments