Skip to content

Commit ff574fd

Browse files
committed
Add forwarding via kinesis stream
1 parent 4709fd5 commit ff574fd

File tree

20 files changed

+1203
-173
lines changed

20 files changed

+1203
-173
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
module "letter_updates_transformer" {
2+
source = "https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.26/terraform-lambda.zip"
3+
4+
function_name = "letter-stream-handler"
5+
description = "Letter Update Filter/Producer"
6+
7+
aws_account_id = var.aws_account_id
8+
component = var.component
9+
environment = var.environment
10+
project = var.project
11+
region = var.region
12+
group = var.group
13+
14+
log_retention_in_days = var.log_retention_in_days
15+
kms_key_arn = module.kms.key_arn
16+
17+
iam_policy_document = {
18+
body = data.aws_iam_policy_document.letter_updates_transformer_lambda.json
19+
}
20+
21+
function_s3_bucket = local.acct.s3_buckets["lambda_function_artefacts"]["id"]
22+
function_code_base_path = local.aws_lambda_functions_dir_path
23+
function_code_dir = "letter-stream-handler/dist"
24+
function_include_common = true
25+
handler_function_name = "handler"
26+
runtime = "nodejs22.x"
27+
memory = 128
28+
timeout = 5
29+
log_level = var.log_level
30+
31+
force_lambda_code_deploy = var.force_lambda_code_deploy
32+
enable_lambda_insights = false
33+
34+
send_to_firehose = true
35+
log_destination_arn = local.destination_arn
36+
log_subscription_role_arn = local.acct.log_subscription_role_arn
37+
38+
lambda_env_vars = merge(local.common_lambda_env_vars, {
39+
EVENTPUB_SNS_TOPIC_ARN = module.eventpub.sns_topic.arn
40+
})
41+
}
42+
43+
data "aws_iam_policy_document" "letter_updates_transformer_lambda" {
44+
statement {
45+
sid = "Kinesis permissions"
46+
effect = "Allow"
47+
48+
actions = [
49+
"kinesis:GetRecords",
50+
]
51+
52+
resources = [
53+
aws_kinesis_stream.letter_change_stream.arn
54+
]
55+
}
56+
57+
statement {
58+
sid = "Kinesis permissions"
59+
effect = "Allow"
60+
61+
actions = [
62+
"kinesis:PutRecord",
63+
]
64+
65+
resources = [
66+
aws_kinesis_stream.letter_change_stream.arn
67+
]
68+
}
69+
}
70+
71+
resource "aws_lambda_event_source_mapping" "letter_updates_transformer_kinesis" {
72+
event_source_arn = aws_kinesis_stream.letter_change_stream.arn
73+
function_name = module.letter_updates_transformer.lambda_function_name
74+
starting_position = "LATEST"
75+
batch_size = 10
76+
maximum_batching_window_in_seconds = 1
77+
scaling_config { maximum_concurrency = 10 }
78+
}

infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,6 @@ module "letter_updates_transformer" {
4141
}
4242

4343
data "aws_iam_policy_document" "letter_updates_transformer_lambda" {
44-
statement {
45-
sid = "KMSPermissions"
46-
effect = "Allow"
47-
48-
actions = [
49-
"kms:Decrypt",
50-
"kms:GenerateDataKey",
51-
]
52-
53-
resources = [
54-
module.kms.key_arn,
55-
]
56-
}
57-
5844
statement {
5945
sid = "AllowSNSPublish"
6046
effect = "Allow"
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<!-- BEGIN_TF_DOCS -->
2+
<!-- markdownlint-disable -->
3+
<!-- vale off -->
4+
5+
## Requirements
6+
7+
No requirements.
8+
## Inputs
9+
10+
No inputs.
11+
## Modules
12+
13+
No modules.
14+
## Outputs
15+
16+
No outputs.
17+
<!-- vale on -->
18+
<!-- markdownlint-enable -->
19+
<!-- END_TF_DOCS -->
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
resource "aws_kinesis_stream" "letter_change_stream" {
2+
name = "letter-change-stream"
3+
shard_count = 1
4+
retention_period = 24
5+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
dist
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
coverage
2+
node_modules
3+
dist
4+
.reports
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import type { Config } from 'jest';
2+
3+
export const baseJestConfig: Config = {
4+
preset: 'ts-jest',
5+
6+
// Automatically clear mock calls, instances, contexts and results before every test
7+
clearMocks: true,
8+
9+
// Indicates whether the coverage information should be collected while executing the test
10+
collectCoverage: true,
11+
12+
// The directory where Jest should output its coverage files
13+
coverageDirectory: './.reports/unit/coverage',
14+
15+
// Indicates which provider should be used to instrument code for coverage
16+
coverageProvider: 'babel',
17+
18+
coverageThreshold: {
19+
global: {
20+
branches: 100,
21+
functions: 100,
22+
lines: 100,
23+
statements: -10,
24+
},
25+
},
26+
27+
coveragePathIgnorePatterns: ['/__tests__/'],
28+
transform: { '^.+\\.ts$': 'ts-jest' },
29+
testPathIgnorePatterns: ['.build'],
30+
testMatch: ['**/?(*.)+(spec|test).[jt]s?(x)'],
31+
32+
// Use this configuration option to add custom reporters to Jest
33+
reporters: [
34+
'default',
35+
[
36+
'jest-html-reporter',
37+
{
38+
pageTitle: 'Test Report',
39+
outputPath: './.reports/unit/test-report.html',
40+
includeFailureMsg: true,
41+
},
42+
],
43+
],
44+
45+
// The test environment that will be used for testing
46+
testEnvironment: 'jsdom',
47+
};
48+
49+
const utilsJestConfig = {
50+
...baseJestConfig,
51+
52+
testEnvironment: 'node',
53+
54+
coveragePathIgnorePatterns: [
55+
...(baseJestConfig.coveragePathIgnorePatterns ?? []),
56+
'zod-validators.ts',
57+
],
58+
};
59+
60+
export default utilsJestConfig;
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"dependencies": {
3+
"@aws-sdk/client-kinesis": "^3.0.0",
4+
"aws-lambda": "^1.0.7"
5+
},
6+
"devDependencies": {
7+
"@types/aws-lambda": "^8.10.119",
8+
"typescript": "^5.0.0"
9+
},
10+
"main": "src/index.ts",
11+
"name": "letter-stream-forwarder",
12+
"private": true,
13+
"scripts": {
14+
"lambda-build": "rm -rf dist && npx esbuild --bundle --minify --sourcemap --target=es2020 --platform=node --loader:.node=file --entry-names=[name] --outdir=dist src/index.ts",
15+
"lint": "eslint .",
16+
"lint:fix": "eslint . --fix",
17+
"test:unit": "jest",
18+
"typecheck": "tsc --noEmit"
19+
},
20+
"version": "0.1.0"
21+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import { KinesisClient } from '@aws-sdk/client-kinesis';
2+
import { mockDeep } from 'jest-mock-extended';
3+
import { DynamoDBStreamEvent, Context } from 'aws-lambda';
4+
import { Deps } from '../deps';
5+
import { EnvVars } from '../env';
6+
import { createHandler } from '../letter-stream-forwarder';
7+
8+
describe('letter-stream-forwarder Lambda', () => {
9+
10+
const mockedDeps: jest.Mocked<Deps> = {
11+
kinesisClient: { send: jest.fn()} as unknown as KinesisClient,
12+
env: {
13+
LETTER_CHANGE_STREAM_NAME: "test-stream",
14+
} as unknown as EnvVars
15+
} as Deps;
16+
17+
beforeEach(() => {
18+
jest.clearAllMocks();
19+
});
20+
21+
22+
it('forwards status changes to Kinesis', async () => {
23+
const event: DynamoDBStreamEvent = {
24+
Records: [
25+
{
26+
eventName: 'MODIFY',
27+
dynamodb: {
28+
Keys: { id: { S: '123' } },
29+
OldImage: { status: { S: 'PENDING' } },
30+
NewImage: { status: { S: 'ACCEPTED' }, id: { S: '123' } },
31+
},
32+
},
33+
],
34+
};
35+
36+
const handler = createHandler(mockedDeps);
37+
await handler(event, mockDeep<Context>(), jest.fn());
38+
39+
expect(mockedDeps.kinesisClient.send).toHaveBeenCalledWith(
40+
expect.objectContaining({
41+
input: expect.objectContaining({
42+
StreamName: 'test-stream',
43+
PartitionKey: '123',
44+
}),
45+
})
46+
);
47+
});
48+
49+
it('does not forward if status did not change', async () => {
50+
const event: DynamoDBStreamEvent = {
51+
Records: [
52+
{
53+
eventName: 'MODIFY',
54+
dynamodb: {
55+
Keys: { id: { S: '123' } },
56+
OldImage: { status: { S: 'PENDING' } },
57+
NewImage: { status: { S: 'PENDING' }, id: { S: '123' } },
58+
},
59+
},
60+
],
61+
};
62+
63+
const handler = createHandler(mockedDeps);
64+
await handler(event, mockDeep<Context>(), jest.fn());
65+
66+
expect(mockedDeps.kinesisClient.send).not.toHaveBeenCalled();
67+
});
68+
69+
it('does not forward non-MODIFY events', async () => {
70+
const event: DynamoDBStreamEvent = {
71+
Records: [
72+
{
73+
eventName: 'INSERT',
74+
dynamodb: {
75+
Keys: { id: { S: '123' } },
76+
NewImage: { status: { S: 'PENDING' }, id: { S: '123' } },
77+
},
78+
},
79+
],
80+
};
81+
82+
const handler = createHandler(mockedDeps);
83+
await handler(event, mockDeep<Context>(), jest.fn());
84+
85+
expect(mockedDeps.kinesisClient.send).not.toHaveBeenCalled();
86+
});
87+
});
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { KinesisClient } from '@aws-sdk/client-kinesis';
2+
import { envVars, EnvVars } from './env';
3+
4+
export type Deps = {
5+
kinesisClient: KinesisClient;
6+
env: EnvVars;
7+
};
8+
9+
export function createDependenciesContainer(): Deps {
10+
return {
11+
kinesisClient: new KinesisClient({}),
12+
env: envVars,
13+
};
14+
}

0 commit comments

Comments
 (0)