Skip to content

Commit 4709fd5

Browse files
committed
CCM-12937 Letter updates transformer lambda
1 parent 2362ef7 commit 4709fd5

File tree

14 files changed

+1042
-54
lines changed

14 files changed

+1042
-54
lines changed

.vscode/settings.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@
1010
"**/Thumbs.db": true,
1111
".github": false,
1212
".vscode": false
13-
}
13+
},
14+
"typescript.tsdk": "node_modules/typescript/lib"
1415
}

infrastructure/terraform/components/api/ddb_table_letters.tf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
resource "aws_dynamodb_table" "letters" {
22
name = "${local.csi}-letters"
33
billing_mode = "PAY_PER_REQUEST"
4+
stream_enabled = true
5+
stream_view_type = "NEW_IMAGE"
46

57
hash_key = "supplierId"
68
range_key = "id"

internal/datastore/src/mi-repository.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
} from '@aws-sdk/lib-dynamodb';
55
import { MI, MISchema } from './types';
66
import { Logger } from 'pino';
7-
import { v4 as uuidv4 } from 'uuid';
7+
import { randomUUID } from 'crypto';
88

99
export type MIRepositoryConfig = {
1010
miTableName: string,
@@ -22,7 +22,7 @@ export class MIRepository {
2222
const now = new Date().toISOString();
2323
const miDb = {
2424
...mi,
25-
id: uuidv4(),
25+
id: randomUUID(),
2626
createdAt: now,
2727
updatedAt: now,
2828
ttl: Math.floor(Date.now() / 1000 + 60 * 60 * this.config.miTtlHours)

lambdas/letter-updates-transformer/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"dependencies": {
3+
"@nhsdigital/nhs-notify-event-schemas-supplier-api": "*",
34
"esbuild": "^0.24.0"
45
},
56
"devDependencies": {

lambdas/letter-updates-transformer/src/__tests__/index.test.ts

Lines changed: 0 additions & 17 deletions
This file was deleted.
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import { SNSClient } from '@aws-sdk/client-sns';
2+
import * as pino from 'pino';
3+
import { createHandler } from '../letter-updates-transformer';
4+
import { KinesisStreamEvent, Context, KinesisStreamRecordPayload } from 'aws-lambda';
5+
import { mockDeep } from 'jest-mock-extended';
6+
import { Deps } from '../deps';
7+
import { EnvVars } from '../env';
8+
import { LetterBase } from '@internal/datastore';
9+
import { mapLetterToCloudEvent } from '../mappers/letter-mapper';
10+
11+
// Make crypto return consistent values, since we're calling it in both prod and test code and comparing the values
12+
const realCrypto = jest.requireActual('crypto');
13+
const randomBytes: Record<string, any> = {'8': realCrypto.randomBytes(8), '16': realCrypto.randomBytes(16)}
14+
jest.mock('crypto', () => ({
15+
randomUUID: () => '4616b2d9-b7a5-45aa-8523-fa7419626b69',
16+
randomBytes: (size: number) => randomBytes[String(size)]
17+
}));
18+
19+
describe('letter-updates-transformer Lambda', () => {
20+
21+
const mockedDeps: jest.Mocked<Deps> = {
22+
snsClient: { send: jest.fn()} as unknown as SNSClient,
23+
logger: { info: jest.fn(), error: jest.fn() } as unknown as pino.Logger,
24+
env: {
25+
EVENT_PUB_SNS_TOPIC_ARN: 'arn:aws:sns:region:account:topic',
26+
} as unknown as EnvVars
27+
} as Deps;
28+
29+
beforeEach(() => {
30+
jest.useFakeTimers();
31+
});
32+
33+
afterEach(() => {
34+
jest.useRealTimers();
35+
})
36+
37+
it('processes Kinesis events and publishes them to SNS', async () => {
38+
39+
const handler = createHandler(mockedDeps);
40+
const letters = generateLetters(1);
41+
const expectedEntries = [expect.objectContaining({Message: JSON.stringify(mapLetterToCloudEvent(letters[0]))})];
42+
43+
await handler(generateKinesisEvent(letters), mockDeep<Context>(), jest.fn());
44+
45+
expect(mockedDeps.snsClient.send).toHaveBeenCalledWith(expect.objectContaining({
46+
input: expect.objectContaining({
47+
TopicArn: 'arn:aws:sns:region:account:topic',
48+
PublishBatchRequestEntries: expectedEntries
49+
})
50+
}));
51+
});
52+
53+
it ('batches mutiple records into a single call to SNS', async () => {
54+
55+
const handler = createHandler(mockedDeps);
56+
const letters = generateLetters(10);
57+
const expectedEntries = letters.map(letter =>
58+
expect.objectContaining({Message: JSON.stringify(mapLetterToCloudEvent(letter))}));
59+
60+
await handler(generateKinesisEvent(letters), mockDeep<Context>(), jest.fn());
61+
62+
expect(mockedDeps.snsClient.send).toHaveBeenCalledWith(expect.objectContaining({
63+
input: expect.objectContaining({
64+
TopicArn: 'arn:aws:sns:region:account:topic',
65+
PublishBatchRequestEntries: expectedEntries
66+
})
67+
}));
68+
});
69+
70+
it("respects SNS's maximumum batch size of 10", async () => {
71+
72+
const handler = createHandler(mockedDeps);
73+
const letters = generateLetters(21);
74+
const expectedEntries = [
75+
letters.slice(0, 10).map(
76+
letter => expect.objectContaining({Message: JSON.stringify(mapLetterToCloudEvent(letter))})),
77+
letters.slice(10, 20).map(
78+
letter => expect.objectContaining({Message: JSON.stringify(mapLetterToCloudEvent(letter))})),
79+
letters.slice(20).map(
80+
letter => expect.objectContaining({Message: JSON.stringify(mapLetterToCloudEvent(letter))})),
81+
];
82+
83+
await handler(generateKinesisEvent(letters), mockDeep<Context>(), jest.fn());
84+
85+
expect(mockedDeps.snsClient.send).toHaveBeenNthCalledWith(1,
86+
expect.objectContaining({
87+
input: expect.objectContaining({
88+
TopicArn: 'arn:aws:sns:region:account:topic',
89+
PublishBatchRequestEntries: expectedEntries[0]
90+
})}));
91+
expect(mockedDeps.snsClient.send).toHaveBeenNthCalledWith(2,
92+
expect.objectContaining({
93+
input: expect.objectContaining({
94+
TopicArn: 'arn:aws:sns:region:account:topic',
95+
PublishBatchRequestEntries: expectedEntries[1]
96+
})}));
97+
expect(mockedDeps.snsClient.send).toHaveBeenNthCalledWith(3,
98+
expect.objectContaining({
99+
input: expect.objectContaining({
100+
TopicArn: 'arn:aws:sns:region:account:topic',
101+
PublishBatchRequestEntries: expectedEntries[2]
102+
})}));
103+
});
104+
105+
106+
function generateLetters(numLetters: number): LetterBase[] {
107+
return Array.from(Array(numLetters).keys())
108+
.map(i => ({ id: String(i + 1), status: 'PRINTED', specificationId: 'spec1', groupId: 'group1' }));
109+
}
110+
111+
function generateKinesisEvent(letterEvents: Object[]): KinesisStreamEvent {
112+
const records = letterEvents
113+
.map(letter => Buffer.from(JSON.stringify(letter)).toString('base64'))
114+
.map(data => ({kinesis: {data}} as unknown as KinesisStreamRecordPayload));
115+
116+
return {Records: records} as unknown as KinesisStreamEvent;
117+
}
118+
});
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import pino from 'pino';
2+
import { envVars, EnvVars } from "./env";
3+
import { SNSClient } from "@aws-sdk/client-sns";
4+
5+
export type Deps = {
6+
snsClient: SNSClient;
7+
logger: pino.Logger;
8+
env: EnvVars;
9+
};
10+
11+
function createSNSClient(): SNSClient {
12+
return new SNSClient({});
13+
}
14+
15+
16+
export function createDependenciesContainer(): Deps {
17+
const log = pino();
18+
19+
return {
20+
snsClient: createSNSClient(),
21+
logger: log,
22+
env: envVars
23+
};
24+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import {z} from 'zod';
2+
3+
const EnvVarsSchema = z.object({
4+
EVENT_PUB_SNS_TOPIC_ARN: z.string(),
5+
});
6+
7+
export type EnvVars = z.infer<typeof EnvVarsSchema>;
8+
9+
export const envVars = EnvVarsSchema.parse(process.env);
Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
// Replace me with the actual code for your Lambda function
2-
import { Handler } from 'aws-lambda';
1+
import { createHandler } from "./letter-updates-transformer";
2+
import { createDependenciesContainer } from "./deps";
33

4-
export const handler: Handler = async (event) => {
5-
console.log('Received event:', event);
6-
return {
7-
statusCode: 200,
8-
body: 'Event logged',
9-
};
10-
};
4+
const container = createDependenciesContainer();
5+
6+
export const handler = createHandler(container);
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
2+
import { Handler, KinesisStreamEvent } from 'aws-lambda';
3+
import { mapLetterToCloudEvent } from './mappers/letter-mapper';
4+
import { PublishBatchCommand, PublishBatchRequestEntry } from '@aws-sdk/client-sns';
5+
import { LetterEvent } from '@nhsdigital/nhs-notify-event-schemas-supplier-api/src';
6+
import { Deps } from './deps';
7+
8+
// SNS PublishBatchCommand supports up to 10 messages per batch
9+
const BATCH_SIZE = 10;
10+
11+
export function createHandler(deps: Deps): Handler<KinesisStreamEvent> {
12+
return async(streamEvent: KinesisStreamEvent) => {
13+
deps.logger.info({description: 'Received event', streamEvent});
14+
15+
const cloudEvents: LetterEvent[] = streamEvent.Records
16+
.map((record) => {
17+
// Kinesis data is base64 encoded
18+
const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
19+
return JSON.parse(payload);
20+
})
21+
.map(mapLetterToCloudEvent);
22+
23+
24+
for (let batch of generateBatches(cloudEvents)) {
25+
await deps.snsClient.send(new PublishBatchCommand({
26+
TopicArn: deps.env.EVENT_PUB_SNS_TOPIC_ARN,
27+
PublishBatchRequestEntries: batch.map(buildMessage),
28+
}));
29+
}
30+
}
31+
32+
function* generateBatches(events: LetterEvent[]) {
33+
for (let i = 0; i < events.length; i += BATCH_SIZE) {
34+
yield events.slice(i, i + BATCH_SIZE);
35+
}
36+
}
37+
38+
function buildMessage(event: LetterEvent): PublishBatchRequestEntry {
39+
return {
40+
Id: event.id,
41+
Message: JSON.stringify(event),
42+
}
43+
}
44+
}

0 commit comments

Comments
 (0)