Skip to content

Commit b79cc0e

Browse files
committed
Log warning when ignoring state transition
1 parent d3d06ac commit b79cc0e

File tree

4 files changed

+139
-48
lines changed

4 files changed

+139
-48
lines changed
Lines changed: 94 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
import { KinesisClient } from '@aws-sdk/client-kinesis';
1+
import { KinesisClient } from "@aws-sdk/client-kinesis";
22
import * as pino from "pino";
3-
import { mockDeep } from 'jest-mock-extended';
4-
import { DynamoDBStreamEvent, Context } from 'aws-lambda';
5-
import { Deps } from '../deps';
6-
import { EnvVars } from '../env';
7-
import { createHandler } from '../letter-stream-forwarder';
3+
import { mockDeep } from "jest-mock-extended";
4+
import { DynamoDBStreamEvent, Context } from "aws-lambda";
5+
import { Deps } from "../deps";
6+
import { EnvVars } from "../env";
7+
import { createHandler } from "../letter-stream-forwarder";
88

9-
describe('letter-stream-forwarder Lambda', () => {
9+
describe("letter-stream-forwarder Lambda", () => {
1010

1111
const mockedDeps: jest.Mocked<Deps> = {
1212
kinesisClient: { send: jest.fn()} as unknown as KinesisClient,
@@ -21,15 +21,15 @@ describe('letter-stream-forwarder Lambda', () => {
2121
});
2222

2323

24-
it('forwards status changes to Kinesis', async () => {
24+
it("forwards status changes to Kinesis", async () => {
2525
const event: DynamoDBStreamEvent = {
2626
Records: [
2727
{
28-
eventName: 'MODIFY',
28+
eventName: "MODIFY",
2929
dynamodb: {
30-
Keys: { id: { S: '123' } },
31-
OldImage: { status: { S: 'PENDING' }, id: { S: '123' } },
32-
NewImage: { status: { S: 'ACCEPTED' }, id: { S: '123' } },
30+
Keys: { id: { S: "123" } },
31+
OldImage: buildValidLetter(),
32+
NewImage: {...buildValidLetter(), status: { S: "ACCEPTED" } },
3333
},
3434
},
3535
],
@@ -41,23 +41,43 @@ describe('letter-stream-forwarder Lambda', () => {
4141
expect(mockedDeps.kinesisClient.send).toHaveBeenCalledWith(
4242
expect.objectContaining({
4343
input: expect.objectContaining({
44-
StreamARN: 'test-stream.arn',
45-
PartitionKey: '123',
44+
StreamARN: "test-stream.arn",
45+
PartitionKey: "123",
4646
}),
4747
})
4848
);
4949
});
5050

5151

52-
it('forwards to Kinesis if a reason code is added', async () => {
52+
it("does not forward invalid status changes", async () => {
5353
const event: DynamoDBStreamEvent = {
5454
Records: [
5555
{
56-
eventName: 'MODIFY',
56+
eventName: "MODIFY",
5757
dynamodb: {
58-
Keys: { id: { S: '123' } },
59-
OldImage: { status: { S: 'PENDING' }, id: { S: '123' } },
60-
NewImage: { status: { S: 'PENDING' }, id: { S: '123' }, reasonCode: {S: 'r1'} },
58+
Keys: { id: { S: "123" } },
59+
OldImage: {...buildValidLetter(), status: { S: "CANCELLED" } },
60+
NewImage: {...buildValidLetter(), status: { S: "PRINTED" } },
61+
},
62+
},
63+
],
64+
};
65+
66+
const handler = createHandler(mockedDeps);
67+
await handler(event, mockDeep<Context>(), jest.fn());
68+
69+
expect(mockedDeps.kinesisClient.send).not.toHaveBeenCalled();
70+
});
71+
72+
it("forwards to Kinesis if a reason code is added", async () => {
73+
const event: DynamoDBStreamEvent = {
74+
Records: [
75+
{
76+
eventName: "MODIFY",
77+
dynamodb: {
78+
Keys: { id: { S: "123" } },
79+
OldImage: buildValidLetter(),
80+
NewImage: {...buildValidLetter(), reasonCode: {S: "r1"} },
6181
},
6282
},
6383
],
@@ -69,23 +89,23 @@ describe('letter-stream-forwarder Lambda', () => {
6989
expect(mockedDeps.kinesisClient.send).toHaveBeenCalledWith(
7090
expect.objectContaining({
7191
input: expect.objectContaining({
72-
StreamARN: 'test-stream.arn',
73-
PartitionKey: '123',
92+
StreamARN: "test-stream.arn",
93+
PartitionKey: "123",
7494
}),
7595
})
7696
);
7797
});
7898

7999

80-
it('forwards to Kinesis if a reason code is changed', async () => {
100+
it("forwards to Kinesis if a reason code is changed", async () => {
81101
const event: DynamoDBStreamEvent = {
82102
Records: [
83103
{
84-
eventName: 'MODIFY',
104+
eventName: "MODIFY",
85105
dynamodb: {
86-
Keys: { id: { S: '123' } },
87-
OldImage: { status: { S: 'PENDING' }, id: { S: '123' }, reasonCode: {S: 'r1'} },
88-
NewImage: { status: { S: 'PENDING' }, id: { S: '123' }, reasonCode: {S: 'r2'} },
106+
Keys: { id: { S: "123" } },
107+
OldImage: {...buildValidLetter(), reasonCode: {S: "r1"} },
108+
NewImage: {...buildValidLetter(), reasonCode: {S: "r2"} },
89109
},
90110
},
91111
],
@@ -97,22 +117,22 @@ describe('letter-stream-forwarder Lambda', () => {
97117
expect(mockedDeps.kinesisClient.send).toHaveBeenCalledWith(
98118
expect.objectContaining({
99119
input: expect.objectContaining({
100-
StreamARN: 'test-stream.arn',
101-
PartitionKey: '123',
120+
StreamARN: "test-stream.arn",
121+
PartitionKey: "123",
102122
}),
103123
})
104124
);
105125
});
106126

107-
it('does not forward if neither status nor reason code changed', async () => {
127+
it("does not forward if neither status nor reason code changed", async () => {
108128
const event: DynamoDBStreamEvent = {
109129
Records: [
110130
{
111-
eventName: 'MODIFY',
131+
eventName: "MODIFY",
112132
dynamodb: {
113-
Keys: { id: { S: '123' } },
114-
OldImage: { status: { S: 'PENDING' }, id: { S: '123' } },
115-
NewImage: { status: { S: 'PENDING' }, id: { S: '123' } },
133+
Keys: { id: { S: "123" } },
134+
OldImage: buildValidLetter(),
135+
NewImage: buildValidLetter(),
116136
},
117137
},
118138
],
@@ -124,14 +144,14 @@ describe('letter-stream-forwarder Lambda', () => {
124144
expect(mockedDeps.kinesisClient.send).not.toHaveBeenCalled();
125145
});
126146

127-
it('does not forward non-MODIFY events', async () => {
147+
it("does not forward non-MODIFY events", async () => {
128148
const event: DynamoDBStreamEvent = {
129149
Records: [
130150
{
131-
eventName: 'INSERT',
151+
eventName: "INSERT",
132152
dynamodb: {
133-
Keys: { id: { S: '123' } },
134-
NewImage: { status: { S: 'PENDING' }, id: { S: '123' } },
153+
Keys: { id: { S: "123" } },
154+
NewImage: buildValidLetter(),
135155
},
136156
},
137157
],
@@ -142,4 +162,41 @@ describe('letter-stream-forwarder Lambda', () => {
142162

143163
expect(mockedDeps.kinesisClient.send).not.toHaveBeenCalled();
144164
});
165+
166+
167+
it("does not forward invalid letter data", async () => {
168+
const event: DynamoDBStreamEvent = {
169+
Records: [
170+
{
171+
eventName: "MODIFY",
172+
dynamodb: {
173+
Keys: { id: { S: "123" } },
174+
OldImage: buildInvalidLetter(),
175+
NewImage: {...buildInvalidLetter(), status: { S: "ACCEPTED" } },
176+
},
177+
}
178+
],
179+
};
180+
181+
const handler = createHandler(mockedDeps);
182+
await expect(handler(event, mockDeep<Context>(), jest.fn())).rejects.toThrow();
183+
184+
expect(mockedDeps.kinesisClient.send).not.toHaveBeenCalled();
185+
});
186+
187+
function buildValidLetter() {
188+
return {
189+
id: {S: "123"},
190+
status: {S: "PENDING"},
191+
specificationId: {S: "spec1"},
192+
groupId: {S: "group1"},
193+
};
194+
}
195+
196+
function buildInvalidLetter() {
197+
return {
198+
id: {S: "123"},
199+
status: {S: "PENDING"},
200+
};
201+
}
145202
});

lambdas/letter-stream-forwarder/src/letter-stream-forwarder.ts

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,37 @@
1-
import { LetterBase } from "@internal/datastore";
2-
import { DynamoDBStreamEvent, Handler, DynamoDBRecord } from "aws-lambda";
1+
import { LetterSchemaBase } from "@internal/datastore";
2+
import { DynamoDBStreamEvent, DynamoDBRecord, DynamoDBStreamHandler } from "aws-lambda";
33
import { PutRecordCommand } from "@aws-sdk/client-kinesis";
44
import { Deps } from "./deps";
55
import { unmarshall } from "@aws-sdk/util-dynamodb";
6+
import { LetterStatus } from "../../api-handler/src/contracts/letters";
7+
import { Logger } from "pino";
68

7-
export function createHandler(deps: Deps): Handler<DynamoDBStreamEvent> {
9+
const VALID_STATE_TRANSITIONS: Record<LetterStatus, Set<LetterStatus>> = {
10+
"PENDING": new Set(["ACCEPTED", "REJECTED"]),
11+
"REJECTED": new Set(["FAILED"]),
12+
"ACCEPTED": new Set(["FORWARDED", "PRINTED", "ENCLOSED", "DISPATCHED", "CANCELLED", "FAILED"]),
13+
"PRINTED": new Set(["ENCLOSED", "DISPATCHED", "CANCELLED", "FAILED"]),
14+
"ENCLOSED": new Set(["DISPATCHED", "CANCELLED", "FAILED"]),
15+
"FORWARDED": new Set(["DELIVERED", "RETURNED"]),
16+
"DISPATCHED": new Set(["DELIVERED", "RETURNED"]),
17+
"CANCELLED": new Set([]),
18+
"FAILED": new Set([]),
19+
"DELIVERED": new Set(["RETURNED"]),
20+
"RETURNED": new Set([]),
21+
}
22+
23+
export function createHandler(deps: Deps): DynamoDBStreamHandler {
824
return async (event: DynamoDBStreamEvent): Promise<void> => {
9-
deps.logger.info({description: 'Received event', event});
25+
deps.logger.info({description: "Received event", event});
1026
const statusChanges = event.Records
1127
.filter(record => record.eventName === "MODIFY")
12-
.filter(record => isChanged(record, 'status') || isChanged(record, 'reasonCode'));
28+
.filter(record =>
29+
(isChanged(record, "status") && isValidStateTransition(record, deps.logger)) ||
30+
isChanged(record, "reasonCode"));
1331

1432
for (const record of statusChanges) {
1533
const newImage = record.dynamodb?.NewImage!;
16-
const letter = unmarshall(newImage as any) as LetterBase;
34+
const letter = LetterSchemaBase.parse(unmarshall(newImage as any));
1735
await deps.kinesisClient.send(new PutRecordCommand({
1836
StreamARN: deps.env.LETTER_CHANGE_STREAM_ARN,
1937
PartitionKey: letter.id,
@@ -22,6 +40,16 @@ export function createHandler(deps: Deps): Handler<DynamoDBStreamEvent> {
2240
}
2341
};
2442

43+
function isValidStateTransition(record: DynamoDBRecord, logger: Logger): boolean {
44+
const oldStatus = record.dynamodb?.OldImage?.status?.S! as LetterStatus;
45+
const newStatus = record.dynamodb?.NewImage?.status?.S! as LetterStatus;
46+
const valid = VALID_STATE_TRANSITIONS[oldStatus].has(newStatus);
47+
if (!valid) {
48+
logger.warn({description: "Ignoring invalid state transition", oldStatus, newStatus});
49+
}
50+
return valid;
51+
}
52+
2553
function isChanged(record: DynamoDBRecord, property: string): boolean {
2654
const oldValue = record.dynamodb?.OldImage![property];
2755
const newValue = record.dynamodb?.NewImage![property];

lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,17 @@ describe("letter-updates-transformer Lambda", () => {
7373
const letters = generateLetters(21);
7474
const expectedEntries = [
7575
letters.slice(0, 10).map(
76-
letter => expect.objectContaining({Message: JSON.stringify(mapLetterToCloudEvent(letter))})),
76+
(letter, index) => expect.objectContaining({
77+
Id: expect.stringMatching(new RegExp(`-${index}$`)),
78+
Message: JSON.stringify(mapLetterToCloudEvent(letter))})),
7779
letters.slice(10, 20).map(
78-
letter => expect.objectContaining({Message: JSON.stringify(mapLetterToCloudEvent(letter))})),
80+
(letter, index) => expect.objectContaining({
81+
Id: expect.stringMatching(new RegExp(`-${index}$`)),
82+
Message: JSON.stringify(mapLetterToCloudEvent(letter))})),
7983
letters.slice(20).map(
80-
letter => expect.objectContaining({Message: JSON.stringify(mapLetterToCloudEvent(letter))})),
84+
(letter, index) => expect.objectContaining({
85+
Id: expect.stringMatching(new RegExp(`-${index}$`)),
86+
Message: JSON.stringify(mapLetterToCloudEvent(letter))})),
8187
];
8288

8389
await handler(generateKinesisEvent(letters), mockDeep<Context>(), jest.fn());

lambdas/letter-updates-transformer/src/letter-updates-transformer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ export function createHandler(deps: Deps): Handler<KinesisStreamEvent> {
3535
}
3636
}
3737

38-
function buildMessage(event: LetterEvent): PublishBatchRequestEntry {
38+
function buildMessage(event: LetterEvent, index: number): PublishBatchRequestEntry {
3939
return {
40-
Id: event.id,
40+
Id: event.id + '-' + index,
4141
Message: JSON.stringify(event),
4242
}
4343
}

0 commit comments

Comments
 (0)