diff --git a/typescript/ddb-stream-lambda-sns/.gitignore b/typescript/ddb-stream-lambda-sns/.gitignore new file mode 100644 index 0000000000..61bf891f65 --- /dev/null +++ b/typescript/ddb-stream-lambda-sns/.gitignore @@ -0,0 +1,9 @@ +*.js +!jest.config.js +*.d.ts +node_modules + +# CDK asset staging directory +.cdk.staging +cdk.out +.DS_Store diff --git a/typescript/ddb-stream-lambda-sns/.npmignore b/typescript/ddb-stream-lambda-sns/.npmignore new file mode 100644 index 0000000000..c1d6d45dcf --- /dev/null +++ b/typescript/ddb-stream-lambda-sns/.npmignore @@ -0,0 +1,6 @@ +*.ts +!*.d.ts + +# CDK asset staging directory +.cdk.staging +cdk.out diff --git a/typescript/ddb-stream-lambda-sns/README.md b/typescript/ddb-stream-lambda-sns/README.md new file mode 100644 index 0000000000..655cbedae4 --- /dev/null +++ b/typescript/ddb-stream-lambda-sns/README.md @@ -0,0 +1,123 @@ +# DynamoDB Stream Integration with Lambda and SNS + + +--- + +![Stability: Developer Preview](https://img.shields.io/badge/stability-Developer--Preview-important.svg?style=for-the-badge) + +> **This is an experimental example. It may not build out of the box** +> +> This example is built on Construct Libraries marked "Developer Preview" and may not be updated for latest breaking changes. +> +> It may additionally requires infrastructure prerequisites that must be created before successful build. +> +> If build is unsuccessful, please create an [issue](https://github.com/aws-samples/aws-cdk-examples/issues/new) so that we may debug the problem +--- + + +## Overview + +This repository provides both L2 and L3 constructs example usage for working with DynamoDB streams [AWS Cloud Development Kit (CDK)](https://aws.amazon.com/cdk/) with TypeScript. It showcases the integration of DynamoDB streams with AWS Lambda and Amazon SNS (Simple Notification Service), providing an example of real-time data processing and notification workflows. + +This solution demonstrates a use case for real-time notifications: alerting users about low inventory of an item in the system. + +## Architecture Diagram + +![Architecture Diagram](images/architecture.jpg) + +## Features + +- L2 (low-level) construct for fine-grained control over DynamoDB streams +- [L3 (high-level)](https://docs.aws.amazon.com/solutions/latest/constructs/aws-dynamodbstreams-lambda.html) construct for simplified, best-practice implementations of DynamoDB streams +- Integration with Lambda functions for stream processing +- Implements an SQS Dead Letter Queue (DLQ) for the Lambda function failure handling +- Shows how to use Amazon SNS to distribute stream processing results or notifications. + + +## Build, Deploy and Testing + +### Prerequisites + +Before you begin, ensure you have met the following requirements: + +* You have installed the latest version of [Node.js and npm](https://nodejs.org/en/download/) +* You have installed the [AWS CLI](https://aws.amazon.com/cli/) and configured it with your credentials +* You have installed the [AWS CDK Toolkit](https://docs.aws.amazon.com/cdk/latest/guide/cli.html) globally +* You have an AWS account and have set up your [AWS credentials](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html) +* You have [bootstrapped your AWS account](https://docs.aws.amazon.com/cdk/latest/guide/bootstrapping.html) for CDK + + + +### Build +To build this app, you need to be in this example's root folder. Then run the following: + +```bash +npm install +npm run build +``` + +This will install the dependencies for this example. + +### Deploy + +Run `cdk deploy`. This will deploy the Stack to your AWS Account. + +Post deployment, you should see the table arn, lambda function arn and sns topic arn on the output of your CLI. + +## Testing +```bash +npm run test +``` + +## Usage + +### Configuring SNS Notification Subscription + +1. After deploying the stack, locate the SNS topic Amazon Resource Name (ARN) from the CLI output. + +2. To subscribe an email address to the SNS topic: + + ```bash + aws sns subscribe --topic-arn --protocol email --notification-endpoint your-email@example.com + ``` +Replace with the actual ARN of your SNS topic, and your-email@example.com with the email address you want to subscribe. + +3. Check your email inbox for a confirmation message from AWS. Click the link in the email to confirm your subscription. + +### Creating an Item in DynamoDB with id, itemName, and count +To trigger the stream processing and email notification, you need to create an item in your DynamoDB table with the fields id, itemName, and count. You can do this using the AWS CLI or AWS Management Console. + +Example item.json provided in this repo: +```bash +{ + "id": { + "S": "1" + }, + "count": { + "N": "10" + }, + "itemName": { + "S": "Coffee Beans" + } +} +``` + +1. Use the following command to put the item into your DynamoDB table: + +```bash +aws dynamodb put-item --table-name --item file://item.json +``` + +Replace with the actual name of your DynamoDB table. + +2. Whenever you update the count field of the item to 0, the DynamoDB stream will trigger the Lambda function, which will process the data and send a notification to the subscribed email address via SNS. + + + +## Cleanup + +To avoid incurring future charges, please destroy the resources when they are no longer needed: + +```bash +cdk destroy +``` diff --git a/typescript/ddb-stream-lambda-sns/bin/ddb-stream.ts b/typescript/ddb-stream-lambda-sns/bin/ddb-stream.ts new file mode 100644 index 0000000000..2e47901d00 --- /dev/null +++ b/typescript/ddb-stream-lambda-sns/bin/ddb-stream.ts @@ -0,0 +1,6 @@ +#!/usr/bin/env node +import * as cdk from 'aws-cdk-lib'; +import { DdbStreamStack } from '../lib/ddb-stream-stack'; + +const app = new cdk.App(); +new DdbStreamStack(app, 'DdbStreamStack', {}); \ No newline at end of file diff --git a/typescript/ddb-stream-lambda-sns/cdk.json b/typescript/ddb-stream-lambda-sns/cdk.json new file mode 100644 index 0000000000..a39d7d4fb7 --- /dev/null +++ b/typescript/ddb-stream-lambda-sns/cdk.json @@ -0,0 +1,72 @@ +{ + "app": "npx ts-node --prefer-ts-exts bin/ddb-stream.ts", + "watch": { + "include": [ + "**" + ], + "exclude": [ + "README.md", + "cdk*.json", + "**/*.d.ts", + "**/*.js", + "tsconfig.json", + "package*.json", + "yarn.lock", + "node_modules", + "test" + ] + }, + "context": { + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/core:checkSecretUsage": true, + "@aws-cdk/core:target-partitions": [ + "aws", + "aws-cn" + ], + "@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true, + "@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true, + "@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true, + "@aws-cdk/aws-iam:minimizePolicies": true, + "@aws-cdk/core:validateSnapshotRemovalPolicy": true, + "@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true, + "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true, + "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true, + "@aws-cdk/aws-apigateway:disableCloudWatchRole": true, + "@aws-cdk/core:enablePartitionLiterals": true, + "@aws-cdk/aws-events:eventsTargetQueueSameAccount": true, + "@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true, + "@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true, + "@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true, + "@aws-cdk/aws-route53-patters:useCertificate": true, + "@aws-cdk/customresources:installLatestAwsSdkDefault": false, + "@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true, + "@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true, + "@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true, + "@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true, + "@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true, + "@aws-cdk/aws-redshift:columnId": true, + "@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true, + "@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true, + "@aws-cdk/aws-apigateway:requestValidatorUniqueId": true, + "@aws-cdk/aws-kms:aliasNameRef": true, + "@aws-cdk/aws-autoscaling:generateLaunchTemplateInsteadOfLaunchConfig": true, + "@aws-cdk/core:includePrefixInUniqueNameGeneration": true, + "@aws-cdk/aws-efs:denyAnonymousAccess": true, + "@aws-cdk/aws-opensearchservice:enableOpensearchMultiAzWithStandby": true, + "@aws-cdk/aws-lambda-nodejs:useLatestRuntimeVersion": true, + "@aws-cdk/aws-efs:mountTargetOrderInsensitiveLogicalId": true, + "@aws-cdk/aws-rds:auroraClusterChangeScopeOfInstanceParameterGroupWithEachParameters": true, + "@aws-cdk/aws-appsync:useArnForSourceApiAssociationIdentifier": true, + "@aws-cdk/aws-rds:preventRenderingDeprecatedCredentials": true, + "@aws-cdk/aws-codepipeline-actions:useNewDefaultBranchForCodeCommitSource": true, + "@aws-cdk/aws-cloudwatch-actions:changeLambdaPermissionLogicalIdForLambdaAction": true, + "@aws-cdk/aws-codepipeline:crossAccountKeysDefaultValueToFalse": true, + "@aws-cdk/aws-codepipeline:defaultPipelineTypeToV2": true, + "@aws-cdk/aws-kms:reduceCrossAccountRegionPolicyScope": true, + "@aws-cdk/aws-eks:nodegroupNameAttribute": true, + "@aws-cdk/aws-ec2:ebsDefaultGp3Volume": true, + "@aws-cdk/aws-ecs:removeDefaultDeploymentAlarm": true, + "@aws-cdk/custom-resources:logApiResponseDataPropertyTrueDefault": false, + "@aws-cdk/aws-s3:keepNotificationInImportedBucket": false + } +} diff --git a/typescript/ddb-stream-lambda-sns/images/architecture.jpg b/typescript/ddb-stream-lambda-sns/images/architecture.jpg new file mode 100644 index 0000000000..6f1f8652b1 Binary files /dev/null and b/typescript/ddb-stream-lambda-sns/images/architecture.jpg differ diff --git a/typescript/ddb-stream-lambda-sns/item.json b/typescript/ddb-stream-lambda-sns/item.json new file mode 100644 index 0000000000..0eb8dba396 --- /dev/null +++ b/typescript/ddb-stream-lambda-sns/item.json @@ -0,0 +1,11 @@ +{ + "id": { + "S": "1" + }, + "count": { + "N": "10" + }, + "itemName": { + "S": "Coffee Beans" + } +} \ No newline at end of file diff --git a/typescript/ddb-stream-lambda-sns/jest.config.js b/typescript/ddb-stream-lambda-sns/jest.config.js new file mode 100644 index 0000000000..08263b8954 --- /dev/null +++ b/typescript/ddb-stream-lambda-sns/jest.config.js @@ -0,0 +1,8 @@ +module.exports = { + testEnvironment: 'node', + roots: ['/test'], + testMatch: ['**/*.test.ts'], + transform: { + '^.+\\.tsx?$': 'ts-jest' + } +}; diff --git a/typescript/ddb-stream-lambda-sns/lib/ddb-stream-stack.ts b/typescript/ddb-stream-lambda-sns/lib/ddb-stream-stack.ts new file mode 100644 index 0000000000..e4995afa4a --- /dev/null +++ b/typescript/ddb-stream-lambda-sns/lib/ddb-stream-stack.ts @@ -0,0 +1,93 @@ +import * as cdk from 'aws-cdk-lib'; +import { Construct } from 'constructs'; +import * as dynamodb from 'aws-cdk-lib/aws-dynamodb'; +import * as lambda from 'aws-cdk-lib/aws-lambda'; +import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources'; +import { DynamoDBStreamsToLambda } from '@aws-solutions-constructs/aws-dynamodbstreams-lambda'; +import * as sns from 'aws-cdk-lib/aws-sns'; +import * as kms from 'aws-cdk-lib/aws-kms'; +import * as sqs from 'aws-cdk-lib/aws-sqs'; + +export class DdbStreamStack extends cdk.Stack { + constructor(scope: Construct, id: string, props?: cdk.StackProps) { + super(scope, id, props); + const aws_sns_kms_key = kms.Alias.fromAliasName( + this, + "aws-managed-sns-kms-key", + "alias/aws/sns", + ) + + const snsTopic = new sns.Topic(this, 'ddb-stream-topic', { + topicName: 'ddb-stream-topic', + displayName: 'SNS Topic for DDB streams', + enforceSSL: true, + masterKey: aws_sns_kms_key, + }); + + //L2 CDK Construct + const deadLetterQueueL2 = new sqs.Queue(this, 'ddb-stream-l2-dlq', { + queueName: 'ddb-stream-l2-dlq', + encryption: sqs.QueueEncryption.KMS_MANAGED, + retentionPeriod: cdk.Duration.days(4), // Adjust retention period as needed + }); + + const itemL2Table = new dynamodb.Table(this, 'itemL2Table', { + partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING }, + stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, + billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, + encryption: dynamodb.TableEncryption.AWS_MANAGED, + //If you wish to retain the table after running cdk destroy, comment out the line below + removalPolicy: cdk.RemovalPolicy.DESTROY + }); + + const itemL2TableLambdaFunction = new lambda.Function(this, 'itemL2TableLambdaFunction', { + runtime: lambda.Runtime.NODEJS_20_X, + handler: 'index.handler', + tracing: lambda.Tracing.ACTIVE, + code: lambda.Code.fromAsset('resources/lambda'), + environment: { + SNS_TOPIC_ARN: snsTopic.topicArn, + AWS_NODEJS_CONNECTION_REUSE_ENABLED: '1' + }, + }); + itemL2TableLambdaFunction.addEventSource(new lambdaEventSources.DynamoEventSource(itemL2Table, { + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: new lambdaEventSources.SqsDlq(deadLetterQueueL2), + bisectBatchOnError: true, + maxRecordAge: cdk.Duration.hours(24), + retryAttempts: 500, + })); + + deadLetterQueueL2.grantSendMessages(itemL2TableLambdaFunction); + + itemL2Table.grantStreamRead(itemL2TableLambdaFunction); + + //L3 CDK Construct + const itemL3Table = new DynamoDBStreamsToLambda(this, 'itemL3Table', { + dynamoTableProps: { + partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING }, + stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, + //If you wish to retain the table after running cdk destroy, comment out the line below + removalPolicy: cdk.RemovalPolicy.DESTROY + }, + lambdaFunctionProps: { + code: lambda.Code.fromAsset('resources/lambda'), + runtime: lambda.Runtime.NODEJS_20_X, + handler: 'index.handler', + environment: { + SNS_TOPIC_ARN: snsTopic.topicArn, + }, + }, + }); + + snsTopic.grantPublish(itemL2TableLambdaFunction); + snsTopic.grantPublish(itemL3Table.lambdaFunction); + + new cdk.CfnOutput(this, 'itemL2TableLambdaFunctionArn', { value: itemL2TableLambdaFunction.functionArn }); + new cdk.CfnOutput(this, 'itemL3TableLambdaFunctionArn', { value: itemL3Table.lambdaFunction.functionArn }); + new cdk.CfnOutput(this, 'l3TableArn', { value: itemL3Table.dynamoTableInterface.tableArn }); + new cdk.CfnOutput(this, 'l2TableArn', { value: itemL2Table.tableArn }); + new cdk.CfnOutput(this, 'topicArn', { value: snsTopic.topicArn }); + new cdk.CfnOutput(this, 'l2DLQArn', { value: deadLetterQueueL2.queueArn }) + } +} diff --git a/typescript/ddb-stream-lambda-sns/package.json b/typescript/ddb-stream-lambda-sns/package.json new file mode 100644 index 0000000000..8ef8796e63 --- /dev/null +++ b/typescript/ddb-stream-lambda-sns/package.json @@ -0,0 +1,27 @@ +{ + "name": "ddb-stream-lambda-sns", + "version": "0.1.0", + "bin": { + "ddb-stream": "bin/ddb-stream.js" + }, + "scripts": { + "build": "tsc", + "watch": "tsc -w", + "test": "jest", + "cdk": "cdk" + }, + "devDependencies": { + "@types/jest": "^29.5.14", + "@types/node": "22.9.0", + "aws-cdk": "2.166.0", + "jest": "^29.7.0", + "ts-jest": "^29.2.5", + "ts-node": "^10.9.2", + "typescript": "~5.6.3" + }, + "dependencies": { + "@aws-solutions-constructs/aws-dynamodbstreams-lambda": "^2.74.0", + "aws-cdk-lib": "2.167.0", + "constructs": "^10.4.2" + } +} \ No newline at end of file diff --git a/typescript/ddb-stream-lambda-sns/resources/lambda/index.mjs b/typescript/ddb-stream-lambda-sns/resources/lambda/index.mjs new file mode 100644 index 0000000000..f11513da70 --- /dev/null +++ b/typescript/ddb-stream-lambda-sns/resources/lambda/index.mjs @@ -0,0 +1,44 @@ +import { SNSClient, PublishCommand } from "@aws-sdk/client-sns"; + +const snsClient = new SNSClient(); + +/** + * Lambda function handler to monitor DynamoDB stream events for inventory changes + * Sends email notifications when an item's count reaches zero + * @param {Object} event - DynamoDB Stream event + * @returns {Object} - Status of the execution + */ + +export const handler = async (event) => { + try { + for (const record of event.Records) { + // Only process MODIFY events + if (record.eventName === "MODIFY") { + const newImage = record.dynamodb.NewImage; + const oldImage = record.dynamodb.OldImage; + + const newCount = newImage.count ? parseInt(newImage.count.N) : null; + const oldCount = oldImage?.count ? parseInt(oldImage.count.N) : null; + + // Check if count changed to 0 from a non-zero value + if (newCount === 0 && oldCount > 0) { + const itemName = newImage.itemName ? newImage.itemName.S : "Unknown item"; + + const params = { + Message: `Alert: ${itemName} has reached zero inventory! Previous count was ${oldCount}.`, + Subject: `Stock Alert - ${itemName} Out of Stock`, + TopicArn: process.env.SNS_TOPIC_ARN + }; + + await snsClient.send(new PublishCommand(params)); + + console.log(`Notification sent for ${itemName} - count dropped to 0 from ${oldCount}`); + } + } + } + return { statusCode: 200, body: 'Processing complete.' }; + } catch (error) { + console.error('Error processing records:', error); + throw error; + } +}; diff --git a/typescript/ddb-stream-lambda-sns/test/ddb-stream.test.ts b/typescript/ddb-stream-lambda-sns/test/ddb-stream.test.ts new file mode 100644 index 0000000000..5c8f9a2041 --- /dev/null +++ b/typescript/ddb-stream-lambda-sns/test/ddb-stream.test.ts @@ -0,0 +1,77 @@ +import * as cdk from 'aws-cdk-lib'; +import { Template, Match } from 'aws-cdk-lib/assertions'; +import * as ddbStream from '../lib/ddb-stream-stack'; + +const app = new cdk.App(); +const stack = new ddbStream.DdbStreamStack(app, 'MyTestStack'); +const template = Template.fromStack(stack); + +test('DynamoDB Table, Lambda Function, and SNS Topic Created', () => { + template.hasResourceProperties('AWS::DynamoDB::Table', { + StreamSpecification: { + StreamViewType: 'NEW_AND_OLD_IMAGES', + } + }); + template.hasResourceProperties('AWS::Lambda::Function', { + Runtime: 'nodejs20.x', + Handler: 'index.handler', + + }); + template.hasResourceProperties('AWS::SNS::Topic', { + KmsMasterKeyId: Match.anyValue(), + }); +}); + +test('Lambda Function has permission to publish to SNS', () => { + template.hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: { + Statement: Match.arrayWith([ + Match.objectLike({ + Effect: 'Allow', + Action: 'sns:Publish', + Resource: Match.anyValue() + }) + ]) + } + }); +}); + +test('Lambda Function has correct environment variables', () => { + template.hasResourceProperties('AWS::Lambda::Function', { + Environment: { + Variables: { + SNS_TOPIC_ARN: Match.anyValue() + } + } + }); +}); + + +test('Lambda Functions have DynamoDB Stream Event Sources', () => { + template.hasResourceProperties('AWS::Lambda::EventSourceMapping', { + BatchSize: 100, + EventSourceArn: { + 'Fn::GetAtt': [Match.anyValue(), 'StreamArn'] + }, + FunctionName: { + Ref: Match.anyValue() + }, + StartingPosition: 'TRIM_HORIZON' + }); +}); + +test('DLQ is created for Lambda functions', () => { + template.hasResourceProperties('AWS::SQS::Queue', { + KmsMasterKeyId: Match.anyValue() + }); +}); + + +test('Stack has the correct number of resources', () => { + template.resourceCountIs('AWS::DynamoDB::Table', 2); + template.resourceCountIs('AWS::Lambda::Function', 2); + template.resourceCountIs('AWS::SNS::Topic', 1); + template.resourceCountIs('AWS::SQS::Queue', 2); + template.resourceCountIs('AWS::Lambda::EventSourceMapping', 2); +}); + diff --git a/typescript/ddb-stream-lambda-sns/tsconfig.json b/typescript/ddb-stream-lambda-sns/tsconfig.json new file mode 100644 index 0000000000..aaa7dc510f --- /dev/null +++ b/typescript/ddb-stream-lambda-sns/tsconfig.json @@ -0,0 +1,31 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "lib": [ + "es2020", + "dom" + ], + "declaration": true, + "strict": true, + "noImplicitAny": true, + "strictNullChecks": true, + "noImplicitThis": true, + "alwaysStrict": true, + "noUnusedLocals": false, + "noUnusedParameters": false, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": false, + "inlineSourceMap": true, + "inlineSources": true, + "experimentalDecorators": true, + "strictPropertyInitialization": false, + "typeRoots": [ + "./node_modules/@types" + ] + }, + "exclude": [ + "node_modules", + "cdk.out" + ] +}