|
| 1 | +import * as cdk from 'aws-cdk-lib'; |
| 2 | +import { Construct } from 'constructs'; |
| 3 | +import * as dynamodb from 'aws-cdk-lib/aws-dynamodb'; |
| 4 | +import * as lambda from 'aws-cdk-lib/aws-lambda'; |
| 5 | +import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources'; |
| 6 | +import { DynamoDBStreamsToLambda } from '@aws-solutions-constructs/aws-dynamodbstreams-lambda'; |
| 7 | +import * as sns from 'aws-cdk-lib/aws-sns'; |
| 8 | +import * as kms from 'aws-cdk-lib/aws-kms'; |
| 9 | +import * as sqs from 'aws-cdk-lib/aws-sqs'; |
| 10 | + |
| 11 | +export class DdbStreamStack extends cdk.Stack { |
| 12 | + constructor(scope: Construct, id: string, props?: cdk.StackProps) { |
| 13 | + super(scope, id, props); |
| 14 | + const aws_sns_kms_key = kms.Alias.fromAliasName( |
| 15 | + this, |
| 16 | + "aws-managed-sns-kms-key", |
| 17 | + "alias/aws/sns", |
| 18 | + ) |
| 19 | + |
| 20 | + const snsTopic = new sns.Topic(this, 'ddb-stream-topic', { |
| 21 | + topicName: 'ddb-stream-topic', |
| 22 | + displayName: 'SNS Topic for DDB streams', |
| 23 | + enforceSSL: true, |
| 24 | + masterKey: aws_sns_kms_key, |
| 25 | + }); |
| 26 | + |
| 27 | + //L2 CDK Construct |
| 28 | + const deadLetterQueueL2 = new sqs.Queue(this, 'ddb-stream-l2-dlq', { |
| 29 | + queueName: 'ddb-stream-l2-dlq', |
| 30 | + encryption: sqs.QueueEncryption.KMS_MANAGED, |
| 31 | + retentionPeriod: cdk.Duration.days(4), // Adjust retention period as needed |
| 32 | + }); |
| 33 | + |
| 34 | + const itemL2Table = new dynamodb.Table(this, 'itemL2Table', { |
| 35 | + partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING }, |
| 36 | + stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, |
| 37 | + billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, |
| 38 | + encryption: dynamodb.TableEncryption.AWS_MANAGED, |
| 39 | + //If you wish to retain the table after running cdk destroy, comment out the line below |
| 40 | + removalPolicy: cdk.RemovalPolicy.DESTROY |
| 41 | + }); |
| 42 | + |
| 43 | + const itemL2TableLambdaFunction = new lambda.Function(this, 'itemL2TableLambdaFunction', { |
| 44 | + runtime: lambda.Runtime.NODEJS_20_X, |
| 45 | + handler: 'index.handler', |
| 46 | + tracing: lambda.Tracing.ACTIVE, |
| 47 | + code: lambda.Code.fromAsset('resources/lambda'), |
| 48 | + environment: { |
| 49 | + SNS_TOPIC_ARN: snsTopic.topicArn, |
| 50 | + AWS_NODEJS_CONNECTION_REUSE_ENABLED: '1' |
| 51 | + }, |
| 52 | + }); |
| 53 | + itemL2TableLambdaFunction.addEventSource(new lambdaEventSources.DynamoEventSource(itemL2Table, { |
| 54 | + startingPosition: lambda.StartingPosition.TRIM_HORIZON, |
| 55 | + onFailure: new lambdaEventSources.SqsDlq(deadLetterQueueL2), |
| 56 | + bisectBatchOnError: true, |
| 57 | + maxRecordAge: cdk.Duration.hours(24), |
| 58 | + retryAttempts: 500, |
| 59 | + })); |
| 60 | + |
| 61 | + deadLetterQueueL2.grantSendMessages(itemL2TableLambdaFunction); |
| 62 | + |
| 63 | + itemL2Table.grantStreamRead(itemL2TableLambdaFunction); |
| 64 | + |
| 65 | + //L3 CDK Construct |
| 66 | + const itemL3Table = new DynamoDBStreamsToLambda(this, 'itemL3Table', { |
| 67 | + dynamoTableProps: { |
| 68 | + partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING }, |
| 69 | + stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, |
| 70 | + //If you wish to retain the table after running cdk destroy, comment out the line below |
| 71 | + removalPolicy: cdk.RemovalPolicy.DESTROY |
| 72 | + }, |
| 73 | + lambdaFunctionProps: { |
| 74 | + code: lambda.Code.fromAsset('resources/lambda'), |
| 75 | + runtime: lambda.Runtime.NODEJS_20_X, |
| 76 | + handler: 'index.handler', |
| 77 | + environment: { |
| 78 | + SNS_TOPIC_ARN: snsTopic.topicArn, |
| 79 | + }, |
| 80 | + }, |
| 81 | + }); |
| 82 | + |
| 83 | + snsTopic.grantPublish(itemL2TableLambdaFunction); |
| 84 | + snsTopic.grantPublish(itemL3Table.lambdaFunction); |
| 85 | + |
| 86 | + new cdk.CfnOutput(this, 'itemL2TableLambdaFunctionArn', { value: itemL2TableLambdaFunction.functionArn }); |
| 87 | + new cdk.CfnOutput(this, 'itemL3TableLambdaFunctionArn', { value: itemL3Table.lambdaFunction.functionArn }); |
| 88 | + new cdk.CfnOutput(this, 'l3TableArn', { value: itemL3Table.dynamoTableInterface.tableArn }); |
| 89 | + new cdk.CfnOutput(this, 'l2TableArn', { value: itemL2Table.tableArn }); |
| 90 | + new cdk.CfnOutput(this, 'topicArn', { value: snsTopic.topicArn }); |
| 91 | + new cdk.CfnOutput(this, 'l2DLQArn', { value: deadLetterQueueL2.queueArn }) |
| 92 | + } |
| 93 | +} |
0 commit comments