Skip to content

Commit f106143

Browse files
authored
fix(idempotency): Validate payload for optimistic idempotent writes. (#2261)
* Validate payload for optimistic idempotent writes. * Re-throw IdempotencyValidationException instead of wrapping in a persistence layer exception.
1 parent 71c632b commit f106143

File tree

3 files changed

+84
-19
lines changed

3 files changed

+84
-19
lines changed

powertools-idempotency/powertools-idempotency-core/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ private Object processIdempotency() throws Throwable {
109109
}
110110
} catch (IdempotencyKeyException ike) {
111111
throw ike;
112+
} catch (IdempotencyValidationException ive) {
113+
throw ive;
112114
} catch (Exception e) {
113115
throw new IdempotencyPersistenceLayerException(
114116
"Failed to save in progress record to idempotency store. If you believe this is a Powertools for AWS Lambda (Java) bug, please open an issue.",

powertools-idempotency/powertools-idempotency-core/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStore.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@
1616

1717
import static software.amazon.lambda.powertools.common.internal.LambdaConstants.LAMBDA_FUNCTION_NAME_ENV;
1818

19-
import com.fasterxml.jackson.core.JsonProcessingException;
20-
import com.fasterxml.jackson.databind.JsonNode;
21-
import com.fasterxml.jackson.databind.ObjectWriter;
22-
import io.burt.jmespath.Expression;
2319
import java.math.BigInteger;
2420
import java.nio.charset.StandardCharsets;
2521
import java.security.MessageDigest;
@@ -33,8 +29,15 @@
3329
import java.util.Spliterators;
3430
import java.util.stream.Stream;
3531
import java.util.stream.StreamSupport;
32+
3633
import org.slf4j.Logger;
3734
import org.slf4j.LoggerFactory;
35+
36+
import com.fasterxml.jackson.core.JsonProcessingException;
37+
import com.fasterxml.jackson.databind.JsonNode;
38+
import com.fasterxml.jackson.databind.ObjectWriter;
39+
40+
import io.burt.jmespath.Expression;
3841
import software.amazon.lambda.powertools.idempotency.IdempotencyConfig;
3942
import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyItemAlreadyExistsException;
4043
import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyItemNotFoundException;
@@ -125,8 +128,7 @@ public void saveSuccess(JsonNode data, Object result, Instant now) {
125128
DataRecord.Status.COMPLETED,
126129
getExpiryEpochSecond(now),
127130
responseJson,
128-
getHashedPayload(data)
129-
);
131+
getHashedPayload(data));
130132
LOG.debug("Function successfully executed. Saving record to persistence store with idempotency key: {}",
131133
dataRecord.getIdempotencyKey());
132134
updateRecord(dataRecord);
@@ -157,8 +159,8 @@ public void saveInProgress(JsonNode data, Instant now, OptionalInt remainingTime
157159

158160
OptionalLong inProgressExpirationMsTimestamp = OptionalLong.empty();
159161
if (remainingTimeInMs.isPresent()) {
160-
inProgressExpirationMsTimestamp =
161-
OptionalLong.of(now.plus(remainingTimeInMs.getAsInt(), ChronoUnit.MILLIS).toEpochMilli());
162+
inProgressExpirationMsTimestamp = OptionalLong
163+
.of(now.plus(remainingTimeInMs.getAsInt(), ChronoUnit.MILLIS).toEpochMilli());
162164
}
163165

164166
DataRecord dataRecord = new DataRecord(
@@ -167,10 +169,23 @@ public void saveInProgress(JsonNode data, Instant now, OptionalInt remainingTime
167169
getExpiryEpochSecond(now),
168170
null,
169171
getHashedPayload(data),
170-
inProgressExpirationMsTimestamp
171-
);
172+
inProgressExpirationMsTimestamp);
172173
LOG.debug("saving in progress record for idempotency key: {}", dataRecord.getIdempotencyKey());
173-
putRecord(dataRecord, now);
174+
175+
try {
176+
putRecord(dataRecord, now);
177+
} catch (IdempotencyItemAlreadyExistsException iaee) {
178+
// Similar to getRecord, we need to call validatePayload before returning a data record.
179+
// PR https://github.com/aws-powertools/powertools-lambda-java/pull/1821 introduced returning a data record
180+
// through IdempotencyItemAlreadyExistsException to save DynamoDB calls when using DDB as store.
181+
Optional<DataRecord> dr = iaee.getDataRecord();
182+
if (dr.isPresent()) {
183+
// throws IdempotencyValidationException if payload validation is enabled and failing
184+
validatePayload(data, dr.get());
185+
}
186+
187+
throw iaee;
188+
}
174189
}
175190

176191
/**
@@ -188,7 +203,7 @@ public void deleteRecord(JsonNode data, Throwable throwable) {
188203

189204
String idemPotencyKey = hashedIdempotencyKey.get();
190205
LOG.debug("Function raised an exception {}. " +
191-
"Clearing in progress record in persistence store for idempotency key: {}",
206+
"Clearing in progress record in persistence store for idempotency key: {}",
192207
throwable.getClass(),
193208
idemPotencyKey);
194209

@@ -255,9 +270,9 @@ private Optional<String> getHashedIdempotencyKey(JsonNode data) {
255270

256271
private boolean isMissingIdemPotencyKey(JsonNode data) {
257272
if (data.isContainerNode()) {
258-
Stream<JsonNode> stream =
259-
StreamSupport.stream(Spliterators.spliteratorUnknownSize(data.elements(), Spliterator.ORDERED),
260-
false);
273+
Stream<JsonNode> stream = StreamSupport.stream(
274+
Spliterators.spliteratorUnknownSize(data.elements(), Spliterator.ORDERED),
275+
false);
261276
return stream.allMatch(JsonNode::isNull);
262277
}
263278
return data.isNull();

powertools-idempotency/powertools-idempotency-core/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStoreTest.java

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
2929
import com.amazonaws.services.lambda.runtime.tests.EventLoader;
3030
import com.fasterxml.jackson.core.JsonProcessingException;
31+
import com.fasterxml.jackson.databind.JsonNode;
3132
import com.fasterxml.jackson.databind.node.DoubleNode;
3233
import com.fasterxml.jackson.databind.node.TextNode;
3334

@@ -145,8 +146,8 @@ void saveInProgress_jmespath_NotFound_shouldThrowException() {
145146
assertThatThrownBy(
146147
() -> persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now,
147148
OptionalInt.empty()))
148-
.isInstanceOf(IdempotencyKeyException.class)
149-
.hasMessageContaining("No data found to create a hashed idempotency key");
149+
.isInstanceOf(IdempotencyKeyException.class)
150+
.hasMessageContaining("No data found to create a hashed idempotency key");
150151
assertThat(status).isEqualTo(-1);
151152
}
152153

@@ -181,7 +182,7 @@ void saveInProgress_withLocalCache_NotExpired_ShouldThrowException() {
181182
assertThatThrownBy(
182183
() -> persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now,
183184
OptionalInt.empty()))
184-
.isInstanceOf(IdempotencyItemAlreadyExistsException.class);
185+
.isInstanceOf(IdempotencyItemAlreadyExistsException.class);
185186
assertThat(status).isEqualTo(-1);
186187
}
187188

@@ -243,7 +244,8 @@ void saveSuccess_withCacheEnabled_shouldSaveInCache() throws JsonProcessingExcep
243244
DataRecord cachedDr = cache.get("testFunction#8d6a8f173b46479eff55e0997864a514");
244245
assertThat(cachedDr.getStatus()).isEqualTo(DataRecord.Status.COMPLETED);
245246
assertThat(cachedDr.getExpiryTimestamp()).isEqualTo(now.plus(3600, ChronoUnit.SECONDS).getEpochSecond());
246-
assertThat(cachedDr.getResponseData()).isEqualTo(JsonConfig.get().getObjectMapper().writeValueAsString(product));
247+
assertThat(cachedDr.getResponseData())
248+
.isEqualTo(JsonConfig.get().getObjectMapper().writeValueAsString(product));
247249
assertThat(cachedDr.getIdempotencyKey()).isEqualTo("testFunction#8d6a8f173b46479eff55e0997864a514");
248250
assertThat(cachedDr.getPayloadHash()).isEmpty();
249251
}
@@ -325,6 +327,52 @@ void getRecord_invalidPayload_shouldThrowValidationException() {
325327

326328
assertThatThrownBy(
327329
() -> persistenceStore.getRecord(JsonConfig.get().getObjectMapper().valueToTree(event), Instant.now()))
330+
.isInstanceOf(IdempotencyValidationException.class);
331+
}
332+
333+
@Test
334+
void saveInProgress_invalidPayload_shouldThrowValidationException() {
335+
APIGatewayProxyRequestEvent event = EventLoader.loadApiGatewayRestEvent("apigw_event.json");
336+
persistenceStore = new BasePersistenceStore() {
337+
@Override
338+
public DataRecord getRecord(String idempotencyKey) throws IdempotencyItemNotFoundException {
339+
return new DataRecord(idempotencyKey, DataRecord.Status.INPROGRESS,
340+
Instant.now().plus(3600, ChronoUnit.SECONDS).getEpochSecond(), "Response", "different hash");
341+
}
342+
343+
@Override
344+
public void putRecord(DataRecord dataRecord, Instant now) throws IdempotencyItemAlreadyExistsException {
345+
DataRecord existingRecord = new DataRecord(
346+
dataRecord.getIdempotencyKey(),
347+
DataRecord.Status.INPROGRESS,
348+
Instant.now().plus(3600, ChronoUnit.SECONDS).getEpochSecond(),
349+
null,
350+
"different hash");
351+
throw new IdempotencyItemAlreadyExistsException("Item already exists", new Exception(), existingRecord);
352+
}
353+
354+
@Override
355+
public void updateRecord(DataRecord dataRecord) {
356+
// Not needed for this test.
357+
}
358+
359+
@Override
360+
public void deleteRecord(String idempotencyKey) {
361+
// Not needed for this test.
362+
363+
}
364+
};
365+
366+
persistenceStore.configure(IdempotencyConfig.builder()
367+
.withEventKeyJMESPath("powertools_json(body).id")
368+
.withPayloadValidationJMESPath("powertools_json(body).message")
369+
.build(),
370+
"myfunc");
371+
372+
Instant now = Instant.now();
373+
OptionalInt remainingTime = OptionalInt.empty();
374+
JsonNode eventJson = JsonConfig.get().getObjectMapper().valueToTree(event);
375+
assertThatThrownBy(() -> persistenceStore.saveInProgress(eventJson, now, remainingTime))
328376
.isInstanceOf(IdempotencyValidationException.class);
329377
}
330378

0 commit comments

Comments
 (0)