4
4
DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig ,
5
5
Error_DynamoDbItemEncryptorException ,
6
6
EncryptItemOutput_EncryptItemOutput ,
7
- DecryptItemOutput_DecryptItemOutput
7
+ DecryptItemOutput_DecryptItemOutput ,
8
8
)
9
9
from aws_dbesdk_dynamodb .internaldafny .generated .AwsCryptographyDbEncryptionSdkStructuredEncryptionTypes import (
10
10
CryptoAction_ENCRYPT__AND__SIGN ,
@@ -65,10 +65,12 @@ def Build(config: DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig):
65
65
legacy_instance .encryption_context = maybe_encryption_context .value
66
66
# Access the value property, not calling it as a function
67
67
legacy_instance .attribute_actions = maybe_actions .value
68
-
68
+
69
69
# Set the material description field name and signature field name
70
70
# These values might be customized by the customer
71
- if isinstance (legacy_override .encryptor , EncryptedClient ) and hasattr (legacy_override .encryptor , "_crypto_config" ):
71
+ if isinstance (legacy_override .encryptor , EncryptedClient ) and hasattr (
72
+ legacy_override .encryptor , "_crypto_config"
73
+ ):
72
74
# Get field names from the encryptor's crypto config
73
75
crypto_config = legacy_override .encryptor ._crypto_config
74
76
if hasattr (crypto_config , "material_description_field_name" ):
@@ -78,11 +80,9 @@ def Build(config: DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig):
78
80
else :
79
81
# Use default value if not explicitly set
80
82
legacy_instance .materialDescriptionFieldName = _dafny .seq_of_chars ("*amzn-ddb-map-desc*" )
81
-
83
+
82
84
if hasattr (crypto_config , "signature_field_name" ):
83
- legacy_instance .signatureFieldName = _dafny .seq_of_chars (
84
- crypto_config .signature_field_name
85
- )
85
+ legacy_instance .signatureFieldName = _dafny .seq_of_chars (crypto_config .signature_field_name )
86
86
else :
87
87
# Use default value if not explicitly set
88
88
legacy_instance .signatureFieldName = _dafny .seq_of_chars ("*amzn-ddb-map-sig*" )
@@ -139,83 +139,79 @@ def legacyActions(attribute_actions_on_encrypt):
139
139
140
140
def EncryptItem (self , input ):
141
141
"""Encrypt an item using the legacy DynamoDB encryptor.
142
-
142
+
143
143
Args:
144
144
input: EncryptItemInput containing the plaintext item to encrypt
145
-
145
+
146
146
Returns:
147
147
Result containing the encrypted item or an error
148
148
"""
149
149
try :
150
150
# Get the plaintext item from the input
151
151
plaintext_item = input .plaintextItem
152
-
152
+
153
153
# Check policy
154
154
if not self .policy .is_FORCE__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT :
155
155
return Wrappers .Result_Failure (
156
156
InternalLegacyOverride .CreateError ("Legacy policy does not support encrypt" )
157
157
)
158
-
158
+
159
159
# Use the encryptor to encrypt the item using the instance attributes
160
160
encrypted_item = self .encryptor .encrypt_item (
161
- plaintext_item ,
162
- actions = self .attribute_actions ,
163
- encryption_context = self .encryption_context
161
+ plaintext_item , actions = self .attribute_actions , encryption_context = self .encryption_context
164
162
)
165
-
163
+
166
164
# Create the output with the encrypted item
167
165
output = EncryptItemOutput_EncryptItemOutput (encrypted_item , Wrappers .Option_None ())
168
166
return Wrappers .Result_Success (output )
169
-
167
+
170
168
except Exception as e :
171
169
# Return an appropriate error result with the exception details
172
- return Wrappers .Result_Failure (
173
- InternalLegacyOverride .CreateError (f"Error during encryption: { str (e )} " )
174
- )
170
+ return Wrappers .Result_Failure (InternalLegacyOverride .CreateError (f"Error during encryption: { str (e )} " ))
175
171
176
172
def DecryptItem (self , input ):
177
173
"""Decrypt an item using the legacy DynamoDB encryptor.
178
174
179
175
Args:
180
176
input: DecryptItemInput containing the encrypted item to decrypt
181
-
177
+
182
178
Returns:
183
179
Result containing the decrypted item or an error
184
180
"""
185
181
try :
186
182
# Get the encrypted item from the input
187
183
encrypted_item = input .encryptedItem
188
-
184
+
189
185
# Check policy
190
- if not (self .policy .is_FORCE__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT or
191
- self .policy .is_FORBID__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT ):
186
+ if not (
187
+ self .policy .is_FORCE__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT
188
+ or self .policy .is_FORBID__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT
189
+ ):
192
190
return Wrappers .Result_Failure (
193
191
InternalLegacyOverride .CreateError ("Legacy policy does not support decrypt" )
194
192
)
195
-
193
+
196
194
# Validate that this is indeed a legacy item with the required fields
197
- if not (encrypted_item .contains (self .materialDescriptionFieldName ) and
198
- encrypted_item .contains (self .signatureFieldName )):
195
+ if not (
196
+ encrypted_item .contains (self .materialDescriptionFieldName )
197
+ and encrypted_item .contains (self .signatureFieldName )
198
+ ):
199
199
return Wrappers .Result_Failure (
200
200
InternalLegacyOverride .CreateError ("Item does not contain required legacy fields" )
201
201
)
202
-
202
+
203
203
# Use the encryptor to decrypt the item using the instance attributes
204
204
decrypted_item = self .encryptor .decrypt_item (
205
- encrypted_item ,
206
- actions = self .attribute_actions ,
207
- encryption_context = self .encryption_context
205
+ encrypted_item , actions = self .attribute_actions , encryption_context = self .encryption_context
208
206
)
209
-
207
+
210
208
# Create the output with the decrypted item
211
209
output = DecryptItemOutput_DecryptItemOutput (decrypted_item , Wrappers .Option_None ())
212
210
return Wrappers .Result_Success (output )
213
-
211
+
214
212
except Exception as e :
215
213
# Return an appropriate error result with the exception details
216
- return Wrappers .Result_Failure (
217
- InternalLegacyOverride .CreateError (f"Error during decryption: { str (e )} " )
218
- )
214
+ return Wrappers .Result_Failure (InternalLegacyOverride .CreateError (f"Error during decryption: { str (e )} " ))
219
215
220
216
def __init__ (self ):
221
217
super ().__init__ ()
@@ -240,23 +236,27 @@ def IsLegacyInput(input):
240
236
return False
241
237
242
238
# Check if we're dealing with DecryptItemInput
243
- if not hasattr (input , ' encryptedItem' ):
239
+ if not hasattr (input , " encryptedItem" ):
244
240
return False
245
241
246
242
# We need the instance with materialDescriptionFieldName and signatureFieldName
247
- if not hasattr (input , ' legacyOverride' ) or not input .legacyOverride :
243
+ if not hasattr (input , " legacyOverride" ) or not input .legacyOverride :
248
244
return False
249
245
250
246
legacy_override = input .legacyOverride
251
- if not hasattr (legacy_override , 'materialDescriptionFieldName' ) or not legacy_override .materialDescriptionFieldName :
247
+ if (
248
+ not hasattr (legacy_override , "materialDescriptionFieldName" )
249
+ or not legacy_override .materialDescriptionFieldName
250
+ ):
252
251
return False
253
- if not hasattr (legacy_override , ' signatureFieldName' ) or not legacy_override .signatureFieldName :
252
+ if not hasattr (legacy_override , " signatureFieldName" ) or not legacy_override .signatureFieldName :
254
253
return False
255
254
256
255
# Check if the item contains both required markers
257
- return (input .encryptedItem .contains (legacy_override .materialDescriptionFieldName ) and
258
- input .encryptedItem .contains (legacy_override .signatureFieldName ))
259
-
256
+ return input .encryptedItem .contains (
257
+ legacy_override .materialDescriptionFieldName
258
+ ) and input .encryptedItem .contains (legacy_override .signatureFieldName )
259
+
260
260
except :
261
261
# If we encounter any error during detection, default to not using legacy
262
262
return False
@@ -267,5 +267,4 @@ def CreateError(message):
267
267
return Error_DynamoDbItemEncryptorException (message )
268
268
269
269
270
-
271
270
aws_dbesdk_dynamodb .internaldafny .generated .InternalLegacyOverride .InternalLegacyOverride = InternalLegacyOverride
0 commit comments