Skip to content

Commit 4dc0798

Browse files
authored
feat(idempotency): Allow durable function to replay (#4834)
1 parent 5e728e1 commit 4dc0798

File tree

4 files changed

+115
-6
lines changed

4 files changed

+115
-6
lines changed

packages/idempotency/src/IdempotencyHandler.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,11 @@ export class IdempotencyHandler<Func extends AnyFunction> {
177177
* window, we might get an `IdempotencyInconsistentStateError`. In such
178178
* cases we can safely retry the handling a few times.
179179
*/
180-
public async handle(): Promise<ReturnType<Func>> {
180+
public async handle({
181+
isReplay,
182+
}: {
183+
isReplay?: boolean;
184+
} = {}): Promise<ReturnType<Func>> {
181185
// early return if we should skip idempotency completely
182186
if (this.shouldSkipIdempotency()) {
183187
return await this.#functionToMakeIdempotent.apply(
@@ -190,7 +194,7 @@ export class IdempotencyHandler<Func extends AnyFunction> {
190194
for (let retryNo = 0; retryNo <= MAX_RETRIES; retryNo++) {
191195
try {
192196
const { isIdempotent, result } =
193-
await this.#saveInProgressOrReturnExistingResult();
197+
await this.#saveInProgressOrReturnExistingResult({ isReplay });
194198
if (isIdempotent) return result as ReturnType<Func>;
195199

196200
return await this.getFunctionResult();
@@ -356,7 +360,11 @@ export class IdempotencyHandler<Func extends AnyFunction> {
356360
* Before returning a result, we might neede to look up the idempotency record
357361
* and validate it to ensure that it is consistent with the payload to be hashed.
358362
*/
359-
readonly #saveInProgressOrReturnExistingResult = async (): Promise<{
363+
readonly #saveInProgressOrReturnExistingResult = async ({
364+
isReplay,
365+
}: {
366+
isReplay?: boolean;
367+
} = {}): Promise<{
360368
isIdempotent: boolean;
361369
result: JSONValue;
362370
}> => {
@@ -381,6 +389,10 @@ export class IdempotencyHandler<Func extends AnyFunction> {
381389
{ cause: error }
382390
);
383391
if (error.name === 'IdempotencyItemAlreadyExistsError') {
392+
if (isReplay) {
393+
return returnValue;
394+
}
395+
384396
let idempotencyRecord = (error as IdempotencyItemAlreadyExistsError)
385397
.existingRecord;
386398
if (idempotencyRecord === undefined) {

packages/idempotency/src/makeIdempotent.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@ const isContext = (arg: unknown): arg is Context => {
1717
);
1818
};
1919

20+
const isDurableContext = (arg: unknown): boolean => {
21+
return (
22+
arg !== undefined &&
23+
arg !== null &&
24+
typeof arg === 'object' &&
25+
'step' in arg
26+
);
27+
}
28+
2029
const isFnHandler = (
2130
fn: AnyFunction,
2231
args: Parameters<AnyFunction>
@@ -26,7 +35,7 @@ const isFnHandler = (
2635
fn !== undefined &&
2736
fn !== null &&
2837
typeof fn === 'function' &&
29-
isContext(args[1])
38+
(isContext(args[1])|| isDurableContext(args[1]))
3039
);
3140
};
3241

@@ -125,7 +134,9 @@ function makeIdempotent<Func extends AnyFunction>(
125134
let functionPayloadToBeHashed: JSONValue;
126135

127136
if (isFnHandler(fn, args)) {
128-
idempotencyConfig.registerLambdaContext(args[1]);
137+
// If it's a durable context, retrieve the lambdaContext property
138+
// Otherwise use the context
139+
idempotencyConfig.registerLambdaContext(args[1]?.lambdaContext || args[1]);
129140
functionPayloadToBeHashed = args[0];
130141
} else {
131142
if (isOptionsWithDataIndexArgument(options)) {
@@ -135,6 +146,8 @@ function makeIdempotent<Func extends AnyFunction>(
135146
}
136147
}
137148

149+
const isReplay = args[1]?.durableExecutionMode === "REPLAY_MODE"
150+
138151
return new IdempotencyHandler({
139152
functionToMakeIdempotent: fn,
140153
idempotencyConfig: idempotencyConfig,
@@ -143,7 +156,7 @@ function makeIdempotent<Func extends AnyFunction>(
143156
functionArguments: args,
144157
functionPayloadToBeHashed,
145158
thisArg: this,
146-
}).handle() as ReturnType<Func>;
159+
}).handle({ isReplay }) as ReturnType<Func>
147160
};
148161
}
149162

packages/idempotency/tests/unit/IdempotencyHandler.test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,76 @@ describe('Class IdempotencyHandler', () => {
239239
);
240240
expect(mockProcessIdempotency).toHaveBeenCalledTimes(MAX_RETRIES + 1);
241241
});
242+
243+
it("allows execution when isReplay is true and there is IN PROGRESS record", async ()=> {
244+
// Prepare
245+
// Mock saveInProgress to simulate an existing IN_PROGRESS record
246+
vi.spyOn(persistenceStore, 'saveInProgress')
247+
.mockRejectedValueOnce(
248+
new IdempotencyItemAlreadyExistsError(
249+
'Record exists',
250+
new IdempotencyRecord({
251+
idempotencyKey: 'test-key',
252+
status: IdempotencyRecordStatus.INPROGRESS,
253+
expiryTimestamp: Date.now() + 10000,
254+
})
255+
)
256+
);
257+
258+
// Act
259+
await idempotentHandler.handle({isReplay: true})
260+
261+
// Assess
262+
expect(mockFunctionToMakeIdempotent).toBeCalled()
263+
})
264+
265+
it("raises an IdempotencyAlreadyInProgressError error when isReplay is false and there is an IN PROGRESS record", async ()=> {
266+
// Prepare
267+
// Mock saveInProgress to simulate an existing IN_PROGRESS record
268+
vi.spyOn(persistenceStore, 'saveInProgress')
269+
.mockRejectedValueOnce(
270+
new IdempotencyItemAlreadyExistsError(
271+
'Record exists',
272+
new IdempotencyRecord({
273+
idempotencyKey: 'test-key',
274+
status: IdempotencyRecordStatus.INPROGRESS,
275+
expiryTimestamp: Date.now() + 10000,
276+
})
277+
)
278+
);
279+
280+
// Act & Assess
281+
await expect(idempotentHandler.handle({ isReplay: false })).rejects.toThrow(IdempotencyAlreadyInProgressError);
282+
})
283+
284+
it("returns the result of the original durable execution when another durable execution with the same payload is invoked", async () => {
285+
286+
// Prepare
287+
vi.spyOn(
288+
persistenceStore,
289+
'saveInProgress'
290+
).mockRejectedValue(new IdempotencyItemAlreadyExistsError());
291+
292+
const stubRecord = new IdempotencyRecord({
293+
idempotencyKey: 'idempotencyKey',
294+
expiryTimestamp: Date.now() + 10000,
295+
inProgressExpiryTimestamp: 0,
296+
responseData: { response: false },
297+
payloadHash: 'payloadHash',
298+
status: IdempotencyRecordStatus.COMPLETED,
299+
});
300+
const getRecordSpy = vi
301+
.spyOn(persistenceStore, 'getRecord')
302+
.mockResolvedValue(stubRecord);
303+
304+
// Act
305+
const result = await idempotentHandler.handle({isReplay: false})
306+
307+
// Assess
308+
expect(result).toStrictEqual({ response: false });
309+
expect(getRecordSpy).toHaveBeenCalledTimes(1);
310+
expect(getRecordSpy).toHaveBeenCalledWith(mockFunctionPayloadToBeHashed);
311+
})
242312
});
243313

244314
describe('Method: getFunctionResult', () => {

packages/idempotency/tests/unit/makeIdempotent.test.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,4 +597,18 @@ describe('Function: makeIdempotent', () => {
597597
);
598598
expect(getRecordSpy).toHaveBeenCalledTimes(0);
599599
});
600+
601+
it("registers the LambdaContext when provided a durable context", async ()=> {
602+
// Prepare
603+
const registerLambdaContextSpy = vi.spyOn(IdempotencyConfig.prototype, "registerLambdaContext")
604+
const fn = async (_event: any, _context: any) => { }
605+
const handler = makeIdempotent(fn, mockIdempotencyOptions)
606+
const mockDurableContext = {step: vi.fn(), lambdaContext: context}
607+
608+
// Act
609+
await handler(event, mockDurableContext)
610+
611+
// Assess
612+
expect(registerLambdaContextSpy).toHaveBeenCalledOnce()
613+
})
600614
});

0 commit comments

Comments
 (0)